WebSocket 协议 RFC 6455 提供了一个标准化的 在客户端和服务器之间建立全双工双向通信通道的方法 通过单个 TCP 连接。它是与HTTP不同的TCP协议,但旨在 通过 HTTP 工作,使用端口 80 和 443,并允许重用现有防火墙规则。
WebSocket交互以HTTP请求开始,该请求使用HTTPUpgrade标头来升级,或者在本例中,切换到WebSocket协议。以下示例显示了这样的交互:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080(1)Upgrade标头。
(2)Upgrade使用连接。
支持 WebSocket 的服务器返回输出,而不是通常的 200 状态代码 类似于以下内容:
HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp(1)协议切换
握手成功后,HTTP 升级请求的 TCP 套接字将保留 为客户端和服务器打开以继续发送和接收消息。
有关 WebSocket 工作原理的完整介绍超出了本文档的范围。
请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则 可能需要将其配置为将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请检查 与 WebSocket 支持相关的云提供商的说明。
即使 WebSocket 被设计为与 HTTP 兼容并且以 HTTP 请求开始, 重要的是要了解这两种协议导致非常不同的 体系结构和应用程序编程模型。
在 HTTP 和 REST 中,应用程序被建模为多个 URL。要与应用程序交互, 客户端访问这些 URL,请求-响应样式。服务器将请求路由到 基于 HTTP URL、方法和标头的适当处理程序。
相比之下,在 WebSocket 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。这指向 一种完全不同的异步、事件驱动的消息传递体系结构。
WebSocket也是一种低级传输协议,与HTTP不同,它没有规定 消息内容的任何语义。这意味着无法路由或处理 消息,除非客户端和服务器在消息语义上达成一致。
WebSocket客户端和服务器可以通过HTTP握手请求上的Sec WebSocket protocol标头协商使用更高级别的消息传递协议(例如STOMP)。如果没有这一点,他们需要制定自己的惯例。
WebSockets可以使网页具有动态性和交互性。但是,在许多情况下, AJAX 和 HTTP 流或长轮询的组合可以提供简单和 有效的解决方案。
例如,新闻、邮件和社交源需要动态更新,但可能是 完全可以每隔几分钟这样做一次。协作、游戏和金融应用, 另一方面,需要更接近实时。
延迟本身并不是决定性因素。如果消息量相对较低(例如, 监控网络故障)HTTP 流或轮询可以提供有效的解决方案。 低延迟、高频和高容量的组合使最佳 使用 WebSocket 的案例。
还请记住,在Internet上,超出您控制范围的限制性代理可能会阻止WebSocket交互,因为它们未配置为传递Upgrade标头,或者因为它们关闭了看起来空闲的长期连接。这意味着,与面向公共的应用程序相比,在防火墙内部应用程序中使用WebSocket是一个更直接的决定。
Spring 框架提供了一个 WebSocket API,您可以使用它来编写客户端和 处理 WebSocket 消息的服务器端应用程序
要创建WebSocket服务器,首先可以创建WebSocketHandler。以下示例显示了如何执行此操作:
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;public class MyWebSocketHandler implements WebSocketHandler {@Overridepublic Mono handle(WebSocketSession session) {// ...}
}
然后,您可以将其映射到 URL:
@Configuration
class WebConfig {@Beanpublic HandlerMapping handlerMapping() {Map map = new HashMap<>();map.put("/path", new MyWebSocketHandler());int order = -1; // before annotated controllersreturn new SimpleUrlHandlerMapping(map, order);}
}
如果使用WebFlux配置,则无需进一步操作,否则如果不使用WebFlush配置,则需要声明WebSocketHandlerAdapter,如下所示:
@Configuration
class WebConfig {// ...@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
WebSocketHandler的handle方法接受WebSocketSession并返回Mono<Void>以指示会话的应用程序处理何时完成。会话通过两个流处理,一个用于入站消息,另一个用于出站消息。下表描述了处理流的两种方法:
WebSocketSession方法 | 描述 |
Flux | 提供对入站消息流的访问,并在连接关闭时完成。 |
Mono | 获取传出消息的源,编写消息,并返回一个Mono<Void>,该消息在源完成并完成编写时完成。 |
WebSocketHandler必须将入站流和出站流组合成一个统一的流,并返回反映该流完成情况的Mono<Void>。根据应用程序要求,统一流程在以下情况下完成:
入站或出站消息流完成。
入站流完成(即连接关闭),而出站流是无限的。
在选定的点上,通过WebSocketSession的close方法。
当入站和出站消息流组合在一起时,无需检查连接是否打开,因为Reactive streams表示结束活动。入站流接收完成或错误信号,出站流接收取消信号。
处理程序的最基本实现是处理入站流的实现。以下示例显示了这样的实现:
class ExampleHandler implements WebSocketHandler {@Overridepublic Mono handle(WebSocketSession session) {return session.receive() (1).doOnNext(message -> {// ... (2)}).concatMap(message -> {// ... (3)}).then(); (4)}
}
(1)访问入站消息流。
(2)对每条消息做些什么。
(3)执行使用消息内容的嵌套异步操作。
(4)返回接收完成时完成的Mono<Void>。
以下实现组合了入站和出站流:
class ExampleHandler implements WebSocketHandler {@Overridepublic Mono handle(WebSocketSession session) {Flux output = session.receive() (1).doOnNext(message -> {// ...}).concatMap(message -> {// ...}).map(value -> session.textMessage("Echo " + value)); (2)return session.send(output); (3)}
}
(1)处理入站消息流。
(2)创建出站消息,生成组合流。
(3)返回在我们继续接收时未完成的Mono<Void>。
入站和出站流可以是独立的,并且仅在完成时才加入, 如以下示例所示:
class ExampleHandler implements WebSocketHandler {@Overridepublic Mono handle(WebSocketSession session) {Mono input = session.receive() (1).doOnNext(message -> {// ...}).concatMap(message -> {// ...}).then();Flux source = ... ;Mono output = session.send(source.map(session::textMessage)); (2)return Mono.zip(input, output).then(); (3)}
}
(1)处理入站消息流。
(2)发送传出消息。
(3)加入这些流,并返回一个Mono<Void>,在任何一个流结束时完成。
DataBuffer是WebFlux中字节缓冲区的表示。参考资料的Spring Core部分在数据缓冲区和编解码器部分有更多内容。要理解的关键点是,在某些服务器(如Netty)上,字节缓冲区被合并并进行引用计数,并且在使用时必须释放,以避免内存泄漏。
在Netty上运行时,应用程序必须使用DataBufferUtils.retain(dataBuffer),如果它们希望保留输入数据缓冲区,以确保它们不会被释放,然后在缓冲区被消耗时使用DataBufferUtils.release(dataBuffer)。
WebSocketHandlerAdapter委托给WebSocketService。默认情况下,这是HandshakeWebSocketService的一个实例,它对WebSocket请求执行基本检查,然后对正在使用的服务器使用RequestUpgradeStrategy。目前,有对Reactor Netty、Tomcat、Jetty和Undertow的内置支持。
HandshakeWebSocketService公开了一个sessionAttributePredicate属性,该属性允许设置Predicate<String>以从WebSession中提取属性并将其插入WebSocketSession的属性中。
每个服务器的RequestUpgradeStrategy公开了特定于底层WebSocket服务器引擎的配置。使用WebFlux Java配置时,您可以自定义WebFluxConfig相应部分中所示的财产,否则,如果不使用WebFlex配置,请使用以下选项:
@Configuration
class WebConfig {@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter(webSocketService());}@Beanpublic WebSocketService webSocketService() {TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();strategy.setMaxSessionIdleTimeout(0L);return new HandshakeWebSocketService(strategy);}
}
检查服务器的升级策略以查看可用的选项。现在 只有Tomcat和Jetty暴露了这样的选择。
配置CORS并限制对WebSocket端点的访问的最简单方法是让WebSocketHandler实现CorsConfigurationSource,并返回一个具有允许的源、头和其他详细信息的CorsCoonfiguration。如果无法做到这一点,还可以在SimpleUrlHandler上设置corsConfigurations属性,以按URL模式指定CORS设置。如果同时指定了两者,则使用CorsConfiguration上的组合方法组合它们。
Spring WebFlux提供了一个WebSocketClient抽象,其中包含Reactor Netty、Tomcat、Jetty、Undertow和标准Java(即JSR-356)的实现。
要启动WebSocket会话,可以创建客户端的实例并使用其执行方法:
WebSocketClient client = new ReactorNettyWebSocketClient();URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->session.receive().doOnNext(System.out::println).then());
有些客户端(如Jetty)实施生命周期,需要停止并启动才能使用它们。所有客户端都具有与底层WebSocket客户端的配置相关的构造函数选项。
org.springframework.boot spring-boot-starter-websocket org.webjars webjars-locator-core org.webjars sockjs-client 1.0.2 org.webjars stomp-websocket 2.3.3 org.webjars bootstrap 3.3.7 org.webjars jquery 3.1.1-1 org.springframework.boot spring-boot-starter-test test
package com.example.messagingstompwebsocket;public class HelloMessage {private String name;public HelloMessage() {}public HelloMessage(String name) {this.name = name;}public String getName() {return name;}public void setName(String name) {this.name = name;}
}
在 Spring 处理 STOMP 消息传递的方法中,STOMP 消息可以路由到@Controller类。例如,将 (from ) 映射为处理到目标的消息,如以下清单所示:
package com.example.messagingstompwebsocket;import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
import org.springframework.web.util.HtmlUtils;@Controller
public class GreetingController {@MessageMapping("/hello")@SendTo("/topic/greetings")public Greeting greeting(HelloMessage message) throws Exception {Thread.sleep(1000); // simulated delayreturn new Greeting("Hello, " + HtmlUtils.htmlEscape(message.getName()) + "!");}}
这个控制器简洁而简单,但仍在继续。我们一步一步地将其分解。
@MessageMapping注释确保,如果消息发送到/hello目标,则调用greeting()方法。
消息的有效负载被绑定到HelloMessage对象,该对象被传递到greeting()中。
在内部,该方法的实现通过使线程休眠一秒钟来模拟处理延迟。这是为了证明,在客户机发送消息后,服务器可以花多长时间来异步处理消息。客户机可以继续进行它需要做的任何工作,而无需等待响应。
延迟一秒后,greeting()方法创建一个greeting对象并将其返回。返回值将广播给/ttopic/greetings的所有订阅者,如@SendTo注释中所指定的。请注意,输入消息中的名称是经过净化的,因为在这种情况下,它将被回显并在客户端的浏览器DOM中重新呈现。
现在已经创建了服务的基本组件,您可以配置Spring以启用WebSocket和STOMP消息传递。
package com.example.messagingstompwebsocket;import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {config.enableSimpleBroker("/topic");config.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/gs-guide-websocket").withSockJS();}}
WebSocketConfig用@Configuration注释,表示它是一个Spring配置类。它还带有@EnableWebSocketMessageBroker注释。顾名思义,@EnableWebSocketMessageBroker启用WebSocket消息处理,并由消息代理支持。
configureMessageBroker()方法实现WebSocketMessageBrokerConfigurer中的默认方法来配置消息代理。它首先调用enableSimpleBroker(),以启用一个基于内存的简单消息代理,在以/topic为前缀的目的地将问候消息传送回客户端。它还为绑定到用@MessageMapping注释的方法的消息指定/app前缀。此前缀将用于定义所有消息映射。例如,/app/hello是GreetingController.greeting()方法映射到的端点。
registerTompEndpoints()方法注册/gs-guide-websocket端点,启用SockJS回退选项,以便在websocket不可用时使用备用传输。SockJS客户端将尝试连接到/gs-guide-websocket并使用最佳可用传输(websocket、xhr流、xhr轮询等)。