每一行代码都是深思熟虑……
FromSlice FromChan Of Range Subject Timeout Interval Merge Concat Race CombineLatest Empty Never Throw
FromSlice
FromChan
Of
Range
Subject
Timeout
Interval
Merge
Concat
Race
CombineLatest
Empty
Never
Throw
Do Take TakeWhile TakeUntil Skip SkipWhile SkipUntil IgnoreElements Share StartWith Zip Filter Distinct DistinctUntilChanged Debounce DebounceTime Throttle ThrottleTime First Last Count Max Min Reduce Map MapTo MergeMap MergeMapTo SwitchMap SwitchMapTo
Do
Take
TakeWhile
TakeUntil
Skip
SkipWhile
SkipUntil
IgnoreElements
Share
StartWith
Zip
Filter
Distinct
DistinctUntilChanged
Debounce
DebounceTime
Throttle
ThrottleTime
First
Last
Count
Max
Min
Reduce
Map
MapTo
MergeMap
MergeMapTo
SwitchMap
SwitchMapTo
import ( . "github.com/langhuihui/RxGo/rx" ) func main(){ err := Of(1, 2, 3, 4).Take(2).Subscribe(ObserverFunc(func(event *Event) { })) }
import ( . "github.com/langhuihui/RxGo/rx" . "github.com/langhuihui/RxGo/pipe" ) func main(){ err := Of(1, 2, 3, 4).Pipe(Skip(1),Take(2)).Subscribe(ObserverFunc(func(event *Event) { })) }
管道模式相比链式模式,具有操作符 可扩展性 ,用户可以按照规则创建属于自己的操作符
type Operator func(Observable) Observable
操作符只需要返回Operator这个类型即可,例如 实现一个happy为false就立即完成的操作符
func MyOperator(happy bool) Operator { return func(source Observable) Observable { return func (sink *Observer) error { if happy{ return source(sink) } return nil } } }
在任何时候,您都可以创建自定义的Observable,用来发送任何事件
import ( . "github.com/langhuihui/RxGo/rx" ) func MyObservable (sink *Control) error { sink.Next("hello") return nil } func main(){ ob := Observable(MyObservable) ob.Subscribe(ObserverFunc(func(event *Event) { })) }
所谓Observable,就是一个可以被订阅,然后不断发送事件的事件源,见如下示意图
Observable
time --> (*)-------------(o)--------------(o)---------------(x)----------------|> | | | | | Start value value error Done
该示意图代表了,事件被订阅后(Start)开始不停发送事件的过程,直到发出error或者Done(完成)为止
有的Observable并不会发出完成事件,比如Never
参考网站: rxmarbles
实现Rx的关键要素,是要问几个问题
Channel
Observer
如何向普通用户解释复杂的概念
当用户需要订阅或者终止事件流,则进行链路传递,订阅或者终止所有中间过程中的事件源
Observable---------Operator----------Operator-----------Observer <| <| <| 订阅/取消 订阅/取消 订阅/取消
当事件流完成或者报错时,需要通知下游事件流的完成或者报错
Observable---------Operator----------Operator-----------Observer |> |> |> 完成/错误 完成/错误 完成/错误
实际情况远比这个复杂,后面会进行分析
Observable 被定义成为一个函数,该函数含有一个类型为*Observer的参数。
type Observable func(*Observer) error
任何事件源都是这样的一个函数,当调用该函数即意味着 订阅 了该事件源,入参为一个Observer,具体功能见下面
如果该函数返回nil,即意味着 事件流完成
否则意味着 事件流异常
type Stop chan bool type Observer struct { next NextHandler //缓存当前的NextHandler,后续可以被替换 dispose Stop //取消订阅的信号,只用来close complete Stop //用于发出完成信号 err error //缓存当前的错误 }
该控制器为一个结构体,其中next记录了当前的NextHandler,
在任何时候,如果关闭了dispose这个channel,就意味着 取消订阅 。
//Dispose 取消订阅 func (c *Observer) Dispose() { select { case <-c.dispose: default: close(c.dispose) } } //Aborted 判断是否已经取消订阅或者已完成 func (c *Observer) Aborted() bool { select { case <-c.dispose: return true case <-c.complete: return true default: return false } }
由于Channel的close可以引发所有读取该Channel的阻塞行为唤醒,所以可以在不同层级复用该channel
并且,由于已经close的channel可以反复读取以取得是否close的状态信息,所以不需要再额外记录
Observer对象为Observable和事件处理逻辑共同持有,是二者沟通的桥梁
type Event struct { Data interface{} Target *Observer } NextHandler interface { OnNext(*Event) }
NextHandler是一个接口,实现OnNext函数,当Observable数据推送到Observer中时,即调用了该函数
NextHandler
OnNext
Target属性用于存储当前发送事件的Observer对象,有两大重要使命
Target
这样做的好处是可以实现不同的观察者,比如函数或者channel
type( NextFunc func(*Event) NextChan chan *Event ) func (next NextFunc) OnNext(event *Event) { next(event) } func (next NextChan) OnNext(event *Event) { next <- event }
//TakeUntil 一直获取事件直到unitl传来事件为止 func (ob Observable) TakeUntil(until Observable) Observable { return func(sink *Observer) error { go until(sink.New3(NextFunc(func(event *Event) { //获取到任何数据就让下游完成 sink.Complete() //由于复用了complete信号,所以会导致所有复用complete的事件流完成 }))) return ob(sink) } }
TakeUnitl的用途是,传入一个until事件源,当这个until事件源接受到事件时,就会导致当前的事件源”完成”。相当于某种中断信号。
看似简短的代码,确考虑各种不同的情形
几大实现细节: