查看原文
其他

RxJS 如何助力业务开发?

阳羡 豆皮范儿 2022-11-07

豆皮粉儿们,大家好,又见面了。
这次又给大家带来了 RxJS 的相关文章,本文将结合实际项目需求和示例来给大家讲解 RxJS 的优缺点和最佳实践。

本文作者:阳羡

⚠️注意啦:字节跳动春招开始了,大家ready了吗?欢迎大家找我内推,最快方式进入到部门筛选,筛选方式可以在公众号后台回复关键字,详情见本文末❤️

背景/目的

rxjs 在我们实际项目中被大量使用,并证明了收益。
本文与大家分享收益与沉淀的 best practice。

优势 - 副作用管理

个人认为 rxjs 最强的优势就是副作用管理。什么是副作用管理呢?

你未看此花时,此花与汝心同归于寂。你来看此花时,则此花颜色一时明白起来 --王阳明

一旦一个 Subscribable 未被 subscribe ,其逻辑会在执行完当前 operator 后停止。

为了最大限度的利用这个特性,个人建议把逻辑拆成更小的函数,并用更多的 operator 连接。

如果不需要这个特性,就没有使用 rxjs 的必要性,仅仅徒增复杂度而已,如果使用了 rxjs,但是却没有利用这个特性,我建议还不如直接使用 Promise 。

举一个实际的例子来说明 rxjs 是如何进行副作用管理的,比如说常见的 polling 的需求,大家一般会选择如下的写法:

const resultId = await getResultId(dsl);history.push(`?resultId=${resultId}`);const result = await polling(resultId);setState(result);

以上写法有问题吗?

其实几乎每句都有问题😂

•第二句话:如果用户发起了请求后立刻切走了路由,那么第二句话会把路由切回去•第三句话:如果用户发起了新的查询请求,虽然不需要上次的请求了,但是并不会取消上次的查询,浪费服务端资源与前端请求数。•第四句话:如果组件被卸载了,此时再去 setState 会报一个 warning。但这不是关键问题,关键问题是 竞速问题[2]。如果前一个轮询较慢,而最新的轮询较快,用户很可能看见的结果是前一个轮询的结果。

这时候有同学说,的确,那我给每一行加上检测是否 unmount 或者 AbortController 不就行了。

你以为你在写 go 啊

这样会有两个问题:

•代码变得异常臃肿,降低了可维护性、可读性•其次把这种机械性的逻辑交给人来写,一方面浪费人力与时间,另一方面也不可靠,万一忘了呢

而 rxjs 就零成本的解决了这些问题:

useEffect(() => { const subscription = dslInput$.pipe( switchMap(dsl => getResultId(dsl)), tap(resultId => history.push(`?resultId=${resultId}`)), switchMap(resultId => polling(resultId)), ).subscribe(result => setState(result)); return () => subscription.unsubscribe()}, [])

当组件卸载,调用 subscription.unsubscribe(),副作用则会立刻停止,不再执行之后的逻辑。当然还有其他优点,不过不是 rxjs 特有的优点,这里就不展开了。

缺点 - 函数式/声明式

个人认为 rxjs 没有什么特别大的缺点,它最大的缺点就是函数式/声明式的通病,就是不方便处理生命周期长的变量。

举个例子,比如说你要给上面的代码加上埋点,看看这些请求一共耗时多少,于是代码就膨胀成了这样:

dslInput$.pipe( map(dsl => ({dsl, startTime: new Date()})) switchMap(async ({dsl, startTime}) => ({resultId: await getResultId(dsl), startTime})), tap(({resultId}) => history.push(`?resultId=${resultId}`)), switchMap(async ({resultId, startTime}) => ({result: await polling(resultId), startTime})), tap(({startTime}) => Tracker.collect(Date.now() - startTime.getTime())),).subscribe(({result}) => setState(result));

啊这,好乱啊,真心看不下去。

如果有大佬有解决方案,希望能带带我。

其实 Promise 本身也算是属于函数式,所以它也有这个问题,那么它是怎么解决的呢?

升级成了 async/await 的语法糖🍬。

基本概念

我在 Rxjs TENET 问题解析 中已经写过一个最简单的 Subject 了,没有什么 magic:

class Subject<T> { private fns: Array<(value: T) => void> = [] public next(value: T) { this.fns.forEach(fn => fn(value)) } public subscribe(fn: (value: T) => void) { this.fns.push(fn) return () => { this.fns = this.fns.filter(x => x !== fn) } }}

副作用管理也没有什么 magic,它依赖于第4行,当没有任何 subscription 时,便不会调用任何函数,即逻辑不会继续往下执行。

Observable 就是没有 next 的 Subject。通常是由单一的数据源进行控制,一般来说,不需要开发者自己去调用构造函数,最常见的是来自于 pipe 的返回值。

ReplaySubject 内部有一个保存 value 的数组,在第4行前会记录一下当前的 value,每当有一个新的 Subscription,会把最近 n 项的 value 重播给订阅方。

BehaviorSubject 相当于 n 为 1 的 ReplaySubject。因为比 ReplaySubject 常用,所以单独成为了一个类。我还真没怎么用过 ReplaySubject

pipe 用于连接 Observable,一般主要逻辑都会放在其中执行。一般来说,其中逻辑是否执行有两个必要条件:

•输出流在 pipe 之后产生值•BehaviorSubject 和 ReplaySubject 在有新的 subscription 的时候会发出当前值,所以作为输入时必产生值•有订阅(subscribe)方•例外:shareReplay

pipe 的原理是链式(reduce)调用 innerSubscribe 函数,和 subscribe 的区别在于,不会触发副作用的执行。

对于输入流的每次 next 调用,pipe 中逻辑的默认执行次数为订阅方的数量。在绝大多数情况下,不希望逻辑执行多遍,于是需要使用多播。很多文档把多播说的非常复杂,个人觉得没有必要,懂这两个 operator 就行:shareshareReplay。把这两个 operator 放在 pipe 的最后一个位置,即可保证前面的逻辑执行次数与订阅方数量无关。不同的是,shareReplay 不管有没有订阅方,都会执行逻辑,而 share 必须存在第一个订阅方之后才开始执行逻辑。

使用 shareReplay 的又叫做热流,其他叫做冷流。在有订阅时,冷流开始执行逻辑,在无订阅时,冷流终止逻辑,冷流具有 rxjs 的核心优势:副作用管理。热流与是否订阅无关,在没有订阅时,也不会终止逻辑。因此,个人很少把 shareReplay 与异步结合使用。

参考资料:可以深入了解[1]

引用链接

[1] 可以深入了解: https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html#hot-vs-cold-observables

综上,使用 rxjs 包装逻辑的基本结构如下:

const input$ = new Subject();const output$ = new BehaviorSubject(initValue);const subscription = input$.pipe( operator1(), operator2(),).subscribe(output => output$.next(output));

•约定:在 UI 层消费数据时,input$ 结尾的流仅可使用 next;output$ 结尾的流仅可使用 subscribe•一般消费 output$ 使用 hook:const output = useSubscribable(output$)•记得把 subscription 传出去,在组件卸载时 unscribable

有同学问了,为什么不像这么写:

const input$ = new Subject();const output$ = input$.pipe( operator1(), operator2(),);

•第二种写法需要考虑多播。当冷流、热流混搭,最后到底什么执行了,什么没执行;或者什么副作用能被清理,什么无法清理,就乱掉了。而第一种写法根本不用考虑复杂的多播。You may not need broadcast.md。•在第二种写法中,output$ 的类型是 Observable。•当组件在数据发出后订阅的话,会丢失当前的数据。•没有初始值,调用时需要 useSubscribable(output$, initValue),初始值散落在各个 UI 组件里,降低了可维护性。startsWith 可以部分解决这个问题。•没有 next 方法,当需求变得复杂时,通常需要对 output$ 进行处理,这时候又会改造成第一种写法。

一句话:第一种写法就不用考虑乱七八糟的概念问题

值得注意的是,一个流 error 或者 complete 后,无论如何订阅方都不会再收到新值了,并且不再执行任何逻辑。

input$.pipe(tap(() => throw new Error())).subscribe(x => { // never reach here})

所以需要自己注意错误处理,在我们项目中,封装了一些自定义 operator 将异步返回值包装为:

loading: booleanerror?: Errordata?: Tinput?: any

从而防止因为出现错误,而导致流异常终止,无法继续执行的问题。

Rxjs 中有个特殊的 ObservableNEVER。它从不发出任何值,常用于提前终止逻辑,类似于普通函数中的 return

input$.pipe( switchMap(x => { if(!x) { return NEVER } return x }), operator1()).subscribe(x => { // x will never be falsy})

Operator

按好用度排序:

•switchMap: 超好用的 op。在 subscribe 下个流前,自动 unscbscribe 上个流,从而完美解决竞速问题。和异步有关的用它就对了。•debounce:一般个人喜欢与 pair 之类的 op 结合使用,通过判断哪个值改变从而决定 debounce 多少时间。•distinctUntilChanged:浅比较,如果两个值相等则不发出新值。一般传入深比较函数来使用。•withLatestFrom:获得目标流的最新值而不触发副作用。和 combineLatest 一样的是,如果某个流没有值,则会卡住,这个挺坑的。不同的是,combineLatest 参数中的流发出新值不会执行 pipe 中的逻辑。•exhaustMap。在当前流 complete 前,忽视其他流。一般用于和下载功能结合。值得注意的是,需要记得把上一个流 complete,否则下一次逻辑永远不会执行。编写自定义 operator 时,一定要思考如何取消副作用。个人建议组合已有 operator 来编写。

赋能业务 - 以 form-result 为例

接下来看看 rxjs 是如何帮助业务快速迭代的。在我们的业务中,最常见的是查询分析页,由查询条件与结果展示两块组成,这样的结构我称之为 form-result。

但是不管是怎样的界面,都可以划分为如下几层:

一个业务组件通常结构如下:

const { input$, output$ } = useXxx(); // 从 context 中取数据const handleClick = useCallback(() => input$.next(someValue)); // 使用 input$const output = useSubscrible(output$); // 使用output$,大部分情况下为 BehaviorSubject,此时不需要初始值return <div onClick={handleClick}>{output}</div> // 渲染界面

所有业务组件最外层是一个 Provider

const value = useXxxProvider();return <XxxContext.Provider value={value}>{children}</XxxContext.Provider>

几乎所有的业务逻辑都在 useXxxProvider 里,useXxxProvider 会调用 createXxx 去生成各个子业务的逻辑,最后把所有逻辑进行 merge 并返回。

function useXxxProvider() { const { observables, subscriptions } = useMemo(() => { // 创建 form 有关的流 const { observables: formObservables, subscriptions: formSubscriptions } = createForm(); // 创建 result 有关的流 const { observables: resultObservables, subscriptions: resultSubscriptions } = createResult(formObservables.formOutput$); // 把流和订阅返回 return { observables: { ...formObservables, ...resultObservables }, subscriptions: [...formSubscriptions, ...resultSubscriptions] } }, []) // 一般没有任何依赖项,如有,需保证不变,或以 ref 形式传入 // 在组件卸载时销毁订阅 useEffect(() => { return () => subscriptions.forEach(sub => sub.unsubscribe()) }, []); // 返回生成的流 return observables;}

其中,每一小块业务逻辑都被包装在 createXxx 里,其基本结构如下:

1. 创建有关的流,一般分为 input$ 和 output$2. 使用 pipe 连接两条流,包装主要业务逻辑3. 把流与订阅返回

function createXxx() { // 创建有关的 Subject const someSubject$ = new Subject(); // 连接流与业务逻辑 const subscription = someSubject$.pipe(...).subscribe() // 返回流与订阅 return { observables: { someSubject$ } subscriptions: [subscription] }}

具体到我们的业务上,其中表单逻辑的代码如下:

1. 输入流的类型为 Partial<T>,因为在表单 onChange 的时候,通常不会关心所有表单的值,大部分情况下只要把自己的值调用 formInput$.next() 就行了2. 把输入流中的值与原值进行 merge,并根据实际业务添加参数校验与表单联动的逻辑后,就形成了输出流

function createForm() { const formInput$ = new Subject<Pratial<Form>>(); const formOutput$ = new BehaviorSubject<Form>(INIT_FORM); const subscription = formInput$.pipe( withLatestFrom(formOutput$), map(([input, output]) => { // 如有其他的参数校验、表单联动等逻辑,可以加在这里 const extra = {} if('someThing' in input) { extra.otherThing = someValue } return {...output, ...input, ...extra} }) ).subscribe(output => formOutput$.next(output)); return { observables: { formInput$, formOutput$ }, subscriptions: [subscription] }}

结果业务的代码如下:

1. 根据实际业务场景生成请求流,如遇请求参数与 UI state 差别很大的,可以在这里使用 map 进行转换2. 当产生请求流时,发送请求,生成结果流3. 把结果流和有关订阅返回

function createResult(formOutput$){ // 情况一:无查询按钮,修改表单则自动发请求 const request$ = formOutput$.pipe( pair(),// 自定义op,第一次返回[undefined, T],其他和pairwise一致 debounce(([prev])=> timer(prev?0:800)),// 首次立刻发请求 distinctUntilChanged(equals) // 根据业务需要,可以增加这个,注意的是不传入深比较函数则为浅比较 ); // 情况二:点击查询按钮再发请求 const requestButtonInput$ = new Subject(); const request$ = requestButtonInput$.pipe( withLatestFrom(formOutput$), map(([,form]) => form) ); // 情况三:两者兼具 const requestButtonInput$ = new Subject(); const request$ = merge(formOutput$.pipe( pair(),// 自定义op,第一次返回[undefined, T],其他和pairwise一致 debounce(([prev])=> timer(prev?0:800))// 首次立刻发请求 ), requestButtonInput$.pipe( withLatestForm(formOutput$), map(([, form]) => form) )); // 准备完毕请求流,开始发请求 const resultOutput$ = new BehaviorSubject(INIT_RESULT) const subscription = request$.pipe( switchMap(request => getResult(request)) ).subscribe(output => resultOutput$.next(output)); return { observables: { requestButtonInput$, resultOutput$ }, subscriptions: [subscription] }}

useSubscribable

附上 useSubscribable代码:

import useRefWrapper from 'hooks/useRefWrapper'import { useState, useEffect } from 'react'import { Subscribable, BehaviorSubject } from 'rxjs'function getInitValue<T, R>( subscribable: Subscribable<T>, selector: (value: T) => R, initValue?: R) { if (initValue !== undefined) { return initValue } if (subscribable instanceof BehaviorSubject) { return selector((subscribable as any)._value) } return undefined}export default function useSubscribable<T>(subscribable: BehaviorSubject<T>): Texport default function useSubscribable<T, R>( subscribable: BehaviorSubject<T>, selector: (value: T) => R): Rexport default function useSubscribable<T>( subscribable: Subscribable<T>, initValue: T): Texport default function useSubscribable<T, R>( subscribable: Subscribable<T>, selector: (value: T) => R, initValue: R): Rexport default function useSubscribable<T>( subscribable: Subscribable<T>): T | undefinedexport default function useSubscribable<T, R>( subscribable: Subscribable<T>, selector: (value: T) => R): R | undefinedexport default function useSubscribable<T, R = T>( subscribable: Subscribable<T>, selectorOrInitValue?: (value: T) => R, initValue?: R): R | undefined { const innerInitValue = typeof selectorOrInitValue === 'function' ? initValue : selectorOrInitValue const innerSelector = typeof selectorOrInitValue === 'function' ? selectorOrInitValue : (x: T) => (x as unknown) as R const innerSelectorRef = useRefWrapper(innerSelector) const [state, setState] = useState(() => getInitValue(subscribable, innerSelector, innerInitValue) ) useEffect(() => { const subscription = subscribable.subscribe((x) => setState(innerSelectorRef.current(x)) ) return () => subscription.unsubscribe() }, [innerSelectorRef, subscribable]) return state}

总结

•rxjs 的优势是副作用管理•如果使用 input$、output$,然后再使用 pipe 连接的形式,就不需要考虑多播、冷热等复杂概念也可轻松上手•注意错误处理

引用链接

[1] 竞速问题: https://en.wikipedia.org/wiki/Race_condition

[2] 可以深入了解: https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html#hot-vs-cold-observables

The     End

如果你觉得这篇文章对你有帮助,有启发,我想请你帮我2个小忙:

1、点个「在看」,让更多的人也能看到这篇文章内容;

2、关注公众号「豆皮范儿」,公众号后台回复「加群」 加入我们一起学习;


关注公众号的福利持续更新,公众号后台送学习资料:

1、豆皮范儿后台回复「vis」,还可以获取更多可视化免费学习资料。

2、豆皮范儿后台回复「webgl」,还可以获取webgl免费学习资料。

3、豆皮范儿后台回复「算法」,还可以获取算法的学习资料。

4、豆皮范儿后台回复「校招」,获取校招内推码

5、豆皮范儿后台回复「社招」,获取内推二维码;

6、豆皮范儿后台回复「实习生」,获取内推二维码



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存