概述

Websocket

通常,应用层协议都是完全基于网络层协议TCP/UDP来实现,例如HTTP、SMTP、POP3,而Websocket是同时基于HTTP与TCP来实现:

  • 先用带有 Upgrade:WebsocketHeader的特殊HTTP request来实现与服务端握手HandShake;
  • 握手成功后,协议升级成Websocket,进行长连接通讯;
  • 整个过程可理解为:小锤抠缝,大锤搞定

为什么不使用HTTP长连接来实现即时通讯?事实上,在Websocket之前就是使用HTTP长连接这种方式,如Comet。但是它有如下弊端:

  • HTTP 1.1 规范中规定,客户端不应该与服务器端建立超过两个的 HTTP 连接, 新的连接会被阻塞。
  • 对于服务端来说,每个长连接都占有一个用户线程,在NIO或者异步编程之前,服务端开销太大。

为什么不直接使用Socket编程,基于TCP直接保持长连接,实现即时通讯?

  • Socket编程针对C/S模式的,而浏览器是B/S模式,浏览器没法发起Socket请求,正因如此, W3C最后还是给出了浏览器的Socket—-Websocket。
  • 常规的new Websocket(),对于http是ws://***,而https就是wss://

SockJS/Socket.IO

HTML5规范中给出了原生的Websocket API,但是并不是所有浏览器都完美支持,而当浏览器不支持Websocket时,应该自动切换成Ajax长轮询,SSE等备用解决方案。所以在实际开发中我们通常采用封装了Websocket及其备用方案的库—-SockJSSocket.IO

如果你使用Java做服务端,同时又恰好使用Spring Framework作为框架,那么推荐使用SockJS,因为Spring Framework本身就是SockJS推荐的Java Server实现,同时也提供了Java 的client实现。

如果你使用Node.js做服务端,那么毫无疑问你该选择Socket.IO,它本省就是从Node.js开始的,当然服务端也提供了engine.io-server-java实现。甚至你可以使用 netty-socketio 注意:不管你使用哪一种,都必须保证客户端与服务端同时支持

STOMP

STOMP(Simple (or Streaming) Text Orientated Messaging Protocol)一个简单的面向文本/流的消息协议。STOMP提供了能够协作的报文格式,以至于STOMP客户端可以与任何STOMP消息代理(Brokers)进行通信,从而为多语言,多平台和Brokers集群提供简单且普遍的消息协作。

STOMP可用于任何可靠的双向流网络协议之上,如TCP和WebSocket。 虽然STOMP是面向文本的协议,但消息有效负载可以是文本或二进制。

STOMP是一种基于帧的协议,帧的结构是效仿HTTP报文格式,简洁明了。

Spring Boot使用

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- websocket依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 安全认证 -->
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
<version>0.7.0</version>
</dependency>
</dependencies>

配置

1、Config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import cn.lauy.websocket.handler.WebSocketDecoratorFactory;
import cn.lauy.websocket.handler.WebsocketHandShakeInterceptor;
import cn.lauy.websocket.handler.WebsocketTextHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;

@EnableWebSocketMessageBroker
@Configuration
// public class WebSocketConfig implements WebSocketConfigurer {
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

private static final Logger logger = LoggerFactory.getLogger(WebSocketConfig.class);

// @Override
// public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// registry.addHandler(buildWebsocketTextHandler(), "/project/service/workOrderCreateCallback/*", "/demo")
// .addInterceptors(new WebsocketHandShakeInterceptor())
// .addInterceptors(new HttpSessionHandshakeInterceptor())
// .setAllowedOrigins("*")
// .withSockJS();
// }

/**
* 前端通过:var socket = new SockJS(host+'/project/websocket/createOrderCallback' + '?token=' + token);
* @param registry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/project/websocket/createOrderCallback")
.addInterceptors(new WebsocketHandShakeInterceptor())
.setHandshakeHandler(buildWebsocketTextHandler())
.setAllowedOrigins("*")
.withSockJS();
}

/**
* queue 点对点
* topic 广播
* user 点对点前缀
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 服务端发送消息给客户端的域,多个用逗号隔开
registry.enableSimpleBroker("/queue", "/topic");
// 定义一对一推送的时候前缀
registry.setUserDestinationPrefix("/project/websocket");
// 服务端接收地址的前缀
registry.setApplicationDestinationPrefixes("/project/service/websocket");
}

@Autowired
private WebSocketDecoratorFactory webSocketDecoratorFactory;

/*
* 1. setMessageSizeLimit 设置消息缓存的字节数大小 字节
* 2. setSendBufferSizeLimit 设置websocket会话时,缓存的大小 字节
* 3. setSendTimeLimit 设置消息发送会话超时时间,毫秒
*/
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.addDecoratorFactory(webSocketDecoratorFactory)
.setMessageSizeLimit(10240)
.setSendBufferSizeLimit(10240)
.setSendTimeLimit(60 * 1000);
}

@Bean
public WebsocketTextHandler buildWebsocketTextHandler() {
return new WebsocketTextHandler();
}
}

2、安全校验

在此可以在websocket握手建立连接前设置唯一标识信息到 Principal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import cn.lauy.websocket.helper.JwtUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import javax.servlet.http.HttpServletRequest;
import java.security.Principal;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

public class WebsocketTextHandler extends DefaultHandshakeHandler {

private static final Logger log = LoggerFactory.getLogger(WebsocketTextHandler.class);

@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
/**
* 这边可以按你的需求,如何获取唯一的值,既unicode
* 得到的值,会在监听处理连接的属性中,既WebSocketSession.getPrincipal().getName()
* 也可以自己实现Principal()
*/
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request;
HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest();
final String token = httpRequest.getParameter("token");
Map<String, Object> verify = JwtUtil.verify(JwtUtil.DEFAULT_SECRET, token);
// Integer accountId = (Integer) verify.get("accountId");
String orderNumber = (String) verify.get("orderNumber");
if (!StringUtils.isEmpty(token)) {
return () -> orderNumber;
}
}
return null;
}

}

3、创建Websocket拦截器,鉴权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import cn.lauy.websocket.helper.JwtUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import javax.servlet.http.Cookie;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;

public class WebsocketHandShakeInterceptor implements HandshakeInterceptor {

private static final Logger log = LoggerFactory.getLogger(WebsocketHandShakeInterceptor.class);

private static final String SECRET = JwtUtil.DEFAULT_SECRET;

/**
* 握手建立连接前进行权限校验
*/
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) {
boolean result = handleByQueryString(serverHttpRequest);
if (!result) {
serverHttpResponse.setStatusCode(HttpStatus.FORBIDDEN);
}
return result;
}

private boolean handleByQueryString(ServerHttpRequest serverHttpRequest) {
if (serverHttpRequest instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) serverHttpRequest;
String authToken = servletServerHttpRequest.getServletRequest().getParameter("token");
if (StringUtils.isEmpty(authToken)) {
return false;
}
Map<String, Object> verify = JwtUtil.verify(SECRET, authToken);
String orderNumber = (String) verify.get("orderNumber");
log.info("WebsocketHandShakeInterceptor handleByQueryString: orderNumber = {}", orderNumber);
if (!StringUtils.isEmpty(orderNumber)) {
return true;
}
}
return false;
}

private boolean handleByCookie(ServerHttpRequest serverHttpRequest) {
if (serverHttpRequest instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) serverHttpRequest;
Cookie[] cookies = servletServerHttpRequest.getServletRequest().getCookies();
Optional<Cookie> cookie = Arrays.stream(cookies).filter(c -> c.getName().equals("token")).findFirst();
if (cookie.isPresent()) {
String token = cookie.get().getValue();
return true;
} else {
return false;
}
}
return false;
}

@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
log.debug("WebsocketHandShakeInterceptor afterHandshake: {}", serverHttpRequest.getPrincipal());
}

}

4、通过这可以缓存 Principal 和WebsocketSeesion绑定 信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory;

import java.security.Principal;

@Component
public class WebSocketDecoratorFactory implements WebSocketHandlerDecoratorFactory {
@Override
public WebSocketHandler decorate(WebSocketHandler handler) {
return new WebSocketHandlerDecorator(handler) {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Principal principal = session.getPrincipal();
if (principal != null) {
// 身份校验成功,缓存socket连接
SessionManager.put(principal.getName(), session);
}
super.afterConnectionEstablished(session);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
Principal principal = session.getPrincipal();
if (principal != null) {
// 身份校验成功,移除socket连接
SessionManager.remove(principal.getName());
}
super.afterConnectionClosed(session, closeStatus);
}
};
}
}

WebsocketSession工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class SessionManager {

private static Logger logger = LoggerFactory.getLogger(SessionManager.class);

private static final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

private SessionManager() {
}

public static void put(String key, WebSocketSession webSocketSession) {
sessions.put(key, webSocketSession);
logger.info("Websocket SessionManager session size after put: size = {}, putKey = {}", getSessionSize(), key);
}

public static Set<String> sessions() {
return sessions.keySet();
}

public static WebSocketSession get(String key) {
return sessions.get(key);
}

public static void remove(String key) {
sessions.remove(key);
logger.info("Websocket SessionManager session size after remove: size = {}, removeKey = {}", getSessionSize(), key);
}

public static Integer getSessionSize() {
return sessions.size();
}
}

JWT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jws;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import org.springframework.util.StringUtils;

import javax.servlet.http.HttpServletRequest;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
* @author Blue
*/
public class JwtUtil {

public static final long EXPIRE = 1000 * 60 * 60 * 24;
public static final String APP_SECRET = ""; // 自定义
public static final String DEFAULT_SECRET = "";

public static String getJwtToken(String id, String nickname){

String JwtToken = Jwts.builder()
.setHeaderParam("typ", "JWT")
.setHeaderParam("alg", "HS256")
.setSubject("JWT生成的名称")
.setIssuedAt(new Date())
.setExpiration(new Date(System.currentTimeMillis() + EXPIRE))
.claim("id", id)
.claim("nickname", nickname)
.signWith(SignatureAlgorithm.HS256, APP_SECRET)
.compact();

return JwtToken;
}

public static String sign(Map<String, Object> map){

String JwtToken = Jwts.builder()
.setHeaderParam("typ", "JWT")
.setHeaderParam("alg", "HS256")
.setSubject("JWT生成的名称")
.setIssuedAt(new Date())
.setExpiration(new Date(System.currentTimeMillis() + EXPIRE))
//.claim("id", id)
//.claim("nickname", nickname)
.setClaims(map)
.signWith(SignatureAlgorithm.HS256, APP_SECRET)
.compact();

return JwtToken;
}

public static void main(String[] args) {
HashMap<String, Object> map = new HashMap<String, Object>();
map.put("orderNumber", "");
String token = sign(map);
System.out.println(token);

//Jws<Claims> claimsJws = Jwts.parser().setSigningKey(APP_SECRET).parseClaimsJws(token);
//Claims claims = claimsJws.getBody();
System.out.println(verify(DEFAULT_SECRET, token));
}

public static Map<String, Object> verify(String secret, String token) {
Jws<Claims> claimsJws = Jwts.parser().setSigningKey(secret).parseClaimsJws(token);
Claims claims = claimsJws.getBody();
HashMap<String, Object> map = new HashMap<String, Object>();
map.put("orderNumber", claims.get("orderNumber"));
return map;
}

/**
* 判断token是否存在与有效
* @param jwtToken
* @return
*/
public static boolean checkToken(String jwtToken) {
if(StringUtils.isEmpty(jwtToken)) return false;
try {
Jwts.parser().setSigningKey(APP_SECRET).parseClaimsJws(jwtToken);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}

/**
* 通过request判断token是否存在与有效
* @param request
* @return
*/
public static boolean checkToken(HttpServletRequest request) {
try {
String jwtToken = request.getHeader("token");
if(StringUtils.isEmpty(jwtToken)) return false;
Jwts.parser().setSigningKey(APP_SECRET).parseClaimsJws(jwtToken);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}

/**
* 根据token获取会员id
* @param request
* @return
*/
public static String getMemberIdByJwtToken(HttpServletRequest request) {
String jwtToken = request.getHeader("token");
if(StringUtils.isEmpty(jwtToken)) return "";
Jws<Claims> claimsJws = Jwts.parser().setSigningKey(APP_SECRET).parseClaimsJws(jwtToken);
Claims claims = claimsJws.getBody();
return (String)claims.get("id");
}
}

Api

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import cn.lauy.websocket.handler.SessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.socket.WebSocketSession;

import javax.websocket.server.ServerEndpoint;
import java.security.Principal;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;


// @ServerEndpoint(value = "/project/service/workOrderCreateCallback/{orderNumber}", encoders = {WebSocketEncoder.class})

@RestController
public class CreateWorkOrderWebsocket {

private static Logger logger = LoggerFactory.getLogger(CreateWorkOrderWebsocket.class);

/**
* 设置session最大空闲时间
*/
private static final long SESSION_TIMEOUNT = 5 * 60 * 1000;

@Autowired
private SimpMessagingTemplate template;

/**
* 服务器指定用户进行推送,需要前端开通 var socket = new SockJS(host+'/myUrl' + '?token=1234');
*/
@RequestMapping("/sendUser")
public void sendUser(String token) {
logger.info("token = {} ,对其发送您好", token);
WebSocketSession webSocketSession = SessionManager.get(token);
if (webSocketSession != null) {
/**
* 主要防止broken pipe
*/
template.convertAndSendToUser(token, "/queue/sendUser", "您好");
}

}

/**
* 广播,服务器主动推给连接的客户端
*/
@RequestMapping("/sendTopic")
public void sendTopic() {
template.convertAndSend("/topic/sendTopic", "大家晚上好");

}

/**
* 客户端发消息,服务端接收
*
* @param message
*/
// 相当于RequestMapping
@MessageMapping("/sendServer")
public void sendServer(String message) {
logger.info("message:{}", message);
}

/**
* 客户端发消息,大家都接收,相当于直播说话
*
* @param message
* @return
*/
@MessageMapping("/sendAllUser")
@SendTo("/topic/sendTopic")
public String sendAllUser(String message) {
// 也可以采用template方式
return message;
}

/**
* spring webscoket通道的建立最开始还是源于http协议的第一次握手,握手成功之后,就打开了浏览器和服务器的webscoket通过,
* 这时,httprequest中的登录授权信息即javax.security.Principal会被绑定到websocket的session中
*
* @param
*/
@MessageMapping("/sendMyUserAnn")
@SendToUser(value = "/queue/sendUserAnn",broadcast = false)
public HashMap sendMyUser(String msg, Principal principal) {
WebSocketSession webSocketSession = SessionManager.get("IWOQC1021080610032F4");
if (webSocketSession != null) {
HashMap<String, Object> map = new HashMap<>();
map.put("time", System.currentTimeMillis());
map.put("result", "精准推送,只推送到" + principal.getName());
return map;
}
return new HashMap();
}

@MessageMapping("/sendMyUser")
public void sendMyUser(@RequestBody Map<String, String> map) {
String orderNumber = map.get("orderNumber");
WebSocketSession webSocketSession = SessionManager.get(orderNumber);
if (webSocketSession != null) {
String user = webSocketSession.getPrincipal().getName();
// template.convertAndSendToUser(map.get("name"), "/queue/sendUser", map.get("message"));
map.put("time", UUID.randomUUID().toString());
map.put("user", user);
map.put("type", "queue");
// template.convertAndSendToUser("IWOQC1021080610032F4", "/queue/sendUser", map);
template.convertAndSendToUser(user, "/queue/sendUser", map);
}
}

@MessageMapping("/sendUserAll")
@SendTo(value = "/queue/sendUserAll")
public HashMap sendUserAll(String msg) {
//if (webSocketSession != null) {
HashMap<String, Object> map = new HashMap<>();
map.put("time", System.currentTimeMillis());
map.put("result", "广播推送,推送到所有!size = " + SessionManager.getSessionSize());
// template.convertAndSendToUser("IWOQC1021080610032F4", "/queue/sendUser", map);
map.put("type", "topic");
// 发送到topci下广播
template.convertAndSend("/topic/sendUserAll", map);
map.put("type", "queue");
return map;
//}
//return new HashMap();
}

}

消息推送类型

建立Websocket连接

前端发起连接

1
var socket = new SockJS(host+'/project/websocket/createOrderCallback' + '?token=' + token);

服务端定义接收路径

1
2
3
4
5
6
7
8
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/project/websocket/createOrderCallback")
.addInterceptors(new WebsocketHandShakeInterceptor())
.setHandshakeHandler(buildWebsocketTextHandler())
.setAllowedOrigins("*")
.withSockJS();
}

成功示例如下

使用注解精准推送

Spring Webscoket通道的建立最开始还是源于http协议的第一次握手,握手成功之后,就打开了浏览器和服务器的webscoket通过,这时,HttpRequest中的登录授权信息即javax.security.Principal会被绑定到Webscoket的session中。

1
2
3
4
5
6
7
8
9
10
11
12
@MessageMapping("/sendMyUserAnn")
@SendToUser(value = "/queue/sendUserAnn",broadcast = false)
public HashMap sendMyUser(@RequestBody JSONObject jsonObject, Principal principal) {
WebSocketSession webSocketSession = SessionManager.get("websocket_token");
if (webSocketSession != null) {
HashMap<String, Object> map = new HashMap<>();
map.put("time", System.currentTimeMillis());
map.put("result", "精准推送,只推送到" + principal.getName());
return map;
}
return new HashMap();
}

前端发送

1
2
// 服务端定义的接受前缀:/project/service
stompClient.send("/project/service/sendMyUserAnn", {}, JSON.stringify({accountId:name,orderNumber:message}));

前端订阅

1
2
3
4
5
stompClient.subscribe('/project/websocket/queue/sendUserAnn', function(response) {
debugger
showResponse(response.body);
console.log("subscribe ann response:" + respnose.body)
});

成功示例如下:

消息精准推送

服务器指定用户进行推送,需要前端开通 var socket = new SockJS(host+'/myUrl' + '?token=1234')

1
2
3
4
5
6
7
8
9
10
11
12
@RequestMapping("/sendUser")
public void sendUser(String token) {
logger.info("token = {} ,对其发送您好", token);
WebSocketSession webSocketSession = SessionManager.get(token);
if (webSocketSession != null) {
/**
* 主要防止broken pipe
*/
template.convertAndSendToUser(token, "/queue/sendUser", "您好");
}

}

前端消息订阅

1
2
3
4
5
6
7
// 服务端定义的推送前缀:/project/service
stompClient.subscribe('/project/websocket/queue/sendUser', function(response) {
debugger
showResponse(response.body);
// console.log("subscribe:" + response)
console.log("subscribe response:" + response.body)
});

成功实例如下

消息广播

所有订阅的用户连接都能收到此消息

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 客户端发消息,大家都接收,相当于直播说话
*
* @param message
* @return
*/
@MessageMapping("/sendAllUser")
@SendTo("/topic/sendTopic")
public String sendAllUser(String message) {
// 也可以采用template方式
return message;
}

前端消息发送

1
2
3
4
function sendUserAll() {
// 服务端定义的推送前缀:/project/service
stompClient.send("/project/service/sendUserAll", {}, JSON.stringify({accountId:name,orderNumber:message}));
}

前端消息订阅

1
2
3
4
5
6
7
// 广播不需要加服务端定义的接收前缀:/project/service
stompClient.subscribe('/topic/sendUserAll', function(response) {
debugger
showResponse(response.body);
// console.log("subscribe topic:" + response)
console.log("subscribe all topic response:" + response.body)
});

成功示例

总Html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8" />
<title>Spring Boot WebSocket+广播式</title>
</head>
<body>
<noscript>
<h2 style="color:#ff0000">貌似你的浏览器不支持websocket</h2>
</noscript>
<div>
<div>
<button id="connect" onclick="connect()">连接</button>
<button id="disconnect" onclick="disconnect();">断开连接</button>
</div>
<div id="conversationDiv">
<label>输入你的用户id</label> <input type="text" id="name" />
<br>
<label>输入工单号</label> <input type="text" id="messgae" value="websocket_token"/>
<button id="send" onclick="send();">发送</button>
<button id="sendAnn" onclick="sendAnn();">发送Ann</button>
<button id="sendUserAll" onclick="sendUserAll();">sendUserAll</button>
<p id="response"></p>
<hr/>
<p id="responseAnn"></p>
<p id="responseAll"></p>
</div>
</div>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
<script type="text/javascript">
var stompClient = null;
// var userId = 'IWOQC1021080610032F4'
var userId = 'websocket_token'
// var token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJvcmRlck51bWJlciI6IklXT1FDMTAyMTA4MDYxMDAzMkY0In0.R2nRVqY54dDJGE-YuTtfA9Q8r8e6sSz_SXF34PIEREg"
var token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJvcmRlck51bWJlciI6IndlYnNvY2tldF90b2tlbiJ9.BTYW_dp7nuDEzbR3QcbveHE45IdYr-MKlvw-bOgDKKg"
//gateway网关的地址
var host="http://127.0.0.1:8082";
function setConnected(connected) {
document.getElementById('connect').disabled = connected;
document.getElementById('disconnect').disabled = !connected;
document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';
$('#response').html();
}
// SendUser ***********************************************
function connect() {
//地址+端点路径,构建websocket链接地址,注意,对应config配置里的addEndpoint
// var socket = new SockJS(host+'/myUrl' + '?token=' + token);
var socket = new SockJS(host+'/project/websocket/createOrderCallback' + '?token=' + token);
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
setConnected(true);
console.log('Connected:' + frame);
//监听的路径以及回调
// stompClient.subscribe('/user/' + userId + '/queue/sendUser', function(response) {
stompClient.subscribe('/project/websocket/queue/sendUser', function(response) {
debugger
showResponse(response.body);
// console.log("subscribe:" + response)
console.log("subscribe response:" + response.body)
});
stompClient.subscribe('/project/websocket/queue/sendUserAnn', function(response) {
debugger
showResponse(response.body);
// console.log("subscribe:" + response)
console.log("subscribe ann response:" + response.body)
});
// 广播通知这边就不能 加 接收的前缀:/project/websocket,不然拿不到消息
// /project/websocket
// stompClient.subscribe('/project/websocket/queue/sendUserAll', function(response) {
stompClient.subscribe('/project/websocket/queue/sendUserAll', function(response) {
debugger
showResponse(response.body);
// console.log("subscribe queue:" + response)
console.log("subscribe all queue response:" + response.body)
});
stompClient.subscribe('/topic/sendUserAll', function(response) {
debugger
showResponse(response.body);
// console.log("subscribe topic:" + response)
console.log("subscribe all topic response:" + response.body)
});
});
}
/*
function connect() {
//地址+端点路径,构建websocket链接地址,注意,对应config配置里的addEndpoint
var socket = new SockJS(host+'/myUrl');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
setConnected(true);
console.log('Connected:' + frame);
//监听的路径以及回调
stompClient.subscribe('/topic/sendTopic', function(response) {
showResponse(response.body);
});
});
}*/
function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
}
setConnected(false);
console.log("Disconnected");
}
function send() {
var name = $('#name').val();
var message = $('#messgae').val();
/*//发送消息的路径,由客户端发送消息到服务端
stompClient.send("/sendServer", {}, message);
*/
/*// 发送给所有广播sendTopic的人,客户端发消息,大家都接收,相当于直播说话 注:连接需开启 /topic/sendTopic
stompClient.send("/sendAllUser", {}, message);
*/
/* 这边需要注意,需要启动不同的前端html进行测试,需要改不同token ,例如 token=1234,token=4567
* 然后可以通过写入name 为token 进行指定用户发送
*/
stompClient.send("/project/service/sendMyUser", {}, JSON.stringify({accountId:name,orderNumber:message}));
}
function sendAnn() {
var name = "Ann" + $('#name').val();
var message = $('#messgae').val();
/*//发送消息的路径,由客户端发送消息到服务端
stompClient.send("/sendServer", {}, message);
*/
/*// 发送给所有广播sendTopic的人,客户端发消息,大家都接收,相当于直播说话 注:连接需开启 /topic/sendTopic
stompClient.send("/sendAllUser", {}, message);
*/
/* 这边需要注意,需要启动不同的前端html进行测试,需要改不同token ,例如 token=1234,token=4567
* 然后可以通过写入name 为token 进行指定用户发送
*/
stompClient.send("/project/service/sendMyUserAnn", {}, JSON.stringify({accountId:name,orderNumber:message}));
}
function sendUserAll() {
var name = "sendUserAll" + $('#name').val();
var message = $('#messgae').val();
/*//发送消息的路径,由客户端发送消息到服务端
stompClient.send("/sendServer", {}, message);
*/
/*// 发送给所有广播sendTopic的人,客户端发消息,大家都接收,相当于直播说话 注:连接需开启 /topic/sendTopic
stompClient.send("/sendAllUser", {}, message);
*/
/* 这边需要注意,需要启动不同的前端html进行测试,需要改不同token ,例如 token=1234,token=4567
* 然后可以通过写入name 为token 进行指定用户发送
*/
stompClient.send("/project/service/sendUserAll", {}, JSON.stringify({accountId:name,orderNumber:message}));
}
function showResponse(message) {
var response = $('#responseAnn');
response.html('Ann' + message);
}
</script>
</body>
</html>

参考链接:

  1. SpringBoot集成websocket能力(stomp)
  2. springboot websocket 一篇足够了
  3. WebSocket系列1—SpringBoot WebSocket 不能注入( @Autowired ) 解决问题
  4. 完全理解TCP/UDP、HTTP长连接、Websocket、SockJS/Socket.IO以及STOMP的区别和联系