Java 类io.grpc.stub.ServerCallStreamObserver 实例源码

项目:reactive-grpc    文件:ServerCalls.java   
/**
 * Implements a unary -> stream call as {@link Single} -> {@link Flowable}, where the server responds with a
 * stream of messages.
 */
public static <TRequest, TResponse> void oneToMany(
        TRequest request, StreamObserver<TResponse> responseObserver,
        Function<Single<TRequest>, Flowable<TResponse>> delegate) {
    try {
        Single<TRequest> rxRequest = Single.just(request);

        Flowable<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
        rxResponse.subscribe(new ReactivePublisherBackpressureOnReadyHandler<TResponse>(
                (ServerCallStreamObserver<TResponse>) responseObserver));
    } catch (Throwable throwable) {
        responseObserver.onError(prepareError(throwable));
    }
}
项目:reactive-grpc    文件:ReactivePublisherBackpressureOnReadyHandler.java   
public ReactivePublisherBackpressureOnReadyHandler(ServerCallStreamObserver<T> requestStream) {
    this.requestStream = Preconditions.checkNotNull(requestStream);
    requestStream.setOnReadyHandler(this);
    requestStream.setOnCancelHandler(new Runnable() {
        @Override
        public void run() {
            subscription.cancel();
        }
    });
}
项目:reactive-grpc    文件:ServerCalls.java   
/**
 * Implements a unary -> stream call as {@link Mono} -> {@link Flux}, where the server responds with a
 * stream of messages.
 */
public static <TRequest, TResponse> void oneToMany(
        TRequest request, StreamObserver<TResponse> responseObserver,
        Function<Mono<TRequest>, Flux<TResponse>> delegate) {
    try {
        Mono<TRequest> rxRequest = Mono.just(request);

        Flux<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
        rxResponse.subscribe(new ReactivePublisherBackpressureOnReadyHandler<>(
                (ServerCallStreamObserver<TResponse>) responseObserver));
    } catch (Throwable throwable) {
        responseObserver.onError(prepareError(throwable));
    }
}
项目:factcast    文件:BlockingStreamObserver.java   
BlockingStreamObserver(String id, ServerCallStreamObserver<T> delegate) {
    this.id = id;
    this.delegate = delegate;
    this.delegate.setOnReadyHandler(this::wakeup);
    this.delegate.setOnCancelHandler(this::wakeup);
    delegate.setCompression("gzip");
    delegate.setMessageCompression(true);
}
项目:factcast    文件:FactStoreGrpcService.java   
@Override
public void subscribe(@NonNull MSG_SubscriptionRequest request,
        @NonNull StreamObserver<MSG_Notification> responseObserver) {
    SubscriptionRequestTO req = converter.fromProto(request);
    resetDebugInfo(req);

    BlockingStreamObserver<MSG_Notification> resp = new BlockingStreamObserver<>(req.toString(),
            (ServerCallStreamObserver) responseObserver);

    final boolean idOnly = req.idOnly();

    store.subscribe(req, new GrpcObserverAdapter(req.toString(), resp, f -> idOnly ? converter
            .createNotificationFor(f.id()) : converter.createNotificationFor(f)));
}
项目:factcast    文件:FactStoreGrpcService0Test.java   
@Test
public void testFetchById() throws Exception {
    UUID id = UUID.randomUUID();
    uut.fetchById(protoConverter.toProto(id), mock(ServerCallStreamObserver.class));

    verify(backend).fetchById(eq(id));
}
项目:factcast    文件:FactStoreGrpcService0Test.java   
@Test
public void testSubscribeFacts() throws Exception {
    SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.forMark()).fromNowOn();
    when(backend.subscribe(this.reqCaptor.capture(), any())).thenReturn(null);
    uut.subscribe(new ProtoConverter().toProto(SubscriptionRequestTO.forFacts(req)), mock(
            ServerCallStreamObserver.class));

    verify(backend).subscribe(any(), any());
    assertFalse(reqCaptor.getValue().idOnly());

}
项目:factcast    文件:FactStoreGrpcService0Test.java   
@Test
public void testSubscribeIds() throws Exception {
    SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.forMark()).fromNowOn();
    when(backend.subscribe(this.reqCaptor.capture(), any())).thenReturn(null);
    uut.subscribe(new ProtoConverter().toProto(SubscriptionRequestTO.forIds(req)), mock(
            ServerCallStreamObserver.class));

    verify(backend).subscribe(any(), any());
    assertTrue(reqCaptor.getValue().idOnly());

}
项目:grpc    文件:Backend.java   
@Override
public void watch(Request req, StreamObserver<Result> responseObserver) {
    logger.info("Start watching: " + req.getQuery());

    int responseNo = 0;
    final ServerCallStreamObserver responseObserver2 = (ServerCallStreamObserver) responseObserver;
    while (!responseObserver2.isCancelled()) {
        sleepUpToMiilis(1000);
        responseObserver.onNext(Result.newBuilder().setTitle(format("result %d for [%s] from backend %d", responseNo++, req.getQuery(), id)).build());
    }

    responseObserver2.setOnCancelHandler(() -> logger.warning("Request canceled!"));
}
项目:armeria    文件:GrpcServiceServerTest.java   
@Override
public void staticUnaryCallSetsMessageCompression(SimpleRequest request,
                                                  StreamObserver<SimpleResponse> responseObserver) {
    if (!request.equals(REQUEST_MESSAGE)) {
        responseObserver.onError(new IllegalArgumentException("Unexpected request: " + request));
        return;
    }
    ServerCallStreamObserver<SimpleResponse> callObserver =
            (ServerCallStreamObserver<SimpleResponse>) responseObserver;
    callObserver.setCompression("gzip");
    callObserver.setMessageCompression(true);
    responseObserver.onNext(RESPONSE_MESSAGE);
    responseObserver.onCompleted();
}
项目:bazel    文件:GrpcServerImpl.java   
@VisibleForTesting
GrpcSink(
    final String rpcCommandName,
    ServerCallStreamObserver<RunResponse> observer,
    ExecutorService executor) {
  // This queue is intentionally unbounded: we always act on it fairly quickly so filling up
  // RAM is not a concern but we don't want to block in the gRPC cancel/onready handlers.
  this.actionQueue = new LinkedBlockingQueue<>();
  this.exchanger = new Exchanger<>();
  this.observer = observer;
  this.observer.setOnCancelHandler(
      () -> {
        Thread commandThread = GrpcSink.this.commandThread.get();
        if (commandThread != null) {
          logger.info(
              String.format(
                  "Interrupting thread %s due to the streaming %s call being cancelled "
                      + "(likely client hang up or explicit gRPC-level cancellation)",
                  commandThread.getName(), rpcCommandName));
          commandThread.interrupt();
        }

        actionQueue.offer(SinkThreadAction.DISCONNECT);
      });
  this.observer.setOnReadyHandler(() -> actionQueue.offer(SinkThreadAction.READY));
  this.future = executor.submit(GrpcSink.this::call);
}
项目:grpc-java    文件:ProtoReflectionService.java   
@Override
public StreamObserver<ServerReflectionRequest> serverReflectionInfo(
    final StreamObserver<ServerReflectionResponse> responseObserver) {
  final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver =
      (ServerCallStreamObserver<ServerReflectionResponse>) responseObserver;
  ProtoReflectionStreamObserver requestObserver =
      new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver);
  serverCallStreamObserver.setOnReadyHandler(requestObserver);
  serverCallStreamObserver.disableAutoInboundFlowControl();
  serverCallStreamObserver.request(1);
  return requestObserver;
}
项目:grpc-java    文件:AsyncServer.java   
@Override
public StreamObserver<Messages.SimpleRequest> streamingCall(
    final StreamObserver<Messages.SimpleResponse> observer) {
  final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
      (ServerCallStreamObserver<Messages.SimpleResponse>) observer;
  // TODO(spencerfang): flow control to stop reading when !responseObserver.isReady
  return new StreamObserver<Messages.SimpleRequest>() {
    @Override
    public void onNext(Messages.SimpleRequest value) {
      if (shutdown.get()) {
        responseObserver.onCompleted();
        return;
      }
      responseObserver.onNext(Utils.makeResponse(value));
    }

    @Override
    public void onError(Throwable t) {
      // other side closed with non OK
      responseObserver.onError(t);
    }

    @Override
    public void onCompleted() {
      // other side closed with OK
      responseObserver.onCompleted();
    }
  };
}
项目:grpc-java    文件:AsyncServer.java   
@Override
public void streamingFromServer(
    final Messages.SimpleRequest request,
    final StreamObserver<Messages.SimpleResponse> observer) {
  // send forever, until the client cancels or we shut down
  final Messages.SimpleResponse response = Utils.makeResponse(request);
  final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
      (ServerCallStreamObserver<Messages.SimpleResponse>) observer;
  // If the client cancels, copyWithFlowControl takes care of calling
  // responseObserver.onCompleted() for us
  StreamObservers.copyWithFlowControl(
      new Iterator<Messages.SimpleResponse>() {
        @Override
        public boolean hasNext() {
          return !shutdown.get() && !responseObserver.isCancelled();
        }

        @Override
        public Messages.SimpleResponse next() {
          return response;
        }

        @Override
        public void remove() {
          throw new UnsupportedOperationException();
        }
      },
      responseObserver);
}
项目:reactive-grpc    文件:ReactiveStreamObserverPublisher.java   
@Override
public void subscribe(Subscriber<? super T> subscriber) {
    Preconditions.checkNotNull(subscriber);
    subscriber.onSubscribe(new Subscription() {
        private static final int MAX_REQUEST_RETRIES = 20;

        @Override
        public void request(long l) {
            // Some Reactive Streams implementations use Long.MAX_VALUE to indicate "all messages"; gRPC uses Integer.MAX_VALUE.
            int i = (int) Math.min(l, Integer.MAX_VALUE);

            // Very rarely, request() gets called before the client has finished setting up its stream. If this
            // happens, wait momentarily and try again.
            for (int j = 0; j < MAX_REQUEST_RETRIES; j++) {
                try {
                    callStreamObserver.request(i);
                    break;
                } catch (IllegalStateException ex) {
                    if (j == MAX_REQUEST_RETRIES - 1) {
                        throw ex;
                    }
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        // no-op
                    }
                }
            }
        }

        @Override
        public void cancel() {
            // Don't cancel twice if the server is already canceled
            if (callStreamObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver) callStreamObserver).isCancelled()) {
                return;
            }

            isCanceled = true;
            if (callStreamObserver instanceof ClientCallStreamObserver) {
                ((ClientCallStreamObserver) callStreamObserver).cancel("Client canceled request", null);
            } else {
                callStreamObserver.onError(Status.CANCELLED.withDescription("Server canceled request").asRuntimeException());
            }
        }
    });
    this.subscriber = subscriber;

    subscribed.countDown();
}
项目:armeria    文件:TestServiceImpl.java   
/**
 * Immediately responds with a payload of the type and size specified in the request.
 */
@Override
public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
    ServerCallStreamObserver<SimpleResponse> obs =
            (ServerCallStreamObserver<SimpleResponse>) responseObserver;
    SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder();
    try {
        switch (req.getResponseCompression()) {
            case DEFLATE:
                // fallthrough, just use gzip
            case GZIP:
                obs.setCompression("gzip");
                break;
            case NONE:
                obs.setCompression("identity");
                break;
            case UNRECOGNIZED:
                // fallthrough
            default:
                obs.onError(Status.INVALID_ARGUMENT
                                    .withDescription("Unknown: " + req.getResponseCompression())
                                    .asRuntimeException());
                return;
        }
    } catch (IllegalArgumentException e) {
        obs.onError(Status.UNIMPLEMENTED
                            .withDescription("compression not supported.")
                            .withCause(e)
                            .asRuntimeException());
        return;
    }

    if (req.getResponseSize() != 0) {
        boolean compressable = compressableResponse(req.getResponseType());
        ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
        // For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
        // TODO(wonderfly): whether or not this is a good approach needs further discussion.
        int offset = random.nextInt(
                compressable ? compressableBuffer.size() : uncompressableBuffer.size());
        ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize());
        responseBuilder.getPayloadBuilder()
                       .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
                       .setBody(payload);
    }

    if (req.hasResponseStatus()) {
        obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode())
                          .withDescription(req.getResponseStatus().getMessage())
                          .asRuntimeException());
        return;
    }

    responseObserver.onNext(responseBuilder.build());
    responseObserver.onCompleted();
}
项目:grpc-java    文件:TestServiceImpl.java   
/**
 * Immediately responds with a payload of the type and size specified in the request.
 */
@Override
public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
  ServerCallStreamObserver<SimpleResponse> obs =
      (ServerCallStreamObserver<SimpleResponse>) responseObserver;
  SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder();
  try {
    if (req.hasResponseCompressed() && req.getResponseCompressed().getValue()) {
      obs.setCompression("gzip");
    } else {
      obs.setCompression("identity");
    }
  } catch (IllegalArgumentException e) {
    obs.onError(Status.UNIMPLEMENTED
        .withDescription("compression not supported.")
        .withCause(e)
        .asRuntimeException());
    return;
  }

  if (req.getResponseSize() != 0) {
    boolean compressable = compressableResponse(req.getResponseType());
    ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
    // For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
    // TODO(wonderfly): whether or not this is a good approach needs further discussion.
    int offset = random.nextInt(
        compressable ? compressableBuffer.size() : uncompressableBuffer.size());
    ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize());
    responseBuilder.getPayloadBuilder()
        .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
        .setBody(payload);
  }

  if (req.hasResponseStatus()) {
    obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode())
        .withDescription(req.getResponseStatus().getMessage())
        .asRuntimeException());
    return;
  }

  responseObserver.onNext(responseBuilder.build());
  responseObserver.onCompleted();
}
项目:grpc-java    文件:CascadingTest.java   
/**
 * Create a chain of client to server calls which can be cancelled top down.
 *
 * @return a Future that completes when call chain is created
 */
private Future<?> startChainingServer(final int depthThreshold) throws IOException {
  final AtomicInteger serversReady = new AtomicInteger();
  final SettableFuture<Void> chainReady = SettableFuture.create();
  class ChainingService extends TestServiceGrpc.TestServiceImplBase {
    @Override
    public void unaryCall(final SimpleRequest request,
        final StreamObserver<SimpleResponse> responseObserver) {
      ((ServerCallStreamObserver) responseObserver).setOnCancelHandler(new Runnable() {
        @Override
        public void run() {
          receivedCancellations.countDown();
        }
      });
      if (serversReady.incrementAndGet() == depthThreshold) {
        // Stop recursion
        chainReady.set(null);
        return;
      }

      Context.currentContextExecutor(otherWork).execute(new Runnable() {
        @Override
        public void run() {
          try {
            blockingStub.unaryCall(request);
          } catch (StatusRuntimeException e) {
            Status status = e.getStatus();
            if (status.getCode() == Status.Code.CANCELLED) {
              observedCancellations.countDown();
            } else {
              responseObserver.onError(e);
            }
          }
        }
      });
    }
  }

  server = InProcessServerBuilder.forName("channel").executor(otherWork)
      .addService(new ChainingService())
      .build().start();
  return chainReady;
}
项目:grpc-java    文件:ProtoReflectionService.java   
ProtoReflectionStreamObserver(
    ServerReflectionIndex serverReflectionIndex,
    ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver) {
  this.serverReflectionIndex = serverReflectionIndex;
  this.serverCallStreamObserver = checkNotNull(serverCallStreamObserver, "observer");
}
项目:grpc-java    文件:AsyncServer.java   
@Override
public StreamObserver<Messages.SimpleRequest> streamingBothWays(
    final StreamObserver<Messages.SimpleResponse> observer) {
  // receive data forever and send data forever until client cancels or we shut down.
  final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
      (ServerCallStreamObserver<Messages.SimpleResponse>) observer;
  // If the client cancels, copyWithFlowControl takes care of calling
  // responseObserver.onCompleted() for us
  StreamObservers.copyWithFlowControl(
      new Iterator<Messages.SimpleResponse>() {
        @Override
        public boolean hasNext() {
          return !shutdown.get() && !responseObserver.isCancelled();
        }

        @Override
        public Messages.SimpleResponse next() {
          return BIDI_RESPONSE;
        }

        @Override
        public void remove() {
          throw new UnsupportedOperationException();
        }
      },
      responseObserver
  );

  return new StreamObserver<Messages.SimpleRequest>() {
    @Override
    public void onNext(final Messages.SimpleRequest request) {
      // noop
    }

    @Override
    public void onError(Throwable t) {
      // other side cancelled
    }

    @Override
    public void onCompleted() {
      // Should never happen, because clients should cancel this call in order to stop
      // the operation. Also because copyWithFlowControl hogs the inbound network thread
      // via the handler for onReady, we would never expect this callback to be able to
      // run anyways.
      log.severe("clients should CANCEL the call to stop bidi streaming");
    }
  };
}