阿里开源的消息队列框架
创建SpringBoot项目设置pom.xml
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
|
设置application.properties
1 2 3 4 5 6 7 8 9 10
|
jtxyh.producer.groupName=jtxyh_producer
jtxyh.producer.namesrvAddr=192.168.1.210:9876
jtxyh.consumer.gourpName=jtxyh_consumer jtxyh.consumer.NamesrvAddr=192.168.1.210:9876 jtxyh.consumer.topic=Topic_Jtxyh
|
设置生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @SpringBootConfiguration public class MQProducerConfiguration {
@Value("${jtxyh.producer.groupName}") private String groupName; @Value("${jtxyh.producer.namesrvAddr}") private String namesrvAddr;
@Bean public DefaultMQProducer getRocketMQProducer(){ DefaultMQProducer producer; producer = new DefaultMQProducer(this.groupName); producer.setNamesrvAddr(this.namesrvAddr); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } return producer; } }
|
设置消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @SpringBootConfiguration public class MQConsumerConfiguration {
@Value("${jtxyh.consumer.NamesrvAddr}") private String namesrvAddr; @Value("${jtxyh.consumer.gourpName}") private String groupName; @Value("${jtxyh.consumer.topic}") private String topic; @Autowired private GiftSendListener giftSendListener;
@Bean public DefaultMQPushConsumer getRocketMQConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.registerMessageListener(giftSendListener); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); try { consumer.subscribe(topic,"*"); consumer.start(); }catch (MQClientException e){ e.printStackTrace(); } return consumer; } }
|
设置消费者监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if(msgs == null || msgs.isEmpty()){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = msgs.get(0); System.out.println(messageExt.toString()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
|
测试调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @RequestMapping("/message") @Controller public class MessageHandler { @Autowired private DefaultMQProducer defaultMQProducer;
@RequestMapping("send") @ResponseBody public String send(){ String msg = "Hello World!!!"; Message sendMsg = new Message("Topic_Jtxyh","DemoTag",msg.getBytes()); SendResult sendResult = null; try { sendResult = defaultMQProducer.send(sendMsg); }catch (Exception e) { e.printStackTrace(); } return sendResult.toString(); } }
|
相关文章
数据库连接池
SpringIOC
Junit和Spring
Tomcat
Servlet
Request,Response和ServletContext
Cookie和Session
JSP和EL和Jstl
Filter和Listener
Mybatis