RabbitMQ基础操作指南
RabbitMQ
2.收发信息的步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| 生产者: 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); 需要设置参数: .setHost(String); .setPort(int); .setUsername(String); .setPassword(String); .setVirtualHost(String); 2.通过工厂对象创建连接 Connection connection = factory.newConnection(); 3.通过连接对象创建通道 Channel channel = connection.createChannel(); 3.1队列声明
.queueDeclare(String,boolean,boolean,boolean,Map); 3.2消息发布
.basicPublish("",QUEUE,null,message.getBytes());
消费者: 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); 2.通过工厂对象创建连接 Connection connection = factory.newConnection(); 3.通过连接对象创建通道 Channel channel = connection.createChannel(); 3.1队列声明 3.2创建消费方法
DefaultConsumer consumer = new DefaultComsumer(channel){ @Override handleDelivery(); }; 3.3进行监听
channel.basicConsume(QUEUE,true,consumer);
|
1、发送端操作流程
- 1)创建连接
- 2)创建通道
- 3)声明队列
- 4)发送消息
2、接收端
- 1)创建连接
- 2)创建通道
- 3)声明队列
- 4)监听队列
- 5)接收消息
- 6)ack回复
3.简单队列
1.模型
2.获取连接的工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class ConnectionUtils {
public static final String HOST = "127.0.0.1";
public static final Integer PORT = 5672;
public static final String USERNAME = "guest";
public static final String PASSWORD = "guest";
public static final String VIRTUALHOST = "/";
public static Connection getConnection(){ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost(VIRTUALHOST);
Connection connection = null; try { connection = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return connection; } }
|
3.生产者生产消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public class Producer01 { private static final String QUEUE = "helloworld"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = null; Channel channel = null; try{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); connection = factory.newConnection(); channel = connection.createChannel();
channel.queueDeclare(QUEUE,true,false,false,null); String message = "helloworld小明:"+System.currentTimeMillis();
channel.basicPublish("",QUEUE,null, message.getBytes()); System.out.println("Send Message is:"+message);
}catch (Exception e){ e.printStackTrace(); }finally { if (channel != null){ channel.close(); } if (connection != null){ connection.close(); } } } }
|
4.消费者消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class Consumer01 { private static final String QUEUE = "helloworld"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE,true,false,false,null); DefaultConsumer consumer = new DefaultConsumer(channel){
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String exchange = envelope.getExchange(); String routingKey = envelope.getRoutingKey(); long deliveryTag = envelope.getDeliveryTag(); String message = new String(body,"utf-8"); boolean isRedeliver = envelope.isRedeliver(); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); System.out.println("deliveryTag:"+deliveryTag); System.out.println("isRedeliver:"+isRedeliver); System.out.println("message:"+message); } };
channel.basicConsume(QUEUE,true,consumer); } }
|
5.简单队列的不足
耦合性高,生产者一一对应消费者(如何我想要有多个消费者消费队列中消息,这时候就不行了),队列名变更,这时候得同事变更。
4.work queues 工作队列模式
1.模型
为什么会出现工作队列
simple队列是一一对应的,而且实际开发,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接受到消息之后就需要处理,可能需要花费时间,这时候队列就会积压了很多消息
2.生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class Send {
public static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 0; i < 50; i++) { String message = "MQ "+i; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("send message: "+message); Thread.sleep(i*20); } channel.close(); connection.close(); } }
|
3.消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class Receive1 { public static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[1] get message :"+ new String(body,"utf-8")); try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] over"); } } };
boolean autoAck = true; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
|
4.消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class Receive2 { public static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[2] get message :"+ new String(body,"utf-8")); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] over"); } } };
boolean autoAck = true; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
|
5.现象
消费者1和消费者2处理的消息是一样的。
消费者1:偶数
消费者2:奇数
这种方式叫做轮询分发(roun-robin)结果就是:不管谁忙活着谁清闲 都不会多给一个消息任务
任务消息总是平均分配。(你一个我一个)
5.公平分发 fair depatch
1.说明
使用公平分发,必须关闭自动应答ack,改为手动
2.生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class Send {
public static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null);
int prefetchCount = 1; channel.basicQos(prefetchCount); for (int i = 0; i < 50; i++) { String message = "MQ "+i; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("send message: "+message); Thread.sleep(i*20); } channel.close(); connection.close(); } }
|
3.消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class Receive1 { public static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); int prefetchCount = 1; channel.basicQos(prefetchCount); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[1] get message :"+ new String(body,"utf-8")); try { Thread.sleep(1000*2); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(),false); System.out.println("[1] over"); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
|
4.消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class Receive2 { public static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); int prefetchCount = 1; channel.basicQos(prefetchCount); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[2] get message :"+ new String(body,"utf-8")); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(),false); System.out.println("[2] over"); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
|
5.与work queues的差别之处
5.1 生产者
5.2 消费者
6.现象
消费者2处理的消息比消费者1多(能者多劳,公平分发)
6.消息应答 与 消息持久化
6.1消息应答
自动确认模式:
RabbitMQ一旦将消息分发非消费之后,就会从内存中删除这个消息
现象:
这种情况下,如果杀死(kill)当前正在执行的消费者,就会丢失正在执行的消息。
1 2
| boolean autoAck = true; channel.basicConsume(QUEUE_NAME,autoAck,consumer);
|
手动确认模式:
1 2 3
| boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
|
如果有一个消费者挂掉,就会交付给其他消费者。RabbitMQ支持消息应答,消费者发送一个消息应答,告诉RabbitMQ这个消息我已经处理完成,RabbitMQ可以将这个消息从内存中删除了。
(message acknowledgment)消息应答模式(Ack)是打开的, false。
如果RabbitMQ挂了,消息仍然会丢失。
6.2消息持久化
1 2 3 4 5 6 7 8 9 10
|
boolean durable = true; channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
|
我们将程序中的boolean durable = true; 改为false是不可以的,会报错。尽管代码是正确的,但是该队列应该声明定义好了,就不可以再进行修改了。(RabbitMQ不允许用不同的参数重新定义一个已经存在的队列(可以先删除再创建))
7.Exchange(交换机,转发器)
一方面是接收生产者的消息,另一方面是向队列推送消息。
匿名转发:“”;
7.1 Fanout Exchange(不处理路由键)
只需要将生产者与exchange进行bind,就会把exchange中的信息转发到与exchange绑定的所有Queue中。
1 2 3 4 5 6 7 8 9
| 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
1.可以理解为路由表的模式
2.这种模式不需要RouteKey
3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
|
7.2 Direct Exchange 处理路径键
1 2 3 4 5 6 7 8 9
| 任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
1.一般情况可以使用rabbitMQ自带的Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。
2.这种模式下不需要将Exchange进行任何绑定(binding)操作
3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
|
7.3 Topic Exchange
1 2 3 4 5 6 7 8 9 10 11 12
| 任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
|
性能排序:fanout > direct >> topic。比例大约为11:10:6
8.订阅模式Publish/Subscribe(fanout)
1.模型
2.解读
- 一个生产者,多个消费者。
- 每一个消费者都有自己对应的队列。
- 生产者没有直接把消息发送到队列 而是发送到了交换机 (转发器exchange)
- 每个队列都要绑定到交换机上
- 生产者发送的消息经过交换机到达队列 ,就能实现一个消息就可以被多个消费者消费。
1 2 3 4
| 发布订阅模式: 1、每个消费者监听自己的队列。 2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息
|
注册 –> 邮件 –> 短信
3.发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class Send { public static final String QUEUE_NAME = "test_queue_fanout"; public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String message = "hello publish/subscribe"; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("send message : "+message); channel.close(); connection.close(); } }
|
4.exchange图示
消息去哪里了??丢失了,因为交换机没有存储的能力,在RabbitMQ里面只有队列有存储能力。因为此时没有把交换机和相应的队列进行绑定,所以数据就丢失了。
5.发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class Send { public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String message = "hello publish/subscribe"; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("send message : "+message); channel.close(); connection.close(); } }
|
6.消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class Receive1 { public static final String QUEUE_NAME = "test_queue_fanout_email"; public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("[1] receive message:"+new String(body,"utf-8")); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] over"); channel.basicAck(envelope.getDeliveryTag(),false); } } };
channel.basicConsume(QUEUE_NAME,false,consumer); } }
|
7.消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class Receive2 { public static final String QUEUE_NAME = "test_queue_fanout_sms"; public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("[2] receive message:"+new String(body,"utf-8")); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[2] over"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; channel.basicConsume(QUEUE_NAME,false,consumer); } }
|
8.管理界面
9.Routing(direct)
1.路由模型
2.生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class Send { public static final String EXCHANGE_NAME = "test_exchange_direct"; public static final String ROUTING_KEY = "error"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"direct"); String message = "hello direct!"+ROUTING_KEY; channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,message.getBytes()); System.out.println("send message:"+message); channel.close(); connection.close(); } }
|
3.消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class Receive1 { public static final String QUEUE_NAME = "test_queue_direct_1"; public static final String EXCHANGE_NAME = "test_exchange_direct"; public static final String ROUTING_KEY = "error"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY); DefaultConsumer consumer = new MyDefaultConsumer(channel,"1"); channel.basicConsume(QUEUE_NAME,false,consumer); } }
|
4.消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class Receive2 { public static final String QUEUE_NAME = "test_queue_direct_2"; public static final String EXCHANGE_NAME = "test_exchange_direct"; public static final String ROUTING_KEY1 = "info"; public static final String ROUTING_KEY2 = "error"; public static final String ROUTING_KEY3 = "warning"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY2); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY3); DefaultConsumer consumer = new MyDefaultConsumer(channel,"2"); channel.basicConsume(QUEUE_NAME,false,consumer); } }
|
5.MyDefaultConsumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class MyDefaultConsumer extends DefaultConsumer { private Channel channel = null; private String name = ""; public MyDefaultConsumer(Channel channel,String name) { super(channel); this.channel = channel; this.name = name; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(name+" receive message: "+new String(body,"utf-8")); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(name+" over"); channel.basicAck(envelope.getDeliveryTag(),false); } } }
|
5.Exchanges图示
10.Topic(topic)
1.模型
1 2 3 4 5
| .“#”表示0个或若干个关键字,“”表示一个关键字。如“log.”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
Goods.insert | Goods.update | ==> Goods.# Goods.delete |
|
2.生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Send { public static final String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String message = "商品。。。"; channel.basicPublish(EXCHANGE_NAME,"goods.delete",null,message.getBytes()); System.out.println("topic send message:"+message); channel.close(); connection.close(); } }
|
3.消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class Receive1 { public static final String QUEUE_NAME = "test_queue_topic_1"; public static final String EXCHANGE_NAME = "test_exchange_topic"; public static final String ROUTING_KEY = "goods.add"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY); DefaultConsumer consumer = new MyDefaultConsumer(channel,"topic1"); channel.basicConsume(QUEUE_NAME,false,consumer); } }
|
4.消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class Receive2 { public static final String QUEUE_NAME = "test_queue_topic_2"; public static final String EXCHANGE_NAME = "test_exchange_topic"; public static final String ROUTING_KEY = "goods.#"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); channel.basicQos(1); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY); DefaultConsumer consumer = new MyDefaultConsumer(channel,"topic2"); channel.basicConsume(QUEUE_NAME,false,consumer); } }
|
11.RabbitMQ的消息确认机制(事务+confirm)
在rabbitMQ中 我们可以通过持久化数据,解决rabbitmq服务器异常的数据丢失问题。
问题:生产者将消息发送出来之后,消息到底有没有到RabbitMQ服务器,默认的情况是不知道的。
两种方式:
AMQP实现了事务机制
Confirm模式
事务机制:
txSelect txCommit txRollback
txSelect:用户将当前channel设置成transaction模式、
txCommit:用于提交事务
txRollback:回滚事务
1事务机制
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class TxSend {
private static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); String msgString = "hello tx message!";
System.out.println("send message:"+msgString); try{ channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msgString.getBytes()); int i = 1/0; channel.txCommit(); }catch (Exception e){ channel.txRollback(); System.out.println("send mesage txRollback"); } channel.close(); connection.close(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class TxReceive { private static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("receive:"+new String(body,"utf-8")); } };
channel.basicConsume(QUEUE_NAME,true,consumer);
} }
|
2.Confirm模式
生产者端confirm模式的实现原理
1 2 3
| 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会指派成一个唯一的id(从1开 始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消 息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
|
Confirm模式最大的好处就在于它是异步的、
Nack;
开启confirm模式。
Channel.confirmSelect();
编程模式:
1.普通 发一条 waitForConfirms()
2.批量 发一批 waitForConfirms()
3.异步 Confirm模式:提供一个回调的方法,
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
public class Send2 {
private static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
String msgString = "Hello confirm message batch"; for (int i = 0; i < 10; i++) { channel.basicPublish("",QUEUE_NAME,null,msgString.getBytes()); } if(!channel.waitForConfirms()){ System.out.println("message send failed"); }else{ System.out.println("message send ok"); }
channel.close(); connection.close(); } }
|
异步模式:
1
| Channel对象提供的confirmListener()回调方法值包deliveryTag(当前Channel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序列集合,每publish一条数据,集合中元素加1.每回到一次handleAck方法,unconfirm集合删除相应的一条(multiple=false)或多条(multiple=true),从程序的运行效率来看,这个unconfirm集合最好采用有序集合sortedset存储结构。
|
publish-subscribe和work queues的区别
1 2 3 4 5 6 7 8 9 10 11 12 13
| 1、publish/subscribe与work queues有什么区别。 区别: 1)work queues不用定义交换机,而publish/subscribe需要定义交换机。 2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默 认交换机)。 3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑 定到默认的交换机 。 相同点: 所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。 2、实质工作用什么 publish/subscribe还是work queues。 建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换 机。
|
4.4.4思考
1、本案例的需求使用Routing工作模式能否实现?
使用Routing模式也可以实现本案例,共设置三个 routingkey,分别是email、sms、all,email队列绑定email和
all,sms队列绑定sms和all,这样就可以实现上边案例的功能,实现过程比topics复杂。
Topic模式更多加强大,它可以实现Routing、publish/subscirbe模式的功能。
Routing 与 Topic的区别
1 2 3 4
| 使用Routing模式时,生产者生产信息是带有特殊的rountingKey,一条消息只会发送到一个Queue中,消费者通过唯 一的routingKey来监听指定的Queue
使用Topic模式时,生产者生产消息时带有通用的routingKey,一条信息可以发送到符合条件的Queue中,消费者通过配置带有#通配符的routingKey来监听满足条件的Queue。
|
1 2 3 4 5 6 7 8
| Headers类型的Exchanges是不处理路由键的,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与 Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对 进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型,而fanout,direct,topic的路由必须都需要字符串形式的。
匹配规则x-match有下列两种类型: x-match=all:表示所有的键值对都可匹配才可以接收到消息。 x-match=any:表示只有有键值对匹配就可以接收到到消息。
|
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配
队列。
案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种
通知类型都接收的则两种通知都有效。
代码:
1)生产者
队列与交换机绑定的代码与之前不同,如下:
1 2 3 4 5 6
| Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_type", "email"); Map<String, Object> headers_sms = new Hashtable<String, Object>(); headers_sms.put("inform_type", "sms"); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
|
通知:
1 2 3 4 5 6 7 8
| String message = "email inform to user"+i; Map<String,Object> headers = new Hashtable<String, Object>(); headers.put("inform_type", "email");
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers);
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
|
消费者:
1 2 3 4 5 6 7
| channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); Map<String, Object> headers_email = new Hashtable<String, Object>(); headers_email.put("inform_email", "email");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
|
7.RPC
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
- 1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
- 2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
- 3、服务端将RPC方法 的结果发送到RPC响应队列
- 4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。