/** * 启动 */ private void start(Config config) { String systemName = config.getString("bakka.system.name"); String path = config.getString("bakka.system.classpath"); List<String> services = config.getStringList("akka.cluster.roles"); this.actorSystem = ActorSystem.create(systemName, config); this.inbox = Inbox.create(actorSystem); Set<Class<?>> classes = ClassUtil.scan(path); for (Class<?> c : classes) { if (c.isAnnotationPresent(Bakka.class)) { Bakka an = c.getAnnotation(Bakka.class); if (services.contains(an.service())) { Props prop = Props.create(c).withDispatcher("dispatcher"); ActorRef ar = actorSystem.actorOf(prop, an.service()); localActors.put((Class<? extends BaseActor>) c, ar); } } } }
private void containRequest(final GrizzlyHTTPRequest request, final ActorRef requestHandler) { ExecutionContext ec = getContext().dispatcher(); Future<Object> f = future(new Callable<Object>() { public Object call() throws IOException { Inbox inbox = Inbox.create(getContext().system()); processGrizzlyRequest(inbox, requestHandler, request); return inbox.receive(getRootTimeout()); } }, ec); f.onComplete(new OnComplete<Object>() { @Override public void onComplete(Throwable throwable, Object result) throws Throwable { try { if (throwable != null) { log.error(throwable, "Request containment exception"); handleResponse(request.getResponseHandle(), 500, "text/plain", throwable.getMessage()); } else if (result == null || !(result instanceof MediatorHTTPResponse)) { String err = "Request handler responded with unexpected result: " + result; log.warning(err); handleResponse(request.getResponseHandle(), 500, "text/plain", err); } else { MediatorHTTPResponse mediatorHTTPResponse = (MediatorHTTPResponse) result; handleResponse(request.getResponseHandle(), mediatorHTTPResponse); } } finally { //trigger response to client request.getResponseHandle().resume(); } } }, ec); }
private void processGrizzlyRequest(final Inbox handlerInbox, final ActorRef requestHandler, final GrizzlyHTTPRequest request) throws IOException { final NIOReader in = request.getRequest().getNIOReader(); final Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); for (String hdr : request.getRequest().getHeaderNames()) { headers.put(hdr, request.getRequest().getHeader(hdr)); } final List<Pair<String, String>> params = new ArrayList<>(); for (String param : request.getRequest().getParameterNames()) { for (String value : request.getRequest().getParameterValues(param)) { params.add(Pair.of(param, value)); } } in.notifyAvailable(new ReadHandler() { final StringWriter bodyBuffer = new StringWriter(); char[] readBuffer = new char[1024]; private void read() throws IOException { while (in.isReady()) { int len = in.read(readBuffer); if (len > 0) { bodyBuffer.write(readBuffer, 0, len); } } } @Override public void onDataAvailable() throws Exception { read(); in.notifyAvailable(this); } @Override public void onError(Throwable throwable) { try { log.error(throwable, "Error during reading of request body"); handleResponse(request.getResponseHandle(), 500, "text/plain", throwable.getMessage()); } catch (IOException ex) { log.error(ex, "Error during reading of request body"); } finally { request.getResponseHandle().resume(); } } @Override public void onAllDataRead() throws Exception { try { read(); MediatorHTTPRequest mediatorHTTPRequest = buildMediatorHTTPRequest(requestHandler, request, bodyBuffer.toString(), headers, params); handlerInbox.send(requestHandler, mediatorHTTPRequest); } finally { IOUtils.closeQuietly(in); } } }); }
/** * 获得通用信箱 * * @return */ public static Inbox getInbox() { AkkaBootService instance = ContextResolver.getComponent(AkkaBootService.class); return instance.getInbox(); }
/** * Inbox creator wrapper. * * @return */ public Inbox createInbox() { return Inbox.create(actorSystem); }
/** * Inbox creator wrapper. * * @param actorRef * @return */ public Inbox createInboxAndWatch(ActorRef actorRef) { final Inbox inbox = Inbox.create(actorSystem); inbox.watch(actorRef); return inbox; }