服务器推送消息到前端实现页面数据实时刷新-分布式Websoc

背景

项目上有个新的需求,需要在系统数据发生改变时,前端页面要实时刷新页面数据。

简单的方案一:

  最简单的方式就是直接在前端页面使用定时器定时刷新数据。

  这个方案除非是定时的时间设置很短,否则还是会存在页面刷新不及时的情况。但是如果定时时间设置得过短,一旦客户端使用量变多,整个系统的请求数量会变的非常多,需要消耗许多服务器资源。故放弃这个方案。

方案二:

  服务端推送的方式,通过使用Websocket,进入页面时,前端就与服务端建立起socket通道,当系统数据发生改变时,在服务端选中需要刷新的页面的socket会话,主动发送消息到前端,通知前端重新请求数据。

  这个方案能达到实时刷新的需求,但考虑到的是客户端数量增长上来,建立的socket太多,会不会对占用太多的服务器资源。然后经过自己开发环境的简单测试,建立几千个socket对服务器资源消耗不大,就暂时决定用这个方案了。最终效果如何还是要 看生产环境的表现的。

实现

前端

前端使用的是vue框架

在methods中添加websocket相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
javascript复制代码    data(){
return {
ws:null
}
},

methods: {
websocket() {
//建立socket通道
//process.env.VUE_APP_URL为服务端地址
//code为自定义的参数,主要是用于服务端标识当前会话是哪个用户
this.ws = new WebSocket(
'ws:' +
process.env.VUE_APP_URL +
'/websocket?identity=identity'
);
//socket连接成功后的回调函数
this.ws.onopen = () => {
console.log('websocket连接成功!');
//若项目中没有使用nginx转发请求则忽略这步
//设置定时器,每过55秒传一次数据
//以防长时间不通信被nginx自动关闭会话
//也可以通过修改nginx配置文件延长关闭时间
setInterval(() => {
this.keepAlive(ws);
}, 55000);
};
//接收来自服务端消息的回调函数
//fluseData() 为自定义的数据刷新函数
this.ws.onmessage = evt => {
console.log('已接收来自后台的消息:', evt);
// 刷新数据
this.fluseData();
};
//关闭socket的回调函数
this.ws.onclose = function() {
// 关闭 websocket
console.log('连接已关闭...');
};
// 路由跳转时结束websocket链接
this.$router.afterEach(function() {
this.ws.close();
});
},
//持续向后台发送消息,用于维护socket通道不被nginx关闭
keepAlive(webSocket) {
if (webSocket) {
if (webSocket.readyState == webSocket.OPEN) {
webSocket.send('');
}
}
}
}

在页面加载时调用函数建立socket连接

1
2
3
javascript复制代码mounted() {
this.websocket();
}

在页面关闭或销毁时关闭socket

1
2
3
javascript复制代码beforeDestroy() {
this.ws.close();
},

后端

引入jar包

1
2
3
4
5
java复制代码    <dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-websocket</artifactId>
<version>9.0.38</version>
</dependency>

设置websocket配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码
@Configuration
public class MyWebsocketConfig implements ServerApplicationConfig {
@Override
public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {
return null;
}

@Override
public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {
return set;
}


}

设置MyWebsocket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
java复制代码
@ServerEndpoint(value = "/websocket")
@Component
@Slf4j
public class MyWebsocket {

/**
* 存放每个客户端对应的Session对象。
*/
private static Map<String,Session> socketMap = new ConcurrentHashMap<>();


public MyWebsocket() {
}


/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
String key = getKey(session);
if(key != null){
// 存放 session
socketMap.put(key, session);
sendMessageToUser(key, "后台建立成功!key:"+ key);
log.info("连接成功");
}else{
log.error("socket链接session保存失败!当前key为空");
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 根据session获取key,自定义,用于标示每个session
* @param session
* @return
*/
public String getKey(Session session){
String key = null;
try {
String identity = String.valueOf(session.getRequestParameterMap().get("identity").get(0));
key = identity
} catch (Exception e) {
log.error("根据session获取key失败:",e.getMessage());
e.printStackTrace();
}
return key;
}

/**
* 根据identity获取key
* @return
*/
public String getKey(String identity) {
return identity;
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
try {
String key = getKey(session);
if(key != null){
socketMap.remove(key);
}
log.info("有一连接关闭 ");
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
String key = getKey(session);
log.info("收到客户端:" + key + "的消息:" + message + " ,当前socket数量:" + socketMap.size());
}


/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
onClose(session);
log.error("websocket连接发生错误",e.getMessage());
}

/**
* 发送信息给指定用户
* @param key
* @param message
* @return
*/
public boolean sendMessageToUser(String key) {
if(key == null){
return false;
}
Session session = socketMap.get(key);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.info("发送消息失败----->{}",e.getMessage());
}
return true;
}
}

  由于socket的session是不支持序列化的,所以不能将session存在redis,在线上有多台服务器的情况下就无法共享session。

  所以这里采用redis的消息订阅功能,当有一台服务器监听到系统数据发生变更,需要向前端发送消息时,会向redis发送消息,然后每台服务器的websocket那边会收到redis的 消息,检查自己拥有的session是否有满足相关条件的,若满足条件则向前端发送消息。

redis配置

redis消息监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码@Configuration
public class RedisPublishConfig {
/**
* redis消息监听器容器
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("testChannel"));
return container;
}

/**
* 消息监听器适配器
* @param redisMsg
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {
return new MessageListenerAdapter(redisMsg, "receiveMessage");
}
}

redis消息接收接口

1
2
3
4
5
6
7
8
9
java复制代码
@Component
public interface RedisMsg {
/**
* 接收信息
* @param message
*/
public void receiveMessage(String message);
}

在MyWebsocket中继承RedisMsg接口

1
java复制代码public class MyWebsocket implements RedisMsg

在MyWebsocket中实习RedisMsg接口的receiveMessage函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码
/**
* 广播接收信息
* @param message
*/
@Override
public void receiveMessage(String message) {
JSONObject jsonObject = JSONObject.parseObject(message);
String identity = jsonObject.getString("identity");
String news = jsonObject.getString("message");
if(StringUtils.isNotBlank(identity)){
String key = getKey(identity);
sendMessageToUser(key, news);
}else {
socketMap.forEach((k, v) -> {
sendMessageToUser(k, news);
});
}
}

MyWebsocket类的完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
java复制代码
@ServerEndpoint(value = "/websocket")
@Component
@Slf4j
public class MyWebsocket implements RedisMsg {
private static CopyOnWriteArraySet<Session> sessionSet=new CopyOnWriteArraySet<>();
// 这里使用静态,让 service 属于类
private static RedisTemplate redisTemplate;

// 注入的时候,给类的 service 注入
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
MyWebsocket.redisTemplate = redisTemplate;
}


/**
* concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
*/
private static Map<String,Session> socketMap = new ConcurrentHashMap<>();


public MyWebsocket() {
}


/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
String key = getKey(session);
if(key != null){
// 存放 session
socketMap.put(key, session);
sendMessageToUser(key, "后台建立成功!key:"+ key + " ,当前socket数量:" + socketMap.size());
log.info("连接成功");
}else{
log.error("socket链接session保存失败!当前key为空");
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 根据session获取key
* @param session
* @return
*/
public String getKey(Session session){
String key = null;
try {
String token = String.valueOf(session.getRequestParameterMap().get("token").get(0));
String pageType = String.valueOf(session.getRequestParameterMap().get("pageType").get(0));
Map<String, String> loginContext = SsoStoreManageUtil.getInstance().get(token);
if(loginContext != null){
String userId = loginContext.get("userId");
key = pageType + '_' + userId;
}
} catch (Exception e) {
log.error("根据session获取key失败:",e.getMessage());
e.printStackTrace();
}
return key;
}

/**
* 根据页面类型和用户id获取key
* @param pageType
* @param userId
* @return
*/
public String getKey(String pageType,String userId) {
return pageType + '_' + userId;
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
try {
String key = getKey(session);
if(key != null){
socketMap.remove(key);
}
log.info("有一连接关闭 ,当前socket数量:" + socketMap.size());
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
String key = getKey(session);
log.info("收到客户端:" + key + "的消息:" + message + " ,当前socket数量:" + socketMap.size());
}


/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误",error.getMessage());
onClose(session);
error.printStackTrace();
}

/**
* 发送信息给指定用户
* @param key
* @param message
* @return
*/
public boolean sendMessageToUser(String key, String message) {
if(key == null){
return false;
}
Session session = socketMap.get(key);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.info("发送消息失败----->{}",e.getMessage());
}
return true;
}

/**
* 广播接收信息
* @param message
*/
@Override
public void receiveMessage(String message) {
JSONObject jsonObject = JSONObject.parseObject(message);
String identity = jsonObject.getString("identity");
String news = jsonObject.getString("message");
if(StringUtils.isNotBlank(identity)){
String key = getKey(identity);
sendMessageToUser(key, news);
}else {
socketMap.forEach((k, v) -> {
sendMessageToUser(k, news);
});
}
}
}

业务场景下的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class Test{

@Autowired
RedisTemplate redisTemplate;


public void webSocketTest() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("identity","");
jsonObject.put("message","");
// 业务逻辑
//广播消息到各个订阅者 testChannel
redisTemplate.convertAndSend("testChannel", jsonObject.toJSONString());

}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%