@Override public void stopServer(RpcServer selfGateway) { if (selfGateway instanceof AkkaBasedEndpoint) { final AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway; final RpcEndpoint rpcEndpoint; synchronized (lock) { if (stopped) { return; } else { rpcEndpoint = actors.remove(akkaClient.getActorRef()); } } if (rpcEndpoint != null) { akkaClient.getActorRef().tell(Kill.getInstance(), ActorRef.noSender()); } else { LOG.debug("RPC endpoint {} already stopped or from different RPC service", selfGateway.getAddress()); } } }
@Test public void testTerminationOnFatalError() { highAvailabilityServices.setJobMasterLeaderRetriever( HighAvailabilityServices.DEFAULT_JOB_ID, new TestingLeaderRetrievalService()); new JavaTestKit(system){{ final ActorGateway taskManager = TestingUtils.createTaskManager( system, highAvailabilityServices, // no jobmanager new Configuration(), true, false); try { watch(taskManager.actor()); taskManager.tell(new FatalError("test fatal error", new Exception("something super bad"))); expectTerminated(d, taskManager.actor()); } finally { taskManager.tell(Kill.getInstance()); } }}; }
@Override public void onReceive(Object msg) throws Exception { if(msg instanceof Restart) { log.info("killing application"); app.tell(Kill.getInstance(), getSelf()); } else if(msg instanceof GetActorTree) { log.info("actor tree requested"); // we have to manually forward the message // as the monitor actor isn't allowed to talk // to the publisher service directly. ActorRef self = getSelf(), sender = getSender(); f.ask(monitor, new GetTree(), Tree.class).thenAccept(tree -> { sender.tell(tree, self); }); } else { unhandled(msg); } }
/** * Shuts down this registry and the associated {@link MetricReporter}. */ public void shutdown() { synchronized (lock) { Future<Boolean> stopFuture = null; FiniteDuration stopTimeout = null; if (queryService != null) { stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); try { stopFuture = Patterns.gracefulStop(queryService, stopTimeout); } catch (IllegalStateException ignored) { // this can happen if the underlying actor system has been stopped before shutting // the metric registry down // TODO: Pull the MetricQueryService actor out of the MetricRegistry LOG.debug("The metric query service actor has already been stopped because the " + "underlying ActorSystem has already been shut down."); } } if (reporters != null) { for (MetricReporter reporter : reporters) { try { reporter.close(); } catch (Throwable t) { LOG.warn("Metrics reporter did not shut down cleanly", t); } } reporters = null; } shutdownExecutor(); if (stopFuture != null) { boolean stopped = false; try { stopped = Await.result(stopFuture, stopTimeout); } catch (Exception e) { LOG.warn("Query actor did not properly stop.", e); } if (!stopped) { // the query actor did not stop in time, let's kill him queryService.tell(Kill.getInstance(), ActorRef.noSender()); } } } }
private static void stopActor(ActorRef actor) { if (actor != null) { actor.tell(Kill.getInstance(), ActorRef.noSender()); } }