Midwayjs 使用RabbitMQ的使用过程 Midw

Midway.js 使用RabbitMQ的使用过程

前言

这段时间在做公司的一个项目,这个项目是基于midway.js为架构的一个项目。 主要是基于typescript的一个nodejs的web框架。 因为要用到rabbitmq,所以,在midway官网上调试了rabbitmq相关的demo,发现demo会出现生成者能够正常生产消息,但是消费者订阅后并没有收到生产者发出的消息的bug,所以经过我的的思考和查阅了相当大的一部分资料,顺利的吧把demo跑通了。以下就是我解决的一下方法和思路。

  1. 安装

官方参考链接

前提是已经部署好了rabbitmq服务和midway
1.1 安装依赖

1
2
ruby复制代码$ cnpm i @midwayjs/rabbitmq amqplib --save
$ cnpm i @types/amqplib --save-dev

1.2 创建服务要使用的文件

image.png

以上 server/index.js 消费者 用来监听rabbitmq中的队列, 直接用node运行

src/consumer/userConsumer.ts 不过没有用到这个文件

src/service/rabbitmq.ts rabbitmq服务

src/controller/home.ts 本身文件就有, 用这个文件来发起请求
src/config 配置文件,配置rabbitmq的链接信息

  1. 使用

server/index.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
javascript复制代码const amqp = require("amqplib/callback_api")

// 创建连接
amqp.connect('amqp://localhost', (err, conn) => {
// 创建通道
conn.createChannel((err, ch) => {
// 定义队列的名称
const q = 'task_queue'
// Declare the queue 声明队列
ch.assertQueue(q, { durable: true })

// Wait for Queue Messages 监听消息
console.log(` [*] Waiting for messages in ${q}. To exit press CTRL+C`)

ch.consume( q, msg => {
console.log(` [x] Received ${msg.content.toString()}`)
}, { noAck: true }
)
})
})

src/consumer/userConsumer.ts

1
复制代码没啥用,现在这里埋个坑

src/service/rabbitmq.ts

这里我没有根据官方的demo去写,自己重新定义了一个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
kotlin复制代码import {Provide, Scope, ScopeEnum, Init, Config} from "@midwayjs/decorator";
import { Connection, Channel, Message, Options } from 'amqplib';
import * as rabbitmq from 'amqplib';

interface IRabbitConf {
hostname: string;
prot: number;
username: string;
password: string;
queueName: string[];
}

@Scope(ScopeEnum.Singleton) // singleon 单例模式 全局唯一 ,进程级别
@Provide('rabbitmqService')
export class RabbitmqService {

public config: Options.Connect;
public connection: Connection;
public channel: Channel;
private isAsert: Record<string, boolean>;
private queueNames: string[];
private isCheck: boolean;

// 注意这个地方, 使用的是config文件中的配置,所以记得要在config中配置
@Config('rabbit')
rabbitConf: IRabbitConf;


@Init()
async connect() {
console.log("正在连接rabbitmq")

this.config = Object.assign(
{
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'fgc1101',
password: 'fgc19941030',
frameMax: 0,
heartbeat: 0,
vhost: '/',
},
this.rabbitConf
);

console.log(this.rabbitConf)
this.connection = await rabbitmq.connect(this.config);
await this.creatChannel();
this.queueNames = this.rabbitConf.queueName || [];
this.isAsert = {};
this.isCheck = false;
}

async checkQueue() {
if (!this.channel) {
await this.creatChannel();
}
for (const queueName of this.queueNames) {
const checkQueue = await this.channel.checkQueue(queueName);
if (checkQueue?.queue === queueName) this.isAsert[queueName] = true;
}
this.isCheck = true;
}

close() {
this.connection.close();
}

async creatChannel() {
this.channel = await this.connection.createChannel();
}

async push(
queueName: string,
msg: string,
option?: { priority?: number; durable?: boolean; siId?: string }
) {
const options = {
priority: 0,
durable: true,
headers: { time: 0, siId: option?.siId },
...option,
};
if (!this.queueNames.includes(queueName)){
console.log(`you did not define default queueName ${queueName}`)
return new Error(`you did not define default queueName ${queueName}`);
}

if (!this.isCheck) {
await this.checkQueue();
}
if (!this.isAsert[queueName]) {
this.channel.assertQueue(queueName, { durable: options.durable });
this.isAsert[queueName] = true;
}
this.channel.sendToQueue(queueName, Buffer.from(msg), options);
}

async pull(queueName: string): Promise<false | Message> {
if (!this.queueNames.includes(queueName))
new Error(`you did not default queueName ${queueName}`);

if (!this.isCheck) {
await this.checkQueue();
}
if (!this.isAsert[queueName]) {
await this.channel.assertQueue(queueName, { durable: true });
await this.channel.prefetch(1);
this.isAsert[queueName] = true;
}
return this.channel.get(queueName, { noAck: false });
}

async repush(msg: Message) {
const { fields, properties, content } = msg;
if (!this.isCheck) {
await this.checkQueue();
}
if (!this.isAsert[fields.routingKey]) {
this.channel.assertQueue(fields.routingKey, { durable: true });
this.channel.prefetch(1);
this.isAsert[fields.routingKey] = true;
}
const { headers } = properties;

this.channel.ack(msg);
headers.time++;
this.channel.sendToQueue(fields.routingKey, content, properties);
}

ack(msg: Message) {
this.channel.ack(msg);
}

nack(msg: Message) {
this.channel.nack(msg);
}

}

src/controller/home.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kotlin复制代码
import {Controller, Get, Inject, Provide} from '@midwayjs/decorator';
import {RabbitmqService} from "../service/rabbitmq";

@Provide()
@Controller('/')
export class HomeController {

@Inject()
rabbitmqService : RabbitmqService

@Get('/')
async home() {

console.log("测试连接rabbitmq服务")
await this.rabbitmqService.push('task_queue', 'world typeScript');

return {success: true, message: 'OK', data: {}};
}

}

src/config/config.default.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
arduino复制代码export default (appInfo: EggAppInfo) => {
// 数据库配置
config.sequelize = {
...
}

config.rabbit = {
hostname: '127.0.0.1',
prot: 5672,
username: 'fgc1101',
password: 'fgc19941030',
queueName: ['task_queue']
}

return config;
}
  1. 注意点

3.1 配置文件不要忘记写

3.2 监听队列的文件可以直接用js写

  1. 测试

4.1 启动服务

1
arduino复制代码npm run dev

image.png

4.2 根据控制器访问路径

image.png

4.3 进入的Server目录下执行消费者服务的代码

1
复制代码 node index.js

image.png

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%