@Test public void statusExceptionTriggersHandler() { ClientResponseObserver<Object, Object> delegate = mock(ClientResponseObserver.class); final AtomicBoolean called = new AtomicBoolean(false); CancellableStreamObserver<Object, Object> observer = new CancellableStreamObserver<Object, Object>(delegate, new Runnable() { @Override public void run() { called.set(true); } }); observer.onError(Status.CANCELLED.asException()); assertThat(called.get()).isTrue(); }
@Test public void statusRuntimeExceptionTriggersHandler() { ClientResponseObserver<Object, Object> delegate = mock(ClientResponseObserver.class); final AtomicBoolean called = new AtomicBoolean(false); CancellableStreamObserver<Object, Object> observer = new CancellableStreamObserver<Object, Object>(delegate, new Runnable() { @Override public void run() { called.set(true); } }); observer.onError(Status.CANCELLED.asRuntimeException()); assertThat(called.get()).isTrue(); }
@Test public void otherExceptionDoesNotTriggersHandler() { ClientResponseObserver<Object, Object> delegate = mock(ClientResponseObserver.class); final AtomicBoolean called = new AtomicBoolean(false); CancellableStreamObserver<Object, Object> observer = new CancellableStreamObserver<Object, Object>(delegate, new Runnable() { @Override public void run() { called.set(true); } }); observer.onError(Status.INTERNAL.asRuntimeException()); assertThat(called.get()).isFalse(); }
@Test public void testCallsAreForwardedAndOnReadyHandlerBound() { @SuppressWarnings("unchecked") StreamObserver<Object> delegateObserver = mock(StreamObserver.class); @SuppressWarnings("unchecked") ClientCallStreamObserver<Object> callStreamObserver = mock(ClientCallStreamObserver.class); Runnable onReadyHandler = new Runnable() { @Override public void run() { } }; ClientResponseObserver<Object, Object> observer = new ForwardingClientResponseObserver<>(delegateObserver, onReadyHandler); observer.onNext("A"); verify(delegateObserver).onNext("A"); Throwable t = new RuntimeException(); observer.onError(t); verify(delegateObserver).onError(t); observer.onCompleted(); verify(delegateObserver).onCompleted(); observer.beforeStart(callStreamObserver); verify(callStreamObserver).setOnReadyHandler(onReadyHandler); verifyNoMoreInteractions(delegateObserver, callStreamObserver); }
StreamObserverToCallListenerAdapter(StreamObserver<Response> observer, CallToStreamObserverAdapter<Request, Response> adapter, boolean streamingResponse) { this.observer = observer; this.streamingResponse = streamingResponse; this.adapter = adapter; if (observer instanceof ClientResponseObserver) { @SuppressWarnings("unchecked") ClientResponseObserver<Request, Response> clientResponseObserver = (ClientResponseObserver<Request, Response>) observer; clientResponseObserver.beforeStart(adapter); } adapter.freeze(); }
public CancellableStreamObserver(ClientResponseObserver<TRequest, TResponse> delegate, Runnable onCanceledHandler) { this.delegate = delegate; this.onCanceledHandler = onCanceledHandler; }