1. Publish模式(订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取)

Publish模式 - 图1

  • X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
  • 相关场景:邮件群发,群聊天,广播(广告)

目录结构

Publish模式 - 图2

kuteng-RabbitMQ

-RabbitMQ

—rabitmq.go //这个是RabbitMQ的封装

-Pub

—mainPub.go //Publish 先启动

-Sub

—mainSub.go

-Sub2

—mainSub.go

rabitmq.go代码

  1. package RabbitMQ
  2. import (
  3. "fmt"
  4. "log"
  5. "github.com/streadway/amqp"
  6. )
  7. //连接信息
  8. const MQURL = "amqp://kuteng:kuteng@127.0.0.1:5672/kuteng"
  9. //rabbitMQ结构体
  10. type RabbitMQ struct {
  11. conn *amqp.Connection
  12. channel *amqp.Channel
  13. //队列名称
  14. QueueName string
  15. //交换机名称
  16. Exchange string
  17. //bind Key 名称
  18. Key string
  19. //连接信息
  20. Mqurl string
  21. }
  22. //创建结构体实例
  23. func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
  24. return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
  25. }
  26. //断开channel 和 connection
  27. func (r *RabbitMQ) Destory() {
  28. r.channel.Close()
  29. r.conn.Close()
  30. }
  31. //错误处理函数
  32. func (r *RabbitMQ) failOnErr(err error, message string) {
  33. if err != nil {
  34. log.Fatalf("%s:%s", message, err)
  35. panic(fmt.Sprintf("%s:%s", message, err))
  36. }
  37. }
  38. //订阅模式创建RabbitMQ实例
  39. func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
  40. //创建RabbitMQ实例
  41. rabbitmq := NewRabbitMQ("", exchangeName, "")
  42. var err error
  43. //获取connection
  44. rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
  45. rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
  46. //获取channel
  47. rabbitmq.channel, err = rabbitmq.conn.Channel()
  48. rabbitmq.failOnErr(err, "failed to open a channel")
  49. return rabbitmq
  50. }
  51. //订阅模式生产
  52. func (r *RabbitMQ) PublishPub(message string) {
  53. //1.尝试创建交换机
  54. err := r.channel.ExchangeDeclare(
  55. r.Exchange,
  56. "fanout",
  57. true,
  58. false,
  59. //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
  60. false,
  61. false,
  62. nil,
  63. )
  64. r.failOnErr(err, "Failed to declare an excha"+
  65. "nge")
  66. //2.发送消息
  67. err = r.channel.Publish(
  68. r.Exchange,
  69. "",
  70. false,
  71. false,
  72. amqp.Publishing{
  73. ContentType: "text/plain",
  74. Body: []byte(message),
  75. })
  76. }
  77. //订阅模式消费端代码
  78. func (r *RabbitMQ) RecieveSub() {
  79. //1.试探性创建交换机
  80. err := r.channel.ExchangeDeclare(
  81. r.Exchange,
  82. //交换机类型
  83. "fanout",
  84. true,
  85. false,
  86. //YES表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
  87. false,
  88. false,
  89. nil,
  90. )
  91. r.failOnErr(err, "Failed to declare an exch"+
  92. "ange")
  93. //2.试探性创建队列,这里注意队列名称不要写
  94. q, err := r.channel.QueueDeclare(
  95. "", //随机生产队列名称
  96. false,
  97. false,
  98. true,
  99. false,
  100. nil,
  101. )
  102. r.failOnErr(err, "Failed to declare a queue")
  103. //绑定队列到 exchange 中
  104. err = r.channel.QueueBind(
  105. q.Name,
  106. //在pub/sub模式下,这里的key要为空
  107. "",
  108. r.Exchange,
  109. false,
  110. nil)
  111. //消费消息
  112. messges, err := r.channel.Consume(
  113. q.Name,
  114. "",
  115. true,
  116. false,
  117. false,
  118. false,
  119. nil,
  120. )
  121. forever := make(chan bool)
  122. go func() {
  123. for d := range messges {
  124. log.Printf("Received a message: %s", d.Body)
  125. }
  126. }()
  127. fmt.Println("退出请按 CTRL+C\n")
  128. <-forever
  129. }

mainPub.go代码

  1. package main
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  7. )
  8. func main() {
  9. rabbitmq := RabbitMQ.NewRabbitMQPubSub("" +
  10. "newProduct")
  11. for i := 0; i < 100; i++ {
  12. rabbitmq.PublishPub("订阅模式生产第" +
  13. strconv.Itoa(i) + "条" + "数据")
  14. fmt.Println("订阅模式生产第" +
  15. strconv.Itoa(i) + "条" + "数据")
  16. time.Sleep(1 * time.Second)
  17. }
  18. }

mainSub.go代码(两个消费者代码是一样的)

  1. package main
  2. import "github.com/student/kuteng-RabbitMQ/RabbitMQ"
  3. func main() {
  4. rabbitmq := RabbitMQ.NewRabbitMQPubSub("" +
  5. "newProduct")
  6. rabbitmq.RecieveSub()
  7. }