RxJS排程器

2020-09-25 16:18 更新

什么是调度程序?调度程序控制何时开始订阅以及何时传递通知。它由三个部分组成。

  • 调度程序是一种数据结构。它知道如何根据优先级或其他条件存储和排队任务。
  • 调度程序是一个执行上下文。它表示执行任务的位置和时间(例如,立即执行或在其他回调机制中执行,例如 setTimeout 或 process.nextTick 或动画帧)。
  • 调度程序具有(虚拟)时钟。它通过 now()调度程序上的 getter 方法提供“时间”的概念。在特定的调度程序上调度的任务将仅遵守该时钟指示的时间。

调度程序使您可以定义 Observable 在哪个执行上下文中将通知传递给其 Observer。

在下面的例子中,我们采取了一贯的简约可观察到发射值 123同步,并使用运营商 observeOn指定的 async调度用于提供这些值。

在 Stackblitz 上查看

  1. import { Observable, asyncScheduler } from 'rxjs';
  2. import { observeOn } from 'rxjs/operators';
  3. const observable = new Observable((observer) => {
  4. observer.next(1);
  5. observer.next(2);
  6. observer.next(3);
  7. observer.complete();
  8. }).pipe(
  9. observeOn(asyncScheduler)
  10. );
  11. console.log('just before subscribe');
  12. observable.subscribe({
  13. next(x) {
  14. console.log('got value ' + x)
  15. },
  16. error(err) {
  17. console.error('something wrong occurred: ' + err);
  18. },
  19. complete() {
  20. console.log('done');
  21. }
  22. });
  23. console.log('just after subscribe');

使用输出执行:

  1. just before subscribe
  2. just after subscribe
  3. got value 1
  4. got value 2
  5. got value 3
  6. done

请注意,之后的通知 got value...是如何传递的 just after subscribe,这与到目前为止我们看到的默认行为不同。这是因为在最终观察者 observeOn(asyncScheduler)之间引入了代理 new Observable观察者。让我们重命名一些标识符,以使该区别在示例代码中显而易见:

  1. import { Observable, asyncScheduler } from 'rxjs';
  2. import { observeOn } from 'rxjs/operators';
  3. var observable = new Observable((proxyObserver) => {
  4. proxyObserver.next(1);
  5. proxyObserver.next(2);
  6. proxyObserver.next(3);
  7. proxyObserver.complete();
  8. }).pipe(
  9. observeOn(asyncScheduler)
  10. );
  11. var finalObserver = {
  12. next(x) {
  13. console.log('got value ' + x)
  14. },
  15. error(err) {
  16. console.error('something wrong occurred: ' + err);
  17. },
  18. complete() {
  19. console.log('done');
  20. }
  21. };
  22. console.log('just before subscribe');
  23. observable.subscribe(finalObserver);
  24. console.log('just after subscribe');

proxyObserver中创建 observeOn(asyncScheduler),其 next(val)功能大致如下:

  1. const proxyObserver = {
  2. next(val) {
  3. asyncScheduler.schedule(
  4. (x) => finalObserver.next(x),
  5. 0 /* delay */,
  6. val /* will be the x for the function above */
  7. );
  8. },
  9. // ...
  10. }

async 计划与经营 setTimeout或者 setInterval,即使给定 delay为零。像往常一样,在 JavaScript 中,setTimeout(fn, 0)已知 fn最早在下一个事件循环迭代时运行该函数。这解释了为什么 got value 1交付到finalObserver后来 just after subscribe发生。

schedule()调度程序的方法带有一个 delay参数,该参数指的是相对于调度程序自身内部时钟的时间量。调度程序的时钟与实际的挂钟时间没有任何关系。像这样的时间运算符 delay不是按实际时间而是按调度程序的时钟指示的时间进行操作。这在测试中特别有用,在该测试中,虚拟时间调度程序可用于伪造壁钟时间,而实际上却是同步执行调度的任务。

调度程序类型

async 计划是一个内置的 RxJS 提供调度。可以使用 Scheduler 对象的静态属性来创建和返回每个对象。

排程器 目的
null 通过不传递任何调度程序,可以同步和递归地传递通知。将此用于恒定时间操作或尾部递归操作。
queueScheduler 在当前事件框架中的队列上进行调度(蹦床调度程序)。使用它进行迭代操作。
asapScheduler 微型任务队列上的计划,这与用于承诺的队列相同。基本上在当前工作之后,但是在下一个工作之前。使用它进行异步转换。
asyncScheduler 时间表与配合使用 setInterval。将此用于基于时间的操作。
animationFrameScheduler 计划将在下一次浏览器内容重新绘制之前发生的任务。可用于创建流畅的浏览器动画。

使用调度程序

您可能已经在 RxJS 代码中使用了调度程序,而没有明确说明要使用的调度程序的类型。这是因为所有处理并发的 Observable 运算符都有可选的调度程序。如果不提供调度程序,则 RxJS 将使用最小并发原理选择默认调度程序。这意味着将选择调度程序,该调度程序将引入最少的并发量以满足操作员的需求。例如,对于返回有限且少量消息的 Observable 的运算符,RxJS 不使用 Scheduler,即 nullor undefined。对于返回大量或无限数量消息的操作员,使用 queueScheduler。对于使用计时器的操作员,async 将使用。

由于 RxJS 使用最少的并发调度程序,因此如果您出于性能目的引入并发,则可以选择其他调度程序。要指定特定的调度程序,可以使用采用调度程序的那些运算符方法,例如 from([10, 20, 30], asyncScheduler)

静态创建运算符通常将 Scheduler 作为参数。例如,from(array, scheduler)让您指定在传递从转换的每个通知时要使用的 Scheduler array。它通常是运算符的最后一个参数。以下静态创建运算符采用 Scheduler 参数:

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

使用 subscribeOn预定计划在什么情况下会在 subscribe()呼叫发生。默认情况下,subscribe()对 Observable 的调用将立即同步进行。但是,您可以使用 instance operator subscribeOn(scheduler)(其中scheduler 提供的参数)来延迟或调度实际订阅在给定 Scheduler 上发生。

使用 observeOn预定计划在什么情况下会通知交付。正如我们在上面的示例中看到的那样,实例运算符 observeOn(scheduler)在源 Observable 和目标 Observer 之间引入了一个中介者 Observer,其中中介者使用您的给定调度对目标 Observer 的调用 scheduler

实例运算符可以将Scheduler作为参数。

时间相关的运营商如 bufferTimedebounceTimedelayauditTimesampleTimethrottleTimetimeIntervaltimeouttimeoutWithwindowTime 都将一个调度程序作为最后一个参数,否则默认情况下的操作 asyncScheduler

其他实例操作符采取调度作为参数:cachecombineLatestconcatexpandmergepublishReplaystartWith

注意两个cachepublishReplay 接受调度,因为他们利用 ReplaySubject。ReplaySubjects 的构造函数将可选的 Scheduler 作为最后一个参数,因为 ReplaySubject 可能会处理时间,这仅在 Scheduler 的上下文中才有意义。默认情况下,ReplaySubject 使用 queue 调度程序提供时钟。

以上内容是否对您有帮助:
在线笔记
App下载
App下载

扫描二维码

下载编程狮App

公众号
微信公众号

编程狮公众号