RabbitMQ实战代码-NodeJS

NodeJS下使用

使用amqplib操作RabbitMQ

安装 amqplib

1
npm install amqplib --save

注意以下几点

建立连接后库会自己发送心跳包保活,不用开发者处理。

发送消息

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class RabbitMQ {
constructor(opts) {
this.opts = Object.assign({
"protocol": "amqp",
"hostname": "localhost",
"port": 5672,
"username": "guest",
"password": "guest"
}, opts);
this.trynum = 0;
this.maxtrynum = 9;
}

async sendQueueMsg(queueName, msg, resultCallback, retry) {
if (!retry) {
this.trynum = 0;
}
const amqp = require('amqplib');
let channel = null;
let conn = null;
let self = this;

function retryAction() {
setTimeout(() => {
try {
if (channel) {
channel.close();
if (conn) {
conn.close();
}
}
} catch (e) {
console.info("关闭失败")
}
}, 1000);

if (self.trynum < self.maxtrynum) {
self.trynum += 1;
console.info("MQ失败尝试", "第" + self.trynum + "次", msg)
setTimeout(() => {
self.sendQueueMsg(queueName, msg, resultCallback, true);
}, 10000)
} else {
resultCallback && resultCallback("Fail: " + msg);
}
}

try {
let msgBuffer = Buffer.from(msg, "utf8");
conn = await amqp.connect(this.opts);
conn.on('error', function (err) {
console.info("conn error")
});
channel = await conn.createChannel();
await channel.assertQueue(queueName);
let data = await channel.sendToQueue(queueName, msgBuffer, {
persistent: true
});
if (data) {
resultCallback && resultCallback("Success: " + msg);
setTimeout(() => {
try {
if (channel) {
channel.close();
if (conn) {
conn.close();
}
}
} catch (e) {
console.info("关闭失败")
}
}, 1000);
} else {
retryAction();
}
} catch (e) {
retryAction();
}
}
}

exports.RabbitMQ = RabbitMQ;

注意

发送完成后关闭的时候一定要添加延时,否则可能还没发送出去,就被关闭了。

调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
const {
RabbitMQ
} = require("./server/RabbitMQ")
const {config} = require("./server/config/config");

function getDateStr() {
let date = new Date();
let year = date.getFullYear();
let month = date.getMonth() + 1;
let day = date.getDate();
let hour = date.getHours();
let minute = date.getMinutes();
let sec = date.getSeconds();
month = month < 10 ? "0" + month : month;
day = day < 10 ? "0" + day : day;
hour = hour < 10 ? "0" + hour : hour;
minute = minute < 10 ? "0" + minute : minute;
sec = sec < 10 ? "0" + sec : sec;
return year + "-" + month + "-" + day + " " + hour + ":" + minute + ":" + sec;
}
let mq = new RabbitMQ({
"hostname": config.mq_hostname,
"port": config.mq_port,
"username": config.mq_username,
"password": config.mq_password
});
let i = 0;
function sendmq() {
if (i < 100) {
let msg = {
"num": i,
};
mq.sendQueueMsg("psvmc.mq_queuename", JSON.stringify(msg), (error) => {
console.log(getDateStr(), "发送MQ结果:", error);
setTimeout(() => {
i += 1;
sendmq()
}, 2000)
})
}
}
sendmq()

加入了失败重发机制

接收消息

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const amqp = require('amqplib');

class RabbitMQ {
constructor(opts) {
let m_opts = Object.assign({
"protocol": "amqp",
"hostname": "localhost",
"port": 5672,
"username": "guest",
"password": "guest"
}, opts);
this.opts = m_opts;
this.open = amqp.connect(m_opts);
}

async receiveQueueMsg(queueName, receiveCallBack) {
let channel = null;
try {
let conn = await this.open;
channel = await conn.createChannel();
channel.consume(queueName, function (msg) {
if (msg !== null) {
let data = msg.content.toString();
channel.ack(msg);
receiveCallBack && receiveCallBack(data);
}
})
} catch (e) {
console.error(e);
if (channel) {
channel.close();
}
}
}
}

exports.RabbitMQ = RabbitMQ;

调用

1
2
3
4
5
6
7
8
9
10
11
const {RabbitMQ} = require("./src/utils/RabbitMQ")
let mq = new RabbitMQ({
"hostname": "110.110.110.110",
"port": 5672,
"username": "psvmc",
"password": "123456"
});

mq.receiveQueueMsg("xhkjedu.xhschool.livequeue", data => {
console.info("receiveQueueMsg:", data)
})