【AKKA干货】AKKA-HTTP(JAVA版)踩坑记


因为不会scala,所以只能使用java版本。

国内AKKA的中文资料实在太少,想要找解决方案真心头大。 特别是对我这种英文差的小白来说实在痛苦。

===================================================================================

先公布一下我的AKKA-HTTP的性能测试数据吧。

测试环境:华为云 2核4G 云耀云服务器

单机简单GET返回数据(hello world) 并发可达 30000+

请求转到AKKA集群环境输出 并发可达 23000-28000 (因为我在转发之前使用了AES解密,可能有影响)

AKKA-HTTP的几个难点主要集中在以下几点:

1. 支持websocket

2. 支持https(TSL)

3. RESTful实现

4. 跨域支持

5. 支持静态资源(屏蔽各种配置文件)

6. 一个进程同时绑定HTTP和HTTPS

7. 将http请求异步接管到akka集群

===================================================================================

好了,直接上干货吧。

支持websocket

private Flow<Message, Message, NotUsed> websocketFlow() {
    ActorRef actor =     
        system.actorOf(GatewayActor.props(Globals.getGameRouter())); //gameRouter是集群路由Group
    return socketFlow(actor);
}


 1 private Flow<Message, Message, NotUsed> socketFlow(ActorRef actor) {
 2         // 背压支撑
 3     Source<Message, NotUsed> source = Source.<String>actorRefWithBackpressure("ack", o -> {
 4         if (o == "complete")
 5         return Optional.of(CompletionStrategy.draining());
 6             else
 7         return Optional.empty();
 8         }, o -> Optional.empty()).map(message -> (Message) TextMessage.create(message))
 9                 .mapMaterializedValue(textMessage -> {
10                     actor.tell(textMessage, ActorRef.noSender());
11                     return NotUsed.getInstance();
12                 })
13         // .keepAlive(Duration.ofSeconds(10), () -> TextMessage.create("ping")) //这段代码可让服务端自动向客户端发送ping
14         ;
15 
16         Sink<Message, NotUsed> sink = Flow.<Message>create().to(Sink.actorRef(actor, PoisonPill.getInstance()));
17 
18         return Flow.fromSinkAndSource(sink, source);
19     }


@Override
protected Route routes() {
    // TODO Auto-generated method stub
    return path("gateway", () -> get(() -> handleWebSocketMessages(websocketFlow))));

支持HTTPS

第一步:在HttpApp中实现useHttps

1 public HttpsConnectionContext useHttps(ActorSystem system) {
 2         HttpsConnectionContext https = null;
 3         try {
 4           // initialise the keystore
 5           // !!! never put passwords into code !!!
 6           final char[] password = "123456789".toCharArray();
 7 
 8           final KeyStore ks = KeyStore.getInstance("PKCS12");
 9           final InputStream keystore = KutaHttpApp.class.getClassLoader().getResourceAsStream("xxx-xxx-com-akka-0709085814.pfx");
10           if (keystore == null) {
11             throw new RuntimeException("Keystore required!");
12           }
13           ks.load(keystore, password);
14           Enumeration<String> aliases = ks.aliases();
15             while(aliases.hasMoreElements()) {
16                 String next = aliases.nextElement();
17                 logger.info(next);
18                 java.security.Key key = ks.getKey(next, password);
19                 logger.info("alg:{},format:{}",key.getAlgorithm(),key.getFormat());
20             }
21           
22           final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
23           keyManagerFactory.init(ks, password);
24 
25           final TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
26           tmf.init(ks);
27 
28           final SSLContext sslContext = SSLContext.getInstance("TLS");
29           sslContext.init(keyManagerFactory.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
30 
31           https = ConnectionContext.https(sslContext);
32 
33         } catch (NoSuchAlgorithmException | KeyManagementException e) {
34           system.log().error("Exception while configuring HTTPS.", e);
35         } catch (CertificateException | KeyStoreException | UnrecoverableKeyException | IOException e) {
36           system.log().error("Exception while ", e);
37         }
38 
39         return https;
40     }

第二步:在main函数中注册

1 final Http http = Http.get(system);
2 HttpsConnectionContext https = app.useHttps(system);
3 http.setDefaultServerHttpContext(https);
4 Integer sslPort = PropertyUtils.getInteger("app", "gateway.ssl.port");
5 http.bindAndHandle(flow, ConnectHttp.toHost(host, sslPort), materializer);

RESTful实现

public Route RESTfulAsyncRouter() {
    return path(PathMatchers.segment("RESTful"),()->{
          complete("hello");
    });
}

跨域支持

1 public Route RESTfulRouter() {
 2         return path(PathMatchers.segment("RESTful"), 
 3                 ()-> concat(
 4                         post(()->{
 5                             return entity(Jackson.unmarshaller(JSONObject.class), json -> {
 6                                  final JSONObject data = json;
 7                                  final MessageDispatcher dispatcher = system.dispatchers().lookup(KSFConstants.BLOCKING_DISPATCHER);
 8                                  try {
 9                                     return 
10                                              CompletableFuture.<Route>supplyAsync(()->{
11                                                     final ActorRef RESTfull = system.actorOf(RESTfulActor.props(Globals.getHallRouter())
12                                                             .withDispatcher(KSFConstants.BLOCKING_DISPATCHER));
13 //                                                    logger.info("当前运行线程:{}",Thread.currentThread());
14                                                     CompletionStage<Optional<KutaJsonResponse>> rsp = Patterns
15                                                             .ask(RESTfull, data, timeout)
16                                                             .thenApply(a -> {
17                                                                 return Optional.of((KutaJsonResponse) a);
18                                                             });
19                                                     return onSuccess(() -> rsp, performed -> {
20                                                         RESTfull.tell(PoisonPill.getInstance(), ActorRef.noSender());
21                                                         List<HttpHeader> list = new ArrayList<>();
22                                                         list.add(HttpHeader.parse("Access-Control-Allow-Origin", "*"));
23                                                         list.add(HttpHeader.parse("Access-Control-Allow-Credentials", "true"));
24                                                         list.add(HttpHeader.parse("Access-Control-Allow-Methods", "POST,OPTIONS"));
25                                                         list.add(HttpHeader.parse("Access-Control-Expose-Headers", "Content-Type, Access-Control-Allow-Origin, Access-Control-Allow-Credentials"));
26                                                         list.add(HttpHeader.parse("Access-Control-Allow-Headers", "Content-Type,Access-Token,Authorization"));
27                                                         if (performed.isPresent()) {
28                                                             if(performed.get().getSize() > (1024 * 50)) {
29                                                                 return encodeResponseWith(
30                                                                         Collections.singletonList(Coder.Gzip),
31                                                                         ()->complete(StatusCodes.OK, list, performed.get(), Jackson.marshaller())
32                                                                         );
33                                                             }
34                                                             else {
35                                                                 return complete(StatusCodes.OK, list, performed.get(), Jackson.marshaller());
36                                                             }
37                                                         }
38                                                         else {
39                                                             return complete(StatusCodes.NOT_FOUND);
40                                                         }
41                                                 }).orElse(complete(StatusCodes.INTERNAL_SERVER_ERROR));
42                                                 }, dispatcher).get();
43                                 } catch (InterruptedException | ExecutionException e) {
44                                     // TODO Auto-generated catch block
45                                     e.printStackTrace();
46                                 }
47                                 return null;
48                             });
49                         })
50                         , options(()->{
51                             List<HttpHeader> list = new ArrayList<>();
52                             list.add(HttpHeader.parse("Access-Control-Allow-Origin", "*"));
53                             list.add(HttpHeader.parse("Access-Control-Allow-Credentials", "true"));
54                             list.add(HttpHeader.parse("Access-Control-Allow-Methods", "POST,OPTIONS"));
55                             list.add(HttpHeader.parse("Access-Control-Expose-Headers", "Content-Type, Access-Control-Allow-Origin, Access-Control-Allow-Credentials,Vary"));
56                             list.add(HttpHeader.parse("Access-Control-Allow-Headers", "Content-Type,Access-Token,Authorization"));
57                             return respondWithHeaders(list,()-> complete(""));
58                         })
59                         )
60                 );
61     }

支持静态资源(屏蔽各种配置文件)

public Route staticResourceRouter() {
        return path(PathMatchers.remaining(), remain -> get(()-> {
            if(remain.endsWith(".properties") || remain.endsWith(".xml") || remain.endsWith(".conf")) {
                return complete(StatusCodes.UNAUTHORIZED);
            }
            return getFromResource(remain);
            }));
    }

将资源文件放到你的classPath目录下即可

一个进程同时绑定HTTP和HTTPS

1 //我自己实现的HttpApp
 2 KutaHttpApp app = new KutaHttpApp(system);
 3 final Http http = Http.get(system);
 4 final ActorMaterializer materializer = ActorMaterializer.create(system);
 5 final Flow<HttpRequest, HttpResponse, NotUsed> flow = app.routes().flow(system, materializer);
 6 //先绑定http
 7 http.bindAndHandle(flow, ConnectHttp.toHost(host, port), materializer);
 8 boolean useHttps = Boolean.parseBoolean(PropertyUtils.getProperty("app", "gateway.usessl")); 
 9 if (useHttps) {
10     HttpsConnectionContext https = app.useHttps(system);
11     http.setDefaultServerHttpContext(https);
12     Integer sslPort = PropertyUtils.getInteger("app", "gateway.ssl.port");
13     http.bindAndHandle(flow, ConnectHttp.toHost(host, sslPort), materializer);
14     logger.info("启动ssl服务.host:{},port:{}",host,sslPort);
15 }

将http请求异步接管到akka集群

public Route RESTfulAsyncRouter() {
        return path(PathMatchers.segment("RESTful"),()->{
            return entity(Jackson.unmarshaller(JSONObject.class), json -> {
                final Set<HttpHeader> headers = new HashSet<>();
                return completeWith(Marshaller.entityToOKResponse(headers, Jackson.<KutaJsonResponse>marshaller()), f->{
                    system.actorOf(RESTfulAsyncActor.props(json, f));
                });
            });
        });
    }

研究了好久才研究出来。 希望能帮助到正在使用akka-http的同学们吧。


原文链接:https://www.cnblogs.com/kutasms/p/13447408.html