Rxjs基础

我们知道在Rxjs中有几类基础组件,它们分别是:
1、Observables
2、Observers
3、Operators
4、Subjects
5、Schedulers
以上这些组件是Rxjs的核心构建块。我们需要理解它们如何协同工作以便提供强大的功能。一旦对它们有了更高层次的理解,使用起来将会得心应手。那么,让我们一一来看看它们。先来看一个例子:
//import { from } from "rxjs";
let from = require("rxjs").from;
let nums = [1, 2, 4, 34, 56, 789];
let numsObservable$ = from(nums); //Observable
let observer = {
next: num => console.log(num),
error: err => console.error(err),
complete: () => console.log("Execution completed")
};
numsObservable$.subscribe(observer); //Observer subscribed
说明
1、从rxjs中导入必要的函数from
2、定义nums 是一个数字数组,然后将其转换为 Observable。
3、创建了一个名为numObservable$
的 rxjs Observable,借助它可以将数组转换为Observable。注意在 Observables 末尾附加 $ 是一种命名约定。
4、当我们只创建一个 observable 时,什么都不会被执行或调用。要执行任何操作,您需要订阅 Observable。
5、numsObservable$
订阅了一个名为observer
的对象。注意,这个observer
有 3 个属性,next、error 和 complete。一旦订阅,马上执行,因此订阅是执行的触发器。
6、执行完成后,您可以在控制台中看到“执行完成”消息。
什么是Observable
Observable 是可以随时间到达的事件流或数据流。使用RxJS函数,您可以从几乎任何来源(事件、Web 服务、套接字数据等)创建 Observable。换句话说,Observable是未来数据的集合。
在上面的示例中,我使用 from
运算符从数组创建 Observable。同样,也有 fromEvent 操作符来创建 JavaScript 事件。
Observables 可以是Lazy或Cold。除非调用 subscribe 方法,否则它们不会激活任何producer。
下面的代码展示了如何使用 fromEvent
从点击事件创建一个 Observable。
import { fromEvent } from "rxjs";
let clicksObservable$ = fromEvent(document, "click");
什么是Observer
Observable 表示可被观察的数据源,任何对数据源感兴趣可以订阅 Observables。Observer是通过订阅,注册对数据流感兴趣的代码。(因此Observer应该是可执行的,是有行为的,比如成功会怎么样,失败会怎么样,什么时候算结束,结束后还需要执行啥?)
Observer提供了迭代Observable上的数据流(序列)。
import { Observer } from './types';
import { config } from './config';
import { hostReportError } from './util/hostReportError';
export const empty: Observer = {
closed: true,
next(value: any): void { /* noop */},
error(err: any): void {
if (config.useDeprecatedSynchronousErrorHandling) {
throw err;
} else {
hostReportError(err);
}
},
complete(): void { /*noop*/ }
};
以上是一个empty Observer,基本上,Observer有 3 种方法,next、error 和 complete。理解它们中的每一个很重要。
let observer = {
next: num => console.log(num),
error: err => console.error(err),
complete: () => console.log("Execution completed")
};
next – 接受一个参数(值)。这是 Observable 正在推送/生成的数据。所以每次 observable 推送数据时 next(value) 都会收到该值。
error – 接受一个参数作为错误。如果Observer抛出错误,则执行此方法。
complete - 不接受任何参数。一旦 Observable 完成其操作/执行,此方法就会被执行。
什么是Operator
运算符是允许操纵 Observables 产生的值的函数。它们是每个人都喜欢使用 RxJS 的另一个原因。我将使用下面的代码示例向您解释运算符的概念。
import { from } from "rxjs";
import { map } from "rxjs/operators";
const nObservable$ = from([1, 2, 3, 5, 56, 567, 89]);
nObservable$.pipe(
map(val => val * 2)
).subscribe(
value => console.log(value)
);
上面的代码将存储在数组中的数字加倍。重要的是要注意 map
函数的使用,您可以将其与 Array.prototype.map()
相关联。正如您所注意到的,map
运算符放在管道方法中。
让我们看看另一个捕获鼠标点击并记录 x 坐标的示例。我将使用 pluck
运算符来执行此操作。
import { fromEvent } from "rxjs";
import { pluck } from "rxjs/operators";
const clicksObservable$ = fromEvent(document, "click");
clicksObservable$
.pipe(pluck("clientX"))
.subscribe({
next: clientX => console.log(clientX),
error: err => console.error(err),
complete: () => console.log("Execution completed")
});
rxjs/operators 中有很多 Operator。可以说,学习Rxjs的使用就是学习这些Operator,但是想要深入理解Rxjs还是需要深入学习基本的概念。
什么是Rxjs中的Subject
Subject 将数据推送到多个Observer,Observable一次将数据推送给单个Observer。 Subject 允许您发出新值,这与订阅 Observables 的情况下被动等待数据不同。
Subject用于多播,Observable用于单播。
如果您是 RxJS 的新手,请稍事休息,喝杯茶、咖啡或任何您喜欢的东西,然后查看下面的代码示例以从高层次上理解Subject。
如果您在 GitHub 上查看 Subject 的源代码,可以看到允许我们在subject调用.next,.error,.complete
等。
让我们看看下面的例子,我只是向你展示了调用 next()
,你也可以在需要时调用 error(err)
和 complete()
。
import { Subject } from "rxjs";
const mySubject$ = new Subject();
mySubject$.subscribe({
next : (num) => console.log("ObserverA: " + num)
});
mySubject$.subscribe({
next : (num) => console.log("ObserverB: " + num)
});
mySubject$.next(5);
mySubject$.next(101);
输出如下:
ObserverA: 5
ObserverB: 5
ObserverA: 101
ObserverB: 101
如上所示,您可以使用 next(value)
向多个Observer发出下一个值。当值的发出完成时,您可以调用 complete()
如下。
mySubject$.complete();
什么是Rxjs中的Schedulers
Schedulers是 rxjs 构建块的另一个重要部分。Schedulers控制订阅何时开始以及何时传递通知。有几个开箱即用的Schedulers,如 queueScheduler、asyncScheduler、asapScheduler 等。
在一个简单的项目中,您甚至可能不需要触摸这部分,但是,当您需要对上下文进行精确控制时,它很有用。看下面的例子:
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x)
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
});
console.log('just after subscribe');
输出如下:
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
总结:
以上就是对Rxjs中的基本构建块有了大致的理解和分析,记录一下方便以后查阅。