流行的消息中间件
使用
docker安装
1
| docker run -d --name rabbitmq3.8 -p 5672:5672 -p 15672:15672 rabbitmq:3.8-management
|
简介
组成
消息发布流程
工作模式
Work queues工作队列
一个生产者将消息发给队列,多个消费者共同监听一个队列的消息
消息不能被重复消费
rabbitMQ采用轮询的方式将消息平均发送给消费者
Publish/Subscribe发布订阅
一个生产者把消息发送给交换机,与交换机绑定的有多个队列,每个消费者监听自己的队列
生产者将消息发给交换机,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
如果消息发给没有绑定队列的交换机上消息将丢失
Routing路由
交换机类型设置为:direct
一个交换机绑定多个队列,每个队列设置routingkey,并且一个队列可以设置多个routingKey
每个消费者监听自己的队列
生产者将消息发给交换机,发送消息时需要指定routingkey的值,交换机来判断改routingkey的值和哪个队列的routingkey相等,如果相等则将消息转发给该队列
Topics通配符
交换机类型设置为:topic
一个交换机绑定多个队列,交换机根据routingkey的值来匹配队列,匹配时采用通配符方式,匹配成功的将消息转发到指定的队列
#号统配符:匹配一个或多个词,多个词中间用.分割
1 2 3 4
| 匹配:info.# info.1 info.1.2 info.1.2.3
|
\*号统配符:匹配一个词
1 2 3 4
| 匹配:info.* info.1 info.2 info.3
|
header取消了routingkey,使用header中的key/value匹配队列
RPC远程调用
使用MQ实现RPC的异步调用,基于Direct交换机实现
客户端既是生产者也是消费者,像RPC请求队列发送RPC调用消息,同时监听RPC响应队列
服务端监听RPC请求队列的消息,收到消息后执行服务端方法,得到方法返回的结果,服务端将RPC的结果发送到RPC响应队列
SpringBoot使用
pom
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
yml
1 2 3 4 5 6 7
| spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: /
|
发送消息类型转json
1 2 3 4
| @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
|
定义配置类
DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
name->交换机名称,durable->是否持久化,autoDelete->是否自动删除,arguments->指定一些参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Configuration public class DirectRabbitConfig {
@Bean DirectExchange directExchange(){ return new DirectExchange("directExchange"); }
@Bean Queue infoQueue(){ return new Queue("infoMsgQueue"); }
@Bean Binding infoToExchangeBinging(Queue infoQueue, DirectExchange directExchange) { return BindingBuilder.bind(infoQueue).to(directExchange).with("info-msg"); }
@Bean TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); }
@Bean Queue errorQueue(){ return new Queue("errorMsgQueue"); }
@Bean Binding infoToExchangeBinging2(Queue errorQueue, TopicExchange topicExchange) { return BindingBuilder.bind(errorQueue).to(topicExchange).with("error-#"); } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Component public class DirectSender {
@Resource private AmqpTemplate rabbitTemplate;
public void sendInfo() { String content = "发送到directExchange交换机,指定routing key为info-msg"; this.rabbitTemplate.convertAndSend("directExchange", "info-msg", content, new CorrelationData(UUID.randomUUID().toString())); }
public void sendError() { String content = "I am Error msg!"; this.rabbitTemplate.convertAndSend("topicExchange", "error.msg", content); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Component public class DirectReceiver1 {
@RabbitListener(queues = "infoMsgQueue") public void process(Message message,String data) { System.out.println("########### 消费者消费:" + data); } }
@Component
@RabbitListener(queues = "errorMsgQueue") public class DirectReceiver3 {
@RabbitHandler public void process(Message message,String data) { System.out.println("消费error消息:" + data); }
@RabbitHandler public void process(Message message,UserInfo data) { System.out.println("消费error消息:" + data); } }
|
开启发送接收确认模式
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| spring: rabbitmq: host: 192.168.0.18 port: 5672 username: guest password: guest virtual-host: / publisher-returns: true template: mandatory: true publisher-confirm-type: correlated listener: simple: acknowledge-mode: manual retry: enabled: true
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void initRabbitTemplate() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { } });
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { } }); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @RabbitListener(queues = RabbitMqConfig.ORDER_TIMEOUT_QUEUE, concurrency = "4-10") public void consumeTimeOutQueue(@Payload String orderId, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("消息ID=" + orderId); if (orderId.equals("1")) { int i = 1 / 0; } channel.basicAck(deliveryTag, false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { System.out.println("消息已重复处理失败,拒绝再次接收..."); channel.basicReject(deliveryTag, false); } else { System.out.println("消息即将再次返回队列处理..."); channel.basicAck(deliveryTag, false); channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes("《报错后发送的新消息》")); } } }
|
延迟队列
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
public static final String ORDER_QUEUE = "woniu.order.queue";
public static final String ORDER_TIMEOUT_QUEUE = "woniu.order.timeout.queue";
public static final String ORDER_EXCHANGE = "woniu.order.exchage";
@Bean public Queue orderQueue() {
Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", ORDER_EXCHANGE); arguments.put("x-dead-letter-routing-key", ORDER_TIMEOUT_QUEUE); return new Queue(ORDER_QUEUE, true, false, false, arguments); }
@Bean public Queue orderTimeoutQueue() { return new Queue(ORDER_TIMEOUT_QUEUE, true, false, false); }
@Bean public Binding orderQueueBinding() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_QUEUE); }
@Bean public Binding orderTimeoutQueueBinding() { return BindingBuilder.bind(orderTimeoutQueue()).to(orderExchange()).with(ORDER_TIMEOUT_QUEUE); }
@Bean public DirectExchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE, true, false, null); }
|
使用
1 2 3 4 5 6 7 8 9 10
| @Autowired private RabbitTemplate rabbitTemplate;
public void sendOrder(String orderId) { rabbitTemplate.convertAndSend(RabbitMqConfig.ORDER_EXCHANGE, RabbitMqConfig.ORDER_QUEUE, orderId, message -> { message.getMessageProperties().setExpiration("2000"); return message; },new CorrelationData(UUID.randomUUID().toString())); }
|
相关文章
数据库连接池
SpringIOC
Junit和Spring
Tomcat
Servlet
Request,Response和ServletContext
Cookie和Session
JSP和EL和Jstl
Filter和Listener
Mybatis