mq总结

docker 安装 rabbitMQ

1
2
3
4
5
6
docker pull rabbitmq:management

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rannitmq:management

键入 http://localhost:15672
其中账号密码均为 guest

rabbitMQ的分析

优点:

  1. 应用解耦,提高系统的容错性
  2. 异步通讯,提高系统的吞吐量
  3. 流量消锋,提高系统的并发能力

缺点:

  1. 系统的可用性降低,系统引入的外部依赖越多,系统的稳定性越差,一旦MQ宕机,就会对业务造成影响
  2. 系统更多复杂度提高

rabbitMQ的架构

producer:负责消息的发送

exchange:交换机,负责转发消息

queue:队列,负责存储消息

binding:把交换机和队列进行关联

consumer:负责消费消息

virtual host:虚拟主机,在一个rabbitMQ中可以有多个虚拟主机,虚拟主机中有exchange、binding、queue;每个虚拟主机之间相互隔离

队列模型

simpe queue简单队列模型
work queue工作队列模型:distribute tasks among workers

image-20240503100502904

发布订阅模型:
  • fanout:广播

  • direct:路由,指定具体的routing key

  • topic:主题,可以使用通配符,来简化routing key的填写。其中 # 代表0个或多个单词; *表示一个单词

image-20240503100602528

当有多个队列和一个交换机绑定时

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"awesomeProject1/utils"
"github.com/streadway/amqp"
"log"
"strconv"
)

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
utils.FailOnErr(err, "failed to create connection")
defer conn.Close()

ch, err := conn.Channel()
utils.FailOnErr(err, "failed to create channel")
defer ch.Close()

err = ch.ExchangeDeclare("complex_topic_logs", "topic", true,
false, false, false, nil)
utils.FailOnErr(err, "failed to declare an exchange")

queues := map[string][]string{
"queue_orange": {"*.orange.*"},
"queue_lazy": {"lazy.#"},
"queue_all": {"#"},
}
index := 0
//!!!
for queueName, bindingKeys := range queues {
q, err := ch.QueueDeclare(queueName, true, false, false, false, nil)
utils.FailOnErr(err, "failed to declare a queue")

for _, bindingKey := range bindingKeys {
err = ch.QueueBind(q.Name, bindingKey, "complex_topic_logs", false, nil)
utils.FailOnErr(err, "failed to bind queue")
}
msgs, err := ch.Consume(q.Name, "consumer"+strconv.Itoa(index), false,
false, false, false, nil)
index++
go func(queueName string, msgs <-chan amqp.Delivery) {
for d := range msgs {
log.Printf(" [%s] Received a message: %s", queueName, d.Body)
}
}(queueName, msgs)
}
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}

producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
"awesomeProject1/utils"
"github.com/streadway/amqp"
"log"
"time"
)

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
utils.FailOnErr(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
utils.FailOnErr(err, "Failed to open a channel")
defer ch.Close()

err = ch.ExchangeDeclare(
"complex_topic_logs", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
messages := map[string]string{
"quick.orange.rabbit": "Message for quick.orange.rabbit",
"lazy.orange.elephant": "Message for lazy.orange.elephant",
"quick.brown.fox": "Message for quick.brown.fox",
"lazy.brown.dog": "Message for lazy.brown.dog",
"quick.orange.fox": "Message for quick.orange.fox",
"lazy.orange.rabbit": "Message for lazy.orange.rabbit",
}

for routingKey, body := range messages {
err = ch.Publish(
"complex_topic_logs", // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
utils.FailOnErr(err, "Failed to publish a message")
log.Printf(" [x] Sent %s: %s", routingKey, body)
time.Sleep(1 * time.Second) // 模拟发送消息的时间间隔
}
}

rabbitMQ中设置 qos的作用

Qos 方法来设置 Quality of Service(QoS),可以控制消息的分发方式。

  • prefetch count: 指定在收到确认之前,消费者可以同时预取的最大消息数目。
  • prefetch size: 指定在收到确认之前,消费者可以同时预取的最大消息大小(以字节为单位)。通常设置为 0 表示忽略此限制。
  • global: 如果设置为 true,则 Qos 设置会对所有消费者生效。如果设置为 false,则 Qos 设置只对当前的消费者生效。

QoS 的作用和优点

  1. 避免消息淹没消费者: 通过设置 prefetch count,可以确保在处理当前消息之前不会向消费者发送更多消息。例如,在上面的代码中,prefetch count 设置为 1,意味着消费者在处理并确认当前消息之前不会接收更多消息。这可以防止消费者在无法及时处理消息时被淹没。
  2. 公平调度: 使用 QoS 可以实现公平调度。假设有多个消费者并且 prefetch count 设置为 1,那么 RabbitMQ 会确保每个消费者在处理并确认当前消息之前不会收到更多消息,从而实现消息的公平分配。
  3. 减少未确认消息的积压: 通过限制未确认消息的数量,可以减少消息积压的风险,从而提高系统的稳定性和可靠性。

注意事项

  • 适当设置 prefetch count: 根据消费者处理消息的能力,合理设置 prefetch count,可以平衡消息处理的吞吐量和消费者的负载。
  • 全局与非全局: 全局(global)设置会影响到所有的消费者,而非全局设置只影响当前的消费者。根据实际需求选择合适的设置

如何保障消息的可靠性

可能发生消息丢失的三个阶段:

  1. 生产者发送消息的阶段
    • exchangeName有误
    • routingKey有误

生产者确认机制:确保消息正常发送给了交换机

生产者回退机制:确保消息正常发给了队列;其中,若队列返回nack,可以将异常数据写入mysql,兜底处理

  1. rabbitMQ存储消息的阶段
    • 默认情况下,消息存储于内存中,如果rabbitMQ宕机,内存中的消息就会丢失
  2. 消费者消费消息的阶段
    • 消费者获得消息后,会自动给rabbitMQ返回ACK,rabbitMQ服务端此时会把这个消息从队列中删除。若获取消息后,业务逻辑处理完前,rabbitMQ宕机,消息丢失。

死信队列

DLQ dead letter queue 消息无法被消费者成功消费而被重新投递到另一个队列中

  1. 消息被拒绝
  2. 消息过期
  3. 队列满了
  4. 消息路由失败

延迟队列

rabbitMQ中没有真正意义上的延迟队列。可以使用ttl(time to live)+死信队列实现

适用场景:

  • 10min内未支付,则关闭订单
  • 用户注册后,三日未上线,则短信提醒

Golang 实现 RabbitMQ 的延迟队列 | Go 技术论坛 (learnku.com)

go - 高可用延迟队列设计与实现 - 个人文章 - SegmentFault 思否

消息的重复消费问题

为保证消息百分百被消费成功,可能出现消息重复消费的情况。

image-20240503162436164

生产端:无法解决

消费端:保证消息的幂等性,就算多次消费,也不会影响最终的效果

使用数据库唯一约束保证幂等性

Kafka

为何读写速度极快

  • 使用批量处理的方式来提升系统吞吐能力。 网络IO 速度
  • 基于磁盘文件高性能顺序读写的特性来设计的存储结构。磁盘IO 速度
  • 利用操作系统的PageCache来缓存数据,减少IO并提升读性能。
  • 使用零拷贝技术加速消费流程。

mq总结
http://example.com/2024/04/06/mq总结/
作者
Forrest
发布于
2024年4月6日
许可协议