# my-rxjs **Repository Path**: bibinocode/my-rxjs ## Basic Information - **Project Name**: my-rxjs - **Description**: 仿写rxjs - **Primary Language**: TypeScript - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 2 - **Created**: 2023-01-04 - **Last Updated**: 2023-07-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 仿生Rxjs - rxjs是一个可观察序列组合异步和基于事件的库。 - 提供了核心类型,Observable(被观察者),Observer(观察者),Schedulers(调度),Subjects(订阅者)和一些操作符。 - 可用于优雅的处理异步,如封装拖拽,redux类似的库 - `ReactiveX` 结合了观察者模式和迭代器模式,并将函数式编程与集合相结合,以满足对理想管理事件序列的需求 - Observable 表示可调用的未来值或事件的集合的想法 - Observer 是一组回调,知道如何监听 Observable 传递的值 - Subscription 表示 `Observable` 的执行,主要用于取消执行 - Operators 是纯函数,使用操作(如 `map、`filter、`concat`、`reduce` 等)处理集合时具有函数式编程风格 - Subject 等同于 `EventEmitter`,是将值或事件多播到多个 `Observer` 的唯一方法 - Schedulers 是集中式调度程序,用于控制并发,允许我们协调计算发生在例如 setTimeout、requestAnimationFrame 或其他位置的时间 - [官方文档](https://rxjs.dev/) - [入门指南](https://rxjs.dev/guide/overview) - [例子](https://rxjs.dev/examples) - [常见问题](https://rxjs.dev/faq) - [中文文档](https://cn.rx.js.org/) - [github源码](https://github.com/ReactiveX/rxjs) - [rxjs弹珠图](https://rxmarbles.com/) - [rxjs可视化](https://rxviz.com/) - [explorer](https://reactive.how/rxjs/explorer) ## Observable - 可观察对象是懒惰的多个值的`Push`集合,可观察对象是一种异步数据流,它可以在将来推送多个值。它们是惰性的,因为它们不会立即开始发送值,直到有人订阅它们。 ### pull 和 push - `pull`和`push`是两种不同的协议,用于描述数据生产者如何与消费者进行通信。 - 在`pull`系统中,消费者决定何时从数据生产者索取数据。生产者本身并不知道何时向消费者提供数据。 - 每个`javascript` 函数都是`pull`系统。函数是数据生产者,而调用函数的代码通过`拉出`单个返回值从其调用中消费它。 例如: ```javascript // 有一个生成器函数 作为数据生产者 function* generator(){ yield 1; yield 2; yield 3; } // 生产者无法主动提供数据,需要消费者调用消费 const it = generator() console.log(it.next().value) // 1 console.log(it.next().value) // 2 console.log(it.next().value) // 3 ``` - `push` 由生产者决定何时推送数据,消费者不知何时会收到数据。 - `Promise` 是js中常见的推送系统,`Promise` 向已注册的回调函数提供已解析的值。 - RxJS引入了可观察对象,这是一种新的 JavaScript 推送系统。可观察对象是多个值的生产者,将它们推送到观察者(Consumers) 例如: ```javascript function observable(){ return Promise((resolve,reject)=>{ setTimeout(()=>{ resolve(1) },2000) }) } observable().then((res)=>console.log(res)) button.addEventListener('click', function() { console.log('Button was clicked!'); }); ``` | 生产者 | 消费者 | | | :----- | :------------------------- | -------------------------- | | Pull | 被动:在请求时生成数据 | 主动:决定何时请求数据 | | Push | 主动:以自己的速度生成数据 | 被动:对收到的数据做出反应 | ### 使用Rxjs中的Observeable ![img](README.assets/stream2_1671959921595.png) - `Observer(观察者)` 是由可观察对象传递的值的消费者。观察者仅仅是一组回调,每种类型的通知由可观察对象传递:`next`,`error` 和 `complete` - 要使用`Observer(观察者)`,请将其提供给可观察对象的`subscribe` - 观察者只是带有三个回调的对象,每种类型的通知都有一个回调,可观察对象可能传递这些通知 例: ```javascript const observer = { next:(value)=>console.log(value), error:(err)=>console.log(err), complete:()=>console.log('done') } observable.subscribe(observer) ``` - `RxJS`中的观察者也可能是部分可选的。如果不提供其中一个回调,可观察对象的执行仍然会正常进行,但是某些类型的通知将被忽略,因为观察者中没有相应的回调 - 在订阅可观察对象时,您也可以将`next`回调作为参数提供,而不必附加到观察者对象上,在 `observable.subscribe` 内部,它将使用回调参数作为`next`处理程序创建观察者对象 例: ```javascript // 只传递next单回调 observable.subscribe(value=>console.log(value)) ``` - `调用`或`订阅`是一个隔离的操作:两次函数调用会触发两个单独的副作用,两次可观察对象订阅会触发两个单独的副作用。与`事件发射器(EventEmitters)`不同,事件发射器共享副作用并且无论是否存在订阅者都有急切执行,而可观察对象没有共享的执行并且是懒惰的 - `Observables` 可以使用 `new Observable` 或创建操作符创建,使用观察者订阅,执行以向观察者发送 `next` / `error` / `complete` 通知,并且可以对其执行进行处理 - Observable 的核心关注点 - 创建 `Observables` - 订阅 `Observables` - 执行 `Observables` - 处理 `Observables` 例: ```javascript import {Observable} from 'rxjs' const observable = new Observable((subscribe)=>{ subscribe.next(1) subscribe.next(2) subscribe.next(3) subscribe.complete() // or subscribe.error('err') }) const observer = { next:(value)=>console.log(value), error:(err)=>console.log(err), complete:()=>console.log('done') } observable.subscribe(observer) ``` ## Operators - 在Rxjs中`Observable`控制流的状态是基层,但是最有用的是`operator`(操作符),操作符允许复杂的异步代码以声明的形式进行组合成基础单元。操作符主要作用于操作,组合,流中的数据 - 操作符本质上是一个纯函数,接收一个`Observable` 创建出一个新的`Observable`,期间不产生副作用,作为参数传入的`Observable`保持不变。订阅输出 Observable 同样会订阅输入 Observable - 操作符类型 - Creation Operators 创建操作符,它们用于创建新的 Observable。这些操作符可以从各种不同的数据源(如数组、对象、Promise 等)创建 Observable,并可以控制 Observable 的行为(如发出值的频率、顺序等),像`of`、`from`、`timer`、`interval`和`fromEvent`等 - Transformation Operators 转换操作符是 RxJS 中的一类特殊的操作符,它们用于将输入 Observable 转换为新的输出 Observable。这些操作符可以对输入 Observable 中的值进行转换、过滤、合并等操作,以便在输出 Observable 中呈现出所需的信息。像`map`,`filter`等 - Combination Operators 组合操作符用于将多个 `Observable` 合并成一个新的 `Observable`。这些操作符可以帮助你创建复杂的数据流,并控制它们之间的关系,像`merge`和`concat`等 - Filtering Operators 过滤操作符用于过滤输入 Observable 中的值,只返回符合特定条件的值。这些操作符可以帮助你创建精确的数据流,并且非常实用。像`filter`等 - Multicasting Operators 多播操作符用于将单个`Observable`共享给多个观察者(Observer)。这些操作符可以帮助你控制 Observable 的行为,并有效地利用资源。像`share`等 - [rxmarbles](https://rxmarbles.com/) ## of && form - RxJS 的 `of` 操作符允许你创建一个`Observable`,它发出一组项目,然后完成 - 你可以使用它来将任何值发送到一个`Observable`中 - 例如,可以传入一个数值数组来转换为`Observable` ```javascript import {of} from 'rxjs' // of内部会将参数转为数组形式 const arrayLike = of(1,2,3) arrayLike.subscribe({ next:value => console.log(value), complete:()=>console.log("done") }) ``` - `of` 操作符是同步的,意味着它会立即发出所有值,并立即完成,如果你需要异步发出值,你可以使用`from`操作符 - RxJS 的 from 操作符允许你将多种不同的数据类型转换为 Observable,包括数组、类数组对象(如arguments对象)、迭代器和可观察对象 - `from` 操作符是异步的,意味着它会在内部使用内置的调度 ```javascript import {from} from 'rxjs' const promiseLike = from(Promise.resolve(4)) promiseLike.subscribe({ next:(value)=>console.log(value), error:error=>console.log(error), complete:()=>console.log('done') }) ``` - `of`的内部是调用了`from`操作符进行实现 ![image-20230105194529087](README.assets/image-20230105194529087.png) - 根据源码整理依赖关系 of -> from -> innerFrom ## fromEvent - Rxjs的`fromEvent`函数允许你将浏览器事件转为`Observable` - 返回`subscriber`对象挂载`unsubscribe`取消订阅 - 函数接收两个参数 - 参数1:事件目标,例如`DOM`元素或者`window`对象 - 参数2:事件名称,例如`click`或`scroll` - 使用: ```javascript import {fromEvent} from 'rxjs' const source = fromEvent(document,'click') const subscriber = source.subscribe(console.log) ``` ## Subscription - `Subscription`用于取消`Observable`的执行(取消订阅) - `Subscriber` 继承于`Subscription` - 调用`observable.subscribe()`返回的`subscriber`上挂载了一个`unsubscribe`方法用于取消订阅 ```javascript import { fromEvent } from './rxjs' const source = fromEvent(document, 'click'); const subscriber = source.subscribe(console.log) setTimeout(() => { subscriber.unsubscribe(); }, 1000) ``` ## map && filter - `map`操作符,允许你对一个`Observable`中的每个值进行转换,并返回一个新的`Observable`。它接收一个函数作为参数,该函数定义如何转换值。 - `filter`操作符,允许你选择性过滤`Observable`中的值。接收一个函数作为参数,函数内部定义如何过滤值。 - 使用 ```javascript import {of,map,filter} from 'rxjs' of(1,2,3) .pipe(map(value=>value * 2)) // 2 4 6 .pipe(filter(v=>v > 3)) // 4 6 .pip(map(x=>x + 1)) // 5 7 .subscribe(console.log) ``` ![img](README.assets/imageoperator_1671960903708.png) ![img](README.assets/imageoperator2_1671960942520.png) ## pipe - `pipe`作为`Observable`对象上的方法,允许你将多个操作符链接在一起,在单个表达式中执行复杂的数据处理流程。 - 例 ```javascript import { of, map, filter } from 'rxjs' const subscriber = of(1, 2, 3) .pipe( map(val => val * 2), filter(val => val > 3), map(data => data + 1) ) subscriber.subscribe(console.log) ``` ## asyncScheduler - 使用 setTimeout(task,duration)调度任务 - async 调度器通过将任务放在 JavaScript 事件循环队列中异步地调度任务。它最适用于延迟任务的执行或定期执行任务 - 类似setTimeout,但是可以在内部重复调用 ```js import { asyncScheduler } from './rxjs' function task(state) { console.log('state: ', state); if (state < 5) { this.schedule(state + 1, 1000); } } asyncScheduler.schedule(task, 1000, 0); ``` ## timer [Observable | RxJS 中文文档](https://cn.rx.js.org/class/es6/Observable.js~Observable.html#static-method-timer) - `timer`是一个工厂函数,创建一个 发出数字的`Observable`,该 `Observable` 在初始延时(`initialDelay`)之后开始发送并且在每个时间周期( `period`)后发出自增的数字。 > 类似于interval,但是可以指定什么时候开始发送 - 每隔一秒发出自增的数字,3秒后开始发送: ```javascript import {timer} from 'rxjs' const numbers = timer(3000, 1000); numbers.subscribe(x => console.log(x)); ``` - 五秒后发出一个数字 ```js var numbers = timer(5000); numbers.subscribe(x => console.log(x)); ``` ## interval - `interval` 返回一个发出无限自增的序列整数, 你可以选择固定的时间间隔进行发送。 第一次并 没有立马去发送, 而是第一个时间段过后才发出。 默认情况下, 这个操作符使用 async 调度器来 提供时间的概念,但也可以给它传递任意调度器。 - `interval` 函数会一直发送数字,直到你取消订阅。你可以使用 take 操作符限制发送的数字数量 - 每一秒发出一个自增数 ```js var numbers = interval(1000); numbers.subscribe(x => console.log(x)); ``` ## take - `take` 返回的 Observable 只发出源 Observable 最初发出的的N个值 (N = `count`)。 如果源发出值的数量小于 `count` 的话,那么它的所有值都将发出。然后它便完成,无论源 Observable 是否完成。 ```js // 获取时间间隔为1秒的 interval Observable 的最初的5秒 import {interval,take} from 'rxjs' interval(1000) .pipe(take(5)) .subscribe(console.log) // 0 1 2 3 4 ``` ## Subject - `Subject`是`Observable`的特殊类型,它可以将值广播给多个观察者,就像`EventEmitter`一样。 - 每个`Subject`都是一个`Observable`和一个`Observer`。可以订阅`Subject`来调用`next`,`error`,`complete`。因为它是继承`Observable`。 - 冷推流和热推流 - 冷推流:在每一个订阅者订阅时重新发送数据,并且获取到的数据是独立的。 ```javascript import {Observable} from 'rxjs' const observer = new Observable(subscribe=>{ subscribe.next(1) subscribe.next(2) subscribe.next(3) }) observer.subscribe(console.log) observer.subscribe(console.log) // 两个订阅者拿到的都是完整的数据,并且是独立的 ``` - 热推流:每次订阅后只有一个观察者,下一个观察者要进行订阅时是一次新的数据流程。因此热推流和`Observer`是一对一关系。 ```javascript import {Subject} from 'rxjs' const subject = new Subject() subject.next(1) subject.subscribe({next:(x)=>console.log("a:",x)}) subject.next(2) subject.subscribe({next:(x)=>console.log("b:",x)}) subject.next(3) // a: 2 a:3 b:3 // 第一次next调用时并没有订阅者 ``` ## buffer - 两个`buffer` - bufferTime `bufferTime`是一个静态操作符,它会按照指定间隔时间将`Observable`发出的值存储在数组中,然后将这些数组作为单独的值发出。 ```javascript import {interval,bufferTime} from 'rxjs' // 没间隔一秒输出自增数值 interval(1000) .pipe(bufferTime(3000)) // 三秒内存储的数据进行存储 .subscribe(value=>{console.log(value)}) // [0,1,2] ``` - bufferCount `bufferCount`它会将 Observable 中的值按照指定的数量缓存在数组中,然后将这些数组作为单独的值发出。 ```javascript import {interval,bufferCount} from 'rxjs' // 没间隔一秒输出自增数值 interval(1000) .pipe(bufferCount(3)) // 存储三个值后发出 .subscribe(value=>{console.log(value)}) // [0,1,2] ``` ## switchMap - `switchMap`转换操作符通常用于将一个`Obervable`的输出映射到另一个`Observable`,并将新的`Observable`的输出发送到输出流中 - `switchMap` 的行为非常类似于 `map` 操作符,但有一个重要的区别:它会取消订阅之前的 `Observable`,并订阅最新的 `Observable`。这意味着,如果有多个 `Observable` 输出,只会发出最新的 `Observable` 的输出 ```javascript import {interval,switchMap,from,take} from 'rxjs' //在这个例子中,它会每隔 1000 毫秒发出一个数字 const source$ = interval(1000).pipe(take(3)) //使用 take 操作符限制了 source$ 只会发出 3 个值(0、1、2) const seitch$ = source.pipe( //switchMap 操作符。它会将每个值映射成一个新的可观察对象,并且在这个新的可观察对象发出值之前,会取消订阅之前的可观察对象 //在这个例子中,每个值都会被映射成一个 Promise,该 Promise 在 2 秒后被解析 //最后使用 from 操作符将 Promise 转换为可观察对象 switchMap(n => from(new Promise(resolve => { setTimeout(() => resolve(n), 2000) }))) ) // take发出[0,1,2]然后switchMap转换内部from转为Promise返回值,因此会一直生成新的observable,然后取消订阅之前的观察对象。因此一直替换直到2 switch$.subscribe(n => console.log(n)); // 2 ``` ## mergeMap - `mergeMap`可以将源`Observable`的每个值映射到一个新的`Observable`中,并将它们合并到一个单独的输出`Observable`中。这个操作符可以用来执行多个异步操作,并将它们的结果合并到一起 - `map`操作符会把一个数据流转换成另一个数据流,但是它的转换函数必须是同步的,并且只能返回一个值 - `mergeMap`操作符也可以把一个数据流转换成另一个数据流,但是它的转换函数可以是异步的,会并发执行这些转换函数,因此值的顺序不确定 ``` source$.pipe( mergeMap(project: function(value: T, index: number): ObservableInput, concurrent: number): Observable) ``` - `source$`是源Observable - `project`函数接受源Observable的每个值和索引,并返回一个Observable - `concurrent`参数是可选的,用于指定最多有多少个内部Observable可以并发执行 ```javascript import {interval,mergeMap,from,take} from 'rxjs' const source$ = interval(1000) .pipe(take(3)) .pipe( // 异步谁先完成谁先输出,顺序不固定 mergeMap(val=>from(new Promise(resolve=>{ setTimeout(()=>resolve(val),1000 * val) }))) source$.subscribe(console.log) // 输出值不确定顺序 ``` ## concatMap - 将源值投射为一个合并到输出 Observable 的 Observable,以串行的方式等待前一个完成再合并下一个 Observable。 ```javascript import {interval,concatMap,from,take} from 'rxjs' const source$ = interval(1000) .pipe(take(3)) .pipe( // 会等待上一个Obsrvable发出值再执行下一个 concatMap(val=>from(new Promise(resolve=>{ setTimeout(()=>resolve(val),1000 * val) }))) source$.subscribe(console.log) // 0 1 2 ``` ## takeUntil - `takeUntil`是RxJS中的一个变换操作符,它会取消订阅源 Observable,并停止发出值,直到另一个 Observable 发出值 ```javascript import { interval ,takeUntil} from 'rxjs'; const source$ = interval(1000); const stop$ = new Subject(); //这段代码会每隔 1000 毫秒在控制台输出一个数字,直到 stop$ Observable 发出值 //一旦 stop$ Observable 发出值,就会停止发出数字,并取消订阅源 Observable。 const result$ = source$.pipe( takeUntil(stop$) ); result$.subscribe(x => console.log(x)); // 三秒后发出数据产生了值,输出就会停止 setTimeout(()=>{ stop$.next(100); },3000) ``` ## withLatestFrom - `withLatestFrom` 是 RxJS 中的一个变换操作符,它会在源 Observable 发出值时,取最新的值从另一个 Observable 中发出的值合并 ```js first$: -----0-----1-----2-----3-----4-----5| second$: -----------------0-----------------1| result$: -----------------[2,0]--[3,0]-[4,0]-[5,1]| ``` ```javascript import {interval } from 'rxjs'; import { withLatestFrom } from 'rxjs/operators'; const first$ = interval(1000); const second$ = interval(3000); // first每1秒产生一个新的值 // second美3秒产生新的值 // 当第second产生新值时,就会将first最新的值进行合并为数组 // 然后等待下一次新值的产生才会进行变化,期间first的值会一直变化 const result$ = first$.pipe( withLatestFrom(second$) ); result$.subscribe(([first, second]) => console.log(first, second)); ``` ## debounce - `debounceTime`操作符会在指定时间内忽略掉`Observable`发出的新值,如果在这段时间内源 Observable 再次发出值,则会重新计算这段时间。(防抖) ```javascript import { debounceTime } from 'rxjs/operators'; // 假设我们有一个名为 input$ 的 Observable,代表文本框的输入事件 input$.pipe(debounceTime(500)).subscribe(val => { // 在用户输入完文本后的 500 毫秒内,如果没有新的输入事件发生, // 就会在这里执行搜索请求 search(val); }); ``` - debounce - `debounce` 和 `debounceTime` 操作符类似,但是它允许你使用自定义函数来决定忽略掉源 `Observable` 发出的值的时间 ```javascript import { debounce, timer } from 'rxjs'; input$.pipe(debounce(() => timer(500))).subscribe(val => { // 在用户输入完文本后的 500 毫秒内,如果没有新的输入事件发生, // 就会在这里执行搜索请求 search(val); }); ``` ## lastValueFrom - `lastValueFrom` 是一种 RxJS 操作符,它是用于 `Observable` 序列的。它会返回一个新的 `Observable`,该 `Observable` 在源 `Observable` 完成(即不再发出任何项)时发出源 `Observable` 的最后一个值。 - 如果源 `Observable` 从未完成,则 `lastValue Observable` 将永远不会发出任何值。因此,通常需要与其他操作符(例如 `take` 或 `takeUntil`)结合使用,以便在源 `Observable` 完成之前终止观察 ```javascript import { lastValueFrom,interval,take } from 'rxjs'; const source = interval(1000); const lastValue = lastValueFrom(source.pipe( take(5), )); lastValue.then(console.log); ``` ## share - `share` 可以让多个观察者订阅同一个`Observable`的方法。通常用于避免在多个观察之间重复执行相同的数据请求。 - 假设有一个`Observable`,它可以执行一个HTTP请求以获取数据。如果在不同的组件中订阅了这个`Observable`,它就会执行多次HTTP请求、这样可能会导致性能问题。 - 这时,我们就可以使用`share` 方法来共享这个`Observable`,以避免多次执行相同Http请求。 - 具体来说,`share`方法会将`Observable`转换成一个`ConnectableObservable`,它可以让多个观察者订阅同一个`Observable` ,但是实际上只会一次数据请求。 ```js import {share,fromFetch} from 'rxjs' cosnt shareObservable = fromFetch('http:localhost:80/user').pipe(share()) shareObservabble.subscribe(res=>res.json()).then(res=>console.log(res)) shareObservable.subscribe(res=>res.json()).then(res=>console.log(res)) ``` ## fromFetch - `fromFetch`是rxjs封装的请求方法,获取得到资源作为`Observable`发出 ```js fromFetch('http://localhost:8080/api/user/1') .pipe( switchMap(response => { if (response.ok) { return response.json(); } else { throw new Error('Api request failed'); } }), ) .subscribe({ next: response => console.log(response), error: error => console.error(error), }); ``` ## merge - `merge` 将多个`Observable` 合并成一个`Observable` ,并将这些`Observable`中的值按执行时间顺序依次发出 ```js const first = of(1, 2, 3); const second = of(4, 5, 6); merge(first, second).subscribe(value => console.log(value)); // Output: 1, 4, 2, 5, 3, 6 ``` ## error - `catchError`和`throwError`都是用来处理`Observable`中发生的错误的 - `catchError`操作符允许你捕获一个`Observable`中发生的错误,并返回一个新的`Observable`来取代原来的`Observable` - `catchError` 的方法是将它作为 Observable 的链式调用的一部分,并传入一个回调函数作为参数。回调函数接收一个错误对象作为参数,并返回一个新的 Observable。这个新的 Observable 将会取代原来的 Observable,并继续执行后续的操作。 - `throwError` 操作符则是用来显式地抛出一个错误的。它返回一个不包含任何值的 Observable,并立即终止。通常,你可能会使用 `throwError` 来表示一个不可恢复的错误,例如网络连接 catchError ```js import { Observable ,of} from 'rxjs'; import { catchError } from 'rxjs/operators'; const source$ = new Observable(subscriber=>{ setTimeout(()=>{ subscriber.error(new Error('发生了错误')); },1000); }); source$.pipe( catchError(error => of('正常值')), ).subscribe({ next: value => console.log('next',value), error: error => console.error('error',error), complete: () => console.log('complete'), }); import { Observable ,of, throwError} from 'rxjs'; import { catchError } from 'rxjs/operators'; const source$ = new Observable(subscriber=>{ subscriber.error({success:false}); }); source$.pipe( catchError(error => { //return ["hello"] return throwError(()=>error); }), ).subscribe({ next: value => console.log('next',value), error: error => console.error('error',error), complete: () => console.log('complete'), }); ``` ## throwIfEmpty - `throwIfEmpty`操作符用于在源 Observable 完成后,如果没有发出任何值,就抛出一个错误 ```js import { Observable, throwIfEmpty } from 'rxjs'; const source$ = new Observable(subscriber => { subscriber.next(1); subscriber.complete(); }); source$ .pipe(throwIfEmpty()) .subscribe({ next: user => console.log(user), error: error => console.error(error), }) ```