RxJS webSocket
浏览器提供的与 w3c 兼容的 WebSocket 对象的包装。
webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig
<T>): WebSocketSubject
<T>
参量
urlConfigOrSource | WebSocket 端点作为 url 或带有 配置和其他观察者。 |
---|---|
退货
WebSocketSubject<T>
:允许通过 WebSocket 连接发送和接收消息的主题。
描述
Subject
通过 WebSocket 与服务器通信的服务器
webSocket
是产生一个工厂函数 WebSocketSubject
, 可以用来与任意端点建立 WebSocket 连接。 webSocket
接受带有 WebSocket 端点 url 的字符串作为参数,或者接受 WebSocketSubjectConfig
用于提供其他配置的对象,如 以及用于跟踪 WebSocket 连接生命周期的观察者。
当 WebSocketSubject
订阅,它试图让一个 socket 连接, 除非已经有一个。 这意味着许多订户将始终收听 在同一套接字上,从而节省了资源。 但是,如果两个实例由组成 WebSocketSubject
, 即使这两个网址具有相同的网址,它们也会尝试将其分开 连接。 使用者时 WebSocketSubject
取消订阅的 ,套接字连接会关闭, 仅当没有更多订户仍在收听时。 如果一段时间后消费者开始 再次订阅,将重新建立连接。
建立连接后,每当服务器收到新消息时, WebSocketSubject
都会发出该消息。 消息作为流中的值。 默认情况下,通过解析来自套接字的消息 JSON.parse
。 如果你 想要自定义反序列化的处理方式(如果有的话),您可以提供自定义 resultSelector
在 WebSocketSubject
。 如果连接关闭,则流将完成,前提是发生了 任何错误。 如果在任何时候(启动,维护或关闭连接)出现错误, 无论抛出什么 WebSocket API,流都将出错。
由于是 Subject
,因此 WebSocketSubject
允许从服务器接收和发送消息。 为了 与所连接的端点,用于通信 next
, error
和 complete
方法。 next
将值发送到服务器,因此请记住 该值将不会预先序列化。 因此, JSON.stringify
必须手动调用一个值, 在调用 之前 next
结果 。 还要注意,如果在下一个价值时刻 没有套接字连接(例如,没有人在订阅),这些值将被缓冲,并在连接时发送 终于成立了。 complete
方法关闭套接字连接。 error
一样 并通过状态代码和字符串以及发生的详细信息通知服务器发生了问题。 由于 WebSocket API 需要状态码, WebSocketSubject
因此不允许使用,例如常规 Subject
, 将任意值传递给该 error
方法。 需要使用具有以下内容的对象进行调用 code
具有状态码编号的 可选 reason
属性和具有描述细节的字符串的 属性 错误。
通话 next
不会影响的订户 WebSocketSubject
-他们没有 某些信息已发送到服务器的信息(当然,除非服务器 以某种方式响应消息)。 另一方面,由于调用 complete
触发器 尝试关闭套接字连接。 如果该连接已关闭且没有任何错误,则流将 完成,从而通知所有订户。 由于通话 error
关闭 如果关闭自身,套接字连接也将带有服务器的不同状态代码 没有错误,已订阅的 Observable 不会像人们期望的那样出错,但会像往常一样完成。 在两种情况下 (调用 complete
或 error
),如果关闭套接字连接的过程导致某些错误, 则 流 会出错。
多路复用
WebSocketSubject
还有其他操作符,在其他主题中找不到。 这就是所谓的 multiplex
,它是 用于模拟打开多个套接字连接,而实际上仅维护一个。 例如,一个应用程序同时具有聊天面板和有关体育新闻的实时通知。 由于这是两个不同的功能, 每个有两个独立的连接是有意义的。 也许 WebSocket 可能有两个单独的服务 端点,在单独的计算机上运行,只有 GUI 将它们组合在一起。 具有套接字连接 因为每个功能都可能变得资源过于昂贵。 单身是一种常见的模式 WebSocket 端点,充当其他服务(在本例中为聊天和体育新闻服务)的网关。 即使客户端应用程序中只有一个连接,也可以像处理流一样操作流 需要两个单独的插座。 这消除了手动在网关中注册和注销 提供服务并过滤出感兴趣的消息。 这正是该 multiplex
方法的目的。
方法接受三个参数。 前两个是返回订阅和取消订阅消息的函数 分别。 这些消息将在产生 Observable 的使用者使用时发送到服务器 订阅和取消订阅。 服务器可以使用它们来验证某种消息应该启动还是停止 被转发给客户。 对于上述示例应用程序,在获得带有正确标识符的订阅消息后, 网关服务器可以决定应连接到真实的体育新闻服务并开始从中转发消息。 请注意,这两个消息都将按函数返回的方式发送,默认情况下,它们使用 JSON.stringify 进行序列化,只是 作为通过推送的消息 next
。 还请记住,这些消息将在 发送, 每次 订阅时 并且 取消订阅。 这是潜在的危险,因为 Observable 的一个使用者可能会取消订阅,并且服务器 由于收到取消订阅的消息,可能会停止发送消息。 这需要处理 在服务器上或 使用 publish
在从“多路复用”返回的 Observable 上 。
的最后一个参数 multiplex
是 的 messageFilter
应返回布尔值 函数。 用于过滤邮件 由服务器发送给仅属于模拟 WebSocket 流的服务器。 例如,服务器可能会标记这些 消息对象上带有某种字符串标识符的消息,并且 messageFilter
将返回 true
如果套接字发出的对象上有这样的标识符。 消息返回 false
在 messageFilter
简单地跳过, 并且不会顺流而下。
返回值 multiplex
是 Observable,其中包含来自模拟套接字连接的消息。 请注意 不是 WebSocketSubject
,因此调用 next
或 multiplex
再次失败。 用于将值推向 服务器,使用 root WebSocketSubject
。
例子
侦听来自服务器的消息
import { webSocket } from "rxjs/webSocket";
const subject = webSocket("ws://localhost:8081");
subject.subscribe(
msg => console.log('message received: ' + msg), // Called whenever there is a message from the server.
err => console.log(err), // Called if at any point WebSocket API signals some kind of error.
() => console.log('complete') // Called when connection is closed (for whatever reason).
);
将消息推送到服务器
import { webSocket } from "rxjs/webSocket";
const subject = webSocket('ws://localhost:8081');
subject.subscribe();
// Note that at least one consumer has to subscribe to the created subject - otherwise "nexted" values will be just buffered and not sent,
// since no connection was established!
subject.next({message: 'some message'});
// This will send a message to the server once a connection is made. Remember value is serialized with JSON.stringify by default!
subject.complete(); // Closes the connection.
subject.error({code: 4000, reason: 'I think our app just broke!'});
// Also closes the connection, but let's the server know that this closing is caused by some error.
多路 WebSocket
import { webSocket } from "rxjs/webSocket";
const subject = webSocket('ws://localhost:8081');
const observableA = subject.multiplex(
() => ({subscribe: 'A'}), // When server gets this message, it will start sending messages for 'A'...
() => ({unsubscribe: 'A'}), // ...and when gets this one, it will stop.
message => message.type === 'A' // If the function returns `true` message is passed down the stream. Skipped if the function returns false.
);
const observableB = subject.multiplex( // And the same goes for 'B'.
() => ({subscribe: 'B'}),
() => ({unsubscribe: 'B'}),
message => message.type === 'B'
);
const subA = observableA.subscribe(messageForA => console.log(messageForA));
// At this moment WebSocket connection is established. Server gets '{"subscribe": "A"}' message and starts sending messages for 'A',
// which we log here.
const subB = observableB.subscribe(messageForB => console.log(messageForB));
// Since we already have a connection, we just send '{"subscribe": "B"}' message to the server. It starts sending messages for 'B',
// which we log here.
subB.unsubscribe();
// Message '{"unsubscribe": "B"}' is sent to the server, which stops sending 'B' messages.
subA.unsubscribe();
// Message '{"unsubscribe": "A"}' makes the server stop sending messages for 'A'. Since there is no more subscribers to root Subject,
// socket connection closes.
更多建议: