/** * 取消监听 * * @param tag * @param observable * @return */ @SuppressWarnings("rawtypes") public RxBus unregister(@NonNull Object tag, @NonNull Observable<?> observable) { if (null == observable) return $(); List<Subject> subjects = subjectMapper.get(tag); if (null != subjects) { subjects.remove((Subject<?, ?>) observable); if (isEmpty(subjects)) { subjectMapper.remove(tag); LogUtil.d("unregister", tag + " size:" + subjects.size()); } } return $(); }
/** * 取消监听 * * @param tag * @param observable * @return */ @SuppressWarnings("rawtypes") public RxBus unregister(@NonNull Object tag, @NonNull Observable<?> observable) { if (null == observable) return getInstance(); List<Subject> subjects = subjectMapper.get(tag); if (null != subjects) { subjects.remove((Subject<?, ?>) observable); if (isEmpty(subjects)) { subjectMapper.remove(tag); LogUtils.logd("unregister"+ tag + " size:" + subjects.size()); } } return getInstance(); }
public void onNext(T1 args) { try { int id; Subject<T2, T2> subj = PublishSubject.create(); Observer<T2> subjSerial = new SerializedObserver(subj); synchronized (ResultManager.this.guard) { ResultManager resultManager = ResultManager.this; id = resultManager.leftIds; resultManager.leftIds = id + 1; ResultManager.this.leftMap.put(Integer.valueOf(id), subjSerial); } Observable<T2> window = Observable.create(new WindowObservableFunc(subj, ResultManager.this.cancel)); Observable<D1> duration = (Observable) OnSubscribeGroupJoin.this.leftDuration.call(args); Subscriber<D1> d1 = new LeftDurationObserver(id); ResultManager.this.group.add(d1); duration.unsafeSubscribe(d1); R result = OnSubscribeGroupJoin.this.resultSelector.call(args, window); synchronized (ResultManager.this.guard) { List<T2> rightMapValues = new ArrayList(ResultManager.this.rightMap.values()); } ResultManager.this.subscriber.onNext(result); for (T2 t2 : rightMapValues) { subjSerial.onNext(t2); } } catch (Throwable t) { Exceptions.throwOrReport(t, this); } }
/** * 取消监听 */ @SuppressWarnings("rawtypes") public RxBus unregister(@NonNull Object tag, @NonNull Observable<?> observable) { if (null == observable) { return $(); } List<Subject> subjects = subjectMapper.get(tag); if (null != subjects) { subjects.remove((Subject<?, ?>) observable); if (isEmpty(subjects)) { subjectMapper.remove(tag); LogUtils.d(tag + " size:" + subjects.size()); } } return $(); }
@NonNull public UploadManager build() { if (uploadService == null) { throw new IllegalArgumentException("Must provide a valid upload service"); } if (uploadDataStore == null) { throw new IllegalArgumentException("Must provide a valid upload data store"); } if (uploadErrorAdapter == null) { throw new IllegalArgumentException("Must provide a valid upload error adapter"); } final Subject<Job, Job> jobSubject = PublishSubject.<Job>create().toSerialized(); final Subject<Status, Status> statusSubject = PublishSubject.<Status>create().toSerialized(); final Uploader uploader = Uploader.create(uploadService); final UploadInteractor uploadInteractor = UploadInteractorImpl.create(uploader, uploadDataStore, uploadErrorAdapter); return new UploadManager(uploadInteractor, uploadErrorAdapter, jobSubject, statusSubject, deleteRecordOnComplete); }
protected void emitEvent(ConfigChangeEvent<String> event) { checkNotNull(event); final Optional<String> oldEventValue = this.lastEmittedValues.put(event.getName(), event.getValueOpt()); if (!event.getValueOpt().equals(oldEventValue)) { Subject<ConfigChangeEvent<String>, ConfigChangeEvent<String>> subject = subjectMap.get(event.getName()); if (subject == null) { log.warn("Event Subject was not initialized for key {} !", event.getName()); return; } log.trace("EMIT {} - value {}", event.getName(), event.getValueOpt()); subject.onNext(event); } else { log.trace("NOT EMITTING key {} value {} - no change from previous value.", event.getName(), event.getValueOpt()); } }
@Nonnull public static <T> Observable.Transformer<? super T, ? extends T> behaviorRefCount() { return new Observable.Transformer<T, T>() { @Override public Observable<T> call(Observable<T> tObservable) { return new OperatorMulticast<>(tObservable, new Func0<Subject<? super T, ? extends T>>() { @Override public Subject<? super T, ? extends T> call() { return BehaviorSubject.<T>create(); } }).refCount(); } }; }
private void getThread(ExecutorService executor, Subject<Object, Object> subject) { executor.execute(() -> { for (int i = 0; i < 10; i++) { System.out.println("onNext " + i); subject.onNext(i); try { Thread.sleep(500); } catch (InterruptedException e) { subject.onError(e); } } System.out.println("completing"); subject.onCompleted(); }); }
/** * 取消监视,删除被监视者. * * @param event * @param register */ public void unregister(final int event, Object register) { String key = event + ""; String tag = getUniqueTag(event, register); Subject subject = single_map.get(tag); List<Subject> subjects = subject_map.get(key); if (null != subjects) { if (null != subject) { subjects.remove(subject); single_map.remove(tag); } if (subjects.size() == 0) { subject_map.remove(key); } } Log.i(TAG, "unregister:" + register.getClass().getSimpleName() + "," + "single_map size:" + single_map.size() + ",subject_map size:" + subject_map.size()); }
/** * 取消监听 * * @param tag * @param observable * @return */ @SuppressWarnings("rawtypes") public RxBus unregister(@NonNull Object tag, @NonNull Observable<?> observable) { if (null == observable) return $(); List<Subject> subjects = subjectMapper.get(tag); if (null != subjects) { subjects.remove((Subject<?, ?>) observable); if (isEmpty(subjects)) { subjectMapper.remove(tag); JLog.d("unregister", tag + " size:" + subjects.size()); } } return $(); }
public Subject<R, R> Method(SD<R> body) { depth++; body.accept(null, r -> { depth--; if (depth==0) result.onCompleted(); }, () -> { depth--; if (depth==0) result.onCompleted(); }, label -> { }, label -> { }, ex -> { result.onError(ex); }) ; return result; }
/** * Get the standard error callback. * * @param responseEmitter Channel to send the error response to * @param druidQuery Query for which we got an error * * @return the standard error callback */ public HttpErrorCallback getStandardError( final Subject responseEmitter, final DruidAggregationQuery<?> druidQuery ) { return new HttpErrorCallback() { @Override public void invoke(int statusCode, String reason, String responseBody) { LOG.error(ErrorMessageFormat.ERROR_FROM_DRUID.logFormat(responseBody, statusCode, reason, druidQuery)); responseEmitter.onError(new ResponseException( statusCode, reason, responseBody, druidQuery, null, getObjectMappers().getMapper().writer() )); } }; }
/** * Get the standard failure callback. * * @param responseEmitter Channel to send the response to * @param druidQuery Query for which we got a failure * * @return the standard failure callback */ public FailureCallback getStandardFailure( final Subject responseEmitter, final DruidAggregationQuery<?> druidQuery ) { return new FailureCallback() { @Override public void invoke(Throwable error) { LOG.error(ErrorMessageFormat.FAILED_TO_SEND_QUERY_TO_DRUID.logFormat(druidQuery), error); responseEmitter.onError(new ResponseException( Status.INTERNAL_SERVER_ERROR, druidQuery, error, objectMappers.getMapper().writer() )); } }; }
/** * 取消监听 * * @param tag * @param observable * @return */ @SuppressWarnings("rawtypes") public RxBus unSubscribe(@NonNull Object tag, @NonNull Observable<?> observable) { if (null == observable) return $(); List<Subject> subjects = subjectMapper.get(tag); if (null != subjects) { subjects.remove((Subject<?, ?>) observable); if (isEmpty(subjects)) { subjectMapper.remove(tag); //LogUtils.d("unregister:"+tag + " size:" + subjects.size()); } } return $(); }
private OperatorMulticast(final Object guard, final AtomicReference<Subject<? super T, ? extends R>> connectedSubject, final List<Subscriber<? super R>> waitingForConnect, Observable<? extends T> source, final Supplier<? extends Subject<? super T, ? extends R>> subjectFactory) { super(s -> { synchronized (guard) { Subject<? super T, ? extends R> subject = connectedSubject.get(); if (subject == null) { // not connected yet, so register waitingForConnect.add(s); } else { subject.unsafeSubscribe(s); } } }); this.guard = guard; this.connectedSubject = connectedSubject; this.waitingForConnect = waitingForConnect; this.source = source; this.subjectFactory = subjectFactory; }
@Test public void testNextWithError() { Subject<String, String> obs = PublishSubject.create(); Iterator<String> it = next(obs).iterator(); fireOnNextInNewThread(obs, "one"); assertTrue(it.hasNext()); assertEquals("one", it.next()); fireOnErrorInNewThread(obs); try { it.hasNext(); fail("Expected an TestException"); } catch (TestException e) { } assertErrorAfterObservableFail(it); }
@Test public void testOnErrorInNewThread() { Subject<String, String> obs = PublishSubject.create(); Iterator<String> it = next(obs).iterator(); fireOnErrorInNewThread(obs); try { it.hasNext(); fail("Expected an TestException"); } catch (TestException e) { // successful } assertErrorAfterObservableFail(it); }
@Test public void testNextWithOnlyUsingNextMethod() { Subject<String, String> obs = PublishSubject.create(); Iterator<String> it = next(obs).iterator(); fireOnNextInNewThread(obs, "one"); assertEquals("one", it.next()); fireOnNextInNewThread(obs, "two"); assertEquals("two", it.next()); obs.onComplete(); try { it.next(); fail("At the end of an iterator should throw a NoSuchElementException"); } catch (NoSuchElementException e) { } }
@Test public void testNextWithCallingHasNextMultipleTimes() { Subject<String, String> obs = PublishSubject.create(); Iterator<String> it = next(obs).iterator(); fireOnNextInNewThread(obs, "one"); assertTrue(it.hasNext()); assertTrue(it.hasNext()); assertTrue(it.hasNext()); assertTrue(it.hasNext()); assertEquals("one", it.next()); obs.onComplete(); try { it.next(); fail("At the end of an iterator should throw a NoSuchElementException"); } catch (NoSuchElementException e) { } }
@Test public void testMulticast() { Subject<String, String> source = PublishSubject.create(); ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory()); @SuppressWarnings("unchecked") Observer<String> observer = mock(Observer.class); multicasted.subscribe(observer); source.onNext("one"); source.onNext("two"); multicasted.connect(); source.onNext("three"); source.onNext("four"); source.onComplete(); verify(observer, never()).onNext("one"); verify(observer, never()).onNext("two"); verify(observer, times(1)).onNext("three"); verify(observer, times(1)).onNext("four"); verify(observer, times(1)).onComplete(); }
@Test public void testMulticastConnectTwice() { Subject<String, String> source = PublishSubject.create(); ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory()); @SuppressWarnings("unchecked") Observer<String> observer = mock(Observer.class); multicasted.subscribe(observer); source.onNext("one"); multicasted.connect(); multicasted.connect(); source.onNext("two"); source.onComplete(); verify(observer, never()).onNext("one"); verify(observer, times(1)).onNext("two"); verify(observer, times(1)).onComplete(); }
@Test public void testMostRecent() { Subject<String, String> s = PublishSubject.create(); Iterator<String> it = mostRecent(s, "default").iterator(); assertTrue(it.hasNext()); assertEquals("default", it.next()); assertEquals("default", it.next()); s.onNext("one"); assertTrue(it.hasNext()); assertEquals("one", it.next()); assertEquals("one", it.next()); s.onNext("two"); assertTrue(it.hasNext()); assertEquals("two", it.next()); assertEquals("two", it.next()); s.onComplete(); assertFalse(it.hasNext()); }
/** * Invoke all service instances with a given request message with a given service name and method name. expected * headers in request: ServiceHeaders.SERVICE_REQUEST the logical name of the service. ServiceHeaders.METHOD the * method name to invoke. retrieves routes from router by calling router.routes and send async to each endpoint once a * response is returned emit the response to the observable. * * @param request request with given headers. * @param duration of the response before TimeException is returned. * @return Observable with stream of results for each service call dispatching result. */ public Observable<Message> invokeAll(final Message request, final Duration duration) { final Subject<Message, Message> responsesSubject = PublishSubject.<Message>create().toSerialized(); Collection<ServiceInstance> instances = router.routes(request); instances.forEach(instance -> { invoke(request, duration).whenComplete((resp, error) -> { if (resp != null) { responsesSubject.onNext(resp); } else { responsesSubject.onNext(Messages.asError(error, request.correlationId(), instance.memberId())); } }); }); return responsesSubject.onBackpressureBuffer().asObservable(); }
@Test public void testIndirectUsage() { // 1. Define action (a biz facade points from where 'real' action) Subject<Float,Float> actionf0 = PublishSubject.create(); // 2. map InputEvents --to--> actions (setup InputMapper's mappings) sut.mappings.clear(); sut.map(tmplKeyInputEvent(KeyInput.KEY_0), InputMapperHelpers.isPressedAsOne, actionf0); // 3. map actions --to--> subscribe listener/observer @SuppressWarnings("unchecked") Observer<Float> observer = mock(Observer.class); actionf0.subscribe(observer); sut.onEvent(new KeyInputEvent(KeyInput.KEY_0, '0', true, false)); sut.onEvent(new KeyInputEvent(KeyInput.KEY_0, '0', false, false)); InOrder inOrder1 = inOrder(observer); inOrder1.verify(observer, times(1)).onNext(1.0f); inOrder1.verify(observer, times(1)).onNext(0.0f); verify(observer, Mockito.never()).onCompleted(); }
/** * Creates a new {@link AbstractSubdocRequest}. * * @param key the key of the document. * @param path the subdocument path to consider inside the document. * @param bucket the bucket of the document. * @param observable the observable which receives responses. * @param restOfContent the optional remainder of the {@link #content()} of the final protocol message, or null if not applicable * @throws NullPointerException if the path is null (see {@link #EXCEPTION_NULL_PATH}) */ public AbstractSubdocRequest(String key, String path, String bucket, Subject<CouchbaseResponse, CouchbaseResponse> observable, ByteBuf... restOfContent) { super(key, bucket, null, null, observable); this.path = path; ByteBuf pathByteBuf; if (path == null || path.isEmpty()) { pathByteBuf = Unpooled.EMPTY_BUFFER; } else { pathByteBuf = Unpooled.wrappedBuffer(path.getBytes(CharsetUtil.UTF_8)); } this.pathLength = pathByteBuf.readableBytes(); this.content = createContent(pathByteBuf, restOfContent); //checking nullity here allows to release all of restOfContent through cleanUpAndThrow releasing content() if (this.path == null) { cleanUpAndThrow(EXCEPTION_NULL_PATH); } }
/** * Publishes a response with the attached observable. * * @param response the response to publish. * @param observable pushing into the event sink. */ protected void publishResponse(final CouchbaseResponse response, final Subject<CouchbaseResponse, CouchbaseResponse> observable) { if (response.status() != ResponseStatus.RETRY && observable != null) { if (moveResponseOut) { Scheduler scheduler = env().scheduler(); if (scheduler instanceof CoreScheduler) { scheduleDirect((CoreScheduler) scheduler, response, observable); } else { scheduleWorker(scheduler, response, observable); } } else { completeResponse(response, observable); } } else { responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, response, observable); } }
/** * Dispatches the response on a generic scheduler through creating a worker. */ private static void scheduleWorker(Scheduler scheduler, final CouchbaseResponse response, final Subject<CouchbaseResponse, CouchbaseResponse> observable) { final Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(new Action0() { @Override public void call() { try { observable.onNext(response); observable.onCompleted(); } catch (Exception ex) { LOGGER.warn("Caught exception while onNext on observable", ex); observable.onError(ex); } finally { worker.unsubscribe(); } } }); }
@Test public void shouldSendProposedConfigToProvider() throws Exception { ClusterFacade clusterMock = mock(ClusterFacade.class); ConfigurationProvider providerMock = mock(ConfigurationProvider.class); ResponseHandler handler = new ResponseHandler(ENVIRONMENT, clusterMock, providerMock); ByteBuf config = Unpooled.copiedBuffer("{\"json\": true}", CharsetUtil.UTF_8); ResponseEvent retryEvent = new ResponseEvent(); retryEvent.setMessage(new InsertResponse(ResponseStatus.RETRY, KeyValueStatus.ERR_TEMP_FAIL.code(), 0, "bucket", config, null, mock(InsertRequest.class))); retryEvent.setObservable(mock(Subject.class)); handler.onEvent(retryEvent, 1, true); verify(providerMock, times(1)).proposeBucketConfig("bucket", "{\"json\": true}"); assertEquals(0, config.refCnt()); assertNull(retryEvent.getMessage()); assertNull(retryEvent.getObservable()); }
@Test public void shouldIgnoreInvalidConfig() throws Exception { ClusterFacade clusterMock = mock(ClusterFacade.class); ConfigurationProvider providerMock = mock(ConfigurationProvider.class); ResponseHandler handler = new ResponseHandler(ENVIRONMENT, clusterMock, providerMock); ByteBuf config = Unpooled.copiedBuffer("Not my Vbucket", CharsetUtil.UTF_8); ResponseEvent retryEvent = new ResponseEvent(); retryEvent.setMessage(new InsertResponse(ResponseStatus.RETRY, KeyValueStatus.ERR_TEMP_FAIL.code(), 0, "bucket", config, null, mock(InsertRequest.class))); retryEvent.setObservable(mock(Subject.class)); handler.onEvent(retryEvent, 1, true); verify(providerMock, never()).proposeBucketConfig("bucket", "Not my Vbucket"); assertEquals(0, config.refCnt()); assertNull(retryEvent.getMessage()); assertNull(retryEvent.getObservable()); }
@Test public void shouldFailWhenUsedAgainstMemcacheBucket() { Locator locator = new ViewLocator(0); ClusterConfig config = mock(ClusterConfig.class); when(config.bucketConfig("default")).thenReturn(mock(MemcachedBucketConfig.class)); CouchbaseRequest request = mock(ViewQueryRequest.class); Subject<CouchbaseResponse, CouchbaseResponse> response = AsyncSubject.create(); when(request.bucket()).thenReturn("default"); when(request.observable()).thenReturn(response); TestSubscriber<CouchbaseResponse> subscriber = new TestSubscriber<CouchbaseResponse>(); response.subscribe(subscriber); locator.locateAndDispatch(request, Collections.<Node>emptyList(), config, null, null); subscriber.awaitTerminalEvent(1, TimeUnit.SECONDS); List<Throwable> errors = subscriber.getOnErrorEvents(); assertEquals(1, errors.size()); assertTrue(errors.get(0) instanceof ServiceNotAvailableException); }
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) { writeHeaders(response); final Subject<Void, Void> subject = PublishSubject.create(); final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription(); Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation()) .subscribe(new Action1<Long>() { @Override public void call(Long tick) { if (!response.getChannel().isOpen()) { subscription.unsubscribe(); return; } try { writeMetric(JsonMapper.toJson(metrics), response); } catch (Exception e) { subject.onError(e); } } }); subscription.set(actionSubscription); return subject; }
private OperatorMulticast(final Object guard, final AtomicReference<Subject<? super T, ? extends R>> connectedSubject, final List<Subscriber<? super R>> waitingForConnect, Observable<? extends T> source, final Func0<? extends Subject<? super T, ? extends R>> subjectFactory) { super(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> subscriber) { synchronized (guard) { if (connectedSubject.get() == null) { // not connected yet, so register waitingForConnect.add(subscriber); } else { // we are already connected so subscribe directly connectedSubject.get().unsafeSubscribe(subscriber); } } } }); this.guard = guard; this.connectedSubject = connectedSubject; this.waitingForConnect = waitingForConnect; this.source = source; this.subjectFactory = subjectFactory; }
/** * 注册事件源 * * @param tag * @return */ @SuppressWarnings({"rawtypes"}) public <T> Observable<T> register(@NonNull Object tag) { List<Subject> subjectList = subjectMapper.get(tag); if (null == subjectList) { subjectList = new ArrayList<Subject>(); subjectMapper.put(tag, subjectList); } Subject<T, T> subject; subjectList.add(subject = PublishSubject.create()); LogUtil.d("register", tag + " size:" + subjectList.size()); return subject; }
@SuppressWarnings("rawtypes") public void unregister(@NonNull Object tag) { List<Subject> subjects = subjectMapper.get(tag); if (null != subjects) { subjectMapper.remove(tag); } }
/** * 触发事件 * * @param content */ @SuppressWarnings({"unchecked", "rawtypes"}) public void post(@NonNull Object tag, @NonNull Object content) { LogUtil.d("post", "eventName: " + tag); List<Subject> subjectList = subjectMapper.get(tag); if (!isEmpty(subjectList)) { for (Subject subject : subjectList) { subject.onNext(content); LogUtil.d("onEvent", "eventName: " + tag); } } }
/** * 注册事件源 * * @param tag * @return */ @SuppressWarnings({"rawtypes"}) public <T> Observable<T> register(@NonNull Object tag) { List<Subject> subjectList = subjectMapper.get(tag); if (null == subjectList) { subjectList = new ArrayList<Subject>(); subjectMapper.put(tag, subjectList); } Subject<T, T> subject; subjectList.add(subject = PublishSubject.create()); LogUtils.logd("register"+tag + " size:" + subjectList.size()); return subject; }