mermaid-framework-rabbitmq基于RabbitMQ开发,提供了RabbitMQ访问服务。其能力如下:
<dependency>
<groupId>com.mermaid.framework</groupId>
<artifactId>mermaid-rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
# 自动将RabbitMessageListener类型的Bean监听到对应的队列
mermaid.framework.rabbitmq.autoListen=true
# rabbitmq服务IP
mermaid.framework.rabbitmq.host=127.0.0.1
# rabbitmq服务端口
mermaid.framework.rabbitmq.port=5672
# rabbitmq用户名
mermaid.framework.rabbitmq.userName=guest
# rabbitmq密码
mermaid.framework.rabbitmq.password=guest
# 集群
mermaid.framework.rabbitmq.addresses=
mermaid.framework.rabbitmq.vhost=/
mermaid.framework.rabbitmq.maxChannels=25
@RestController
public class MyController {
@Autowired
private RabbitMQService rabbitMQService;
@RequestMapping("/rabbitmq-test")
public String rabbitMQTest() {
return rabbitMQService.send("queue-name", "Hello");
}
}
@Test
public void simpleSendTest() throws Exception{
//发送消息,不指定ExchangeType,默认是ExchangeType.DIRECT
rabbitService.send("q.shiguiming", 100);
rabbitService.send("q.zhanglei", 200);
TimeUnit.SECONDS.sleep(5);
Assert.assertEquals(100, MQTestListener.value);
Assert.assertEquals(200, MQTestListener2.value);
}
@Test
public void directTargetTest() throws Exception{
RabbitMQMessageTarget mqTarget = RabbitMQMessageTarget.createDirectTarget("q.shiguiming");
rabbitService.send(mqTarget,300);
TimeUnit.SECONDS.sleep(5);
Assert.assertEquals(300, MQTestListener.value);
}
@Test
public void fanoutTargetTest() throws Exception {
RabbitMQMessageTarget mqTarget = RabbitMQMessageTarget.createFanoutTarget("q.fanout", new String[]{"q.fanout1","q.fanout2"});
rabbitService.send(mqTarget, 400);
TimeUnit.SECONDS.sleep(5);
Assert.assertEquals(400,MQTestListener8.value);
Assert.assertEquals(400,MQTestListener9.value);
rabbitService.send(mqTarget, 500);
TimeUnit.SECONDS.sleep(5);
Assert.assertEquals(500,MQTestListener8.value);
Assert.assertEquals(500,MQTestListener9.value);
}
@Test
public void topicTargetTest() throws Exception {
RabbitMQMessageTarget mqTarget = RabbitMQMessageTarget.createTopicTarget("q.topic", "q.routingKey", "queue1", "queue2");
rabbitService.send(mqTarget, 400);
TimeUnit.SECONDS.sleep(5);
Assert.assertEquals(400,MQTestListener8.value);
Assert.assertEquals(400,MQTestListener9.value);
rabbitService.send(mqTarget, 500);
TimeUnit.SECONDS.sleep(5);
Assert.assertEquals(500,MQTestListener8.value);
Assert.assertEquals(500,MQTestListener9.value);
}
@Service
public class MQTestListener extends AbstractRabbitMessageListener {
public static int value = 0;
public MQTestListener() {
super("q.shiguiming", 0);
}
/**处理监听到的消息*/
@Override
public void handleMessage(Object object) {
System.out.println("MQTestListener works");
value = (int) object;
}
}
@Test
public void sendTest() throws Exception{
rabbitService.listen(new MQTestListener());
}
public class MQTestListener extends AbstractRabbitMessageListener {
public static int value = 0;
public MQTestListener() {
super("q.shiguiming", 0);
}
@Override
public void handleMessage(Object object) {
System.out.println("MQTestListener works");
value = (int) object;
}
}