springBoot聚合websocket如何实现单机10万

1、springboot项目聚合websockert代码。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
java复制代码/**
* 开启WebSocket支持
*
*/
@Configuration
public class WebSocketConfig {
/**
* 扫描并注册带有@ServerEndpoint注解的所有服务端
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}



package com.zntech.cpms.script.provider.config;

import com.zntech.cpms.script.provider.constant.ConstantUtils;
import com.zntech.cpms.script.provider.enums.MessageTypeEnum;
import com.zntech.cpms.script.provider.enums.RunStatusEnum;
import com.zntech.cpms.script.provider.pojo.ClientInfo;
import com.zntech.cpms.script.provider.pojo.MessageRequest;
import com.zntech.cpms.script.provider.pojo.ProjectTask;
import com.zntech.cpms.script.provider.pojo.TaskFailInfo;
import com.zntech.cpms.script.provider.service.ClientInfoService;
import com.zntech.cpms.script.provider.service.ProjectTaskService;
import com.zntech.cpms.script.provider.service.TaskFailInfoService;
import com.zntech.cpms.script.provider.service.impl.MessageHandlerServiceImpl;
import com.zntech.cpms.script.provider.util.CollectionsUtil;
import com.zntech.cpms.script.provider.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;


import javax.validation.constraints.Max;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
* websocket服务端,多例的,一次websocket连接对应一个实例
* @ServerEndpoint 注解的值为URI,映射客户端输入的URL来连接到WebSocket服务器端
*/
@Component
@ServerEndpoint("/{name}")
@Slf4j
public class WebSocketServe {
/** 用来记录当前在线连接数。设计成线程安全的。*/
private static AtomicInteger onlineCount = new AtomicInteger(0);
/** 用于保存uri对应的连接服务,{uri:WebSocketServer},设计成线程安全的 */
private static ConcurrentHashMap<String, WebSocketServe> webSocketServerMAP = new ConcurrentHashMap<>();
private Session session;// 与某个客户端的连接会话,需要通过它来给客户端发送数据
private String name; //客户端消息发送者
private String uri; //连接的uri

/**
* 连接建立成功时触发,绑定参数
* @param session
* 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
* @param name
* @param toName
* @throws IOException
*/
@OnOpen
public void onOpen(Session session, @PathParam("name") String name, @PathParam("toName") String toName) throws IOException {
this.session = session;
this.name = name;
this.uri = session.getRequestURI().toString();
WebSocketServe webSocketServer = webSocketServerMAP.get(uri);
if(webSocketServer != null){ //同样业务的连接已经在线,则把原来的挤下线。
webSocketServer.session.getBasicRemote().sendText(uri + "重复连接被挤下线了");
webSocketServer.session.close();//关闭连接,触发关闭连接方法onClose()
}
webSocketServerMAP.put(uri, this);//保存uri对应的连接服务
addOnlineCount(); // 在线数加1

}

/**
* 连接关闭时触发,注意不能向客户端发送消息了
* @throws IOException
*/
@OnClose
public void onClose() throws IOException {
webSocketServerMAP.remove(uri);//删除uri对应的连接服务
reduceOnlineCount(); // 在线数减1
}

/**
* 收到客户端消息后触发
*
* @param message
* 客户端发送过来的消息
* @throws IOException
*/
@OnMessage
public void onMessage(String message) {
log.info("收到消息:" + message);

}

/**
* 通信发生错误时触发
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
try {
log.info("{}:通信发生错误,连接关闭",name);
webSocketServerMAP.remove(uri);//删除uri对应的连接服务
}catch (Exception e){
}
}

/**
* 获取在线连接数
* @return
*/
public static int getOnlineCount() {
return onlineCount.get();
}

/**
* 原子性操作,在线连接数加一
*/
public static void addOnlineCount() {
onlineCount.getAndIncrement();
}

/**
* 原子性操作,在线连接数减一
*/
public static void reduceOnlineCount() {
onlineCount.getAndDecrement();
}
}

我用的是gradle项目。当然别忘记了引入websocket的jar

附上gradle的jar.

1
2
3
arduino复制代码 implementation ("org.springframework.boot:spring-boot-starter-websocket:2.3.0.RELEASE") {
exclude module: "spring-boot-starter-tomcat"
}

maven项目 jar

1
xml复制代码<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-websocket</artifactId>    <version>${websocket.version}</version>    <exclusions>        <exclusion>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-tomcat</artifactId>        </exclusion>    </exclusions></dependency>

下面开始重点总结:

一开始这个项目发布到服务器上面,只能支持一万的长连接。top命令看了linux的参数,发现CPU内存都还有很大空闲。那么就瓶颈就不在CPU。又看了网络带宽之类的参数,都没有问题。最终锁定发现springboot项目默认的启动容器是tomcat,而tomcat默认支持1W的连接数量。超过就会拒绝。既然问题出在tomcat,那么现在就有两个方案。1、调大tomcat的连接数量。2、容器换成jetty。对于需要保持数十万的长连接,jetty无疑更适合作为启动容器。

启动容器替换成jetty,只需要在jar引用的的时候排除掉tomcat,并且加上jetty的jar.附上jar引用代码

gradle项目jar

1
2
3
4
5
6
7
sql复制代码 implementation("org.springframework.boot:spring-boot-starter-web") {
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-tomcat'
}
implementation 'org.springframework.boot:spring-boot-starter-jetty:2.3.1.RELEASE'
implementation ("org.springframework.boot:spring-boot-starter-websocket:2.3.0.RELEASE") {
exclude module: "spring-boot-starter-tomcat"
}

maven项目 jar

1
2
3
xml复制代码<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-websocket</artifactId>    <version>${websocket.version}</version>    <exclusions>        <exclusion>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-tomcat</artifactId>        </exclusion>    </exclusions></dependency>
<!-- 引入jetty作为启动容器 --><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jetty</artifactId> <version>2.3.1.RELEASE</version></dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>${websocket.version}</version> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions></dependency>

这里有个坑,引入websocket jar的时候一定也要排除tomcat的依赖。不然启动容器依然是tomcat。

现在这个代码放上linux服务器。按理来说应该可以支持起码好几万的长连接了吧。但是事与愿违,经过测试,发现居然只能建立1.6W+长连接。但是服务器的内存跟cpu明显还有空余。那么问题出在哪里?既然替换了容器,这个项目现在支持的长连接应该是看服务器的配置的。问题应该不在于框架了。那会不会是linux本身有什么限制。导致只能维持1.6W+的长连接呢?

ulimit -a 参看linux的各种参数限制。

在这里插入图片描述

max locked memory (kbytes, -l) 16384

这个参数跟长连接的数量及其相似,初步猜测是不是这个参数的问题。

ulimit -l unlimited 把这个参数调成了无限制。现在再来测试长连接的数量。

已经可以达到5W+。因为测试工具的原因。没有做更高的压测,但是初步观察。保持10万的长连接应该是支持的。

附上永久修改 max locked memory 的命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
markdown复制代码  1)、解除 Linux 系统的最大进程数和最大文件打开数限制:

​ vi /etc/security/limits.conf

​ \# 添加如下的行

root soft nofile 1048576

root hard nofile 1048576

* soft nofile 1048576

* hard nofile 1048576



root soft memlock 102400

root hard memlock 102400

* soft memlock 102400

* hard memlock 102400

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%