敲碎时间的人的个人专栏
上一篇

RxJs——map,filter第二种实现

广告
选中文字可对指定文章内容进行评论啦,→和←可快速切换按钮,绿色背景文字可以点击查看评论额。
大纲

上一节我们实现了map和filter函数,我们将这些函数都挂载在MyObservable对象上,这里存在一个问题,类似map和filter这样的操作型函数很多,所以不可能将他们都挂载在MyObservable对象上,因此,这里出现了第二种实现。

 

这些操作函数能串联起来的本质就是能够形成嵌套调用,因此我想到了使用pipe,pipe的本质是接收一个 RxJS 操作符的运行结果作为参数,并返回一个 Observable 实例。

代码实例

map实现

export function map(fn) {
    return (observable)=>{
        return new MyObservable(observer => {
            observable.subscribe({
                next: val=> observer.next(fn(val)),
                error: err => observer.error(err),
                complete: () => observer.complete()
            });
        });
    }
}

filter实现

export function filter(fn) {
    return (observable)=>{
        return new MyObservable(observer => {
            observable.subscribe({
                next: val=> fn(val)? observer.next(val): ()=>{},
                error: err => observer.error(err),
                complete: () => observer.complete()
            });
        });
    }
}

从这里我们可以看出 RxJS 操作符的运行结果就是map或者filter执行后的返回函数,返回值就是内部的一个MyObservable对象实例。

pipe单参数实现

pipe(operation) {
  return operation(this);
}

注意这里的this实际上就是对应函数中的(observable)这个参数。然后调用operation(this)后返回的是一个新的Observable,同时这个参数observable会执行subscribe方法,这个方法会将这些Observable串起来调用。

 

pipe多参数实现

pipe(...operations) {
    return operations.reduce((prev, fn) => fn(prev), this);
}

以上这个函数实现的具体功能就是形成一个函数嵌套调用,并且方向是从左向右的。

这个函数的实现最经典的算是在redux中的一段源码啦,有兴趣的可以看看这个框架,本身代码不多,但是阅读起来不容易理解,感兴趣的可以去看看。

下面我用测试代码验证下这个函数:

const letObservable = of(1,2,3);
const a = interval(500).pipe(map((v) => 'a' + v), take(3));
const b = interval(500).pipe(map((v) => 'b' + v), take(3));
letObservable.pipe(merge(a, b)).subscribe((value) => console.log(value));

日志信息如下:

这里我实现了另外的take和merge方法,调用情况可以知道和RxJs的效果一致。这里我也贴出他们的实现。

take实现

export function take(num) {
    return (observable) => (
      new MyObservable(observer => {
        let times = 0;
        let subscription = observable.subscribe({
          next: val => {
            times++;
            if (num >= times) {
              observer.next(val)
            } else {
              observer.complete()
              //if (subscription)subscription.unsubscribe()         
            }
          },
          error: err => observer.error(err),
          complete: () => observer.complete(),
        });
      })
    )
}

tap实现

export function tap(fn) {
    return (observable) => {
        return new MyObservable(observer => {
            observable.subscribe({
                next: val => {
                    fn(val);
                    observer.next(val);
                },
                error: err => observer.error(err),
                complete: () => observer.complete(),
            });
        });
    };
}

merge实现

export function merge(...observables) {
    return (observable) => {
        let completeNum = 0;
        if (observable) {
            observables = [observable,...observables];
        }
        return new MyObservable(observer => {
            observables.forEach(observable => {

                observable.subscribe({
                    next: val => observer.next(val),
                    error: err => {
                        observables.forEach(observable.unsubscribe);
                        observer.error(err)
                    } ,
                    complete: () => {
                        completeNum++;
                        if (completeNum === observables.length) {
                            observer.complete();
                        }
                        
                    },
                });
            });
        });
    };
}

总结:

1、一些基本的操作(比如map或者filter)都是返回一个函数类型,而函数的参数实际上就是一个Observable     对象,并且返回一个新的Observable对象。

2、pipe管道符操作的参数是一个函数数组,而这个函数数组中的对象都是rxjs中的一些基本操作,这些操作     可以进行组合。

3、接以上第一点,返回的新的Observable对象与后续的Observable对象构成一个链式结构,这些对象接收     订阅后开始执行代码逻辑

好了,欢迎各位参与讨论。

 

 

 

版权声明:著作权归作者所有。

X

欢迎加群学习交流

联系我们