Golang rabbitMQ生产者消费者发送消息和重连

1. 生产者重连

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package rabbitmq

import (
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"time"
)

type Producer struct {
queue string
addr string
exchange string
conn *amqp.Connection
channel *amqp.Channel
notifyClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
done chan bool
isConnected bool
isDone bool
}

const (
reconnectDelay = 5 * time.Second // 连接失败后多久重连
resendDelay = 1 * time.Second // 消息发送失败后,多久重发
resendTime = 10 // 消息重发次数
)

var (
producerNotConnected = errors.New("not connected to the producer")
producerAlreadyClosed = errors.New("already closed: not connected to the producer")
)

func NewProducer(addr string, exchange string, queue string) *Producer {
producer := &Producer{
exchange: exchange,
queue: queue,
addr: addr,
done: make(chan bool),
}
go producer.reconnect()
for !producer.isConnected{}
return producer
}

func (producer *Producer) reconnect() {
for {
producer.isConnected = false
fmt.Println("attempting to connect")
for !producer.connect() {
fmt.Println("failed to connect. Retrying...")
time.Sleep(reconnectDelay)
}
select {
case <-producer.done:
fmt.Println("producer is done")
producer.isDone = true
return
case _ = <-producer.notifyClose:
//producer.isConnected = false
//producer.connect()
}
}
}

func (producer *Producer) connect() bool {
conn, err := amqp.Dial(producer.addr)
if err != nil {
return false
}

ch, err := conn.Channel()
if err != nil {
return false
}

ch.Confirm(false)

_, err = ch.QueueDeclare(
producer.queue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return false
}

producer.changeConnection(conn, ch)
return true
}

func (producer *Producer) changeConnection(conn *amqp.Connection, channel *amqp.Channel) {
producer.conn = conn
producer.channel = channel
// channels没有必要主动关闭。如果没有协程使用它,它会被垃圾收集器收拾
producer.notifyClose = make(chan *amqp.Error)
producer.notifyConfirm = make(chan amqp.Confirmation)
producer.channel.NotifyClose(producer.notifyClose)
producer.channel.NotifyPublish(producer.notifyConfirm)
producer.isConnected = true
}

func (producer *Producer) SendMessage(message interface{}) error {
//if !producer.isConnected {
// return errors.New("failed to push : not connected")
//}
var currentTime = 0
for {
if producer.isDone{
return errors.New("producer is done")
}
err := producer.push(message)
if err != nil {
fmt.Println("push failed. Retrying...")
time.Sleep(time.Second)
currentTime++
if currentTime >= resendTime {
return err
}
}
ticker := time.NewTicker(resendDelay)
select {
case confirm := <-producer.notifyConfirm:
if confirm.Ack {
fmt.Println("push confirmed!")
return nil
}
case <-ticker.C:
}
fmt.Println("push didn't confirm. Retrying...")
}
}

// 发送
func (producer *Producer) push(message interface{}) error {
if !producer.isConnected {
return producerNotConnected
}
data, _ := json.Marshal(message)
return producer.channel.Publish(
producer.exchange, // Exchange
producer.queue, // Routing key
false, // Mandatory
false, // Immediate
amqp.Publishing{
DeliveryMode: 2,
ContentType: "application/json",
Body: data,
Timestamp: time.Now(),
},
)
}

func (producer *Producer) Close() error {
if !producer.isConnected {
return producerAlreadyClosed
}
err := producer.channel.Close()
if err != nil {
return err
}
err = producer.conn.Close()
if err != nil {
return err
}
producer.done <- true
producer.isConnected = false
return nil
}

2. 消费者重连

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package rabbitmq

import (
"errors"
"fmt"
"github.com/streadway/amqp"
"time"
)

type Consumer struct {
queue string
addr string
conn *amqp.Connection
channel *amqp.Channel
notifyClose chan *amqp.Error
done chan bool
isConnected bool
isDone bool
}

var (
consumerNotConnected = errors.New("not connected to the consumer")
consumerAlreadyClosed = errors.New("already closed: not connected to the consumer")
)

func NewConsumer(addr string, queue string)*Consumer{
consumer := &Consumer{
queue: queue,
addr: addr,
done: make(chan bool),
}
go consumer.reconnect()
for !consumer.isConnected{}
return consumer
}

func (consumer *Consumer) reconnect() {
consumer.isConnected = false
fmt.Println("attempting to connect")
for !consumer.connect() {
fmt.Println("failed to connect. Retrying...")
time.Sleep(reconnectDelay)
}
}


func (consumer *Consumer) connect() bool {
conn, err := amqp.Dial(consumer.addr)
if err != nil {
return false
}

ch, err := conn.Channel()
if err != nil {
return false
}

ch.Confirm(false)

_, err = ch.QueueDeclare(
consumer.queue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return false
}

consumer.changeConnection(conn, ch)
return true
}


func (consumer *Consumer) changeConnection(conn *amqp.Connection, channel *amqp.Channel) {
consumer.conn = conn
consumer.channel = channel
consumer.notifyClose = make(chan *amqp.Error)
consumer.channel.NotifyClose(consumer.notifyClose)
consumer.isConnected = true
}

func (consumer *Consumer)ReceiveMessage(){
msgs, err := consumer.channel.Consume(
consumer.queue, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil{
fmt.Println(err)
consumer.reconnect()
consumer.ReceiveMessage()
}

for !consumer.isDone{
select {
case <-consumer.done:
fmt.Println("consumer is done")
consumer.isDone = true
case _ = <-consumer.notifyClose:
fmt.Println(consumerNotConnected)
consumer.reconnect()
consumer.ReceiveMessage()
case msg := <- msgs:
fmt.Println(string(msg.Body))
}
}
}


func (consumer *Consumer) Close() error {
if !consumer.isConnected {
return consumerAlreadyClosed
}
err := consumer.channel.Close()
if err != nil {
return err
}
err = consumer.conn.Close()
if err != nil {
return err
}
consumer.done <- true
consumer.isConnected = false
return nil
}