我必须修改一个dropwizard应用程序以缩短其运行时间。基本上,此应用程序每天接收大约300万个URL,然后下载并解析它们以检测恶意内容。问题在于该应用程序只能处理100万个URL。当我查看该应用程序时,发现它正在进行许多顺序调用。我想对如何通过使其异步或其他技术来改进应用程序提出一些建议。
所需代码如下:-
/* Scheduler */ private long triggerDetection(String startDate, String endDate) { for (UrlRequest request : urlRequests) { if (!validateRequests.isWhitelisted(request)) { ContentDetectionClient.detectContent(request); } } } /* Client */ public void detectContent(UrlRequest urlRequest){ Client client = new Client(); URI uri = buildUrl(); /* It returns the URL of this dropwizard application's resource method provided below */ ClientResponse response = client.resource(uri) .type(MediaType.APPLICATION_JSON_TYPE) .post(ClientResponse.class, urlRequest); Integer status = response.getStatus(); if (status >= 200 && status < 300) { log.info("Completed request for url: {}", urlRequest.getUrl()); }else{ log.error("request failed for url: {}", urlRequest.getUrl()); } } private URI buildUrl() { return UriBuilder .fromPath(uriConfiguration.getUrl()) .build(); } /* Resource Method */ @POST @Path("/pageDetection") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) /** * Receives the url of the publisher, crawls the content of that url, applies a detector to check if the content is malicious. * @returns returns the probability of the page being malicious * @throws throws exception if the crawl call failed **/ public DetectionScore detectContent(UrlRequest urlRequest) throws Exception { return contentAnalysisOrchestrator.detectContentPage(urlRequest); } /* Orchestrator */ public DetectionScore detectContentPage(UrlRequest urlRequest) { try { Pair<Integer, HtmlPage> response = crawler.rawLoad(urlRequest.getUrl()); String content = response.getValue().text(); DetectionScore detectionScore = detector.getProbability(urlRequest.getUrl(), content); contentDetectionResultDao.insert(urlRequest.getAffiliateId(), urlRequest.getUrl(),detectionScore.getProbability()*1000, detectionScore.getRecommendation(), urlRequest.getRequestsPerUrl(), -1, urlRequest.getCreatedAt() ); return detectionScore; } catch (IOException e) { log.info("Error while analyzing the url : {}", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } }
我在考虑以下方法:
我orchestrator.detectContent(urlRequest)直接通过调度程序调用,而不是通过POST调用dropwizard资源方法。
orchestrator.detectContent(urlRequest)
协调器可以返回detectionScore,我将所有的detectScores存储在地图/表中,并执行批处理数据库插入,而不是像当前代码中那样单独插入。
我想对以上方法以及可能改善运行时间的其他技术发表一些意见。另外,我只是阅读了Java异步编程,但是似乎无法理解如何在上面的代码中使用它,因此也希望对此有所帮助。
谢谢。
编辑:我可以想到两个瓶颈:
系统具有8 GB的内存,其中4 GB似乎是可用的
$ free -m total used free shared buffers cached Mem: 7843 4496 3346 0 193 2339 -/+ buffers/cache: 1964 5879 Swap: 1952 489 1463
CPU使用率也最小:
top - 13:31:19 up 19 days, 15:39, 3 users, load average: 0.00, 0.00, 0.00 Tasks: 215 total, 1 running, 214 sleeping, 0 stopped, 0 zombie Cpu(s): 0.5%us, 0.0%sy, 0.0%ni, 99.4%id, 0.1%wa, 0.0%hi, 0.0%si, 0.0%st Mem: 8031412k total, 4605196k used, 3426216k free, 198040k buffers Swap: 1999868k total, 501020k used, 1498848k free, 2395344k cached
受到Davide的(伟大)答案的启发,这是一个示例,一种使用简单反应(我写的我的库)并行化此方法的简便方法。请注意,使用客户端来驱动服务器上的并发性稍有不同。
例
LazyReact streamBuilder = new LazyReact(15,15); streamBuilder.fromIterable(urlRequests) .filter(urlReq->!validateRequests.isWhitelisted(urlReq)) .forEach(request -> { ContentDetectionClient.detectContent(request); });
说明
看起来您可以从客户端驱动并发。这意味着您可以在服务器端的线程之间分配工作,而无需执行其他工作。在此示例中,我们发出了15个并发请求,但您可以将其设置为接近服务器可以处理的最大值。您的应用程序是IO绑定,因此您可以使用很多线程来提高性能。
简单反应可作为期货之源。因此,在这里我们为对ContentDetection客户端的每次调用创建一个异步任务。我们有15个可用线程,因此可以一次对服务器进行15次调用。
Java 7
Java的JDK 8功能有一个反向移植,称为StreamSupport,您也可以通过RetroLambda反向移植Lambda表达式。
为了用CompletableFutures实现相同的解决方案,我们可以为每个合格的URL创建一个Future Task。 更新 我认为我们不需要批处理它们,我们可以使用执行器来限制活跃期货的数量。我们只需要在最后加入所有这些。
Executor exec = Executors.newFixedThreadPool(maxActive);//15 threads List<CompletableFuture<Void>> futures= new ArrayList<>(); for (UrlRequest request : urlRequests) { if (!validateRequests.isWhitelisted(request)) { futures.add(CompletableFuture.runAsync(()->ContentDetectionClient.detectContent(request), exec)); } } CompletableFuture.allOf(futures.toArray()) .join();