Java 类rx.subjects.Subject 实例源码

项目:GitHub    文件:RxBus.java   
/**
 * 取消监听
 *
 * @param tag
 * @param observable
 * @return
 */
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
                        @NonNull Observable<?> observable) {
    if (null == observable)
        return $();
    List<Subject> subjects = subjectMapper.get(tag);
    if (null != subjects) {
        subjects.remove((Subject<?, ?>) observable);
        if (isEmpty(subjects)) {
            subjectMapper.remove(tag);
            LogUtil.d("unregister", tag + "  size:" + subjects.size());
        }
    }
    return $();
}
项目:MyFire    文件:RxBus.java   
/**
 * 取消监听
 *
 * @param tag
 * @param observable
 * @return
 */
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
                        @NonNull Observable<?> observable) {
    if (null == observable)
        return getInstance();
    List<Subject> subjects = subjectMapper.get(tag);
    if (null != subjects) {
        subjects.remove((Subject<?, ?>) observable);
        if (isEmpty(subjects)) {
            subjectMapper.remove(tag);
            LogUtils.logd("unregister"+ tag + "  size:" + subjects.size());
        }
    }
    return getInstance();
}
项目:boohee_v5.6    文件:OnSubscribeGroupJoin.java   
public void onNext(T1 args) {
    try {
        int id;
        Subject<T2, T2> subj = PublishSubject.create();
        Observer<T2> subjSerial = new SerializedObserver(subj);
        synchronized (ResultManager.this.guard) {
            ResultManager resultManager = ResultManager.this;
            id = resultManager.leftIds;
            resultManager.leftIds = id + 1;
            ResultManager.this.leftMap.put(Integer.valueOf(id), subjSerial);
        }
        Observable<T2> window = Observable.create(new WindowObservableFunc(subj, ResultManager.this.cancel));
        Observable<D1> duration = (Observable) OnSubscribeGroupJoin.this.leftDuration.call(args);
        Subscriber<D1> d1 = new LeftDurationObserver(id);
        ResultManager.this.group.add(d1);
        duration.unsafeSubscribe(d1);
        R result = OnSubscribeGroupJoin.this.resultSelector.call(args, window);
        synchronized (ResultManager.this.guard) {
            List<T2> rightMapValues = new ArrayList(ResultManager.this.rightMap.values());
        }
        ResultManager.this.subscriber.onNext(result);
        for (T2 t2 : rightMapValues) {
            subjSerial.onNext(t2);
        }
    } catch (Throwable t) {
        Exceptions.throwOrReport(t, this);
    }
}
项目:MixUtils    文件:RxBus.java   
/**
 * 取消监听
 */
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
    @NonNull Observable<?> observable) {
  if (null == observable) {
    return $();
  }
  List<Subject> subjects = subjectMapper.get(tag);
  if (null != subjects) {
    subjects.remove((Subject<?, ?>) observable);
    if (isEmpty(subjects)) {
      subjectMapper.remove(tag);
      LogUtils.d(tag + "  size:" + subjects.size());
    }
  }
  return $();
}
项目:RxUploader    文件:UploadManager.java   
@NonNull
public UploadManager build() {
    if (uploadService == null) {
        throw new IllegalArgumentException("Must provide a valid upload service");
    }

    if (uploadDataStore == null) {
        throw new IllegalArgumentException("Must provide a valid upload data store");
    }

    if (uploadErrorAdapter == null) {
        throw new IllegalArgumentException("Must provide a valid upload error adapter");
    }

    final Subject<Job, Job> jobSubject = PublishSubject.<Job>create().toSerialized();
    final Subject<Status, Status> statusSubject =
            PublishSubject.<Status>create().toSerialized();

    final Uploader uploader = Uploader.create(uploadService);
    final UploadInteractor uploadInteractor =
            UploadInteractorImpl.create(uploader, uploadDataStore, uploadErrorAdapter);


    return new UploadManager(uploadInteractor, uploadErrorAdapter, jobSubject,
            statusSubject, deleteRecordOnComplete);
}
项目:ice    文件:AbstractDynamicConfigSource.java   
protected void emitEvent(ConfigChangeEvent<String> event)
{
    checkNotNull(event);

    final Optional<String> oldEventValue = this.lastEmittedValues.put(event.getName(), event.getValueOpt());
    if (!event.getValueOpt().equals(oldEventValue)) {
        Subject<ConfigChangeEvent<String>, ConfigChangeEvent<String>> subject = subjectMap.get(event.getName());
        if (subject == null) {
            log.warn("Event Subject was not initialized for key {} !", event.getName());
            return;
        }
        log.trace("EMIT {} - value {}", event.getName(), event.getValueOpt());
        subject.onNext(event);
    }
    else {
        log.trace("NOT EMITTING key {} value {} - no change from previous value.", event.getName(), event.getValueOpt());
    }
}
项目:erlymon-monitor-android    文件:MoreObservables.java   
@Nonnull
public static <T> Observable.Transformer<? super T, ? extends T> behaviorRefCount() {
    return new Observable.Transformer<T, T>() {
        @Override
        public Observable<T> call(Observable<T> tObservable) {
            return new OperatorMulticast<>(tObservable, new Func0<Subject<? super T, ? extends T>>() {

                @Override
                public Subject<? super T, ? extends T> call() {
                    return BehaviorSubject.<T>create();
                }

            }).refCount();
        }
    };
}
项目:rxjava-snippets    文件:GeneralCodeSamples.java   
private void getThread(ExecutorService executor, Subject<Object, Object> subject) {
  executor.execute(() ->
  {
    for (int i = 0; i < 10; i++) {
      System.out.println("onNext " + i);
      subject.onNext(i);
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        subject.onError(e);
      }
    }
    System.out.println("completing");
    subject.onCompleted();
  });
}
项目:RxBusDemo    文件:RxBus.java   
/**
 * 取消监视,删除被监视者.
 *
 * @param event
 * @param register
 */
public void unregister(final int event, Object register) {
    String key = event + "";
    String tag = getUniqueTag(event, register);
    Subject subject = single_map.get(tag);
    List<Subject> subjects = subject_map.get(key);
    if (null != subjects) {
        if (null != subject) {
            subjects.remove(subject);
            single_map.remove(tag);
        }
        if (subjects.size() == 0) {
            subject_map.remove(key);
        }
    }
    Log.i(TAG, "unregister:" + register.getClass().getSimpleName() + "," +
            "single_map size:" + single_map.size() + ",subject_map size:" + subject_map.size());
}
项目:Elephant    文件:RxBus.java   
/**
 * 取消监听
 *
 * @param tag
 * @param observable
 * @return
 */
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
                        @NonNull Observable<?> observable) {
    if (null == observable)
        return $();
    List<Subject> subjects = subjectMapper.get(tag);
    if (null != subjects) {
        subjects.remove((Subject<?, ?>) observable);
        if (isEmpty(subjects)) {
            subjectMapper.remove(tag);
            JLog.d("unregister", tag + "  size:" + subjects.size());
        }
    }
    return $();
}
项目:recaf    文件:StreamExt.java   
public Subject<R, R> Method(SD<R> body) {
    depth++;
    body.accept(null,
            r -> { 
                depth--;
                if (depth==0) result.onCompleted(); 
            }, 
            () -> { 
                depth--;
                if (depth==0) result.onCompleted(); 
            },
            label -> { },
            label -> { },
            ex ->    { result.onError(ex);  }) ;
    return result;
}
项目:DailyNews    文件:RxBus.java   
/**
 * 取消监听
 *
 * @param tag
 * @param observable
 * @return
 */
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
                        @NonNull Observable<?> observable) {
    if (null == observable)
        return getInstance();
    List<Subject> subjects = subjectMapper.get(tag);
    if (null != subjects) {
        subjects.remove((Subject<?, ?>) observable);
        if (isEmpty(subjects)) {
            subjectMapper.remove(tag);
            LogUtils.logd("unregister"+ tag + "  size:" + subjects.size());
        }
    }
    return getInstance();
}
项目:fili    文件:MappingResponseProcessor.java   
/**
 * Get the standard error callback.
 *
 * @param responseEmitter  Channel to send the error response to
 * @param druidQuery  Query for which we got an error
 *
 * @return the standard error callback
 */
public HttpErrorCallback getStandardError(
        final Subject responseEmitter,
        final DruidAggregationQuery<?> druidQuery
) {
    return new HttpErrorCallback() {
        @Override
        public void invoke(int statusCode, String reason, String responseBody) {
            LOG.error(ErrorMessageFormat.ERROR_FROM_DRUID.logFormat(responseBody, statusCode, reason, druidQuery));
            responseEmitter.onError(new ResponseException(
                    statusCode,
                    reason,
                    responseBody,
                    druidQuery,
                    null,
                    getObjectMappers().getMapper().writer()
            ));
        }
    };
}
项目:fili    文件:MappingResponseProcessor.java   
/**
 * Get the standard failure callback.
 *
 * @param responseEmitter  Channel to send the response to
 * @param druidQuery  Query for which we got a failure
 *
 * @return the standard failure callback
 */
public FailureCallback getStandardFailure(
        final Subject responseEmitter,
        final DruidAggregationQuery<?> druidQuery
) {
    return new FailureCallback() {
        @Override
        public void invoke(Throwable error) {
            LOG.error(ErrorMessageFormat.FAILED_TO_SEND_QUERY_TO_DRUID.logFormat(druidQuery), error);
            responseEmitter.onError(new ResponseException(
                    Status.INTERNAL_SERVER_ERROR,
                    druidQuery,
                    error,
                    objectMappers.getMapper().writer()
            ));
        }
    };
}
项目:Zatuji    文件:RxBus.java   
/**
 * 取消监听
 *
 * @param tag
 * @param observable
 * @return
 */
@SuppressWarnings("rawtypes")
public RxBus unSubscribe(@NonNull Object tag,
                        @NonNull Observable<?> observable) {
    if (null == observable)
        return $();
    List<Subject> subjects = subjectMapper.get(tag);
    if (null != subjects) {
        subjects.remove((Subject<?, ?>) observable);
        if (isEmpty(subjects)) {
            subjectMapper.remove(tag);
            //LogUtils.d("unregister:"+tag + "  size:" + subjects.size());
        }
    }
    return $();
}
项目:Ydkd    文件:RxBus.java   
/**
 * 取消监听
 *
 * @param tag
 * @param observable
 * @return
 */
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
                        @NonNull Observable<?> observable) {
    if (null == observable)
        return getInstance();
    List<Subject> subjects = subjectMapper.get(tag);
    if (null != subjects) {
        subjects.remove((Subject<?, ?>) observable);
        if (isEmpty(subjects)) {
            subjectMapper.remove(tag);
            LogUtils.logd("unregister"+ tag + "  size:" + subjects.size());
        }
    }
    return getInstance();
}
项目:RxJavaFlow    文件:OperatorMulticast.java   
private OperatorMulticast(final Object guard, 
        final AtomicReference<Subject<? super T, ? extends R>> connectedSubject, 
        final List<Subscriber<? super R>> waitingForConnect, 
        Observable<? extends T> source, 
        final Supplier<? extends Subject<? super T, ? extends R>> subjectFactory) {
    super(s -> {
        synchronized (guard) {
            Subject<? super T, ? extends R> subject = connectedSubject.get();
            if (subject == null) {
                // not connected yet, so register
                waitingForConnect.add(s);
            } else {
                subject.unsafeSubscribe(s);
            }
        }
    });
    this.guard = guard;
    this.connectedSubject = connectedSubject;
    this.waitingForConnect = waitingForConnect;
    this.source = source;
    this.subjectFactory = subjectFactory;
}
项目:RxJavaFlow    文件:BlockingOperatorNextTest.java   
@Test
public void testNextWithError() {
    Subject<String, String> obs = PublishSubject.create();
    Iterator<String> it = next(obs).iterator();
    fireOnNextInNewThread(obs, "one");
    assertTrue(it.hasNext());
    assertEquals("one", it.next());

    fireOnErrorInNewThread(obs);
    try {
        it.hasNext();
        fail("Expected an TestException");
    } catch (TestException e) {
    }

    assertErrorAfterObservableFail(it);
}
项目:RxJavaFlow    文件:BlockingOperatorNextTest.java   
@Test
public void testOnErrorInNewThread() {
    Subject<String, String> obs = PublishSubject.create();
    Iterator<String> it = next(obs).iterator();

    fireOnErrorInNewThread(obs);

    try {
        it.hasNext();
        fail("Expected an TestException");
    } catch (TestException e) {
        // successful
    }

    assertErrorAfterObservableFail(it);
}
项目:RxJavaFlow    文件:BlockingOperatorNextTest.java   
@Test
public void testNextWithOnlyUsingNextMethod() {
    Subject<String, String> obs = PublishSubject.create();
    Iterator<String> it = next(obs).iterator();
    fireOnNextInNewThread(obs, "one");
    assertEquals("one", it.next());

    fireOnNextInNewThread(obs, "two");
    assertEquals("two", it.next());

    obs.onComplete();
    try {
        it.next();
        fail("At the end of an iterator should throw a NoSuchElementException");
    } catch (NoSuchElementException e) {
    }
}
项目:RxJavaFlow    文件:BlockingOperatorNextTest.java   
@Test
public void testNextWithCallingHasNextMultipleTimes() {
    Subject<String, String> obs = PublishSubject.create();
    Iterator<String> it = next(obs).iterator();
    fireOnNextInNewThread(obs, "one");
    assertTrue(it.hasNext());
    assertTrue(it.hasNext());
    assertTrue(it.hasNext());
    assertTrue(it.hasNext());
    assertEquals("one", it.next());

    obs.onComplete();
    try {
        it.next();
        fail("At the end of an iterator should throw a NoSuchElementException");
    } catch (NoSuchElementException e) {
    }
}
项目:RxJavaFlow    文件:OnSubscribeMulticastTest.java   
@Test
public void testMulticast() {
    Subject<String, String> source = PublishSubject.create();

    ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory());

    @SuppressWarnings("unchecked")
    Observer<String> observer = mock(Observer.class);
    multicasted.subscribe(observer);

    source.onNext("one");
    source.onNext("two");

    multicasted.connect();

    source.onNext("three");
    source.onNext("four");
    source.onComplete();

    verify(observer, never()).onNext("one");
    verify(observer, never()).onNext("two");
    verify(observer, times(1)).onNext("three");
    verify(observer, times(1)).onNext("four");
    verify(observer, times(1)).onComplete();

}
项目:RxJavaFlow    文件:OnSubscribeMulticastTest.java   
@Test
public void testMulticastConnectTwice() {
    Subject<String, String> source = PublishSubject.create();

    ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory());

    @SuppressWarnings("unchecked")
    Observer<String> observer = mock(Observer.class);
    multicasted.subscribe(observer);

    source.onNext("one");

    multicasted.connect();
    multicasted.connect();

    source.onNext("two");
    source.onComplete();

    verify(observer, never()).onNext("one");
    verify(observer, times(1)).onNext("two");
    verify(observer, times(1)).onComplete();

}
项目:RxJavaFlow    文件:OperatorMulticastTest.java   
@Test
public void testMulticast() {
    Subject<String, String> source = PublishSubject.create();

    ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory());

    @SuppressWarnings("unchecked")
    Observer<String> observer = mock(Observer.class);
    multicasted.subscribe(observer);

    source.onNext("one");
    source.onNext("two");

    multicasted.connect();

    source.onNext("three");
    source.onNext("four");
    source.onComplete();

    verify(observer, never()).onNext("one");
    verify(observer, never()).onNext("two");
    verify(observer, times(1)).onNext("three");
    verify(observer, times(1)).onNext("four");
    verify(observer, times(1)).onComplete();

}
项目:RxJavaFlow    文件:BlockingOperatorMostRecentTest.java   
@Test
public void testMostRecent() {
    Subject<String, String> s = PublishSubject.create();

    Iterator<String> it = mostRecent(s, "default").iterator();

    assertTrue(it.hasNext());
    assertEquals("default", it.next());
    assertEquals("default", it.next());

    s.onNext("one");
    assertTrue(it.hasNext());
    assertEquals("one", it.next());
    assertEquals("one", it.next());

    s.onNext("two");
    assertTrue(it.hasNext());
    assertEquals("two", it.next());
    assertEquals("two", it.next());

    s.onComplete();
    assertFalse(it.hasNext());

}
项目:scalecube    文件:ServiceCall.java   
/**
 * Invoke all service instances with a given request message with a given service name and method name. expected
 * headers in request: ServiceHeaders.SERVICE_REQUEST the logical name of the service. ServiceHeaders.METHOD the
 * method name to invoke. retrieves routes from router by calling router.routes and send async to each endpoint once a
 * response is returned emit the response to the observable.
 * 
 * @param request request with given headers.
 * @param duration of the response before TimeException is returned.
 * @return Observable with stream of results for each service call dispatching result.
 */
public Observable<Message> invokeAll(final Message request, final Duration duration) {
  final Subject<Message, Message> responsesSubject = PublishSubject.<Message>create().toSerialized();
  Collection<ServiceInstance> instances = router.routes(request);

  instances.forEach(instance -> {
    invoke(request, duration).whenComplete((resp, error) -> {
      if (resp != null) {
        responsesSubject.onNext(resp);
      } else {
        responsesSubject.onNext(Messages.asError(error, request.correlationId(), instance.memberId()));
      }
    });
  });
  return responsesSubject.onBackpressureBuffer().asObservable();
}
项目:jme3_skel    文件:InputMapperTest.java   
@Test
public void testIndirectUsage() {
    // 1. Define action (a biz facade points from where 'real' action)
    Subject<Float,Float> actionf0 = PublishSubject.create();

    // 2. map InputEvents --to--> actions (setup InputMapper's mappings)
    sut.mappings.clear();
    sut.map(tmplKeyInputEvent(KeyInput.KEY_0), InputMapperHelpers.isPressedAsOne, actionf0);

    // 3. map actions --to--> subscribe listener/observer
    @SuppressWarnings("unchecked")
    Observer<Float> observer = mock(Observer.class);
    actionf0.subscribe(observer);

    sut.onEvent(new KeyInputEvent(KeyInput.KEY_0, '0', true, false));
    sut.onEvent(new KeyInputEvent(KeyInput.KEY_0, '0', false, false));

    InOrder inOrder1 = inOrder(observer);
    inOrder1.verify(observer, times(1)).onNext(1.0f);
    inOrder1.verify(observer, times(1)).onNext(0.0f);
    verify(observer, Mockito.never()).onCompleted();
}
项目:couchbase-jvm-core    文件:AbstractSubdocRequest.java   
/**
 * Creates a new {@link AbstractSubdocRequest}.
 *
 * @param key           the key of the document.
 * @param path          the subdocument path to consider inside the document.
 * @param bucket        the bucket of the document.
 * @param observable    the observable which receives responses.
 * @param restOfContent the optional remainder of the {@link #content()} of the final protocol message, or null if not applicable
 * @throws NullPointerException if the path is null (see {@link #EXCEPTION_NULL_PATH})
 */
public AbstractSubdocRequest(String key, String path, String bucket,
                             Subject<CouchbaseResponse, CouchbaseResponse> observable,
                             ByteBuf... restOfContent) {
    super(key, bucket, null, null, observable);
    this.path = path;
    ByteBuf pathByteBuf;
    if (path == null || path.isEmpty()) {
        pathByteBuf = Unpooled.EMPTY_BUFFER;
    } else {
        pathByteBuf = Unpooled.wrappedBuffer(path.getBytes(CharsetUtil.UTF_8));
    }
    this.pathLength = pathByteBuf.readableBytes();
    this.content = createContent(pathByteBuf, restOfContent);

    //checking nullity here allows to release all of restOfContent through cleanUpAndThrow releasing content()
    if (this.path == null) {
        cleanUpAndThrow(EXCEPTION_NULL_PATH);
    }
}
项目:couchbase-jvm-core    文件:AbstractGenericHandler.java   
/**
 * Publishes a response with the attached observable.
 *
 * @param response the response to publish.
 * @param observable pushing into the event sink.
 */
protected void publishResponse(final CouchbaseResponse response,
    final Subject<CouchbaseResponse, CouchbaseResponse> observable) {
    if (response.status() != ResponseStatus.RETRY && observable != null) {
        if (moveResponseOut) {
            Scheduler scheduler = env().scheduler();
            if (scheduler instanceof CoreScheduler) {
                scheduleDirect((CoreScheduler) scheduler, response, observable);
            } else {
                scheduleWorker(scheduler, response, observable);
            }
        } else {
            completeResponse(response, observable);
        }
    } else {
        responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, response, observable);
    }
}
项目:couchbase-jvm-core    文件:AbstractGenericHandler.java   
/**
 * Dispatches the response on a generic scheduler through creating a worker.
 */
private static void scheduleWorker(Scheduler scheduler, final CouchbaseResponse response,
    final Subject<CouchbaseResponse, CouchbaseResponse> observable) {
    final Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(new Action0() {
        @Override
        public void call() {
            try {
                observable.onNext(response);
                observable.onCompleted();
            } catch (Exception ex) {
                LOGGER.warn("Caught exception while onNext on observable", ex);
                observable.onError(ex);
            } finally {
                worker.unsubscribe();
            }
        }
    });
}
项目:couchbase-jvm-core    文件:ResponseHandlerTest.java   
@Test
public void shouldSendProposedConfigToProvider() throws Exception {
    ClusterFacade clusterMock = mock(ClusterFacade.class);
    ConfigurationProvider providerMock = mock(ConfigurationProvider.class);
    ResponseHandler handler = new ResponseHandler(ENVIRONMENT, clusterMock, providerMock);
    ByteBuf config = Unpooled.copiedBuffer("{\"json\": true}", CharsetUtil.UTF_8);

    ResponseEvent retryEvent = new ResponseEvent();
    retryEvent.setMessage(new InsertResponse(ResponseStatus.RETRY, KeyValueStatus.ERR_TEMP_FAIL.code(),
            0, "bucket", config, null, mock(InsertRequest.class)));
    retryEvent.setObservable(mock(Subject.class));
    handler.onEvent(retryEvent, 1, true);

    verify(providerMock, times(1)).proposeBucketConfig("bucket", "{\"json\": true}");
    assertEquals(0, config.refCnt());
    assertNull(retryEvent.getMessage());
    assertNull(retryEvent.getObservable());
}
项目:couchbase-jvm-core    文件:ResponseHandlerTest.java   
@Test
public void shouldIgnoreInvalidConfig() throws Exception {
    ClusterFacade clusterMock = mock(ClusterFacade.class);
    ConfigurationProvider providerMock = mock(ConfigurationProvider.class);
    ResponseHandler handler = new ResponseHandler(ENVIRONMENT, clusterMock, providerMock);
    ByteBuf config = Unpooled.copiedBuffer("Not my Vbucket", CharsetUtil.UTF_8);

    ResponseEvent retryEvent = new ResponseEvent();
    retryEvent.setMessage(new InsertResponse(ResponseStatus.RETRY, KeyValueStatus.ERR_TEMP_FAIL.code(),
            0, "bucket", config, null, mock(InsertRequest.class)));
    retryEvent.setObservable(mock(Subject.class));
    handler.onEvent(retryEvent, 1, true);

    verify(providerMock, never()).proposeBucketConfig("bucket", "Not my Vbucket");
    assertEquals(0, config.refCnt());
    assertNull(retryEvent.getMessage());
    assertNull(retryEvent.getObservable());
}
项目:couchbase-jvm-core    文件:ViewLocatorTest.java   
@Test
public void shouldFailWhenUsedAgainstMemcacheBucket() {
    Locator locator = new ViewLocator(0);

    ClusterConfig config = mock(ClusterConfig.class);
    when(config.bucketConfig("default")).thenReturn(mock(MemcachedBucketConfig.class));

    CouchbaseRequest request = mock(ViewQueryRequest.class);
    Subject<CouchbaseResponse, CouchbaseResponse> response = AsyncSubject.create();
    when(request.bucket()).thenReturn("default");
    when(request.observable()).thenReturn(response);

    TestSubscriber<CouchbaseResponse> subscriber = new TestSubscriber<CouchbaseResponse>();
    response.subscribe(subscriber);

    locator.locateAndDispatch(request, Collections.<Node>emptyList(), config, null, null);

    subscriber.awaitTerminalEvent(1, TimeUnit.SECONDS);
    List<Throwable> errors = subscriber.getOnErrorEvents();
    assertEquals(1, errors.size());
    assertTrue(errors.get(0) instanceof ServiceNotAvailableException);
}
项目:ReactiveLab    文件:HystrixMetricsStreamHandler.java   
private Observable<Void> handleHystrixRequest(final HttpServerResponse<O> response) {
    writeHeaders(response);

    final Subject<Void, Void> subject = PublishSubject.create();
    final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
    Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long tick) {
                    if (!response.getChannel().isOpen()) {
                        subscription.unsubscribe();
                        return;
                    }
                    try {
                        writeMetric(JsonMapper.toJson(metrics), response);
                    } catch (Exception e) {
                        subject.onError(e);
                    }
                }
            });
    subscription.set(actionSubscription);
    return subject;
}
项目:org.openntf.domino    文件:OperatorMulticast.java   
private OperatorMulticast(final Object guard, final AtomicReference<Subject<? super T, ? extends R>> connectedSubject, final List<Subscriber<? super R>> waitingForConnect, Observable<? extends T> source, final Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
    super(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> subscriber) {
            synchronized (guard) {
                if (connectedSubject.get() == null) {
                    // not connected yet, so register
                    waitingForConnect.add(subscriber);
                } else {
                    // we are already connected so subscribe directly
                    connectedSubject.get().unsafeSubscribe(subscriber);
                }
            }
        }
    });
    this.guard = guard;
    this.connectedSubject = connectedSubject;
    this.waitingForConnect = waitingForConnect;
    this.source = source;
    this.subjectFactory = subjectFactory;
}
项目:GitHub    文件:RxBus.java   
/**
 * 注册事件源
 *
 * @param tag
 * @return
 */
@SuppressWarnings({"rawtypes"})
public <T> Observable<T> register(@NonNull Object tag) {
    List<Subject> subjectList = subjectMapper.get(tag);
    if (null == subjectList) {
        subjectList = new ArrayList<Subject>();
        subjectMapper.put(tag, subjectList);
    }
    Subject<T, T> subject;
    subjectList.add(subject = PublishSubject.create());
    LogUtil.d("register", tag + "  size:" + subjectList.size());
    return subject;
}
项目:GitHub    文件:RxBus.java   
@SuppressWarnings("rawtypes")
public void unregister(@NonNull Object tag) {
    List<Subject> subjects = subjectMapper.get(tag);
    if (null != subjects) {
        subjectMapper.remove(tag);
    }
}
项目:GitHub    文件:RxBus.java   
/**
 * 触发事件
 *
 * @param content
 */
@SuppressWarnings({"unchecked", "rawtypes"})
public void post(@NonNull Object tag, @NonNull Object content) {
    LogUtil.d("post", "eventName: " + tag);
    List<Subject> subjectList = subjectMapper.get(tag);
    if (!isEmpty(subjectList)) {
        for (Subject subject : subjectList) {
            subject.onNext(content);
            LogUtil.d("onEvent", "eventName: " + tag);
        }
    }
}
项目:MyFire    文件:RxBus.java   
/**
 * 注册事件源
 *
 * @param tag
 * @return
 */
@SuppressWarnings({"rawtypes"})
public <T> Observable<T> register(@NonNull Object tag) {
    List<Subject> subjectList = subjectMapper.get(tag);
    if (null == subjectList) {
        subjectList = new ArrayList<Subject>();
        subjectMapper.put(tag, subjectList);
    }
    Subject<T, T> subject;
    subjectList.add(subject = PublishSubject.create());
    LogUtils.logd("register"+tag + "  size:" + subjectList.size());
    return subject;
}
项目:MyFire    文件:RxBus.java   
@SuppressWarnings("rawtypes")
public void unregister(@NonNull Object tag) {
    List<Subject> subjects = subjectMapper.get(tag);
    if (null != subjects) {
        subjectMapper.remove(tag);
    }
}