SpringBoot之webSocket

SpringBoot之webSocket

Scroll Down

简介

WebSocket是一种网络传输协议,可在单个TCP连接上进行全双工通信,位于OSI模型的应用层。WebSocket协议在2011年由IETF标准化为RFC 6455,后由RFC 7936补充规范。Web IDL中的WebSocket API由W3C标准化。

WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。--来源WIKI

实现

maven依赖

<dependency>  
   <groupId>org.springframework.boot</groupId>  
   <artifactId>spring-boot-starter-websocket</artifactId>  
</dependency> 

WebSocketConfig 配置

/**
 * <p>
 *     webSocket
 * </P>
 *
 * @author bing_huang
 * @since V1.0
 */
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator {
    private static final String HttpSession = null;
    /* 修改握手,就是在握手协议建立之前修改其中携带的内容 */
    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
        /*如果没有监听器,那么这里获取到的HttpSession是null*/
        StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession();
        if (ssf != null) {
            javax.servlet.http.HttpSession session = (HttpSession) request.getHttpSession();
            sec.getUserProperties().put("sessionid", session);
        }
        //sec.getUserProperties().put("name", "小强");
        super.modifyHandshake(sec, request, response);
    }
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

WebSocketServer 服务端

/**
 * <p>
 * messageWebSocket 消息通知
 * </P>
 *
 * @author bing_huang
 * @since V1.0
 */
@Component
@ServerEndpoint(value = "/messageSocket/{userId}", configurator = WebSocketConfig.class)
public class MessageWebSocket {
    private static final Logger logger = LoggerFactory.getLogger(MessageWebSocket.class);

    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlineCount = 0;

    /**
     * key: userId value: sessionIds
     */
    private static ConcurrentHashMap<String, ConcurrentLinkedQueue<String>> userSessionMap = new ConcurrentHashMap<String, ConcurrentLinkedQueue<String>>();

    /**
     * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<String, MessageWebSocket> websocketMap = new ConcurrentHashMap<>();

    /**
     * key: sessionId value: userId
     */
    private static ConcurrentHashMap<String, String> sessionUserMap = new ConcurrentHashMap<String, String>();

    /**
     * 当前连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    /**
     * <p>
     * 连接建立成功调用的方法
     * </p>
     *
     * @param session Session
     * @param userId  用户id
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        try {
            this.session = session;
            String sessionId = session.getId();
            //建立userId和sessionId的关系
            if (userSessionMap.containsKey(userId)) {
                userSessionMap.get(userId).add(sessionId);
            } else {
                ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
                queue.add(sessionId);
                userSessionMap.put(userId, queue);
            }
            sessionUserMap.put(sessionId, userId);
            //建立sessionId和websocket引用的关系
            if (!websocketMap.containsKey(sessionId)) {
                websocketMap.put(sessionId, this);
                addOnlineCount();           //在线数加1
            }
        } catch (Exception e) {
            logger.error("连接失败");
            String es = ExceptionUtils.getFullStackTrace(e);
            logger.error(es);
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        String sessionId = this.session.getId();
        //移除userId和sessionId的关系
        String userId = sessionUserMap.get(sessionId);
        sessionUserMap.remove(sessionId);
        if (userId != null) {
            ConcurrentLinkedQueue<String> sessionIds = userSessionMap.get(userId);
            if (sessionIds != null) {
                sessionIds.remove(sessionId);
                if (sessionIds.size() == 0) {
                    userSessionMap.remove(userId);
                }
            }
        }
        //移除sessionId和websocket的关系
        if (websocketMap.containsKey(sessionId)) {
            websocketMap.remove(sessionId);
            subOnlineCount();           //在线数减1
        }
    }

    /**
     * <p>
     * 收到客户端消息后调用的方法
     * </p>
     *
     * @param messageStr 客户端发送过来的消息
     * @param session    session
     * @param userId     用户id
     */
    @OnMessage
    public void onMessage(String messageStr, Session session, @PathParam("userId") Integer userId) {

    }

    /**
     * @param session session
     * @param error   当连接发生错误时的回调
     */
    @OnError
    public void onError(Session session, Throwable error) {
        String es = ExceptionUtils.getFullStackTrace(error);
        logger.error(es);
    }


    /**
     * <p>
     * 实现服务器主动推送
     * </P>
     *
     * @param message  消息
     * @param toUserId 用户id
     * @throws IOException IOException
     */
    public void sendMessage(String message, String toUserId) throws IOException {
        if (toUserId != null && !StringUtil.isEmpty(message.trim())) {
            ConcurrentLinkedQueue<String> sessionIds = userSessionMap.get(toUserId);
            if (sessionIds != null) {
                for (String sessionId : sessionIds) {
                    MessageWebSocket socket = websocketMap.get(sessionId);
                    socket.session.getBasicRemote().sendText(message);
                }
            }
        } else {
            logger.error("未找到接收用户连接,该用户未连接或已断开");
        }
    }

    /**
     * <p>
     * 服务器推送
     * </p>
     *
     * @param message 消息
     * @param session session
     * @throws IOException IOException
     */
    public void sendMessage(String message, Session session) throws IOException {
        session.getBasicRemote().sendText(message);
    }

    /**
     * <p>
     * 消息推送 广播方式
     * </p>
     *
     * @param message 消息
     */
    public void sendMessage(String message) {
        if (StringUtils.isEmpty(message)) {
            return;
        }
        ArrayList<MessageWebSocket> webSockets = new ArrayList<>(websocketMap.values());
        webSockets.forEach(webSocket -> {
            try {
                webSocket.session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();

            }
        });
    }

    /**
     * 获取在线人数
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 在线人数加一
     */
    public static synchronized void addOnlineCount() {
        MessageWebSocket.onlineCount++;
    }

    /**
     * 在线人数减一
     */
    public static synchronized void subOnlineCount() {
        MessageWebSocket.onlineCount--;
    }
}

请求监听

/**
 * <p>
 * 请求监听
 * 将所有的request携带httpSession
 * </P>
 *
 * @author bing_huang
 * @since V1.0
 */
@Component
public class RequestListener implements ServletRequestListener {
    @Override
    public void requestInitialized(ServletRequestEvent sre) {
        //将所有request请求都携带上httpSession
        HttpSession session = ((HttpServletRequest) sre.getServletRequest()).getSession();
    }

    @Override
    public void requestDestroyed(ServletRequestEvent sre) {

    }

    public RequestListener() {
    }
}

前端界面

前端界面采用的vue方式

 mounted() {
  if ('WebSocket' in window) {
        let url = ""
        this.websocket = new WebSocket(url);
        this.initWebSocket()
      } else {
        alert('当前浏览器 Not support websocket')
      }
 },
 initWebSocket() {
        // 连接错误
        this.websocket.onerror = this.setErrorMessage;

        // 连接成功
        this.websocket.onopen = this.setOnopenMessage;

        // 收到消息的回调
        this.websocket.onmessage = this.setOnmessageMessage;

        // 连接关闭的回调
        this.websocket.onclose = this.setOncloseMessage;

        // 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = this.onbeforeunload
      },
      setErrorMessage() {
      },
      setOnopenMessage() {
      },
      setOnmessageMessage(event) {
        // 根据服务器推送的消息做自己的业务处理
        console.log('服务端返回:' + event.data);
      },
      setOncloseMessage() {
      },
      onbeforeunload() {
        this.closeWebSocket()
      },
      closeWebSocket() {
        this.websocket.close()
      }

说明

  1. 如果将其部署服务时采用代理(nginx)时,代理(nginx)需要特别配合
 #webSocket
 location 访问路径 {
   proxy_pass http://后台地址;
   proxy_http_version 1.1;
   proxy_set_header Upgrade $http_upgrade;
   proxy_set_header Connection "Upgrade";
   proxy_set_header Host $host;
 }
  1. 如何访问,因为Websocket是ws协议 wss协议,因此在前端界面url="wss://@ServerEndpoin(value)"|url="ws://@ServerEndpoin(value)"