@SuppressWarnings("checkstyle:IllegalCatch") private void verifyActorReady(ActorRef actorRef) { // Sometimes we see messages go to dead letters soon after creation - it seems the actor isn't quite // in a state yet to receive messages or isn't actually created yet. This seems to happen with // actorSelection so, to alleviate it, we use an actorSelection and send an Identify message with // retries to ensure it's ready. Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS); Throwable lastError = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 10) { try { ActorSelection actorSelection = system.actorSelection(actorRef.path().toString()); Future<Object> future = Patterns.ask(actorSelection, new Identify(""), timeout); ActorIdentity reply = (ActorIdentity)Await.result(future, timeout.duration()); Assert.assertNotNull("Identify returned null", reply.getRef()); return; } catch (Exception | AssertionError e) { Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); lastError = e; } } throw new RuntimeException(lastError); }
@Override public Receive createReceive() { return receiveBuilder() .match(ActorIdentity.class, identity -> { calculator = identity.getRef(); if (calculator == null) { System.out.println("Remote actor not available: " + path); } else { getContext().watch(calculator); getContext().become(active, true); } }) .match(ReceiveTimeout.class, x -> { sendIdentifyRequest(); }) .build(); }
@Override public void onReceive(Object message) throws Exception { if (message instanceof Terminated) { // Handle remote actor termination this.onTerminated((Terminated) message); } else if (message instanceof ActorIdentity) { // Handle remote actor reconnection this.onIdentity((ActorIdentity) message); } // Handle all other events. Transition<?> transition = this.getTransition(this.state, message); if (transition != null) { transition.apply(this, message); } else { unhandled(message); } }
public Drone(DroneConfig config) { this.config = config; this.queen = registerRemoteActor(config.queenPath); this.monitor = registerRemoteActor(config.monitorPath); this.state = State.DISCONNECTED; // Create supervised actors. this.dataFetcher = this.getContext().actorOf(DataFetcher.makeProps(config.trainingSet)); config.trainingSet.reset(); // Define the state machine this.addTransition(State.DISCONNECTED, MsgConnectAndStart.class, new Transition<>(State.CONNECTING, CONNECT)); this.addTransition(State.CONNECTING, ActorIdentity.class, new Transition<>(State.STARTING, GET_INITIAL_MODEL, IS_QUEEN_IDENTITY)); this.addTransition(State.STARTING, MsgGetInitialModel.class, new Transition<>(State.STARTING, GET_INITIAL_MODEL)); this.addTransition(State.STARTING, MsgModel.class, new Transition<>(State.ACTIVE, START_TRAINING)); this.addTransition(State.STARTING, MsgStop.class, new Transition<>(State.STOPPED)); this.addTransition(State.ACTIVE, MsgUpdateDone.class, new Transition<>(State.ACTIVE, NEXT_UPDATE)); this.addTransition(State.ACTIVE, MsgStop.class, new Transition<>(State.STOPPED)); this.addTransition(State.STOPPED, MsgConnectAndStart.class, new Transition<>(State.STARTING, GET_INITIAL_MODEL)); this.addTransition(State.STOPPED, MsgReset.class, new Stay<>(RESET_DATASET)); this.addTransition(Terminated.class, new Transition<>(State.CONNECTING, IS_QUEEN_TERMINATED)); }
@Test public void testIdentifying() { getSystem().actorOf(ConstantEcho.props("foo"), "foo"); final ActorSelection selection = getSystem().actorSelection("/user/foo"); selection.tell(new Identify("identifyFoo"), testActor()); final Object[] seq = receiveN(1); ActorIdentity identity = (ActorIdentity) seq[0]; assertEquals("identifyFoo", identity.correlationId()); identity.getRef().tell("baz", testActor()); expectMsgEquals("foo"); }
@Override public void onReceive(Object msg) throws Exception { if (msg instanceof ActorIdentity && isTarget((ActorIdentity) msg)) { this.owner.tell(msg, getSender()); this.found = true; } if (msg.equals(START)) { this.found = false; getSelf().tell(LOOKUP, getSelf()); } else if (!found && msg.equals(LOOKUP)) { ActorSelection selection = this.getContext().actorSelection(path); selection.tell(new Identify(path), this.getSelf()); getContext().system().scheduler().scheduleOnce( Duration.create(RETRY_PERIOD, TimeUnit.SECONDS), getSelf(), LOOKUP, getContext().dispatcher(), getSelf()); } }
/** * When an {@link ActorIdentity} message is received, initialize the associated remote actor. * * @param identity */ private void onIdentity(ActorIdentity identity) { RemoteActor remoteActor = remoteActors.get(identity.getRef().path().toString()); if (remoteActor != null) { remoteActor.setRef(identity.getRef()); this.getContext().watch(identity.getRef()); } }
private <C extends RpcGateway> CompletableFuture<C> connectInternal( final String address, final Class<C> clazz, Function<ActorRef, InvocationHandler> invocationHandlerFactory) { checkState(!stopped, "RpcService is stopped"); LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", address, clazz.getName()); final ActorSelection actorSel = actorSystem.actorSelection(address); final Future<Object> identify = Patterns.ask(actorSel, new Identify(42), timeout.toMilliseconds()); final Future<C> resultFuture = identify.map(new Mapper<Object, C>(){ @Override public C checkedApply(Object obj) throws Exception { ActorIdentity actorIdentity = (ActorIdentity) obj; if (actorIdentity.getRef() == null) { throw new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.'); } else { ActorRef actorRef = actorIdentity.getRef(); InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader(); @SuppressWarnings("unchecked") C proxy = (C) Proxy.newProxyInstance( classLoader, new Class<?>[]{clazz}, invocationHandler); return proxy; } } }, actorSystem.dispatcher()); return FutureUtils.toJava(resultFuture); }
/** * Check whether a given identity represents the lookup's target actor. * * @param identity * @return * True if target. False otherwise. */ public boolean isTarget(ActorIdentity identity) { return identity.getRef() != null && identity.getRef().path().toString().equals(this.path); }