基于阅读此问题:SubscribeOn和ObserveOn有什么区别
ObserveOn设置Subscribe执行处理程序中代码的位置:
ObserveOn
Subscribe
stream.Subscribe(_ => { // this code here });
该SubscribeOn方法设置在哪个线程上完成流的设置。
SubscribeOn
我被领会到,如果未明确设置这些参数,则使用TaskPool。
现在我的问题是,让我说这样的事情:
Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));
鉴于在分配器上执行,Where predicate并且SelectMany lots_of在哪里some_action执行?
Where
predicate
SelectMany
lots_of
some_action
有关SubscribeOn和的信息有很多误导性ObserveOn。
IObservable<T>
Dispose
IDisposable
IObserver<T>
OnNext
OnCompleted
OnError
该声明
ObserveOn设置在订阅处理程序中执行代码的位置:
比帮助更令人困惑。您所说的“订阅处理程序”实际上是一个OnNext处理程序。请记住,Subscribe方法IObservable接受一个IObserver有OnNext,OnCompleted和OnError方法,但扩展方法提供了接受lambda表达式,并建立一个方便重载IObserver实现你。
IObservable
IObserver
不过让我来修饰这个词。我认为“订阅处理程序”是在被调用时被调用 的可观察 代码Subscribe。这样,上面的描述更类似于的目的SubscribeOn。
SubscribeOn导致Subscribe可观察的方法在指定的调度程序或上下文上异步执行。当您不想Subscribe在正在运行的任何线程上的可观察对象上调用该方法时,可以使用它- 通常是因为它可以长时间运行并且您不想阻塞调用线程。
调用时Subscribe,您正在调用一个可观察对象,它可能是一连串可观察对象的一部分。它只是可观察SubscribeOn到的效果。现在可能情况是链中的所有可观察对象将立即在同一线程上订阅- 并非必须如此。考虑一下Concat-仅在前一个流完成后才订阅每个连续的流,通常,这将发生在从前一个流调用的任何线程上OnCompleted。
Concat
因此,SubscribeOn在您的呼叫Subscribe与要订阅的可观察对象之间,拦截该呼叫并使它异步。
它还影响订阅的处置。Subscribe返回IDisposable用于退订的句柄。SubscribeOn确保Dispose在提供的调度程序上调度对的调用。
想了解什么,当混乱的公共点SubscribeOn确实是在Subscribe一个可观察的处理程序可能调用OnNext,OnCompleted或OnError在这同一线程。但是,其目的不是影响这些调用。在Subscribe方法返回之前完成流的情况并不少见。Observable.Return例如,这样做。让我们来看看。
Observable.Return
如果使用我编写的Spy方法,并运行以下代码:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned");
您将得到以下输出(线程ID当然可能有所不同):
Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned
您可以看到整个订阅处理程序在同一线程上运行,并在返回之前完成。
让我们使用它SubscribeOn来异步运行它。我们将同时Return观察可观察者和SubscribeOn可观察者:
Return
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned");
输出(我添加的行号):
01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 SubscribeOn: Observable obtained on Thread: 1 04 SubscribeOn: Subscribed to on Thread: 1 05 SubscribeOn: Subscription completed. 06 Subscribe returned 07 Return: Subscribed to on Thread: 2 08 Return: OnNext(1) on Thread: 2 09 SubscribeOn: OnNext(1) on Thread: 2 10 Return: OnCompleted() on Thread: 2 11 SubscribeOn: OnCompleted() on Thread: 2 12 Return: Subscription completed.
01-主要方法在线程1上运行。
02- Return可观察对象在调用线程上求值。我们只是到达IObservable这里,没有任何订阅。
03- SubscribeOn可观察对象在调用线程上求值。
04-现在终于可以调用的Subscribe方法了SubscribeOn。
05-该Subscribe方法异步完成…
06-…并且线程1返回到main方法。 这就是SubscribeOn的作用!
07-同时,SubscribeOn将默认调度程序上的呼叫调度为Return。在这里它在线程2上被接收。
08 -作为Return呢,它调用OnNext的上Subscribe线…
09- SubscribeOn现在只是一个传递。
10,11-同样 OnCompleted
12-最后,所有Return订阅处理程序都已完成。
希望这能弄清目的和效果SubscribeOn!
如果您认为SubscribeOn作为一个拦截器Subscribe是通过调用方法上不同的线程,然后ObserveOn做同样的工作,但对于OnNext,OnCompleted和OnError电话。
回想一下我们原来的例子:
给出了以下输出:
现在让我们改变它来使用ObserveOn:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned");
我们得到以下输出:
01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 ObserveOn: Observable obtained on Thread: 1 04 ObserveOn: Subscribed to on Thread: 1 05 Return: Subscribed to on Thread: 1 06 Return: OnNext(1) on Thread: 1 07 ObserveOn: OnNext(1) on Thread: 2 08 Return: OnCompleted() on Thread: 1 09 Return: Subscription completed. 10 ObserveOn: Subscription completed. 11 Subscribe returned 12 ObserveOn: OnCompleted() on Thread: 2
02-和以前一样,Return可观察对象在调用线程上求值。我们只是到达IObservable这里,没有任何订阅。
03- ObserveOn可观察对象也在调用线程上评估。
04-现在我们再次在调用线程上订阅ObserveOn可观察的对象…
05-…然后将呼叫转接到Return可观察对象。
06-现在Return调用OnNext其Subscribe处理程序。
07- 这是的效果ObserveOn。我们可以看到,该OnNext线程是在线程2上异步调度的。
08-同时Return调用OnCompleted线程1 …
09-And Return的订阅处理程序完成…
10-然后ObserveOn订阅处理程序也是如此…
11-将控制权返回给main方法
12-同时,ObserveOn已将Return的OnCompleted调用移至线程2。在09-11期间的任何时候都可能发生,因为它异步运行。碰巧的是,它现在终于被调用了。
SubscribeOn当您需要Subscribe长时间运行的可观察对象并希望尽快离开调度程序线程时,通常会在GUI中看到它的使用- 也许是因为您知道它是在订阅处理程序中完成所有工作的那些可观察对象之一。在可观察链的末尾应用它,因为这是您订阅时调用的第一个可观察物。
ObserveOn当您想要确保时,您最经常会在GUI中看到它的使用OnNext,OnCompleted并将OnError调用编组回调度程序线程。在可观察链的末尾应用它,以尽可能快地过渡回来。
希望你可以看到,回答你的问题是,ObserveOnDispatcher将没有任何区别的线程Where和SelectMany执行上- 这一切都取决于什么的线程 流 从美其名曰!流的订阅处理程序将调用线程上调用,但它不可能说在哪里Where,并SelectMany会不知道如何运行stream实现。
ObserveOnDispatcher
stream
到目前为止,我们一直在专门研究Observable.Return。Return在Subscribe处理程序中完成其流。这不是典型的情况,但是流超出Subscribe处理程序的寿命也很普遍。看Observable.Timer例如:
Observable.Timer
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.Subscribe(); Console.WriteLine("Subscribe returned");
这将返回以下内容:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2
你可以清楚地看到认购完成后OnNext并OnCompleted正在对不同的线程以后调用。
请注意,对于选择并在哪个线程或调度程序上进行调用的任何组合, 都不 会对SubscribeOn或ObserveOn产生 任何影响 。Timer``OnNext``OnCompleted
Timer``OnNext``OnCompleted
当然,您可以使用SubscribeOn确定Subscribe线程:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned");
(我故意更改为NewThreadScheduler此处,以防止在Timer碰巧获得与相同的线程池线程的情况下的混乱SubscribeOn)
NewThreadScheduler
Timer
给予:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 SubscribeOn: Observable obtained on Thread: 1 SubscribeOn: Subscribed to on Thread: 1 SubscribeOn: Subscription completed. Subscribe returned Timer: Subscribed to on Thread: 2 Timer: Subscription completed. Timer: OnNext(0) on Thread: 3 SubscribeOn: OnNext(0) on Thread: 3 Timer: OnCompleted() on Thread: 3 SubscribeOn: OnCompleted() on Thread: 3
在这里,您可以清楚地看到线程(1)上的主线程在Subscribe调用之后返回,但是Timer预订获得了自己的线程(2),但是OnNextand OnCompleted调用在线程(3)上运行。
现在,对于ObserveOn,我们将代码更改为(对于代码中的后续代码,请使用nuget包rx-wpf):
var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned");
这段代码有些不同。第一行确保我们有一个调度程序,并且我们也引入了ObserveOnDispatcher-这就像ObserveOn,除了它指定我们应该使用对 求值DispatcherScheduler 的任何线程ObserveOnDispatcher。
DispatcherScheduler
此代码提供以下输出:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 ObserveOn: OnNext(0) on Thread: 1 Timer: OnCompleted() on Thread: 2 ObserveOn: OnCompleted() on Thread: 1
请注意,调度程序(和主线程)是线程1。Timer它仍在调用OnNext并OnCompleted在其选择的线程(2)上进行- 但是,ObserveOnDispatcher正在将编组的调用调回到调度程序线程(线程1)上。
还要注意,如果我们要阻塞调度程序线程(用表示Thread.Sleep),您会看到ObserveOnDispatcher将阻塞(此代码在LINQPad main方法内效果最好):
Thread.Sleep
var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); Console.WriteLine("Blocking the dispatcher"); Thread.Sleep(2000); Console.WriteLine("Unblocked");
然后您将看到如下输出:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Blocking the dispatcher Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 Unblocked ObserveOn: OnNext(0) on Thread: 1 ObserveOn: OnCompleted() on Thread: 1
随着通话的通过,ObserveOnDispatcher只有一旦Sleep运行就能够下车。
Sleep
请记住,Reactive Extensions本质上是一个自由线程库,它会尽其所能在其运行的线程上尽可能地保持惰性- 您必须故意干预ObserveOn,SubscribeOn并将特定的调度程序传递给接受它们进行更改的运算符,这对您很有用这个。
Observable的使用者无法做任何事情来控制它在内部执行的操作ObserveOn,SubscribeOn而装饰器将观察者和Observables的表面积包装起来,以封送跨线程的调用。希望这些例子已经清楚了。