@Override public StreamObserver<Message> invoke(StreamObserver<Message> responseObserver) { try { this.remote = RpcContext.getContext().getAttachment(Constants.REMOTE_ADDRESS); Class<?> requestType = grpcMethodType.requestType(); PoJo2ProtoStreamObserver servserResponseObserver = PoJo2ProtoStreamObserver.newObserverWrap(responseObserver); Object result = method.invoke(serviceToInvoke, servserResponseObserver); return Proto2PoJoStreamObserver.newObserverWrap((StreamObserver<Object>) result, requestType); } catch (Throwable e) { String stackTrace = ThrowableUtil.stackTraceToString(e); log.error(e.getMessage(), e); StatusRuntimeException statusException = Status.UNAVAILABLE.withDescription(stackTrace).asRuntimeException(); responseObserver.onError(statusException); } finally { log.debug(String.format("Service: %s Method: %s RemoteAddress: %s", providerUrl.getServiceInterface(), method.getName(), this.remote)); } return null; }
private void streamCall(Message request, StreamObserver<Message> responseObserver) { try { Class<?> requestType = grpcMethodType.requestType(); Object reqPojo = SerializerUtil.protobuf2Pojo(request, requestType); Object[] requestParams = new Object[] {reqPojo, PoJo2ProtoStreamObserver.newObserverWrap(responseObserver)}; method.invoke(serviceToInvoke, requestParams); } catch (Throwable e) { String stackTrace = ThrowableUtil.stackTraceToString(e); log.error(e.getMessage(), e); StatusRuntimeException statusException = Status.UNAVAILABLE.withDescription(stackTrace).asRuntimeException(); responseObserver.onError(statusException); } finally { log.debug(String.format("Service: %s Method: %s RemoteAddress: %s", providerUrl.getServiceInterface(), method.getName(), this.remote)); } }
@Override public void onNext(Object value) { try { Object respPojo = value; Message respProtoBufer = SerializerUtil.pojo2Protobuf(respPojo); streamObserver.onNext(respProtoBufer); } catch (ProtobufException e) { String stackTrace = ThrowableUtil.stackTraceToString(e); StatusRuntimeException statusException = Status.UNAVAILABLE.withDescription(stackTrace).asRuntimeException(); streamObserver.onError(statusException); } }
@Override public void onNext(Message value) { try { Object respPoJo = SerializerUtil.protobuf2Pojo(value, poJoType); streamObserver.onNext(respPoJo); } catch (ProtobufException e) { String stackTrace = ThrowableUtil.stackTraceToString(e); StatusRuntimeException statusException = Status.UNAVAILABLE.withDescription(stackTrace).asRuntimeException(); streamObserver.onError(statusException); } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if (ctx.channel().isActive()) { // Create the response status error. Response.Status status = cause instanceof ServerException ? ((ServerException) cause).getResponseStatus() : INTERNAL_SERVER_ERROR; // Create the response error body. String newLine = "\r\n"; StringJoiner content = new StringJoiner(newLine); content.add("statusCode: " + status.getStatusCode()); content.add("statusMessage: " + status.getReasonPhrase()); content.add("statusFamily: " + status.getFamily()); if (cause.getMessage() != null) { content.add("errorMessage: " + cause.getMessage()); } if (cause.getCause() != null) { content.add("detailErrorMessage: " + cause.getCause().getMessage()); } content.add("stackTraceMessage: " + ThrowableUtil.stackTraceToString(cause)); // Write the response error. write(ctx, Response .status(status) .content(content.toString()) .type(MediaType.TEXT_UTF8) .build(), false); } ctx.close(); }