void init() { this.child.add(Subscriptions.create(new Action0() { public void call() { if (InexactSubscriber.this.noWindow) { InexactSubscriber.this.unsubscribe(); } } })); this.child.setProducer(new Producer() { public void request(long n) { if (n > 0) { long u = n * ((long) OperatorWindowWithSize.this.size); if (!((u >>> 31) == 0 || u / n == ((long) OperatorWindowWithSize.this.size))) { u = Long.MAX_VALUE; } InexactSubscriber.this.requestMore(u); } } }); }
public void call(Subscriber<? super T> subscriber) { subscriber.add(Subscriptions.create(new Action0() { public void call() { ToObservableFuture.this.that.cancel(true); } })); try { if (!subscriber.isUnsubscribed()) { subscriber.onNext(this.unit == null ? this.that.get() : this.that.get(this.time, this.unit)); subscriber.onCompleted(); } } catch (Throwable e) { if (!subscriber.isUnsubscribed()) { Exceptions.throwOrReport(e, subscriber); } } }
public static <T> Observable.Transformer<T, T> applySchedulers(final BaseView view) { return new Observable.Transformer<T, T>() { @Override public Observable<T> call(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() {//显示进度条 view.showLoading(); } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .doAfterTerminate(new Action0() { @Override public void call() { view.hideLoading();//隐藏进度条 } }).compose(RxUtils.<T>bindToLifecycle(view)); } }; }
@Override public void call(final Subscriber<? super T> subscriber) { result.setResultCallback(new ResultCallback<T>() { @Override public void onResult(T t) { subscriber.onNext(t); complete = true; subscriber.onCompleted(); } }); subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { if (!complete) { result.cancel(); } } })); }
public void call(final Subscriber<? super Long> child) { final Worker worker = this.scheduler.createWorker(); child.add(worker); worker.schedulePeriodically(new Action0() { long counter; public void call() { try { Subscriber subscriber = child; long j = this.counter; this.counter = 1 + j; subscriber.onNext(Long.valueOf(j)); } catch (Throwable e) { worker.unsubscribe(); } finally { Exceptions.throwOrReport(e, child); } } }, this.initialDelay, this.period, this.unit); }
public TracingActionSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted, String operationName, Tracer tracer) { super(operationName, tracer); if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } if (onError == null) { throw new IllegalArgumentException("onError can not be null"); } if (onCompleted == null) { throw new IllegalArgumentException("onComplete can not be null"); } this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; }
public static final <T> Observer<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } else if (onError == null) { throw new IllegalArgumentException("onError can not be null"); } else if (onComplete != null) { return new Observer<T>() { public final void onCompleted() { onComplete.call(); } public final void onError(Throwable e) { onError.call(e); } public final void onNext(T args) { onNext.call(args); } }; } else { throw new IllegalArgumentException("onComplete can not be null"); } }
public void runCode() { // DoOnTerminate会在Observable结束前触发回调,无论是正常还是异常终止; Observable.range(0,4).doOnTerminate(new Action0() { @Override public void call() { println("------>doOnTerminate()"); } }).subscribe(new Observer<Integer>() { @Override public void onCompleted() { println("------>onCompleted()"); } @Override public void onError(Throwable e) { println("------>onError()" + e); } @Override public void onNext(Integer integer) { println("------->onNext()"); } }); }
public void runCode() { // doOnUnSubscribe则会在Subscriber进行反订阅的时候触发回调。 // 当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber。 Observable observable = Observable.just(1, 2).doOnUnsubscribe(new Action0() { @Override public void call() { println("I'm be unSubscribed!"); } }); Subscription subscribe1 = observable.subscribe(); Subscription subscribe2 = observable.subscribe(); subscribe1.unsubscribe(); subscribe2.unsubscribe(); }
@Override public Subscription requestTopMovie(int start, final RequestCallBack callBack) { return RetrofitHttpClient.getMovieTop250(start) .doOnSubscribe(new Action0() { @Override public void call() { callBack.beforeRequest(); } }).subscribe(new Observer<List<MovieItemBean>>() { @Override public void onCompleted() { callBack.requestComplete(); } @Override public void onError(Throwable e) { Logger.e(e.getLocalizedMessage()); callBack.requestError(e.getLocalizedMessage() + "\n" + e); } @Override public void onNext(List<MovieItemBean> data) { callBack.requestSuccess(data); } }); }
public Subscription schedule(Action0 action) { if (isUnsubscribed()) { return Subscriptions.unsubscribed(); } Subscription ea = new ScheduledAction(action, this.tasks); this.tasks.add(ea); this.queue.offer(ea); if (this.wip.getAndIncrement() != 0) { return ea; } try { this.executor.execute(this); return ea; } catch (RejectedExecutionException t) { this.tasks.remove(ea); this.wip.decrementAndGet(); RxJavaPlugins.getInstance().getErrorHandler().handleError(t); throw t; } }
@Override public Subscription requestMovieReviews(String id, int start, final RequestCallBack callBack) { return RetrofitHttpClient.getMovieReviews(id,start) .doOnSubscribe(new Action0() { @Override public void call() { callBack.beforeRequest(); } }).subscribe(new Observer<MovieReviewBean>() { @Override public void onCompleted() { callBack.requestComplete(); } @Override public void onError(Throwable e) { Logger.e(e.getLocalizedMessage()); callBack.requestError(e.getLocalizedMessage() + "\n" + e); } @Override public void onNext(MovieReviewBean data) { callBack.requestSuccess(data); } }); }
/** * Start an async task which can do things beforehand, in background and callback when the job is done on the main thread, and handle the exception with the given action. * * @param preExecute action to do beforehand. * @param doInBackground action to do in the background. * @param doOnFinish action to do when the job is done.(this is called on main thread) * @param onError action to do when exceptions are thrown. * @return the subscription of the task. */ public static Subscription asyncTask(final Action0 preExecute, @NonNull final Action0 doInBackground, final Action0 doOnFinish, Action1<Throwable> onError) { return Observable.just("Hey nerd! This is an async task.") .subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { if (preExecute != null) preExecute.call(); } }) .observeOn(Schedulers.io()) .doOnNext(Actions.toAction1(doInBackground)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { if (doOnFinish != null) doOnFinish.call(); } }, onError == null ? RxActions.onError() : onError); }
@Test public void canUpdateZoneWithExplicitETag() throws Exception { final Region region = Region.US_EAST; final String topLevelDomain = "www.contoso" + generateRandomResourceName("z", 10) + ".com"; final DnsZone dnsZone = zoneManager.zones().define(topLevelDomain) .withNewResourceGroup(RG_NAME, region) .withETagCheck() .create(); Assert.assertNotNull(dnsZone.eTag()); Action0 action = new Action0() { @Override public void call() { dnsZone.update() .withETagCheck(dnsZone.eTag() + "-foo") .apply(); } }; ensureETagExceptionIsThrown(action); dnsZone.update() .withETagCheck(dnsZone.eTag()) .apply(); }
/** * Start an async task which can do things beforehand, map things in background and callback when the mapping job is done on the main thread, and handle the exception with the given action. * * @param preExecute action to do beforehand. * @param mapper action to do the mapping job. * @param doOnFinish action to do when the job is done.(this is called on main thread) * @param onError action to do when exceptions are thrown. * @return the subscription of the whole mapping job. */ public static <T> Subscription asyncMap(final Action0 preExecute, @NonNull final Func1<String, T> mapper, final Action1<T> doOnFinish, Action1<Throwable> onError) { return Observable.just("Hey nerd! This is an async map.") .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { if (preExecute != null) preExecute.call(); } }) .map(mapper) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<T>() { @Override public void call(T t) { if (doOnFinish != null) doOnFinish.call(t); } }, onError == null ? RxActions.onError() : onError); }
void init() { this.child.add(Subscriptions.create(new Action0() { public void call() { if (ExactSubscriber.this.noWindow) { ExactSubscriber.this.unsubscribe(); } } })); this.child.setProducer(new Producer() { public void request(long n) { if (n > 0) { long u = n * ((long) OperatorWindowWithSize.this.size); if (!((u >>> 31) == 0 || u / n == ((long) OperatorWindowWithSize.this.size))) { u = Long.MAX_VALUE; } ExactSubscriber.this.requestMore(u); } } }); }
/** * 添加线程管理并订阅 * @param ob * @param subscriber * @param cacheKey 缓存kay * @param event Activity 生命周期 * @param lifecycleSubject * @param isSave 是否缓存 * @param forceRefresh 是否强制刷新 */ public void toSubscribe(Observable ob, final ProgressSubscriber subscriber, String cacheKey, final ActivityLifeCycleEvent event, final PublishSubject<ActivityLifeCycleEvent> lifecycleSubject, boolean isSave, boolean forceRefresh) { //数据预处理 Observable.Transformer<HttpResult<Object>, Object> result = RxHelper.handleResult(event,lifecycleSubject); Observable observable = ob.compose(result) .doOnSubscribe(new Action0() { @Override public void call() { //显示Dialog和一些其他操作 subscriber.showProgressDialog(); } }); RetrofitCache.load(cacheKey,observable,isSave,forceRefresh).subscribe(subscriber); }
@Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { if (innerSubscription.isUnsubscribed()) { return Subscriptions.empty(); } final ScheduledAction scheduledAction = new ScheduledAction(action, operationQueue); final ScheduledExecutorService executor = IOSScheduledExecutorPool.getInstance(); Future<?> future; if (delayTime <= 0) { future = executor.submit(scheduledAction); } else { future = executor.schedule(scheduledAction, delayTime, unit); } scheduledAction.add(Subscriptions.from(future)); scheduledAction.addParent(innerSubscription); return scheduledAction; }
/** * Give success response, Have data. * @param dataMethod * @param callback * @param response * @param <T> */ public <T> void giveSuccessResult(final Func0<T> dataMethod, final DataApiCallback<T> callback, final Response response) { AndroidSchedulers.mainThread().createWorker().schedule(new Action0() { @Override public void call() { Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(Subscriber<? super T> subscriber) { Log.d("MOCK", "onNext Thread = " + Thread.currentThread().getName()); subscriber.onNext(dataMethod.call()); subscriber.onCompleted(); } }). delay(response.delayMillis, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new ApiSubcriber(callback)); } }); }
public void call(Subscriber<? super T> s) { if (this.state.casObserverRef(null, s)) { s.add(Subscriptions.create(new Action0() { public void call() { OnSubscribeAction.this.state.set(BufferUntilSubscriber.EMPTY_OBSERVER); } })); boolean win = false; synchronized (this.state.guard) { if (!this.state.emitting) { this.state.emitting = true; win = true; } } if (win) { NotificationLite<T> nl = NotificationLite.instance(); while (true) { Object o = this.state.buffer.poll(); if (o != null) { nl.accept((Observer) this.state.get(), o); } else { synchronized (this.state.guard) { if (this.state.buffer.isEmpty()) { this.state.emitting = false; return; } } } } } return; } s.onError(new IllegalStateException("Only one subscriber allowed!")); }
@Override public Completable afterPostRunAsync(boolean succeeded) { if (succeeded) { isInCreateMode = false; } return Completable.fromAction(new Action0() { @Override public void call() { normalizeProperties(); } }); }
@Override public void getData(boolean isRefresh) { // 因为网易这个原接口参数一大堆,我只传了部分参数,返回的数据会出现图片重复的情况,请不要在意这个问题- - Observable.from(mPhotoList) .doOnSubscribe(new Action0() { @Override public void call() { mView.showLoading(); } }) .compose(mTransformer) .subscribe(new Subscriber<List<BeautyPhotoInfo>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Logger.e(e.toString()); } @Override public void onNext(List<BeautyPhotoInfo> photoList) { mView.loadData(photoList); } }); }
@Override public Subscription getChannelList(final RequestCallBack<List<GankTypeInfo>> callBack) { return Observable.create(new Observable.OnSubscribe<List<GankTypeInfo>>(){ @Override public void call(Subscriber<? super List<GankTypeInfo>> subscriber) { if (!SpUtil.readBoolean("initDb")) { ChannelTypeDb.getInstance().initData(); SpUtil.writeBoolean("initDb", true); } subscriber.onNext(ChannelTypeDb.getInstance().getSelectChannelList()); subscriber.onCompleted(); } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(new Action0() { @Override public void call() { callBack.beforeRequest(); } }).subscribe(new Subscriber<List<GankTypeInfo>>() { @Override public void onCompleted() { callBack.requestComplete(); } @Override public void onError(Throwable e) { callBack.requestError(e.getLocalizedMessage() + "\n" + e); } @Override public void onNext(List<GankTypeInfo> gankTypeInfos) { callBack.requestSuccess(gankTypeInfos); } }); }
@Override public void getData(boolean isRefresh) { // 因为网易这个原接口参数一大堆,我只传了部分参数,返回的数据会出现图片重复的情况,请不要在意这个问题- - RetrofitService.getBeautyPhoto(mPage) .doOnSubscribe(new Action0() { @Override public void call() { mView.showLoading(); } }) .compose(mView.<List<BeautyPhotoInfo>>bindToLife()) .subscribe(new Subscriber<List<BeautyPhotoInfo>>() { @Override public void onCompleted() { mView.hideLoading(); } @Override public void onError(Throwable e) { Logger.e(e.toString()); mView.showNetError(); } @Override public void onNext(List<BeautyPhotoInfo> photoList) { mView.loadData(photoList); mPage++; } }); }
@Override public void getData(boolean isRefresh) { RetrofitService.getVideoList(mVideoId, mPage) .doOnSubscribe(new Action0() { @Override public void call() { mView.showLoading(); } }) .compose(mView.<List<VideoInfo>>bindToLife()) .subscribe(new Subscriber<List<VideoInfo>>() { @Override public void onCompleted() { mView.hideLoading(); } @Override public void onError(Throwable e) { Logger.e(e.toString()); mView.showNetError(); } @Override public void onNext(List<VideoInfo> videoList) { mView.loadData(videoList); mPage++; } }); }
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) { final Subscriber<T> parent = new Subscriber<T>() { public void onCompleted() { subscriber.onCompleted(); } public void onError(Throwable e) { subscriber.onError(e); } public void onNext(T t) { subscriber.onNext(t); } }; subscriber.add(Subscriptions.create(new Action0() { public void call() { final Worker inner = OperatorUnsubscribeOn.this.scheduler.createWorker(); inner.schedule(new Action0() { public void call() { parent.unsubscribe(); inner.unsubscribe(); } }); } })); return parent; }
public void onNext(final T v, long delayTime) { this.innerScheduler.schedule(new Action0() { public void call() { TestSubject.this._onNext(v); } }, delayTime, TimeUnit.MILLISECONDS); }
void scheduleChunk() { this.worker.schedulePeriodically(new Action0() { public void call() { InexactSubscriber.this.startNewChunk(); } }, OperatorWindowWithTime.this.timeshift, OperatorWindowWithTime.this.timeshift, OperatorWindowWithTime.this.unit); }
public Observable.Transformer createProgressDialog() { return new Observable.Transformer<Object, Object>() { ProgressDialog pd; @Override public Observable<Object> call(final Observable<Object> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnRequest(new Action1<Long>() { @Override public void call(Long aLong) { println("doOnRequest"); pd = ProgressDialog.show(getActivity(), "1111", "222"); } }) .doOnTerminate(new Action0() { @Override public void call() { println("doOnTerminate"); pd.dismiss(); } }); } }; }
public void runCode() { //强制Observable按次序发射数据并且功能是有效的 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onCompleted(); subscriber.onNext(3); subscriber.onCompleted(); } }) .cast(Integer.class) .serialize(); observable.doOnUnsubscribe(new Action0() { @Override public void call() { println("Unsubscribed"); } }) .unsafeSubscribe(new Subscriber<Integer>() { @Override public void onCompleted() { println("Complete!"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { } }); }
public BufferSubscriber(Subscriber<? super T> child, Long capacity, Action0 onOverflow) { this.child = child; this.baseCapacity = capacity; this.capacity = capacity != null ? new AtomicLong(capacity.longValue()) : null; this.onOverflow = onOverflow; this.manager = new BackpressureDrainManager(this); }
@Test public void testRemoterObservable() throws Exception { RemoteObservable<IEcho> remoteObservable = sampleService.getRemoterObservable(); Observable<IEcho> observable = remoteObservable.getLocalObservable(); expectingClose = false; eventsReceived = 0; Subscription subscription1 = observable.subscribe(new Action1<IEcho>() { @Override public void call(IEcho data) { Assert.assertFalse(expectingClose); Assert.assertFalse(data instanceof IEcho_Proxy); Assert.assertTrue(data instanceof EchoImpl); eventsReceived++; Log.v(TAG, "Remoter data " + data + " " + data.echo("Hello")); Assert.assertEquals("1", data.echo("1")); expectingClose = true; } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { Assert.fail("Unexpected observable exception"); } }, new Action0() { @Override public void call() { Log.v(TAG, "Remoter data onComplete"); Assert.assertTrue(expectingClose); } }); Thread.sleep(3000); Assert.assertEquals(1, eventsReceived); }
public Subscription call(final TimeoutSubscriber<T> timeoutSubscriber, final Long seqId, T t, Worker inner) { return inner.schedule(new Action0() { public void call() { timeoutSubscriber.onTimeout(seqId.longValue()); } }, this.val$timeout, this.val$timeUnit); }
@Test public void testListOfParceler() throws Exception { RemoteObservable<List<CustomData>> remoteObservable = sampleService.getRemoterObservableOfListOfParceler(); Observable<List<CustomData>> observable = remoteObservable.getLocalObservable(); expectingClose = false; eventsReceived = 0; Subscription subscription1 = observable.subscribe(new Action1<List<CustomData>>() { @Override public void call(List<CustomData> data) { Assert.assertFalse(expectingClose); eventsReceived++; Log.v(TAG, "List data " + data); Assert.assertNotNull(data); Assert.assertEquals(2, data.size()); for (int i = 1; i <= 2; i++) { Assert.assertEquals(i, data.get(i - 1).getData()); } expectingClose = true; } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { Assert.fail("Unexpected observable exception"); } }, new Action0() { @Override public void call() { Log.v(TAG, "List data onComplete"); Assert.assertTrue(expectingClose); } }); Thread.sleep(3000); Assert.assertEquals(1, eventsReceived); }
@Test public void testRemoterObservable() throws Exception { RemoteObservable<IEcho> remoteObservable = sampleService.getRemoterObservable(); Observable<IEcho> observable = remoteObservable.getObservable(); expectingClose = false; eventsReceived = 0; Subscription subscription1 = observable.subscribe(new Action1<IEcho>() { @Override public void call(IEcho data) { Assert.assertFalse(expectingClose); eventsReceived++; Log.v(TAG, "Remoter data " + data + " " + data.echo("Hello")); Assert.assertEquals("1", data.echo("1")); expectingClose = true; } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { Assert.fail("Unexpected observable exception"); } }, new Action0() { @Override public void call() { Log.v(TAG, "Remoter data onComplete"); Assert.assertTrue(expectingClose); } }); Thread.sleep(3000); Assert.assertEquals(1, eventsReceived); }
public final Subscription subscribe(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } else if (onError == null) { throw new IllegalArgumentException("onError can not be null"); } else if (onCompleted != null) { return subscribe(new ActionSubscriber(onNext, onError, onCompleted)); } else { throw new IllegalArgumentException("onComplete can not be null"); } }
@Test public void testListOfParceler() throws Exception { RemoteObservable<List<CustomData>> remoteObservable = sampleService.getRemoterObservableOfListOfParceler(); remoteObservable.setDebug(true); Observable<List<CustomData>> observable = remoteObservable.getObservable(); expectingClose = false; eventsReceived = 0; Subscription subscription1 = observable.subscribe(new Action1<List<CustomData>>() { @Override public void call(List<CustomData> data) { Assert.assertFalse(expectingClose); eventsReceived++; Log.v(TAG, "List<CustomData> data " + data ); Assert.assertNotNull(data); Assert.assertEquals(2, data.size()); for (int i = 1; i <= 2; i++) { Assert.assertEquals(i, data.get(i - 1).getData()); } expectingClose = true; } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { Assert.fail("Unexpected observable exception"); } }, new Action0() { @Override public void call() { Log.v(TAG, "List data onComplete"); Assert.assertTrue(expectingClose); } }); Thread.sleep(3000); Assert.assertEquals(1, eventsReceived); }
@Override protected void afterViewAttached() { service = ServiceGen.create(OtherService.class); onNextArticle = new Action0() { @Override public void call() { getAdapter().clear(); getAdapter().notifyItemRemoved(0); loadRandomArticle(); } }; }