RxJS 可观察对象

2020-09-25 17:17 更新

可观察值是多个值的惰性 Push 集合。它们填补了下表中的缺失点:

Function Iterator
Promise Observable

例。以下是可观察到的该按压值 123 立即(同步)时所订阅,并且该值 4 在一秒之后自从订阅呼叫通过,则完成:

import { Observable } from 'rxjs';


const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

要调用 Observable 并查看这些值,我们需要订阅它:

import { Observable } from 'rxjs';


const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});


console.log('just before subscribe');
observable.subscribe({
  next(x) { console.log('got value ' + x); },
  error(err) { console.error('something wrong occurred: ' + err); },
  complete() { console.log('done'); }
});
console.log('just after subscribe');

可以在控制台上这样执行:

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

拉与推

PullPush 是两种不同的协议,它们描述了数据生产者如何与数据使用者通信。

什么是拉力?在拉式系统中,使用者确定何时从数据生产者接收数据。生产者本身并不知道何时将数据传递给消费者。

每个 JavaScript 函数都是一个 Pull 系统。该函数是数据的生产者,并且调用该函数的代码通过从调用中“拉出” 单个返回值来使用它。

ES2015 引入了生成器函数和迭代器function*),这是另一种Pull系统。调用的代码 iterator.next()是使用者,从迭代器(生产者)中“抽出” 多个值。

制片人 消费者
被动:在需要时产生数据。 有效:决定何时请求数据。
活动:按自己的节奏生成数据。 被动:对收到的数据做出反应。

什么是推?在推送系统中,生产者确定何时将数据发送给消费者。消费者不知道何时接收该数据。

承诺是当今 JavaScript 中最常见的 Push 系统类型。Promise(生产者)将已解析的值传递给已注册的回调(消费者),但与函数不同的是,Promise 负责精确确定何时将该值“推送”到回调中。

RxJS 引入了 Observables,这是一个用于 JavaScript 的新 Push 系统。一个 Observable 是多个值的生产者,将它们“推送”到观察者(消费者)。

  • 函数是一个懒惰地评估计算该同步返回上调用的单个值。
  • 发生器是一个懒惰地评估计算该同步返回零至(潜在地)无穷大的值上迭代。
  • 一个承诺是一个计算可能(也可能不会)最终返回一个值。
  • 一个可观察是懒洋洋地评估计算,并可以同步或异步从它开始被调用时返回零(潜在的)无限值。

可观测值作为功能的概括

与流行的说法相反,Observables 不像 EventEmitters,也不像是多个值的 Promises。在某些情况下,即使用 RxJS 主题对可观察对象进行多播时,可观察对象的行为可能类似于 EventEmitters,但通常它们并不类似于 EventEmitters。

可观察值就像带有零参数的函数,但是将其概括化以允许多个值。

考虑以下:

function foo() {
  console.log('Hello');
  return 42;
}


const x = foo.call(); // same as foo()
console.log(x);
const y = foo.call(); // same as foo()
console.log(y);

我们希望将其视为输出:

"Hello"
42
"Hello"
42

您可以使用 Observables 编写与上面相同的行为:

import { Observable } from 'rxjs';


const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
});


foo.subscribe(x => {
  console.log(x);
});
foo.subscribe(y => {
  console.log(y);
});

和输出是相同的:

"Hello"
42
"Hello"
42

发生这种情况是因为函数和 Observables 都是惰性计算。如果不调用该函数,console.log('Hello')则不会发生。同样在Observables 中,如果您不使用(用 subscribe)“调用”它,console.log('Hello')则不会发生。另外,“调用”或“订阅”是一个独立的操作:两个函数调用触发两个单独的副作用,两个 Observable 订阅触发两个单独的副作用。与 EventEmitter 具有共同的副作用并且无论订阅者的存在如何都渴望执行,与之相反,Observables 没有共享的执行并且很懒。

订阅 Observable 类似于调用 Function。

有人声称 Observable 是异步的。那是不对的。如果在函数调用中包含日志,如下所示:

console.log('before');
console.log(foo.call());
console.log('after');

您将看到输出:

"before"
"Hello"
42
"after"

这与 Observables 相同:

console.log('before');
foo.subscribe(x => {
  console.log(x);
});
console.log('after');

输出为:

"before"
"Hello"
42
"after"

证明订阅 foo 就像功能一样完全同步。

可观察对象能够同步或异步传递值。

一个 Observable 和一个函数有什么区别?可观察对象可以随着时间的推移“返回”多个值,而某些功能则不能。您不能这样做:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen
}

函数只能返回一个值。但是,可观察对象可以做到这一点:

import { Observable } from 'rxjs';


const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100); // "return" another value
  subscriber.next(200); // "return" yet another
});


console.log('before');
foo.subscribe(x => {
  console.log(x);
});
console.log('after');

带有同步输出:

"before"
"Hello"
42
100
200
"after"

但是您也可以异步“返回”值:

import { Observable } from 'rxjs';


const foo = new Observable(subscriber => {
  console.log('Hello');
  subscriber.next(42);
  subscriber.next(100);
  subscriber.next(200);
  setTimeout(() => {
    subscriber.next(300); // happens asynchronously
  }, 1000);
});


console.log('before');
foo.subscribe(x => {
  console.log(x);
});
console.log('after');

输出:

"before"
"Hello"
42
100
200
"after"
300

结论:

  • func.call()意思是“ 同步给我一个价值
  • observable.subscribe()意思是“ 给我同步或异步提供任意数量的值

可观察的解剖

观测量创建使用 new Observable 或产生算符,被订阅为与观察,执行交付 next/ error/ complete通知给观察者,和它们的执行可能布置。这四个方面都在 Observable 实例中编码,但是其中一些方面与其他类型(例如 Observer 和 Subscription)有关。

可观察的核心问题:

  • 创建可观察物
  • 订阅可观察物
  • 执行可观察的
  • 处置可观察物

创建可观察物

Observable构造函数有一个参数:subscribe 功能。

下面的示例创建一个 Observable,以 'hi' 每秒将字符串发送给订阅者。

import { Observable } from 'rxjs';


const observable = new Observable(function subscribe(subscriber) {
  const id = setInterval(() => {
    subscriber.next('hi')
  }, 1000);
});

可以使用创建可观察对象 new Observable。最常见的是,使用创建的功能,如创建观测offrominterval,等。

在上面的示例中,subscribe 函数是描述 Observable 的最重要部分。让我们看看订阅的含义。

订阅可观察物

observable可以订阅示例中的 Observable ,如下所示:

observable.subscribe(x => console.log(x));

这不是一个巧合 observable.subscribe,并 subscribenew Observable(function subscribe(subscriber) {...})具有相同的名称。在库中,它们是不同的,但是出于实际目的,您可以在概念上将它们视为相等。

这显示了如何 subscribe在同一 Observable 的多个 Observer 之间不共享调用。当 observable.subscribe 使用 Observer 进行调用时,subscribein 中的功能 new Observable(function subscribe(subscriber) {...})针对该给定的订户运行。每次呼叫都会 observable.subscribe 触发给定用户的独立设置。

订阅 Observable 就像调用一个函数,提供将数据传递到的回调。

addEventListener/之类的事件处理程序 API 完全不同 removeEventListener。使用 observable.subscribe,给定的 Observer 不会在 Observable 中注册为侦听器。Observable 甚至不维护附加的 Observers 的列表。

一个 subscribe 呼叫是简单地启动一个“可观察执行”,并提供价值或事件到执行的观测方式。

执行可观察物

里面的代码 new Observable(function subscribe(subscriber) {...})表示“可观察的执行”,这是一种惰性计算,仅对每个订阅的观察者发生。执行会随时间同步或异步产生多个值。

可观察的执行可以提供三种类型的值:

  • “下一个”通知:发送一个值,例如数字,字符串,对象等。
  • “错误”通知:发送 JavaScript 错误或异常。
  • “完成”通知:不发送值。

“下一个”通知是最重要和最常见的类型:它们表示正在传递给订户的实际数据。“错误”和“完成”通知在可观察的执行期间只能发生一次,并且只能有一个。

这些约束最好以正则表达式形式写在所谓的“ 可观察语法”或“ 契约”中

next*(error|complete)?

在可观察的执行中,可能会传递零到无限的 Next 通知。如果传递了错误或完成通知,则此后将无法传递其他任何东西。

以下是 Observable 执行的示例,该示例传递三个Next 通知,然后完成:

import { Observable } from 'rxjs';


const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

Observable 严格遵守 Observable Contract,因此以下代码不会传递 Next 通知4

import { Observable } from 'rxjs';


const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // Is not delivered because it would violate the contract
});

一个好主意是subscribetry/ catch块包装任何代码,如果捕获到异常,它们将传递错误通知:

import { Observable } from 'rxjs';


const observable = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err); // delivers an error if it caught one
  }
});

处置可观察的执行

因为可观察的执行可能是无限的,并且观察者想在有限的时间内中止执行是很常见的,所以我们需要一个 API 来取消执行。由于每次执行仅对一个观察者专有,因此一旦观察者完成接收值,它就必须有一种停止执行的方式,以避免浪费计算能力或内存资源。

observable.subscribe 被调用时,观察员被连接到新创建的可观察的执行。此调用还返回一个对象 Subscription

const subscription = observable.subscribe(x => console.log(x));

订阅代表正在进行的执行,并且具有最小的 API,可让您取消该执行。在 Subscription此处 阅读有关类型的更多信息。随着 subscription.unsubscribe()您可以取消正在进行的执行:

import { from } from 'rxjs';


const observable = from([10, 20, 30]);
const subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();

订阅后,您将获得一个 Subscription,代表正在进行的执行。只需调用 unsubscribe()即可取消执行。

当我们使用创建 Observable 时,每个 Observable 必须定义如何处置该执行的资源 create()。您可以通过 unsubscribe 从中返回自定义函数来实现 function subscribe()

例如,这就是我们清除带有的间隔执行集的方式 setInterval

const observable = new Observable(function subscribe(subscriber) {
  // Keep track of the interval resource
  const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);


  // Provide a way of canceling and disposing the interval resource
  return function unsubscribe() {
    clearInterval(intervalId);
  };
});

就像相似 observable.subscribe一样 new Observable(function subscribe() {...})unsubscribesubscribe概念上我们返回的等于 subscription.unsubscribe。实际上,如果删除围绕这些概念的 ReactiveX 类型,则会剩下相当简单的 JavaScript。

function subscribe(subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);


  return function unsubscribe() {
    clearInterval(intervalId);
  };
}


const unsubscribe = subscribe({next: (x) => console.log(x)});


// Later:
unsubscribe(); // dispose the resources

我们使用诸如 Observable,Observer 和 Subscription 之类的 Rx 类型的原因是为了获得安全性(例如 Observable Contract)和与操作员的可组合性。

以上内容是否对您有帮助:
在线笔记
App下载
App下载

扫描二维码

下载编程狮App

公众号
微信公众号

编程狮公众号