RxJS 如何助力业务开发?
豆皮粉儿们,大家好,又见面了。
这次又给大家带来了 RxJS 的相关文章,本文将结合实际项目需求和示例来给大家讲解 RxJS 的优缺点和最佳实践。
本文作者:阳羡
⚠️注意啦:字节跳动春招开始了,大家ready了吗?欢迎大家找我内推,最快方式进入到部门筛选,筛选方式可以在公众号后台回复关键字,详情见本文末❤️
背景/目的
rxjs 在我们实际项目中被大量使用,并证明了收益。
本文与大家分享收益与沉淀的 best practice。
优势 - 副作用管理
个人认为 rxjs 最强的优势就是副作用管理。什么是副作用管理呢?
你未看此花时,此花与汝心同归于寂。你来看此花时,则此花颜色一时明白起来 --王阳明
一旦一个 Subscribable 未被 subscribe ,其逻辑会在执行完当前 operator 后停止。
为了最大限度的利用这个特性,个人建议把逻辑拆成更小的函数,并用更多的 operator 连接。
如果不需要这个特性,就没有使用 rxjs 的必要性,仅仅徒增复杂度而已,如果使用了 rxjs,但是却没有利用这个特性,我建议还不如直接使用 Promise
。
举一个实际的例子来说明 rxjs 是如何进行副作用管理的,比如说常见的 polling 的需求,大家一般会选择如下的写法:
const resultId = await getResultId(dsl);
history.push(`?resultId=${resultId}`);
const result = await polling(resultId);
setState(result);
以上写法有问题吗?
其实几乎每句都有问题😂
•第二句话:如果用户发起了请求后立刻切走了路由,那么第二句话会把路由切回去•第三句话:如果用户发起了新的查询请求,虽然不需要上次的请求了,但是并不会取消上次的查询,浪费服务端资源与前端请求数。•第四句话:如果组件被卸载了,此时再去 setState
会报一个 warning。但这不是关键问题,关键问题是 竞速问题[2]。如果前一个轮询较慢,而最新的轮询较快,用户很可能看见的结果是前一个轮询的结果。
这时候有同学说,的确,那我给每一行加上检测是否 unmount
或者 AbortController
不就行了。
你以为你在写 go 啊
这样会有两个问题:
•代码变得异常臃肿,降低了可维护性、可读性•其次把这种机械性的逻辑交给人来写,一方面浪费人力与时间,另一方面也不可靠,万一忘了呢
而 rxjs 就零成本的解决了这些问题:
useEffect(() => {
const subscription = dslInput$.pipe(
switchMap(dsl => getResultId(dsl)),
tap(resultId => history.push(`?resultId=${resultId}`)),
switchMap(resultId => polling(resultId)),
).subscribe(result => setState(result));
return () => subscription.unsubscribe()
}, [])
当组件卸载,调用 subscription.unsubscribe()
,副作用则会立刻停止,不再执行之后的逻辑。当然还有其他优点,不过不是 rxjs 特有的优点,这里就不展开了。
缺点 - 函数式/声明式
个人认为 rxjs 没有什么特别大的缺点,它最大的缺点就是函数式/声明式的通病,就是不方便处理生命周期长的变量。
举个例子,比如说你要给上面的代码加上埋点,看看这些请求一共耗时多少,于是代码就膨胀成了这样:
dslInput$.pipe(
map(dsl => ({dsl, startTime: new Date()}))
switchMap(async ({dsl, startTime}) => ({resultId: await getResultId(dsl), startTime})),
tap(({resultId}) => history.push(`?resultId=${resultId}`)),
switchMap(async ({resultId, startTime}) => ({result: await polling(resultId), startTime})),
tap(({startTime}) => Tracker.collect(Date.now() - startTime.getTime())),
).subscribe(({result}) => setState(result));
啊这,好乱啊,真心看不下去。
如果有大佬有解决方案,希望能带带我。
其实 Promise
本身也算是属于函数式,所以它也有这个问题,那么它是怎么解决的呢?
升级成了 async/await
的语法糖🍬。
基本概念
流
我在 Rxjs TENET 问题解析 中已经写过一个最简单的 Subject
了,没有什么 magic:
class Subject<T> {
private fns: Array<(value: T) => void> = []
public next(value: T) {
this.fns.forEach(fn => fn(value))
}
public subscribe(fn: (value: T) => void) {
this.fns.push(fn)
return () => {
this.fns = this.fns.filter(x => x !== fn)
}
}
}
副作用管理也没有什么 magic,它依赖于第4行,当没有任何 subscription
时,便不会调用任何函数,即逻辑不会继续往下执行。
Observable
就是没有 next
的 Subject
。通常是由单一的数据源进行控制,一般来说,不需要开发者自己去调用构造函数,最常见的是来自于 pipe
的返回值。
ReplaySubject
内部有一个保存 value
的数组,在第4行前会记录一下当前的 value
,每当有一个新的 Subscription
,会把最近 n 项的 value
重播给订阅方。
BehaviorSubject
相当于 n 为 1 的 ReplaySubject
。因为比 ReplaySubject
常用,所以单独成为了一个类。我还真没怎么用过 ReplaySubject
。
pipe
用于连接 Observable
,一般主要逻辑都会放在其中执行。一般来说,其中逻辑是否执行有两个必要条件:
•输出流在 pipe
之后产生值•BehaviorSubject
和 ReplaySubject
在有新的 subscription
的时候会发出当前值,所以作为输入时必产生值•有订阅(subscribe
)方•例外:shareReplay
pipe
的原理是链式(reduce
)调用 innerSubscribe
函数,和 subscribe
的区别在于,不会触发副作用的执行。
对于输入流的每次 next
调用,pipe
中逻辑的默认执行次数为订阅方的数量。在绝大多数情况下,不希望逻辑执行多遍,于是需要使用多播。很多文档把多播说的非常复杂,个人觉得没有必要,懂这两个 operator
就行:share
、shareReplay
。把这两个 operator
放在 pipe
的最后一个位置,即可保证前面的逻辑执行次数与订阅方数量无关。不同的是,shareReplay
不管有没有订阅方,都会执行逻辑,而 share
必须存在第一个订阅方之后才开始执行逻辑。
使用 shareReplay
的又叫做热流,其他叫做冷流。在有订阅时,冷流开始执行逻辑,在无订阅时,冷流终止逻辑,冷流具有 rxjs 的核心优势:副作用管理。热流与是否订阅无关,在没有订阅时,也不会终止逻辑。因此,个人很少把 shareReplay
与异步结合使用。
参考资料:可以深入了解[1]
引用链接
[1]
可以深入了解: https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html#hot-vs-cold-observables
综上,使用 rxjs 包装逻辑的基本结构如下:
const input$ = new Subject();
const output$ = new BehaviorSubject(initValue);
const subscription = input$.pipe(
operator1(),
operator2(),
).subscribe(output => output$.next(output));
•约定:在 UI 层消费数据时,input$ 结尾的流仅可使用 next
;output$ 结尾的流仅可使用 subscribe
•一般消费 output$
使用 hook:const output = useSubscribable(output$)
•记得把 subscription
传出去,在组件卸载时 unscribable
有同学问了,为什么不像这么写:
const input$ = new Subject();
const output$ = input$.pipe(
operator1(),
operator2(),
);
•第二种写法需要考虑多播。当冷流、热流混搭,最后到底什么执行了,什么没执行;或者什么副作用能被清理,什么无法清理,就乱掉了。而第一种写法根本不用考虑复杂的多播。You may not need broadcast.md。•在第二种写法中,output$
的类型是 Observable
。•当组件在数据发出后订阅的话,会丢失当前的数据。•没有初始值,调用时需要 useSubscribable(output$, initValue)
,初始值散落在各个 UI 组件里,降低了可维护性。startsWith
可以部分解决这个问题。•没有 next
方法,当需求变得复杂时,通常需要对 output$
进行处理,这时候又会改造成第一种写法。
一句话:第一种写法就不用考虑乱七八糟的概念问题
值得注意的是,一个流 error
或者 complete
后,无论如何订阅方都不会再收到新值了,并且不再执行任何逻辑。
input$.pipe(tap(() => throw new Error())).subscribe(x => {
// never reach here
})
所以需要自己注意错误处理,在我们项目中,封装了一些自定义 operator 将异步返回值包装为:
loading: boolean
error?: Error
data?: T
input?: any
从而防止因为出现错误,而导致流异常终止,无法继续执行的问题。
Rxjs 中有个特殊的 Observable
: NEVER
。它从不发出任何值,常用于提前终止逻辑,类似于普通函数中的 return
。
input$.pipe(
switchMap(x => {
if(!x) {
return NEVER
}
return x
}),
operator1()
).subscribe(x => {
// x will never be falsy
})
Operator
按好用度排序:
•switchMap: 超好用的 op。在 subscribe
下个流前,自动 unscbscribe
上个流,从而完美解决竞速问题。和异步有关的用它就对了。•debounce:一般个人喜欢与 pair
之类的 op 结合使用,通过判断哪个值改变从而决定 debounce
多少时间。•distinctUntilChanged:浅比较,如果两个值相等则不发出新值。一般传入深比较函数来使用。•withLatestFrom:获得目标流的最新值而不触发副作用。和 combineLatest
一样的是,如果某个流没有值,则会卡住,这个挺坑的。不同的是,combineLatest
参数中的流发出新值不会执行 pipe
中的逻辑。•exhaustMap。在当前流 complete
前,忽视其他流。一般用于和下载功能结合。值得注意的是,需要记得把上一个流 complete
,否则下一次逻辑永远不会执行。编写自定义 operator 时,一定要思考如何取消副作用。个人建议组合已有 operator 来编写。
赋能业务 - 以 form-result 为例
接下来看看 rxjs 是如何帮助业务快速迭代的。在我们的业务中,最常见的是查询分析页,由查询条件与结果展示两块组成,这样的结构我称之为 form-result。
但是不管是怎样的界面,都可以划分为如下几层:
一个业务组件通常结构如下:
const { input$, output$ } = useXxx(); // 从 context 中取数据
const handleClick = useCallback(() => input$.next(someValue)); // 使用 input$
const output = useSubscrible(output$); // 使用output$,大部分情况下为 BehaviorSubject,此时不需要初始值
return <div onClick={handleClick}>{output}</div> // 渲染界面
所有业务组件最外层是一个 Provider
const value = useXxxProvider();
return <XxxContext.Provider value={value}>{children}</XxxContext.Provider>
几乎所有的业务逻辑都在 useXxxProvider
里,useXxxProvider
会调用 createXxx
去生成各个子业务的逻辑,最后把所有逻辑进行 merge
并返回。
function useXxxProvider() {
const { observables, subscriptions } = useMemo(() => {
// 创建 form 有关的流
const {
observables: formObservables,
subscriptions: formSubscriptions
} = createForm();
// 创建 result 有关的流
const {
observables: resultObservables,
subscriptions: resultSubscriptions
} = createResult(formObservables.formOutput$);
// 把流和订阅返回
return {
observables: {
...formObservables,
...resultObservables
},
subscriptions: [...formSubscriptions, ...resultSubscriptions]
}
}, [])
// 一般没有任何依赖项,如有,需保证不变,或以 ref 形式传入
// 在组件卸载时销毁订阅
useEffect(() => {
return () => subscriptions.forEach(sub => sub.unsubscribe())
}, []);
// 返回生成的流
return observables;
}
其中,每一小块业务逻辑都被包装在 createXxx
里,其基本结构如下:
1. 创建有关的流,一般分为 input$
和 output$
2. 使用 pipe
连接两条流,包装主要业务逻辑3. 把流与订阅返回
function createXxx() {
// 创建有关的 Subject
const someSubject$ = new Subject();
// 连接流与业务逻辑
const subscription = someSubject$.pipe(...).subscribe()
// 返回流与订阅
return {
observables: { someSubject$ }
subscriptions: [subscription]
}
}
具体到我们的业务上,其中表单逻辑的代码如下:
1. 输入流的类型为 Partial<T>
,因为在表单 onChange
的时候,通常不会关心所有表单的值,大部分情况下只要把自己的值调用 formInput$.next()
就行了2. 把输入流中的值与原值进行 merge
,并根据实际业务添加参数校验与表单联动的逻辑后,就形成了输出流
function createForm() {
const formInput$ = new Subject<Pratial<Form>>();
const formOutput$ = new BehaviorSubject<Form>(INIT_FORM);
const subscription = formInput$.pipe(
withLatestFrom(formOutput$),
map(([input, output]) => {
// 如有其他的参数校验、表单联动等逻辑,可以加在这里
const extra = {}
if('someThing' in input) {
extra.otherThing = someValue
}
return {...output, ...input, ...extra}
})
).subscribe(output => formOutput$.next(output));
return {
observables: {
formInput$,
formOutput$
},
subscriptions: [subscription]
}
}
结果业务的代码如下:
1. 根据实际业务场景生成请求流,如遇请求参数与 UI state 差别很大的,可以在这里使用 map
进行转换2. 当产生请求流时,发送请求,生成结果流3. 把结果流和有关订阅返回
function createResult(formOutput$){
// 情况一:无查询按钮,修改表单则自动发请求
const request$ = formOutput$.pipe(
pair(),// 自定义op,第一次返回[undefined, T],其他和pairwise一致
debounce(([prev])=> timer(prev?0:800)),// 首次立刻发请求
distinctUntilChanged(equals) // 根据业务需要,可以增加这个,注意的是不传入深比较函数则为浅比较
);
// 情况二:点击查询按钮再发请求
const requestButtonInput$ = new Subject();
const request$ = requestButtonInput$.pipe(
withLatestFrom(formOutput$),
map(([,form]) => form)
);
// 情况三:两者兼具
const requestButtonInput$ = new Subject();
const request$ = merge(formOutput$.pipe(
pair(),// 自定义op,第一次返回[undefined, T],其他和pairwise一致
debounce(([prev])=> timer(prev?0:800))// 首次立刻发请求
), requestButtonInput$.pipe(
withLatestForm(formOutput$),
map(([, form]) => form)
));
// 准备完毕请求流,开始发请求
const resultOutput$ = new BehaviorSubject(INIT_RESULT)
const subscription = request$.pipe(
switchMap(request => getResult(request))
).subscribe(output => resultOutput$.next(output));
return {
observables: {
requestButtonInput$,
resultOutput$
},
subscriptions: [subscription]
}
}
useSubscribable
附上 useSubscribable
代码:
import useRefWrapper from 'hooks/useRefWrapper'
import { useState, useEffect } from 'react'
import { Subscribable, BehaviorSubject } from 'rxjs'
function getInitValue<T, R>(
subscribable: Subscribable<T>,
selector: (value: T) => R,
initValue?: R
) {
if (initValue !== undefined) {
return initValue
}
if (subscribable instanceof BehaviorSubject) {
return selector((subscribable as any)._value)
}
return undefined
}
export default function useSubscribable<T>(subscribable: BehaviorSubject<T>): T
export default function useSubscribable<T, R>(
subscribable: BehaviorSubject<T>,
selector: (value: T) => R
): R
export default function useSubscribable<T>(
subscribable: Subscribable<T>,
initValue: T
): T
export default function useSubscribable<T, R>(
subscribable: Subscribable<T>,
selector: (value: T) => R,
initValue: R
): R
export default function useSubscribable<T>(
subscribable: Subscribable<T>
): T | undefined
export default function useSubscribable<T, R>(
subscribable: Subscribable<T>,
selector: (value: T) => R
): R | undefined
export default function useSubscribable<T, R = T>(
subscribable: Subscribable<T>,
selectorOrInitValue?: (value: T) => R,
initValue?: R
): R | undefined {
const innerInitValue =
typeof selectorOrInitValue === 'function' ? initValue : selectorOrInitValue
const innerSelector =
typeof selectorOrInitValue === 'function'
? selectorOrInitValue
: (x: T) => (x as unknown) as R
const innerSelectorRef = useRefWrapper(innerSelector)
const [state, setState] = useState(() =>
getInitValue(subscribable, innerSelector, innerInitValue)
)
useEffect(() => {
const subscription = subscribable.subscribe((x) =>
setState(innerSelectorRef.current(x))
)
return () => subscription.unsubscribe()
}, [innerSelectorRef, subscribable])
return state
}
总结
•rxjs 的优势是副作用管理•如果使用 input$、output$,然后再使用 pipe 连接的形式,就不需要考虑多播、冷热等复杂概念也可轻松上手•注意错误处理
引用链接
[1]
竞速问题: https://en.wikipedia.org/wiki/Race_condition
[2]
可以深入了解: https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html#hot-vs-cold-observables
The End
如果你觉得这篇文章对你有帮助,有启发,我想请你帮我2个小忙:
1、点个「在看」,让更多的人也能看到这篇文章内容;
2、关注公众号「豆皮范儿」,公众号后台回复「加群」 加入我们一起学习;
关注公众号的福利持续更新,公众号后台送学习资料:
1、豆皮范儿后台回复「vis」,还可以获取更多可视化免费学习资料。
2、豆皮范儿后台回复「webgl」,还可以获取webgl免费学习资料。
3、豆皮范儿后台回复「算法」,还可以获取算法的学习资料。
4、豆皮范儿后台回复「校招」,获取校招内推码
5、豆皮范儿后台回复「社招」,获取内推二维码;
6、豆皮范儿后台回复「实习生」,获取内推二维码