Rabbitmq深度学习一

rabbitmq 深度学习一

简介

rabbitmq是目前使用最多的一个消息中间件,配合微服务的使用可以使业务模块化,便于之后的维护。同时使用rabbitmq可以将许多业务异步化,提高系统的性能。

队列的创建原则

但是在使用rabbitmq 的时候需要注意点就是到底是生产者来建立队列和交换机还是由消费者来建立交换机和队列。一般的情况是由消费者来建立队列,但是假如消费者挂掉了,导致生产者发出的消息被交换机路由到了一个不存在的队列,那么此时 rabbitmq会忽略该条消息。所以对于重要的消息。即不允许该消息丢失,那么此时最好是由生产者和消费者一起船创建一个队列。

在rabbit里面,假设生产者和消费者同时创建一个队列,如果队列的各项参数都相同的话,rabbitmq是不会有问题的,假设生产者和消费者同时创建一个队列,但是后创建的和前面创建的参数不同,那么此时rabbitmq就会报错。

特殊的队列:死信队列

死信队列适用于当某一个队列的消息被拒绝的时候,或队列的消息大于最大TTL时以及队列大于最大值。此时该消息会被路由到死信交换器。
其一般的使用方式是建立一个死信交换机和一个死信队列,然后监听死信队列即可。

代码示例:

生产者端建立队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
@ConfigurationProperties(ignoreUnknownFields = false, prefix = "somersames.rabbitmq")
@Data
public class RabbitmqConfig {

private String laGouFailQueue = "laGouFailQueue";
private String laGouQueue = "laGouQueue";
private String laGouFailExchange = "laGouFailExchange";
private String laGouFailExchangeRoutingKey = "laGouFailExchangeRoutingKey";
public static final String DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";

public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
@Bean
public Queue laGouFailQueue() {
return new Queue(laGouFailQueue);
}
@Bean
public DirectExchange laGouFailExchange() {
return new DirectExchange(laGouFailExchange);
}
@Bean
public Binding bindingLagouFailExchange(Queue laGouFailQueue, DirectExchange laGouFailExchange) {
return BindingBuilder.bind(laGouFailQueue).to(laGouFailExchange).with(laGouFailExchangeRoutingKey);
}
@Bean
public Queue laGouQueue() {
Map<String, Object> map = new HashMap<>();
map.put(DEAD_LETTER_EXCHANGE, laGouFailExchange);//设置死信交换机
map.put(DEAD_LETTER_ROUTING_KEY, laGouFailExchangeRoutingKey);//设置死信routingKey
return new Queue(laGouQueue, true, false, false, map);
}
}

消费者端建立队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Configuration
public class RabbitConfig {

private String laGouQueue = "laGouQueue";

private String laGouExchange = "laGouExchange";

private String laGouFailQueue = "laGouFailQueue";
private String laGouFailExchange = "laGouFailExchange";
private String laGouFailExchangeRoutingKey = "laGouExchangeRoutingKey";


public static final String DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";

public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

@Bean
public Queue laGouQueue() {
Map<String, Object> map = new HashMap<>();
map.put(DEAD_LETTER_EXCHANGE, laGouFailExchange);//设置死信交换机
map.put(DEAD_LETTER_ROUTING_KEY, laGouFailExchangeRoutingKey);//设置死信routingKey
return new Queue(laGouQueue, true, false, false, map);
}
@Bean
public Queue laGouFailQueue() {
return new Queue(laGouFailQueue);
}
@Bean
public DirectExchange laGouFailExchange() {
return new DirectExchange(laGouFailExchange);
}


@Bean
public SimpleMessageListenerContainer laGouListenerContainer(
ConnectionFactory connectionFactory,
LagouRabbitMqListener lagouRabbitMqListener,
Queue laGouQueue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setMessageListener(lagouRabbitMqListener);
container.setQueues(laGouQueue);
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(10);
container.setMaxConcurrentConsumers(20);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
}

@Service
public class LagouRabbitMqListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] bytes = message.getBody();
System.out.println(new String(bytes));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}

rabbitmq的路由方式

一般来讲rabbitmq的路由方式分为三种,一种是fanout,即该交换机会将此条消息推送到机绑定在该交换所有队列之中。

topic:该路由器可以支持通配符来进行消息的匹配,

1
2
3
4
@Bean
public Binding bindingPublicFundFailExchange(Queue queue, DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("*.key");
}

#可以匹配多个关键字
*只可以匹配一个关键字

direct:该路由器会精确匹配队列的key,从而路由器会将指定的消息发送到相应的队列

1
2
3
4
@Bean
public Binding bindingPublicFundFailExchange(Queue queue, DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("error.key");
}

topic是fanoutdiret居中的一种模式,当topic#的时候就类似于fanout,当topic的设置不带有*# 的时候,就是一个directe 了。

vhost多租户模式

在一个企业中,肯定不会是一个业务系统会使用rabbitmq,那么假设每一个业务系统都搭建一个自己的rabbitmq服务,此时就会造成极大的浪费。最好的解决办法是搭建一个集团一起使用的rabbbitmq集群,然后通过rabbitmq的vhost来进行一个隔离。

vhost模式类似于docker环境,一个vhost就是一个docker镜像,每一个vhost里面的交换机和队列都是互相隔离的。在rabbitmq中,默认的vhost就是一个\

作者

Somersames

发布于

2018-11-07

更新于

2021-12-05

许可协议

评论