简介
WebSocket是一种网络传输协议,可在单个TCP连接上进行全双工通信,位于OSI模型的应用层。WebSocket协议在2011年由IETF标准化为RFC 6455,后由RFC 7936补充规范。Web IDL中的WebSocket API由W3C标准化。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。--来源WIKI
实现
maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
WebSocketConfig 配置
/**
* <p>
* webSocket
* </P>
*
* @author bing_huang
* @since V1.0
*/
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator {
private static final String HttpSession = null;
/* 修改握手,就是在握手协议建立之前修改其中携带的内容 */
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
/*如果没有监听器,那么这里获取到的HttpSession是null*/
StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession();
if (ssf != null) {
javax.servlet.http.HttpSession session = (HttpSession) request.getHttpSession();
sec.getUserProperties().put("sessionid", session);
}
//sec.getUserProperties().put("name", "小强");
super.modifyHandshake(sec, request, response);
}
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WebSocketServer 服务端
/**
* <p>
* messageWebSocket 消息通知
* </P>
*
* @author bing_huang
* @since V1.0
*/
@Component
@ServerEndpoint(value = "/messageSocket/{userId}", configurator = WebSocketConfig.class)
public class MessageWebSocket {
private static final Logger logger = LoggerFactory.getLogger(MessageWebSocket.class);
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* key: userId value: sessionIds
*/
private static ConcurrentHashMap<String, ConcurrentLinkedQueue<String>> userSessionMap = new ConcurrentHashMap<String, ConcurrentLinkedQueue<String>>();
/**
* concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象。
*/
private static ConcurrentHashMap<String, MessageWebSocket> websocketMap = new ConcurrentHashMap<>();
/**
* key: sessionId value: userId
*/
private static ConcurrentHashMap<String, String> sessionUserMap = new ConcurrentHashMap<String, String>();
/**
* 当前连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* <p>
* 连接建立成功调用的方法
* </p>
*
* @param session Session
* @param userId 用户id
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
try {
this.session = session;
String sessionId = session.getId();
//建立userId和sessionId的关系
if (userSessionMap.containsKey(userId)) {
userSessionMap.get(userId).add(sessionId);
} else {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.add(sessionId);
userSessionMap.put(userId, queue);
}
sessionUserMap.put(sessionId, userId);
//建立sessionId和websocket引用的关系
if (!websocketMap.containsKey(sessionId)) {
websocketMap.put(sessionId, this);
addOnlineCount(); //在线数加1
}
} catch (Exception e) {
logger.error("连接失败");
String es = ExceptionUtils.getFullStackTrace(e);
logger.error(es);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
String sessionId = this.session.getId();
//移除userId和sessionId的关系
String userId = sessionUserMap.get(sessionId);
sessionUserMap.remove(sessionId);
if (userId != null) {
ConcurrentLinkedQueue<String> sessionIds = userSessionMap.get(userId);
if (sessionIds != null) {
sessionIds.remove(sessionId);
if (sessionIds.size() == 0) {
userSessionMap.remove(userId);
}
}
}
//移除sessionId和websocket的关系
if (websocketMap.containsKey(sessionId)) {
websocketMap.remove(sessionId);
subOnlineCount(); //在线数减1
}
}
/**
* <p>
* 收到客户端消息后调用的方法
* </p>
*
* @param messageStr 客户端发送过来的消息
* @param session session
* @param userId 用户id
*/
@OnMessage
public void onMessage(String messageStr, Session session, @PathParam("userId") Integer userId) {
}
/**
* @param session session
* @param error 当连接发生错误时的回调
*/
@OnError
public void onError(Session session, Throwable error) {
String es = ExceptionUtils.getFullStackTrace(error);
logger.error(es);
}
/**
* <p>
* 实现服务器主动推送
* </P>
*
* @param message 消息
* @param toUserId 用户id
* @throws IOException IOException
*/
public void sendMessage(String message, String toUserId) throws IOException {
if (toUserId != null && !StringUtil.isEmpty(message.trim())) {
ConcurrentLinkedQueue<String> sessionIds = userSessionMap.get(toUserId);
if (sessionIds != null) {
for (String sessionId : sessionIds) {
MessageWebSocket socket = websocketMap.get(sessionId);
socket.session.getBasicRemote().sendText(message);
}
}
} else {
logger.error("未找到接收用户连接,该用户未连接或已断开");
}
}
/**
* <p>
* 服务器推送
* </p>
*
* @param message 消息
* @param session session
* @throws IOException IOException
*/
public void sendMessage(String message, Session session) throws IOException {
session.getBasicRemote().sendText(message);
}
/**
* <p>
* 消息推送 广播方式
* </p>
*
* @param message 消息
*/
public void sendMessage(String message) {
if (StringUtils.isEmpty(message)) {
return;
}
ArrayList<MessageWebSocket> webSockets = new ArrayList<>(websocketMap.values());
webSockets.forEach(webSocket -> {
try {
webSocket.session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
});
}
/**
* 获取在线人数
*/
public static synchronized int getOnlineCount() {
return onlineCount;
}
/**
* 在线人数加一
*/
public static synchronized void addOnlineCount() {
MessageWebSocket.onlineCount++;
}
/**
* 在线人数减一
*/
public static synchronized void subOnlineCount() {
MessageWebSocket.onlineCount--;
}
}
请求监听
/**
* <p>
* 请求监听
* 将所有的request携带httpSession
* </P>
*
* @author bing_huang
* @since V1.0
*/
@Component
public class RequestListener implements ServletRequestListener {
@Override
public void requestInitialized(ServletRequestEvent sre) {
//将所有request请求都携带上httpSession
HttpSession session = ((HttpServletRequest) sre.getServletRequest()).getSession();
}
@Override
public void requestDestroyed(ServletRequestEvent sre) {
}
public RequestListener() {
}
}
前端界面
前端界面采用的vue方式
mounted() {
if ('WebSocket' in window) {
let url = ""
this.websocket = new WebSocket(url);
this.initWebSocket()
} else {
alert('当前浏览器 Not support websocket')
}
},
initWebSocket() {
// 连接错误
this.websocket.onerror = this.setErrorMessage;
// 连接成功
this.websocket.onopen = this.setOnopenMessage;
// 收到消息的回调
this.websocket.onmessage = this.setOnmessageMessage;
// 连接关闭的回调
this.websocket.onclose = this.setOncloseMessage;
// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = this.onbeforeunload
},
setErrorMessage() {
},
setOnopenMessage() {
},
setOnmessageMessage(event) {
// 根据服务器推送的消息做自己的业务处理
console.log('服务端返回:' + event.data);
},
setOncloseMessage() {
},
onbeforeunload() {
this.closeWebSocket()
},
closeWebSocket() {
this.websocket.close()
}
说明
- 如果将其部署服务时采用代理(nginx)时,代理(nginx)需要特别配合
#webSocket
location 访问路径 {
proxy_pass http://后台地址;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;
}
- 如何访问,因为Websocket是ws协议 wss协议,因此在前端界面
url="wss://@ServerEndpoin(value)"
|url="ws://@ServerEndpoin(value)"