Fastify+TS实现基础IM服务(四)fastify/

本文主要内容均为@fastifywebsocket入门内容,这部分可以直接去看NPM上的@fastify/websocket #readme

不同的是本文的代码示例都是TS的,如果已经有了这部分基础可以直接跳到改造js+ws聊天室代码部分

Fastify 的 WebSocket 支持,基于 ws@8 构建。

安装

1
2
3
bash复制代码npm i @fastify/websocket
# 或者
yarn add @fastify/websocket

如果你是 TypeScript 用户,这个包内置了自己的 TypeScript 类型,但你还需要安装 ws 包的类型:

1
2
3
bash复制代码npm i @types/ws -D
# 或者
yarn add -D @types/ws

使用方法

注册这个插件后,你可以选择 WS 服务器响应哪些路由。这可以通过在 fastify 的 .get 路由上添加 websocket: true 属性到 routeOptions 来实现。在这种情况下,两个参数将被传递给处理程序,socket 连接和 fastify 请求对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typescript复制代码import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';

const app = fastify();

app.register(fastifyWebsocket);

app.register(async function (fastify) {
fastify.get('/', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server');
});
});
});

app.listen({ port: 3000 }, err => {
if (err) {
app.log.error(err);
process.exit(1);
}
});

在这种情况下,它将在每个未注册的路由上响应一个 404 错误,关闭传入的升级连接请求。

然而,你仍然可以定义一个通配符路由,它将被用作默认处理程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
typescript复制代码import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';

const app = fastify();

app.register(fastifyWebsocket, {
options: { maxPayload: 1048576 }
});

app.register(async function (fastify) {
fastify.get('/*', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from wildcard route');
});
});

fastify.get('/', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server');
});
});
});

app.listen({ port: 3000 }, err => {
if (err) {
app.log.error(err);
process.exit(1);
}
});

附加事件处理程序

重要的是,WebSocket 路由处理程序在处理程序执行期间同步附加事件处理程序,以避免意外丢弃消息。如果你想在你的 WebSocket 处理程序中进行任何异步工作,比如认证用户或从数据存储加载数据,请确保在触发这个异步工作之前附加任何 on(‘message’) 处理程序。否则,消息可能会在这个异步工作进行时到达,如果没有处理程序监听这个数据,它将被默默丢弃。

下面是一个例子,展示了如何在仍然访问异步资源的同时同步附加消息处理程序。我们将一个异步事情的 promise 存储在一个局部变量中,同步附加消息处理程序,然后使消息处理程序本身异步,以获取异步数据并进行一些处理:

1
2
3
4
5
6
7
8
typescript复制代码app.get('/*', { websocket: true }, (connection, request) => {
const sessionPromise = request.getSession(); // 示例异步会话获取器,同步调用以返回一个 promise

connection.socket.on('message', async (message) => {
const session = await sessionPromise;
// 使用消息和会话做一些事情
});
});

使用钩子

使用 @fastify/websocket 注册的路由遵循 Fastify 插件封装上下文,因此将运行已注册的任何钩子。这意味着你可能用于认证或错误处理的普通 HTTP 处理程序的相同路由钩子也适用于 websocket 处理程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
typescript复制代码app.addHook('preValidation', async (request, reply) => {
// 检查请求是否已认证
if (!request.isAuthenticated()) {
await reply.code(401).send("未认证");
}
});

app.get('/', { websocket: true }, (connection, req) => {
// 仅为认证的传入请求打开连接
connection.socket.on('message', message => {
// ...
});
});

自定义错误处理程序

你可以选择性地提供一个自定义 errorHandler,它将用于处理已建立的 websocket 连接的任何清理工作。如果在建立连接后你的 websocket 路由处理程序抛出任何错误,将调用 errorHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
typescript复制代码import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';

const app = fastify();

app.register(fastifyWebsocket, {
errorHandler: function (error, connection, req, reply) {
// 做些事情
// 销毁/关闭连接
connection.socket.terminate();
},
options: {
maxPayload: 1048576, // 我们将允许的最大消息大小设置为 1 MiB(1024 字节 * 1024 字节)
verifyClient: function (info, next) {
if (info.req.headers['x-fastify-header'] !== 'fastify is awesome !') {
return next(false); // 连接不被允许
}
next(true); // 连接被允许
}
}
});

app.get('/', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server');
});
});

app.listen({ port: 3000 }, err => {
if (err) {
app.log.error(err);
process.exit(1);
}
});

自定义 preClose 钩子

默认情况下,当服务器关闭时,所有 ws 连接都将关闭。如果你希望修改这种行为,你可以传递你自己的 preClose 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typescript复制代码import fastify from 'fastify';
import fastifyWebsocket from '@fastify/websocket';

const app = fastify();

app.register(fastifyWebsocket, {
preClose: (done) => { // 注意:也可以使用 async 风格,不使用 done-callback
const server = this.websocketServer;

for (const socket of server.clients) {
socket.close(1001, 'WS 服务器以自定义方式离线,发送代码 + 消息');
}

server.close(done);
}
});

测试

测试 ws 处理程序可能相当棘手,幸运的是 fastify-websocket 为 fastify 实例装饰了 injectWS。它允许轻松测试 websocket 端点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typescript复制代码import Fastify from 'fastify';
import FastifyWebSocket from '@fastify/websocket';
import ws from 'ws';

const app = Fastify();
await app.register(FastifyWebSocket);

app.get('/', { websocket: true }, (connection, req) => {
const stream = ws.createWebSocketStream(connection.socket, { /* 选项 */ });
stream.setEncoding('utf8');
stream.write('hello client');

stream.on('data', function (data) {
// 确保设置了数据处理程序或以其他方式读取所有传入的数据,否则流背压将导致底层 WebSocket 对象被暂停。
});
});

await app.listen({ port: 3000 });

App.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typescript复制代码import Fastify from 'fastify';
import FastifyWebSocket from '@fastify/websocket';

const App = Fastify();

App.register(FastifyWebSocket);

App.register(async function(fastify) {
fastify.addHook('preValidation', async (request, reply) => {
if (request.headers['api-key'] !== 'some-random-key') {
return reply.code(401).send();
}
});

fastify.get('/', { websocket: true }, (connection) => {
connection.socket.on('message', message => {
connection.socket.send('hi from server');
});
});
});

export default App;

App.test.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
typescript复制代码import { test } from 'tap';
import Fastify from 'fastify';
import App from './app';

test('connect to /', async (t) => {
t.plan(1);

const fastify = Fastify();
fastify.register(App);
t.teardown(fastify.close.bind(fastify));

const ws = await fastify.injectWS('/', {headers: { "api-key" : "some-random-key" }});
let resolve;
const promise = new Promise(r => { resolve = r; });

ws.on('message', (data) => {
resolve(data.toString());
});
ws.send('hi from client');

t.assert(await promise, 'hi from server');
// 记得最后关闭 ws
ws.terminate();
});

注意!!!

  • 测试结束时需要手动关闭 Websocket。
  • 需要等待 fastify.ready() 以确保 fastify 已被装饰。
  • 如果你需要处理服务器响应,需要在发送消息之前注册事件监听器。

选项

@fastify/websocket 接受以下 ws 的选项:

  • host - 绑定服务器的主机名。
  • port - 绑定服务器的端口。
  • backlog - 等待连接的队列的最大长度。
  • server - 一个预创建的 Node.js HTTP/S 服务器。
  • verifyClient - 用于验证传入连接的函数。
  • handleProtocols - 用于处理 WebSocket 子协议的函数。
  • clientTracking - 指定是否跟踪客户端。
  • perMessageDeflate - 启用/禁用 permessage-deflate。
  • maxPayload - 允许的最大消息大小(以字节为单位)。

有关更多信息,你可以查看 ws 选项文档。

注意:默认情况下,如果你不提供 server 选项,@fastify/websocket 将把你的 websocket 服务器实例绑定到作用域内的 fastify 实例。

注意:ws 的 path 选项不应提供,因为路由由 fastify 自身处理

注意:ws 的 noServer 选项不应提供,因为 @fastify/websocket 的目的是在 fastify 服务器上监听。如果你想要一个自定义服务器,你可以使用 server 选项,如果你想要更多控制,你可以直接使用 ws 库

ws 不允许你将 objectMode 或 writableObjectMode 设置为 true

完整的 TypeScript 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
typescript复制代码/* eslint-disable @typescript-eslint/no-unused-vars */
import Fastify from 'fastify'
import FastifyWebsocket, { WebSocket } from '@fastify/websocket'
import { FastifyRequest, FastifyReply } from 'fastify'

// 创建 Fastify 实例
const app = Fastify()

// 注册 @fastify/websocket 插件
app.register(FastifyWebsocket, {
options: {
maxPayload: 1048576, // 设置允许的最大消息大小为 1MB
verifyClient: (info, done) => {
// 示例:客户端验证逻辑
if (info.req.headers['x-fastify-header'] === 'fastifyrocks') {
done(true) // 验证通过
} else {
done(false) // 验证失败,连接被拒绝
}
},
clientTracking: true, // 启用客户端跟踪
perMessageDeflate: true // 启用 permessage-deflate 压缩
},
errorHandler: (error, connection, _req, _reply) => {
// 错误处理逻辑
console.error('WebSocket 错误:', error)
connection.terminate() // 终止连接
},
preClose: (done) => {
// 服务器关闭前的清理逻辑
console.log('正在关闭 WebSocket 连接')
done() // 完成清理操作
}
})

// 添加认证钩子
app.addHook(
'preValidation',
async (request: FastifyRequest, _reply: FastifyReply) => {
if (!request.headers['authorization']) {
throw new Error('未授权') // 抛出未授权错误
}
}
)

// 定义 WebSocket 路由
app.get(
'/ws',
{ websocket: true },
(connection: WebSocket, _req: FastifyRequest) => {
connection.on('message', (message) => {
console.log('收到消息:', message.toString())
connection.send('消息已接收') // 向客户端发送响应
})

connection.on('close', () => {
console.log('WebSocket 已关闭')
})
}
)

// 启动服务器
app.listen({ port: 3000 }, (err, address) => {
if (err) {
app.log.error(err)
process.exit(1) // 遇到错误,退出程序
}
console.log(`服务器正在监听地址 ${address}`)
})

关键点解释

  • 注册插件:通过 app.register(FastifyWebsocket, {...}) 注册 @fastify/websocket 插件,并传递配置选项。
  • 选项配置options 对象中包含了 ws 库的配置选项,如 maxPayload, verifyClient, clientTracking, 和 perMessageDeflate
+ **`maxPayload`**:定义了 WebSocket 消息的最大大小(以字节为单位)。这是一个重要的安全特性,用于防止恶意用户通过发送大量数据来尝试耗尽服务器资源。
+ **`verifyClient`**:这是一个函数,用于在 WebSocket 握手阶段验证连接客户端。它接收两个参数:`info` 和 `done`。`info` 对象包含了请求相关的信息,如请求头和请求源等,而 `done` 是一个回调函数,用于基于验证结果接受或拒绝请求。这个验证步骤是实现自定义认证逻辑的理想场所。
+ **`clientTracking`**:当设置为 `true` 时,WebSocket 服务器将会跟踪连接到服务器的客户端。这使得服务器能够保持对所有活动 WebSocket 连接的引用,便于管理这些连接,如广播消息。
+ **`perMessageDeflate`**:这个选项启用了 permessage-deflate 压缩,可以减少通过 WebSocket 发送的数据的大小。这对于减少带宽使用和提高传输效率非常有用,特别是在发送大量数据时。除了这些选项,`@fastify/websocket` 插件还提供了其他几个重要的配置点:


+ **`errorHandler`**:这是一个函数,用于处理 WebSocket 连接过程中的任何错误。这允许开发者自定义错误处理逻辑,例如记录错误或关闭出现问题的连接。
+ **`preClose`**:这是一个函数,在服务器即将关闭前调用,允许进行清理工作,如优雅地关闭 WebSocket 连接。
  • 错误处理errorHandler 函数用于处理 WebSocket 连接过程中的错误。
  • 认证钩子preValidation 钩子用于在建立 WebSocket 连接之前进行认证检查。
  • WebSocket 路由:通过 app.get('/ws', {websocket: true}, handler) 定义 WebSocket 路由。在路由处理函数中,可以使用 connection.socket 来发送和接收消息。
  • 启动服务器:通过 app.listen({ port: 3000 }, callback) 启动服务器。

这个示例展示了如何在 Fastify 应用中使用 TypeScript 来集成和配置 WebSocket 功能,包括如何处理客户端消息、执行认证和错误处理。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%