RabbitMQ的工作队列和路由.docx

上传人:小飞机 文档编号:3062281 上传时间:2023-03-10 格式:DOCX 页数:5 大小:37.87KB
返回 下载 相关 举报
RabbitMQ的工作队列和路由.docx_第1页
第1页 / 共5页
RabbitMQ的工作队列和路由.docx_第2页
第2页 / 共5页
RabbitMQ的工作队列和路由.docx_第3页
第3页 / 共5页
RabbitMQ的工作队列和路由.docx_第4页
第4页 / 共5页
RabbitMQ的工作队列和路由.docx_第5页
第5页 / 共5页
亲,该文档总共5页,全部预览完了,如果喜欢就下载吧!
资源描述

《RabbitMQ的工作队列和路由.docx》由会员分享,可在线阅读,更多相关《RabbitMQ的工作队列和路由.docx(5页珍藏版)》请在三一办公上搜索。

1、RabbitMQ的工作队列和路由RabbitMQ的工作队列和路由 工作队列:Working Queue 工作队列这个概念与简单的发送/接收消息的区别就是:接收方接收到消息后,可能需要花费更长的时间来处理消息,这个过程就叫一个Work/Task。 几个概念 分配:多个接收端接收同一个Queue时,如何分配? 消息确认:Server端如何确定接收方的Work已经对消息进行了完整的处理? 消息持久化:发送方、服务端Queue如何对未处理的消息进行磁盘持久化? Round-robin分配 多个接收端接收同一个Queue时,采用了Round-robin分配算法,即轮叫调度依次分配给各个接收方。 消息确认

2、 默认开启了消息确认。消息接收方处理完消息后,向服务器发送消息确认,服务器再删除该消息。 对于耗时的work,可以先关闭自动消息确认,在work完成后,再手动发回确认。 channel.basicConsume(hello,false/*关闭自动消息确认*/,consumer); / .work完成后 channel.basicAck(delivery.getEnvelope.getDeliveryTag, false); 持久化 1. Server端的Queue持久化 注意的是,如果已经声明了同名非持久化的Queue,则再次声明无效。 发送方和接收方都需要指定该参数。 boolean dur

3、able = true; channel.queueDeclare(task_queue, durable, false, false, null); 2. Message持久化 channel.basicPublish(, task_queue, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes); 负载分配 为了解决各个接收端工作量相差太大的问题,突破Round-robin。 int prefetchCount = 1; channel.basicQos(prefetchCount); 意思为,最多为当前接收方发送一条消息。如

4、果接收方还未处理完毕消息,还没有回发确认,就不要再给他分配消息了,应该把当前消息分配给其它空闲接收方。 固定关键词路由:Routing 使用类型为direct的exchange,发送特定关键词的消息给订阅该关键词的Queue。 场景示例:消息发送方发送了类型为errorinfo的两种消息,写磁盘的消息接受者只接受error类型的消息,Console打印的接收两者。 (上图采用了不同颜色来作为routingKey) 发送方 ConnectionFactory factory = new ConnectionFactory; factory.setHost(localhost); Connecti

5、on connection = factory.newConnection; Channel channel = connection.createChannel; channel.exchangeDeclare(EXCHANGE_NAME, direct/*exchange类型为direct*/); channel.basicPublish(EXCHANGE_NAME, info/*关键词=info*/, null, message.getBytes); channel.close; connection.close; 接收方 ConnectionFactory factory = new

6、ConnectionFactory; factory.setHost(localhost); Connection connection = factory.newConnection; Channel channel = connection.createChannel; channel.exchangeDeclare(EXCHANGE_NAME, direct/*exchange类型为direct*/); / 创建匿名Queue String queueName = channel.queueDeclare.getQueue; / 订阅某个关键词,绑定到匿名Queue中 channel.q

7、ueueBind(quueName,EXCHANGE_NAME,error); channel.queueBind(quueName,EXCHANGE_NAME,info); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery; / Blocking. String message = new String(deli

8、very.getBody); String routingKey = delivery.getEnvelope.getRoutingKey; / 可获取路由关键词 关键词模式路由:Topics 这种模式可以看做对Routing的扩展。Routing只能使用固定关键词,而Topics模式可以订阅模糊关键词。 关键词必须是一组word,由点号分割。例如xxx.yyy.zzz,限定255bytes。 * 表示一个word; # 表示0个或者多个word; 发送方 ConnectionFactory factory = new ConnectionFactory; factory.setHost(lo

9、calhost); Connection connection = factory.newConnection; Channel channel = connection.createChannel; channel.exchangeDeclare(EXCHANGE_NAME, topic/*exchange类型*/); channel.basicPublish(EXCHANGE_NAME, xxx.yyy/*关键词routingKey*/, null, message.getBytes); channel.close; connection.close; 接收方 ConnectionFact

10、ory factory = new ConnectionFactory; factory.setHost(localhost); Connection connection = factory.newConnection; Channel channel = connection.createChannel; channel.exchangeDeclare(EXCHANGE_NAME, topic/*exchange类型*/); / 创建匿名Queue String queueName = channel.queueDeclare.getQueue; / 订阅某个关键词,绑定到匿名Queue中

11、 channel.queueBind(quueName,EXCHANGE_NAME,*.yyy); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery; / Blocking. String message = new String(delivery.getBody); String routingKey = delivery.getEnvelope.getRoutingKey; / 可获取路由关键词

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 生活休闲 > 在线阅读


备案号:宁ICP备20000045号-2

经营许可证:宁B2-20210002

宁公网安备 64010402000987号