@Path("3") @GET public void hello3(@Suspended final AsyncResponse asyncResponse, // Inject the Vertx instance @Context Vertx vertx){ System.err.println("Creating client"); HttpClientOptions options = new HttpClientOptions(); options.setSsl(true); options.setTrustAll(true); options.setVerifyHost(false); HttpClient client = vertx.createHttpClient(options); client.getNow(443, "www.google.com", "/robots.txt", resp -> { System.err.println("Got response"); resp.bodyHandler(body -> { System.err.println("Got body"); asyncResponse.resume(Response.ok(body.toString()).build()); }); }); System.err.println("Created client"); }
@GET @Path("/simple") public void asyncGet(@Suspended final AsyncResponse asyncResponse) { new Thread(new Runnable() { @Override public void run() { String result = veryExpensiveOperation(); asyncResponse.resume(result); } private String veryExpensiveOperation() { return new MagicNumber(3) + ""; } }).start(); }
@Path("6") @GET public void hello6(@Suspended final AsyncResponse asyncResponse, // Inject the Vertx instance @Context Vertx vertx){ io.vertx.rxjava.core.Vertx rxVertx = io.vertx.rxjava.core.Vertx.newInstance(vertx); System.err.println("Creating client"); WebClientOptions options = new WebClientOptions(); options.setSsl(true); options.setTrustAll(true); options.setVerifyHost(false); WebClient client = WebClient.create(rxVertx, options); Single<HttpResponse<io.vertx.rxjava.core.buffer.Buffer>> responseHandler = client.get(443, "www.google.com", "/robots.txt").rxSend(); responseHandler.subscribe(body -> { System.err.println("Got body"); asyncResponse.resume(Response.ok(body.body().toString()).build()); }); System.err.println("Created client"); }
@ApiOperation(value = "displays openid config of google async", hidden = true) @GET @Produces(MediaType.APPLICATION_JSON) @Path("/async") public void async(@Suspended final AsyncResponse asyncResponse) throws InterruptedException, ExecutionException { final Future<Response> futureResponseFromClient = jaxrsClient.target("https://accounts.google.com/.well-known/openid-configuration").request().header(javax.ws.rs.core.HttpHeaders.USER_AGENT, "curl/7.55.1").async().get(); final Response responseFromClient = futureResponseFromClient.get(); try { final String object = responseFromClient.readEntity(String.class); asyncResponse.resume(object); } finally { responseFromClient.close(); } }
/** * 根据配置得到所有dashboards */ @GET @Path("dashboard/getdashboards") @Produces(MediaType.APPLICATION_JSON + ";charset=utf-8") @SuppressWarnings("unchecked") public void getDashboards(@Suspended AsyncResponse response) { CacheManager cacheManager = this.cm; String result = dashboardManagement.getDashboards(cacheManager); Map resultMap = new HashMap(); String codeString = "00"; String msgString = "获取配置信息成功"; if (StringHelper.isEmpty(result)) { codeString = "01"; msgString = "获取配置信息失败或配置信息不存在"; } resultMap.put("code", codeString); resultMap.put("msg", msgString); resultMap.put("data", result); String resultMsg = JSONHelper.toString(resultMap); response.resume(resultMsg); }
/** * * 从缓存查询最近1min的MonitorData * * @param data * @param response */ private void loadMonitorDataFromCache(String data, @Suspended AsyncResponse response, String cacheKey) { UAVHttpMessage message = new UAVHttpMessage(); message.setIntent("monitor"); message.putRequest("cache.query.json", data); LoadMonitorDataFromCacheCB callback = new LoadMonitorDataFromCacheCB(); callback.setCacheKey(cacheKey); callback.setData(data); callback.setGodeyeCacheRegion(godeyeCacheRegion); callback.setGodeyeFilterGroupCacheRegion(godeyeFilterGroupCacheRegion); callback.setResponse(response); this.doHttpPost("uav.app.godeye.healthmanager.http.addr", "/hm/cache/q", message, callback); }
/** * 显示当前app的最近实现的日志信息, 默认查询指定时间内的日志记录 * * @return */ @GET @Path("log/q/hm/{appid}") @Deprecated public void getAppLog(@PathParam("appid") String appid, @QueryParam("timespan") String timespan, @QueryParam("psize") String pagesize, @Suspended AsyncResponse response) { try { // 默认:5分钟 long timeLength = DataConvertHelper.toLong(timespan, 1000 * 60 * 5); // 默认不传limit StringBuilder data = new StringBuilder("{\"appid\":\"").append(appid).append("\", \"endtime\":") .append(System.currentTimeMillis() - timeLength); if (pagesize != null && StringHelper.isNumeric(pagesize)) { data.append(",\"psize\":").append(Long.parseLong(pagesize)); } data.append("}"); queryAppLog(data.toString(), response); } catch (Exception e) { logger.err(this, "Error:" + e.getMessage(), e); } }
@SuppressWarnings({ "unchecked" }) @POST @Path("notify/q/stgy/hm") @Produces(MediaType.APPLICATION_JSON + ";charset=utf-8") public void noitifyStrategyQuery(String data, @Suspended AsyncResponse response) throws Exception { Map<String, Object> params = JSONHelper.toObject(data, Map.class); int pagesize = (int) params.get("pagesize"); int pageindex = (int) params.get("pageindex"); Map<String, String> strategyMap = new HashMap<String, String>(); strategyMap.put("keys", String.valueOf(params.get("inputValue"))); // 封装http请求数据 UAVHttpMessage message = new UAVHttpMessage(); message.putRequest("body", JSONHelper.toString(strategyMap)); message.setIntent("strategy.query"); NoitifyStrategyQuery callback = new NoitifyStrategyQuery(); callback.setResponse(response); callback.setPageindex(pageindex); callback.setPagesize(pagesize); doHttpPost("uav.app.godeye.notify.strategy.http.addr", "/rtntf/oper", message, callback); }
@POST @Path("notify/get/stgy/hm") @Produces(MediaType.APPLICATION_JSON + ";charset=utf-8") public void noitifyStrategyGet(String data, @Suspended AsyncResponse response) throws Exception { Map<String, String> strategyMap = new HashMap<String, String>(); strategyMap.put("keys", data); // 封装http请求数据 UAVHttpMessage message = new UAVHttpMessage(); message.putRequest("body", JSONHelper.toString(strategyMap)); message.setIntent("strategy.query"); NoitifyStrategyGetCB callback = new NoitifyStrategyGetCB(); callback.setResponse(response); doHttpPost("uav.app.godeye.notify.strategy.http.addr", "/rtntf/oper", message, callback); }
@POST @Path("notify/del/stgy/hm") @Produces(MediaType.APPLICATION_JSON + ";charset=utf-8") public void noitifyStrategyRemove(String data, @Suspended AsyncResponse response) throws Exception { // 封装http请求数据 UAVHttpMessage message = new UAVHttpMessage(); message.putRequest("body", data); message.setIntent("strategy.remove"); NoitifyStrategyRemoveCB callback = new NoitifyStrategyRemoveCB(); callback.setResponse(response); callback.setData(data); doHttpPost("uav.app.godeye.notify.strategy.http.addr", "/rtntf/oper", message, callback); }
@SuppressWarnings("unchecked") @POST @Path("filter/group/remove") @Produces(MediaType.APPLICATION_JSON + ";charset=utf-8") public void groupFilterRemove(String data, @Suspended AsyncResponse response) { Map<String, Object> param = JSONHelper.toObject(data, Map.class); String emailListStr = String.valueOf(param.get("emailListName")); String resultMsg = "{\"code\":\"00\",\"msg\":\"删除成功\"}"; Map<String, String> esistsMap = cm.getHash(godeyeFilterGroupCacheRegion, godeyeFilterGroupCacheRegionKey, emailListStr); if (esistsMap.get(emailListStr) != null) { cm.delHash(godeyeFilterGroupCacheRegion, godeyeFilterGroupCacheRegionKey, emailListStr); } else { resultMsg = "{\"code\":\"01\",\"msg\":\"邮箱组不存在\"}"; } response.resume(resultMsg); }
@POST @Path("loadAllApps") public void loadAllApps(@Suspended AsyncResponse response) { Map<String, Object> data = new HashMap<String, Object>(); Map<String, Object> where = new HashMap<String, Object>(); data.put("where", where); HashMap<String, Object> sort = new HashMap<String, Object>(); sort.put("values", "createtime"); sort.put("sortorder", "-1"); data.put("sort", sort); Map<String, Object> jsonRequest = createHttpMapRequest(bussinessTypeQuery, data); Map<String, String> callBackInput = createInput(null, null, "query"); manageHttpAsyncClientPost(manageTypeApp, jsonRequest, new AppHttpCallBack(response, callBackInput)); }
@POST @Path("addApp") public void addApp(AppEntity appEntity, @Suspended AsyncResponse response) throws Exception { Map<String, Object> data = new HashMap<String, Object>(); String appurl = XSSFilter(appEntity.getAppurl()); String id = XSSFilter(appEntity.getAppid()); String configpath = XSSFilter(appEntity.getConfigpath()); String ctime = df.format(new Date()); data.put("appid", id); data.put("appurl", appurl); data.put("configpath", configpath); data.put("state", 1); data.put("createtime", ctime); data.put("operationtime", ctime); data.put("operationuser", request.getSession(false).getAttribute("apphub.gui.session.login.user.id")); Map<String, Object> jsonRequest = createHttpMapRequest(bussinessTypeCreate, data); Map<String, String> callBackInput = createInput(id, appurl, "create"); callBackInput.put("configpath", configpath); manageHttpAsyncClientPost(manageTypeApp, jsonRequest, new AppHttpCallBack(response, callBackInput)); }
@POST @Path("addGroup") public void addGroup(GroupEntity groupEntity, @Suspended AsyncResponse response) throws Exception { String groupId = XSSFilter(groupEntity.getGroupid()); String appIds = XSSFilter(groupEntity.getAppids()); String time = df.format(new Date()); Map<String, Object> data = new HashMap<String, Object>(); data.put("groupid", groupId); data.put("appids", appIds); data.put("state", 1); data.put("createtime", time); data.put("operationtime", time); data.put("operationuser", request.getSession(false).getAttribute("apphub.gui.session.login.user.id")); Map<String, Object> jsonRequest = createHttpMapRequest(bussinessTypeCreate, data); Map<String, String> callBackInput = createInput(groupId, appIds, "create"); manageHttpAsyncClientPost(manageTypeGroup, jsonRequest, new GroupHttpCallBack(response, callBackInput)); }
@POST @Path("updateGroup") public void updateGroup(GroupEntity groupEntity, @Suspended AsyncResponse response) throws Exception { String groupId = XSSFilter(groupEntity.getGroupid()); String appIds = XSSFilter(groupEntity.getAppids()); String time = df.format(new Date()); Map<String, Object> where = new HashMap<String, Object>(); where.put("groupid", groupId); Map<String, Object> cloumns = new HashMap<String, Object>(); cloumns.put("operationtime", time); cloumns.put("operationuser", request.getSession(false).getAttribute("apphub.gui.session.login.user.id")); cloumns.put("appids", appIds); Map<String, Object> set = new HashMap<String, Object>(); set.put("set", cloumns); Map<String, Object> data = new HashMap<String, Object>(); data.put("where", where); data.put("update", set); Map<String, Object> jsonRequest = createHttpMapRequest(bussinessTypeModify, data); Map<String, String> callBackInput = createInput(groupEntity.getGroupid(), appIds, "modify"); manageHttpAsyncClientPost(manageTypeGroup, jsonRequest, new GroupHttpCallBack(response, callBackInput)); }
@POST @Path("delApp") public void delApp(AppEntity appEntity, @Suspended AsyncResponse response) throws Exception { String id = appEntity.getAppid(); String ctime = df.format(new Date()); Map<String, Object> update = new HashMap<String, Object>(); Map<String, Object> set = new HashMap<String, Object>(); Map<String, Object> modify = new HashMap<String, Object>(); Map<String, Object> where = new HashMap<String, Object>(); where.put("appid", id); modify.put("where", where); set.put("state", 0); set.put("operationtime", ctime); set.put("operationuser", request.getSession(false).getAttribute("apphub.gui.session.login.user.id")); update.put("set", set); modify.put("update", update); Map<String, Object> jsonRequest = createHttpMapRequest(bussinessTypeModify, modify); Map<String, String> callBackInput = createInput(appEntity.getAppid(), null, "delete"); manageHttpAsyncClientPost(manageTypeApp, jsonRequest, new AppHttpCallBack(response, callBackInput)); }
@POST @Path("delGroup") public void delGroup(GroupEntity groupEntity, @Suspended AsyncResponse response) throws Exception { String time = df.format(new Date()); Map<String, Object> where = new HashMap<String, Object>(); where.put("groupid", groupEntity.getGroupid()); Map<String, Object> cloumns = new HashMap<String, Object>(); cloumns.put("state", 0); cloumns.put("operationtime", time); cloumns.put("operationuser", request.getSession(false).getAttribute("apphub.gui.session.login.user.id")); Map<String, Object> set = new HashMap<String, Object>(); set.put("set", cloumns); Map<String, Object> data = new HashMap<String, Object>(); data.put("where", where); data.put("update", set); Map<String, Object> jsonRequest = createHttpMapRequest(bussinessTypeModify, data); Map<String, String> callBackInput = createInput(groupEntity.getGroupid(), null, "delete"); manageHttpAsyncClientPost(manageTypeGroup, jsonRequest, new GroupHttpCallBack(response, callBackInput)); }
@GET public void getAsync(final @Suspended AsyncResponse res) { res.setTimeoutHandler( (ar) -> { ar.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE) .entity("Operation timed out --- please try again.").build()); } ); res.setTimeout(1000, TimeUnit.MILLISECONDS); executor.submit(() -> { //do long run operations. try { LOG.log(Level.INFO, " execute long run task in AsyncResource"); //Thread.sleep(new Random().nextInt(1005)); Thread.sleep(500); } catch (InterruptedException ex) { LOG.log(Level.SEVERE, "error :{0}", ex.getMessage()); } res.resume(Response.ok("asynchronous resource").build()); }); }
@GET public void getAsync(final @Suspended AsyncResponse res) { res.setTimeoutHandler( (ar) -> { ar.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE) .entity("Operation timed out --- please try again.").build()); } ); res.setTimeout(1000, TimeUnit.MILLISECONDS); executor.submit(() -> { //do long run operations. try { LOG.log(Level.INFO, " execute long run task in AsyncResource"); //Thread.sleep(new Random().nextInt(1005)); Thread.sleep(500); } catch (InterruptedException ex) { LOG.log(Level.SEVERE, "error :" + ex.getMessage()); } res.resume(Response.ok("asynchronous resource").build()); }); }
/** * There should be a parameter of type {@linkplain AsyncResponse} and it should be * annotated with {@linkplain Suspended}. */ private void setAsyncRest() { int i=-1; for(Class<?> param : m.getParameterTypes()) { i++; if(param == AsyncResponse.class) break; } if(i != -1) { Annotation a[] = m.getParameterAnnotations()[i]; for(Annotation a_i : a) { if(a_i.annotationType() == Suspended.class){ setAsyncRest(true); break; } } } }
@GET @Path("/{name}/status") @ManagedAsync public void getStatus( @PathParam("name") final String name, @Suspended final AsyncResponse response) { Optional<CassandraDaemonTask> taskOption = Optional.ofNullable(state.getDaemons().get(name)); if (!taskOption.isPresent()) { response.resume( Response.status(Response.Status.NOT_FOUND).build()); } else { CassandraDaemonTask task = taskOption.get(); client.status(task.getHostname(), task.getExecutor().getApiPort() ).whenCompleteAsync((status, error) -> { if (status != null) { response.resume(status); } else { response.resume(Response.serverError()); } }); } }
@GET @Path("/asyncError") public void asyncError(@Suspended final AsyncResponse asyncResponse) { assertActiveSpan(); new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } finally { // this exception is not propagated to AsyncListener asyncResponse.resume(new RuntimeException("asyncError")); } } }).start(); throw new IllegalStateException(); }
@POST @Consumes({ "application/json" }) @Produces({ "application/json" }) @ApiOperation( value = "", notes = "Creates a new scope", response = ScopeProperty.class, tags = { }) @ApiResponses(value = { @ApiResponse( code = 201, message = "Successfully created the scope", response = ScopeProperty.class), @ApiResponse( code = 409, message = "Scope already exists", response = ScopeProperty.class), @ApiResponse( code = 500, message = "Server error", response = ScopeProperty.class) }) void createScope( @ApiParam(value = "The scope configuration", required = true) CreateScopeRequest createScopeRequest, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@POST @Path("/{scopeName}/streams") @Consumes({ "application/json" }) @Produces({ "application/json" }) @ApiOperation( value = "", notes = "Creates a new stream", response = StreamProperty.class, tags = { }) @ApiResponses(value = { @ApiResponse( code = 201, message = "Successful created the stream", response = StreamProperty.class), @ApiResponse( code = 404, message = "Scope not found", response = StreamProperty.class), @ApiResponse( code = 409, message = "Stream already exists", response = StreamProperty.class), @ApiResponse( code = 500, message = "Server error", response = StreamProperty.class) }) void createStream(@ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @ApiParam(value = "The stream configuration", required = true) CreateStreamRequest createStreamRequest, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@DELETE @Path("/{scopeName}") @ApiOperation(value = "", notes = "Delete a scope", response = void.class, tags = { }) @ApiResponses(value = { @ApiResponse( code = 204, message = "Successfully deleted the scope", response = void.class), @ApiResponse(code = 404, message = "Scope not found", response = void.class), @ApiResponse( code = 412, message = "Cannot delete scope which has non-empty list of streams", response = void.class), @ApiResponse(code = 500, message = "Server error", response = void.class) }) void deleteScope(@ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@DELETE @Path("/{scopeName}/streams/{streamName}") @ApiOperation(value = "", notes = "Delete a stream", response = void.class, tags = { }) @ApiResponses(value = { @ApiResponse( code = 204, message = "Successfully deleted the stream", response = void.class), @ApiResponse( code = 404, message = "Stream not found", response = void.class), @ApiResponse( code = 412, message = "Cannot delete stream since it is not sealed", response = void.class), @ApiResponse( code = 500, message = "Server error", response = void.class) }) void deleteStream(@ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @ApiParam(value = "Stream name", required = true) @PathParam("streamName") String streamName, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@GET @Path("/{scopeName}/readergroups/{readerGroupName}") @Produces({ "application/json" }) @ApiOperation(value = "", notes = "Fetch the properties of an existing reader group", response = ReaderGroupProperty.class, tags = { "ReaderGroups", }) @ApiResponses(value = { @ApiResponse(code = 200, message = "Found reader group properties", response = ReaderGroupProperty.class), @ApiResponse(code = 404, message = "Scope or reader group with given name not found", response = ReaderGroupProperty.class), @ApiResponse(code = 500, message = "Internal server error while fetching reader group details", response = ReaderGroupProperty.class) }) void getReaderGroup(@ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @ApiParam(value = "Reader group name", required = true) @PathParam("readerGroupName") String readerGroupName, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@GET @Path("/{scopeName}/streams/{streamName}/scaling-events") @Produces({ "application/json" }) @ApiOperation(value = "", notes = "Get scaling events for a given datetime period.", response = ScalingEventList.class, tags = { }) @ApiResponses(value = { @ApiResponse(code = 200, message = "List of scaling events", response = ScalingEventList.class), @ApiResponse(code = 404, message = "Scope/Stream not found", response = ScalingEventList.class), @ApiResponse(code = 500, message = "Server error", response = ScalingEventList.class) }) void getScalingEvents(@ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @ApiParam(value = "Stream name", required = true) @PathParam("streamName") String streamName, @ApiParam(value = "Parameter to display scaling events from that particular datetime. " + "Input should be milliseconds from Jan 1 1970.", required = true) @QueryParam("from") Long from, @ApiParam(value = "Parameter to display scaling events to that particular datetime. " + "Input should be milliseconds from Jan 1 1970.", required = true) @QueryParam("to") Long to, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@GET @Path("/{scopeName}") @Produces({ "application/json" }) @ApiOperation( value = "", notes = "Retrieve scope", response = ScopeProperty.class, tags = { }) @ApiResponses(value = { @ApiResponse( code = 200, message = "Successfully retrieved the scope", response = ScopeProperty.class), @ApiResponse( code = 404, message = "Scope not found", response = ScopeProperty.class), @ApiResponse( code = 500, message = "Server error", response = ScopeProperty.class) }) void getScope(@ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@GET @Path("/{scopeName}/streams/{streamName}") @Produces({ "application/json" }) @ApiOperation( value = "", notes = "Fetch the stream properties", response = StreamProperty.class, tags = { }) @ApiResponses(value = { @ApiResponse( code = 200, message = "Found stream configuration", response = StreamProperty.class), @ApiResponse( code = 404, message = "Scope or stream not found", response = StreamProperty.class), @ApiResponse( code = 500, message = "Server error", response = StreamProperty.class) }) void getStream(@ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @ApiParam(value = "Stream name", required = true) @PathParam("streamName") String streamName, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@GET @Path("/{scopeName}/readergroups") @Produces({ "application/json" }) @ApiOperation(value = "", notes = "List reader groups within the given scope", response = ReaderGroupsList.class, tags = { "ReaderGroups", }) @ApiResponses(value = { @ApiResponse(code = 200, message = "List of all reader groups configured for the given scope", response = ReaderGroupsList.class), @ApiResponse(code = 404, message = "Scope not found", response = ReaderGroupsList.class), @ApiResponse(code = 500, message = "Internal server error while fetching the list of reader groups for the given scope", response = ReaderGroupsList.class) }) void listReaderGroups(@ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@GET @Path("/{scopeName}/streams") @Produces({ "application/json" }) @ApiOperation( value = "", notes = "List streams within the given scope", response = StreamsList.class, tags = { }) @ApiResponses(value = { @ApiResponse( code = 200, message = "List of stream objects", response = StreamsList.class), @ApiResponse( code = 404, message = "Scope not found", response = StreamsList.class), @ApiResponse( code = 500, message = "Server error", response = StreamsList.class) }) void listStreams(@ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @ApiParam(value = "Flag whether to display only system created streams") @QueryParam("showInternalStreams") String showInternalStreams, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@PUT @Path("/{scopeName}/streams/{streamName}") @Consumes({ "application/json" }) @Produces({ "application/json" }) @ApiOperation(value = "", notes = "", response = StreamProperty.class, tags = { }) @ApiResponses(value = { @ApiResponse( code = 200, message = "Successfully updated the stream configuration", response = StreamProperty.class), @ApiResponse( code = 404, message = "Scope or stream not found", response = StreamProperty.class), @ApiResponse( code = 500, message = "Server error", response = StreamProperty.class) }) void updateStream(@ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @ApiParam(value = "Stream name", required = true) @PathParam("streamName") String streamName, @ApiParam(value = "The new stream configuration", required = true) UpdateStreamRequest updateStreamRequest, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
@PUT @Path("/{scopeName}/streams/{streamName}/state") @Consumes({ "application/json" }) @Produces({ "application/json" }) @ApiOperation(value = "", notes = "Updates the current state of the stream", response = StreamState.class, tags = { }) @ApiResponses(value = { @ApiResponse(code = 200, message = "Successfully updated the stream state", response = StreamState.class), @ApiResponse(code = 404, message = "Scope or stream not found", response = StreamState.class), @ApiResponse(code = 500, message = "Server error", response = StreamState.class) }) void updateStreamState( @ApiParam(value = "Scope name", required = true) @PathParam("scopeName") String scopeName, @ApiParam(value = "Stream name", required = true) @PathParam("streamName") String streamName, @ApiParam(value = "The state info to be updated", required = true) StreamState updateStreamStateRequest, @Context SecurityContext securityContext, @Suspended final AsyncResponse asyncResponse);
/** * Deletes the entity with supplied id. * * @param inAsyncResponse Asynchronous response object. * @param inEntityId Id of entity to delete. */ @DELETE @Path("{id}") public void deleteEntityById( @Suspended final AsyncResponse inAsyncResponse, @PathParam("id") @NotNull final Long inEntityId) { mService.delete(inEntityId).subscribe( inResult -> inAsyncResponse.resume(Response.ok().build()), inError -> inAsyncResponse.resume( Response .status(500) .entity("An error occurred deleting entity with id " + inEntityId) .type(MediaType.TEXT_PLAIN_TYPE) .build()), () -> inAsyncResponse.resume(Response.ok().build())); }
/** * Updates the entity with supplied id by overwriting it with the supplied entity. * * @param inAsyncResponse Asynchronous response object. * @param inEntity Entity data to write. * @param inEntityId Id of entity to update. */ @PUT @Path("{id}") public void updateEntity(@Suspended final AsyncResponse inAsyncResponse, final E inEntity, @PathParam("id") @NotNull final Long inEntityId) { inEntity.setId(inEntityId); mService.update(inEntity).subscribe( inResult -> inAsyncResponse.resume(Response.ok(inResult).build()), inError -> inAsyncResponse.resume( Response .status(500) .entity("An error occurred updating entity with id " + inEntityId + ": " + inError.getMessage()) .type(MediaType.TEXT_PLAIN_TYPE) .build())); }
/** * Creates a new entity using the supplied entity data. * * @param inAsyncResponse Asynchronous response object. * @param inEntity Entity data to use when creating new entity. */ @POST public void createEntity( @Suspended final AsyncResponse inAsyncResponse, final E inEntity) { if (inEntity.getId() != null) { final Response response = Response.status(400).entity("Id must not be set on new entity").build(); inAsyncResponse.resume(response); } mService.save(inEntity).subscribe( inResult -> inAsyncResponse.resume(Response.ok(inResult).build()), inError -> inAsyncResponse.resume( Response .status(500) .entity("An error occurred creating a new entity: " + inError.getMessage()) .type(MediaType.TEXT_PLAIN_TYPE) .build())); }
@GET public void get(@QueryParam("current") String next, @Suspended AsyncResponse async) { final UriBuilder base = uriInfo.getBaseUriBuilder(); Message message = null; synchronized (messages) { Message current = messages.get(next); if (current == null) message = first; else message = current.next; if (message == null) { queue(async); } } // do this outside of synchronized block to reduce lock hold time if (message != null) send(base, async, message); }
@GET public void receive(@QueryParam("current") String next, @Suspended AsyncResponse async) { final UriBuilder base = uriInfo.getBaseUriBuilder(); Message message = null; synchronized (messages) { Message current = messages.get(next); if (current == null) message = first; else message = current.next; if (message == null) { queue(async); } } // do this outside of synchronized block to reduce lock hold time if (message != null) send(base, async, message); }