protected void send(Request request, String target) { DeliveryOptions options = new DeliveryOptions().setSendTimeout(request.timeout()); core.bus().send(target, request.data(), options, send -> { if (send.succeeded()) { request.write(send.result().body()); } else { Throwable exception = send.cause(); if (exception instanceof ReplyException) { ReplyFailure status = ((ReplyException) exception).failureType(); exceptionHandlers.get(status).accept(request); } else { request.error(send.cause()); } } }); }
@Test public void testGetUndefinedApiEventbus(TestContext testContext) { Async async = testContext.async(); JsonObject event = new JsonObject().put("name", Randoms.randomAlphabet(10)).put("namespace", namespace); vertx.eventBus().<JsonObject>send("api.get", event, ar-> { if (ar.succeeded()) { testContext.fail(); } else { testContext.assertTrue(ar.cause() instanceof ReplyException); ReplyException ex = (ReplyException) ar.cause(); testContext.assertEquals(DefaultErrorCode.RESOURCE_NOT_FOUND.getNumber(), ex.failureCode()); async.complete(); } }); }
@Test public void testMissNameShouldThrowValidationException(TestContext testContext) { AtomicBoolean check = new AtomicBoolean(); vertx.eventBus().<JsonObject>send("api.delete", new JsonObject(), ar -> { if (ar.succeeded()) { testContext.fail(); } else { ar.cause().printStackTrace(); testContext.assertTrue(ar.cause() instanceof ReplyException); testContext.assertEquals(DefaultErrorCode.INVALID_ARGS.getNumber(), ReplyException.class.cast(ar.cause()).failureCode()); check.set(true); } }); Awaitility.await().until(() -> check.get()); }
@Test public void testGetApiByUndefinedNameShouldFailed(TestContext testContext) { JsonObject jsonObject = new JsonObject() .put("namespace", namespace) .put("name", UUID.randomUUID().toString()); AtomicBoolean check1 = new AtomicBoolean(); vertx.eventBus().<JsonObject>send("api.get", jsonObject, ar -> { if (ar.succeeded()) { testContext.fail(); } else { ar.cause().printStackTrace(); testContext.assertTrue(ar.cause() instanceof ReplyException); testContext.assertEquals(DefaultErrorCode.RESOURCE_NOT_FOUND.getNumber(), ReplyException.class.cast(ar.cause()).failureCode()); check1.set(true); } }); Awaitility.await().until(() -> check1.get()); }
@Test public void testMissNameShouldThrowValidationException(TestContext testContext) { AtomicBoolean check = new AtomicBoolean(); vertx.eventBus().<JsonObject>send("api.list", new JsonObject(), ar -> { if (ar.succeeded()) { testContext.fail(); } else { ar.cause().printStackTrace(); testContext.assertTrue(ar.cause() instanceof ReplyException); testContext.assertEquals(DefaultErrorCode.INVALID_ARGS.getNumber(), ReplyException.class.cast(ar.cause()).failureCode()); check.set(true); } }); Awaitility.await().until(() -> check.get()); }
private static JsonObject replyJson(ReplyException ex) { JsonObject jsonObject = new JsonObject(); DefaultErrorCode errorCode = DefaultErrorCode.getCode(ex.failureCode()); if (errorCode != null) { jsonObject.put("code", errorCode.getNumber()) .put("message", errorCode.getMessage()); } else { ReplyFailure replyFailure = ex.failureType(); if (replyFailure == ReplyFailure.NO_HANDLERS) { jsonObject.put("code", DefaultErrorCode.SERVICE_UNAVAILABLE.getNumber()) .put("message", DefaultErrorCode.SERVICE_UNAVAILABLE.getMessage()); } else if (replyFailure == ReplyFailure.TIMEOUT) { jsonObject.put("code", DefaultErrorCode.TIME_OUT.getNumber()) .put("message", DefaultErrorCode.TIME_OUT.getMessage()); } else if (replyFailure == ReplyFailure.RECIPIENT_FAILURE) { jsonObject.put("code", ex.failureCode()) .put("message", ex.getMessage()); } else { jsonObject.put("code", DefaultErrorCode.UNKOWN.getNumber()) .put("message", DefaultErrorCode.UNKOWN.getMessage()); } } return jsonObject; }
private void failed(Future<RpcResponse> completed, Throwable throwable) { if (throwable instanceof ReplyException) { ReplyException ex = (ReplyException) throwable; if (ex.failureType() == ReplyFailure.NO_HANDLERS) { SystemException resourceNotFoundEx = SystemException.create(DefaultErrorCode.SERVICE_UNAVAILABLE) .set("details", "No handlers"); completed.fail(resourceNotFoundEx); } else { ErrorCode errorCode = CustomErrorCode.create(ex.failureCode(), ex.getMessage(), 400); SystemException systemException = SystemException.create(errorCode); completed.fail(systemException); } } else { completed.fail(throwable); } }
@Override public Collection<KeyValueAnnotation> responseAnnotations() { final Collection<KeyValueAnnotation> annotations = new ArrayList<>(4); final HttpServerResponse resp = context.response(); annotations.add(KeyValueAnnotation.create(TraceKeys.HTTP_STATUS_CODE, String.valueOf(resp.getStatusCode()))); if (context.failed()) { if (context.failure() instanceof ReplyException) { final ReplyException err = (ReplyException) context.failure(); annotations.add(KeyValueAnnotation.create(VertxKeys.ReplyFailureType.getName(), err.failureType().name())); annotations.add(KeyValueAnnotation.create(VertxKeys.ReplyFailureCode.getName(), Integer.valueOf(err.failureCode()).toString())); } final String msg = context.failure().getMessage(); annotations.add(KeyValueAnnotation.create(VertxKeys.ReplyFailureMessage.getName(), msg != null ? msg : "")); } return annotations; }
/** * Generate the json error response for a failed request * @param throwable the cause of the error * @return the json string */ protected static String errorResponse(Throwable throwable) { String msg = throwableToMessage(throwable, ""); try { return new JsonObject(msg).toString(); } catch (Exception e) { if (throwable instanceof ReplyException) { return ServerAPIException.toJson(ServerAPIException.GENERIC_ERROR, msg) .toString(); } if (throwable instanceof HttpException) { return ServerAPIException.toJson(ServerAPIException.HTTP_ERROR, msg) .toString(); } return ServerAPIException.toJson(ServerAPIException.GENERIC_ERROR, msg) .toString(); } }
@Override public void delete(String search, String path, Handler<AsyncResult<Void>> handler) { get(search, path, ar -> { if (ar.failed()) { Throwable cause = ar.cause(); if (cause instanceof ReplyException) { // Cast to get access to the failure code ReplyException ex = (ReplyException)cause; if (ex.failureCode() == 404) { handler.handle(Future.succeededFuture()); return; } } handler.handle(Future.failedFuture(ar.cause())); } else { StoreCursor cursor = ar.result(); Queue<String> paths = new ArrayDeque<>(); AtomicLong remaining = new AtomicLong(cursor.getInfo().getTotalHits()); doDelete(cursor, paths, remaining, handler); } }); }
/** * Register the proxy handle on the event bus. * * @param eventBus the event bus * @param address the proxy address */ public MessageConsumer<JsonObject> register(EventBus eventBus, String address, List<Function<Message<JsonObject>, Future<Message<JsonObject>>>> interceptors) { Handler<Message<JsonObject>> handler = this::handle; if (interceptors != null) { for (Function<Message<JsonObject>, Future<Message<JsonObject>>> interceptor : interceptors) { Handler<Message<JsonObject>> prev = handler; handler = msg -> { Future<Message<JsonObject>> fut = interceptor.apply(msg); fut.setHandler(ar -> { if (ar.succeeded()) { prev.handle(msg); } else { ReplyException exception = (ReplyException) ar.cause(); msg.fail(exception.failureCode(), exception.getMessage()); } }); }; } } consumer = eventBus.consumer(address, handler); return consumer; }
@Test public void testWithoutToken() { serviceProxyBuilder .setToken(null); proxy = serviceProxyBuilder.build(OKService.class); proxy.ok(res -> { assertTrue(res.failed()); ReplyException t = (ReplyException) res.cause(); assertEquals(401, t.failureCode()); testComplete(); }); await(); }
@Test public void testCallWithMessageParamWrongType() { JsonObject message = new JsonObject(); message.put("object", new JsonObject().put("foo", "bar")); message.put("str", 76523); message.put("i", 1234); message.put("char", (int)'X'); // chars are mapped to ints message.put("enum", SomeEnum.BAR.toString()); // enums are mapped to strings vertx.eventBus().send("someaddress", message, new DeliveryOptions().addHeader("action", "invokeWithMessage").setSendTimeout(500), onFailure(t -> { assertTrue(t instanceof ReplyException); ReplyException re = (ReplyException) t; // This will as operation will fail to be invoked assertEquals(ReplyFailure.RECIPIENT_FAILURE, re.failureType()); testComplete(); })); await(); }
public static void sendErrFrame(String address, String replyAddress, ReplyException failure, WriteStream<Buffer> handler) { final JsonObject payload = new JsonObject() .put("type", "err") .put("address", replyAddress) .put("sourceAddress", address) .put("failureCode", failure.failureCode()) .put("failureType", failure.failureType().name()) .put("message", failure.getMessage()); writeFrame(payload, handler); }
/** * Use to terminate the application using a HTTP Post * It requires an AdminKey header to work */ private void shutdownHandler(final RoutingContext ctx) { // check for AdminKey header String adminKey = this.config().getString("AdminKey"); if (adminKey == null || adminKey.equals(ctx.request().getHeader("AdminKey"))) { // TODO: check the body for the right credentials this.shutdownExecution(ctx.response()); } else { ctx.fail(new ReplyException(ReplyFailure.RECIPIENT_FAILURE, 401, "Sucker nice try!")); } }
private void forwardErrorCode(RoutingContext routingContext, AsyncResult<Message<Object>> reply) { ReplyException ex = (ReplyException) reply.cause(); ex.printStackTrace(); HttpServerResponse response = routingContext.response(); response.setStatusCode(ex.failureCode()); response.end(); }
@Test public void testMissNameShouldThrowValidationException(TestContext testContext) { AtomicBoolean check = new AtomicBoolean(); vertx.eventBus().<JsonObject>send("api.get", new JsonObject(), ar -> { if (ar.succeeded()) { testContext.fail(); } else { testContext.assertTrue(ar.cause() instanceof ReplyException); testContext.assertEquals(DefaultErrorCode.INVALID_ARGS.getNumber(), ReplyException.class.cast(ar.cause()).failureCode()); check.set(true); } }); Awaitility.await().until(() -> check.get()); }
@Before public void createServer(TestContext testContext) { vertx = Vertx.vertx(); Router router = Router.router(vertx); router.route().handler(BodyHandler.create()); router.route().handler(BaseHandler.create()); router.get("/ex/sys").handler(rc -> { throw SystemException.create(DefaultErrorCode.MISSING_ARGS); }); router.get("/ex/validate").handler(rc -> { Multimap<String, String> error = ArrayListMultimap.create(); throw new ValidationException(error); }); router.get("/ex/unkown").handler(rc -> { throw new RuntimeException("xxxx"); }); router.get("/ex/reply").handler(rc -> { throw new ReplyException(ReplyFailure.TIMEOUT, DefaultErrorCode.INVALID_REQ.getNumber(), DefaultErrorCode.INVALID_REQ.getMessage()); }); router.get("/ex/reply2").handler(rc -> { throw new ReplyException(ReplyFailure.RECIPIENT_FAILURE); }); router.route().failureHandler(FailureHandler.create()); vertx.createHttpServer().requestHandler(router::accept) .listen(port, testContext.asyncAssertSuccess()); }
private static void manageError( ReplyException cause, HttpServerResponse response) { if(isExistingHttStatusCode(cause.failureCode())) { response.setStatusCode(cause.failureCode()); if(StringUtils.isNotEmpty(cause.getMessage())) { response.setStatusMessage(cause.getMessage()); } } else { response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); } response.end(); }
@Override public Collection<KeyValueAnnotation> responseAnnotations() { if (msg.body() instanceof ReplyException) { final ReplyException err = (ReplyException) msg.body(); return Arrays.asList( KeyValueAnnotation.create(VertxKeys.ReplyFailureType.getName(), err.failureType().name()), KeyValueAnnotation.create(VertxKeys.ReplyFailureCode.getName(), Integer.valueOf(err.failureCode()).toString()), KeyValueAnnotation.create(VertxKeys.ReplyFailureMessage.getName(), err.getMessage() != null ? err.getMessage() : "")); } else { return Collections.emptyList(); } }
public void consumerSendTimeout(@Observes @VertxConsumer(TEST_BUS_TIMEOUT) VertxEvent event) { assertEquals(TEST_BUS_TIMEOUT, event.getAddress()); event.messageTo(TEST_SLOW_HANDLER).setDeliveryOptions(new DeliveryOptions().setSendTimeout(10)).send("foo", (r) -> { if (r.failed()) { ReplyException exception = (ReplyException) r.cause(); if (exception.failureType().equals(ReplyFailure.TIMEOUT)) { SYNCHRONIZER.add("timeout"); } } }); }
/** * Convert a throwable to an HTTP status code * @param t the throwable to convert * @return the HTTP status code */ public static int throwableToCode(Throwable t) { if (t instanceof ReplyException) { return ((ReplyException)t).failureCode(); } else if (t instanceof IllegalArgumentException) { return 400; } else if (t instanceof FileNotFoundException) { return 404; } else if (t instanceof HttpException) { return ((HttpException)t).getStatusCode(); } else if (t instanceof HTTPException) { return ((HTTPException)t).getStatusCode(); } return 500; }
/** * Test code of {@link ReplyException} */ @Test public void testThrowableToCodeReplyException() { int expectedCode = 505; Throwable throwable = new ReplyException(ReplyFailure.NO_HANDLERS, expectedCode, "Message"); int statusCode = ThrowableHelper.throwableToCode(throwable); assertEquals(expectedCode, statusCode); }
@Test public void testEventLogRepository_getEventCount() throws Exception { final Vertx vertx = vertxService.getVertx(); final RunRightFastVerticleId verticleId = EventLogRepository.VERTICLE_ID; final long timeout = 60000L; final ProtobufMessageProducer<GetEventCount.Request> getEventCountMessageProducer = new ProtobufMessageProducer<>( vertx.eventBus(), EventBusAddress.eventBusAddress(verticleId, GetEventCount.class), new ProtobufMessageCodec<>(GetEventCount.Request.getDefaultInstance()), metricRegistry ); // because the verticles are deployed asynchronously, the EventLogRepository verticle may not yet be deployed yet // the message consumer for the Verticle only gets registered, while the verticle is starting. Thus, the message consumer may not yet be registered. while (true) { final CompletableFuture<GetEventCount.Response> getEventCountFuture = new CompletableFuture<>(); getEventCountMessageProducer.send(GetEventCount.Request.getDefaultInstance(), responseHandler(getEventCountFuture, GetEventCount.Response.class)); try { getEventCountFuture.get(timeout, TimeUnit.MILLISECONDS); break; } catch (final ExecutionException e) { if (e.getCause() instanceof ReplyException) { final ReplyException replyException = (ReplyException) e.getCause(); if (replyException.failureType() == NO_HANDLERS) { log.log(WARNING, "Waiting for EventLogRepository ... ", e); Thread.sleep(5000L); continue; } } throw e; } } }
private void assertSend(TestContext context, String address, Object body, DeliveryOptions options, int times) { context.assertTrue(times > 0); vertx.eventBus().send(address, body, options, ar -> { if (ar.failed()) { ReplyException ex = (ReplyException) ar.cause(); if (ex.failureType() == NO_HANDLERS) { assertSend(context, address, body, options, times - 1); } } }); }
@Test public void testFailingMethod() { proxy.failingMethod(onFailure(t -> { assertTrue(t instanceof ReplyException); ReplyException re = (ReplyException) t; assertEquals(ReplyFailure.RECIPIENT_FAILURE, re.failureType()); assertEquals("wibble", re.getMessage()); testComplete(); })); await(); }
@Test public void testCallWithMessageInvalidAction() { JsonObject message = new JsonObject(); message.put("object", new JsonObject().put("foo", "bar")); message.put("str", "blah"); message.put("i", 1234); vertx.eventBus().send("someaddress", message, new DeliveryOptions().addHeader("action", "yourmum").setSendTimeout(500), onFailure(t -> { assertTrue(t instanceof ReplyException); ReplyException re = (ReplyException) t; // This will as operation will fail to be invoked assertEquals(ReplyFailure.RECIPIENT_FAILURE, re.failureType()); testComplete(); })); await(); }
@Test public void testConnectionTimeout() { consumer.unregister(); long timeoutSeconds = 2; consumer = ProxyHelper.registerService(TestService.class, vertx, service, SERVICE_ADDRESS, timeoutSeconds); proxy.createConnection("foo", onSuccess(conn -> { long start = System.nanoTime(); conn.startTransaction(onSuccess(res -> { assertEquals("foo", res); vertx.eventBus().consumer("closeCalled").handler(msg -> { assertEquals("blah", msg.body()); long duration = System.nanoTime() - start; assertTrue(String.valueOf(duration), duration >= SECONDS.toNanos(timeoutSeconds)); // Should be closed now conn.startTransaction(onFailure(cause -> { assertNotNull(cause); assertTrue(cause instanceof ReplyException); assertFalse(cause instanceof ServiceException); ReplyException re = (ReplyException) cause; assertEquals(ReplyFailure.NO_HANDLERS, re.failureType()); testComplete(); })); }); })); })); await(); }
@Test public void testConnectionWithCloseFutureTimeout() { consumer.unregister(); long timeoutSeconds = 2; consumer = ProxyHelper.registerService(TestService.class, vertx, service, SERVICE_ADDRESS, timeoutSeconds); long start = System.currentTimeMillis(); proxy.createConnectionWithCloseFuture(onSuccess(conn -> { vertx.eventBus().consumer("closeCalled").handler(msg -> { assertEquals("blah", msg.body()); long now = System.currentTimeMillis(); assertTrue(now - start > timeoutSeconds * 1000); // Should be closed now conn.someMethod(onFailure(cause -> { assertNotNull(cause); assertTrue(cause instanceof ReplyException); assertFalse(cause instanceof ServiceException); ReplyException re = (ReplyException) cause; assertEquals(ReplyFailure.NO_HANDLERS, re.failureType()); testComplete(); })); }); })); await(); }
@Test public void testLongDelivery2() { TestService proxyLong = TestService.createProxyLongDelivery(vertx, SERVICE_ADDRESS); proxyLong.longDeliveryFailed(onFailure(t -> { assertNotNull(t); assertTrue(t instanceof ReplyException); assertFalse(t instanceof ServiceException); ReplyException re = (ReplyException) t; assertEquals(ReplyFailure.TIMEOUT, re.failureType()); testComplete(); })); await(); }
/** * Tile retrieval event handler. * Responds with a <code>application/octet-stream</code> body on success * based on the <code>pixelsId</code>, <code>z</code>, <code>c</code>, * and <code>t</code> encoded in the URL or HTTP 404 if the {@link Pixels} * does not exist or the user does not have permissions to access it. * @param event Current routing context. */ private void getTile(RoutingContext event) { log.info("Get tile"); HttpServerRequest request = event.request(); TileCtx tileCtx = new TileCtx( request.params(), event.get("omero.session_key")); final HttpServerResponse response = event.response(); vertx.eventBus().<byte[]>send( PixelBufferVerticle.GET_TILE_EVENT, Json.encode(tileCtx), result -> { try { if (result.failed()) { Throwable t = result.cause(); int statusCode = 404; if (t instanceof ReplyException) { statusCode = ((ReplyException) t).failureCode(); } response.setStatusCode(statusCode); return; } byte[] tile = result.result().body(); String contentType = "application/octet-stream"; if (tileCtx.format.equals("png")) { contentType = "image/png"; } if (tileCtx.format.equals("tif")) { contentType = "image/tiff"; } response.headers().set( "Content-Type", contentType); response.headers().set( "Content-Length", String.valueOf(tile.length)); response.headers().set( "Content-Disposition", String.format( "attachment; filename=\"%s\"", result.result().headers().get("filename"))); response.write(Buffer.buffer(tile)); } finally { response.end(); log.debug("Response ended"); } }); }
@Test public void noValidKeySerde(TestContext context){ KafkaStreams streamMock = mock(KafkaStreams.class); rule.vertx().deployVerticle(new SessionWindowQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{ KeyBasedQuery query = new KeyBasedQuery("store", "i am not a serde", "key".getBytes(), Serdes.String().getClass().getName()); rule.vertx().eventBus().send(Config.SESSION_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertFailure(handler ->{ context.assertTrue(handler instanceof ReplyException); ReplyException ex = (ReplyException) handler; context.assertEquals(400, ex.failureCode()); })); })); }
@Test public void noValidValueSerde(TestContext context){ KafkaStreams streamMock = mock(KafkaStreams.class); rule.vertx().deployVerticle(new SessionWindowQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{ KeyBasedQuery query = new KeyBasedQuery("store", Serdes.String().getClass().getName(), "key".getBytes(), "i am not a serde"); rule.vertx().eventBus().send(Config.SESSION_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertFailure(handler ->{ context.assertTrue(handler instanceof ReplyException); ReplyException ex = (ReplyException) handler; context.assertEquals(400, ex.failureCode()); })); })); }
@Test public void illegalStateStoreExceptionOnStoreInitialization(TestContext context){ KafkaStreams streamMock = mock(KafkaStreams.class); when(streamMock.store(eq("store"), any(QueryableStoreType.class) )).thenThrow( InvalidStateStoreException.class); rule.vertx().deployVerticle(new SessionWindowQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{ KeyBasedQuery query = new KeyBasedQuery("store", Serdes.String().getClass().getName(), "key".getBytes(), Serdes.String().getClass().getName()); rule.vertx().eventBus().send(Config.SESSION_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertFailure(handler ->{ context.assertTrue(handler instanceof ReplyException); ReplyException ex = (ReplyException) handler; context.assertEquals(500, ex.failureCode()); })); })); }
@Test public void illegalStateStoreExceptionOnQuery(TestContext context){ KafkaStreams streamMock = mock(KafkaStreams.class); ReadOnlySessionStore<Object, Object> storeMock = mock(ReadOnlySessionStore.class); when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock); when(storeMock.fetch(any())).thenThrow(InvalidStateStoreException.class); rule.vertx().deployVerticle(new SessionWindowQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{ KeyBasedQuery query = new KeyBasedQuery("store", Serdes.String().getClass().getName(), "key".getBytes(), Serdes.String().getClass().getName()); rule.vertx().eventBus().send(Config.SESSION_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertFailure(handler ->{ context.assertTrue(handler instanceof ReplyException); ReplyException ex = (ReplyException) handler; context.assertEquals(500, ex.failureCode()); })); })); }
@Test public void unexpectedExceptionOnQuery(TestContext context){ KafkaStreams streamMock = mock(KafkaStreams.class); ReadOnlySessionStore<Object, Object> storeMock = mock(ReadOnlySessionStore.class); when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock); when(storeMock.fetch(any())).thenThrow(IllegalArgumentException.class); rule.vertx().deployVerticle(new SessionWindowQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{ KeyBasedQuery query = new KeyBasedQuery("store", Serdes.String().getClass().getName(), "key".getBytes(), Serdes.String().getClass().getName()); rule.vertx().eventBus().send(Config.SESSION_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertFailure(handler ->{ context.assertTrue(handler instanceof ReplyException); ReplyException ex = (ReplyException) handler; context.assertEquals(500, ex.failureCode()); })); })); }
@Test public void noValidKeySerde(TestContext context){ KafkaStreams streamMock = mock(KafkaStreams.class); rule.vertx().deployVerticle(new AllKeyValuesQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{ StoreWideQuery query = new StoreWideQuery("store", "i am not a serde", Serdes.String().getClass().getName()); rule.vertx().eventBus().send(Config.ALL_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertFailure(handler ->{ context.assertTrue(handler instanceof ReplyException); ReplyException ex = (ReplyException) handler; context.assertEquals(400, ex.failureCode()); })); })); }
@Test public void noValidValueSerde(TestContext context){ KafkaStreams streamMock = mock(KafkaStreams.class); rule.vertx().deployVerticle(new AllKeyValuesQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{ StoreWideQuery query = new StoreWideQuery("store", Serdes.String().getClass().getName(), "i am not a serde" ); rule.vertx().eventBus().send(Config.ALL_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertFailure(handler ->{ context.assertTrue(handler instanceof ReplyException); ReplyException ex = (ReplyException) handler; context.assertEquals(400, ex.failureCode()); })); })); }