@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public SupervisorStrategy.Directive apply(Throwable param) throws Exception { if (param instanceof IllegalArgumentException) return SupervisorStrategy.restart(); if (param instanceof ArithmeticException) return SupervisorStrategy.resume(); if (param instanceof NullPointerException) return SupervisorStrategy.stop(); else return SupervisorStrategy.escalate(); } } ); }
@Override public SupervisorStrategy create() { return new OneForOneStrategy( false, new PFBuilder<Throwable, SupervisorStrategy.Directive>() .match( Exception.class, (Exception e) -> { if (e instanceof ActorKilledException) { LOG.debug("Actor was killed. Stopping it now.", e); } else { LOG.error("Actor failed with exception. Stopping it now.", e); } return SupervisorStrategy.Stop$.MODULE$; }) .build()); }
protected void initSupervisor(final String supervising, int nRetries, String maxDuration) { strategy = new OneForOneStrategy(nRetries == -1 ? Integer.MAX_VALUE : nRetries, ("-1".equals(maxDuration) ? Duration.Inf() : Duration.create(maxDuration)), new Function<Throwable, Directive>() { @Override public Directive apply(Throwable t) { switch (supervising) { case ESCALATE: return escalate(); case RESTART: return restart(); case RESUME: return resume(); case STOP: return stop(); default: return escalate(); } } }); }
@Override public SupervisorStrategy supervisorStrategy() { SupervisorStrategyInfo info = javactorInfoByJavactorType .get(javactor.getClass()).getSupervisorStrategyInfo().getInfo(); Duration withinDuration = toDuration(info.getTimeRange(), info.getTimeUnit()); final int maxNumRetries = info.getMaxNumRetries(); final boolean loggingEnabled = info.isLoggingEnabled(); return info.getType().equals(SupervisorStrategyType.ONE_FOR_ONE) ? new OneForOneStrategy(maxNumRetries, withinDuration, myDecider(), loggingEnabled ) : new AllForOneStrategy(maxNumRetries, withinDuration, myDecider(), loggingEnabled ); }
private static SupervisorStrategy buildResumeOnRuntimeErrorStrategy() { return new OneForOneStrategy(-1, Duration.Inf(), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public Directive apply(Throwable throwable) throws Exception { logException(throwable); if (throwable instanceof Error) { return OneForOneStrategy.escalate(); } else if (throwable instanceof RuntimeException) { return OneForOneStrategy.resume(); } else { return OneForOneStrategy.restart(); } } }); }
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create("1 minute"), (Function<Throwable, Directive>) t -> { LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); return SupervisorStrategy.resume(); }); }
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(10, Duration.create("1 minute"), t -> { LOG.error("An exception happened actor will be resumed", t); return SupervisorStrategy.resume(); }); }
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(-1, Duration.Inf(), throwable -> { logger.error(throwable, "Unknown session error"); if (throwable instanceof Error) { return OneForOneStrategy.escalate(); } else { return OneForOneStrategy.resume(); } }); }
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(-1, Duration.Inf(), t -> { log.info("Throwable, Work is failed for1 "+ t); if (t instanceof ActorInitializationException) return stop(); else if (t instanceof DeathPactException) return stop(); else if (t instanceof RuntimeException) { if (currentJobId!=null) { log.info("RuntimeException, Work is failed for "+ currentJobId); sendToMaster(new MasterWorkerProtocol.WorkFailed(workerId, jobId(),new Result(-1,"","","",null))); } getContext().become(receiveBuilder() .matchAny(p->idle.apply(p)) .build()); return restart(); } else if (t instanceof Exception) { if (currentJobId!=null) { log.info("Exception, Work is failed for "+ currentJobId); sendToMaster(new MasterWorkerProtocol.WorkFailed(workerId, jobId(),new Result(-1,"","","",null))); } getContext().become(receiveBuilder() .matchAny(p->idle.apply(p)) .build()); return restart(); } else { log.info("Throwable, Work is failed for "+ t); return escalate(); } } ); }
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy(-1, Duration.create("1 minute"), t -> { log.error(t, "DroneActor failure caught by supervisor."); System.err.println(t.getMessage()); return SupervisorStrategy.resume(); // Continue on all exceptions! }); }
/** * The supervisor strategy. * * @param notificationRetryNumber * Number of retry when a delivery failed. * @param notificationRetryDuration * How long to wait before attempting to distribute the message * again. */ private static SupervisorStrategy getSupervisorStrategy(int notificationRetryNumber, String notificationRetryDuration) { return new OneForOneStrategy(notificationRetryNumber, Duration.create(notificationRetryDuration), new Function<Throwable, Directive>() { @Override public Directive apply(Throwable t) { log.error("An notification processor reported an exception, retry", t); return resume(); } }); }
/** * Creates a {@link OneForOneStrategy} using the specified parameters. * * @param numberOfRetry * a number of retry * @param withinTimeRange * the time range * @param pluginConfigurationId * the unique id of the plugin configuration */ private static SupervisorStrategy getSupervisorStrategy(int numberOfRetry, Duration withinTimeRange, Long pluginConfigurationId) { final String errorMessage = String.format("An provisioning processor of the plugin %d reported an exception, retry", pluginConfigurationId); return new OneForOneStrategy(numberOfRetry, withinTimeRange, new Function<Throwable, Directive>() { @Override public Directive apply(Throwable t) { log.error(errorMessage, t); return resume(); } }); }
private static SupervisorStrategy buildResumeOrEscalateStrategy() { return new OneForOneStrategy(-1, Duration.Inf(), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public Directive apply(Throwable throwable) throws Exception { logException(throwable); if (throwable instanceof Error) { return OneForOneStrategy.escalate(); } else { return OneForOneStrategy.resume(); } } }); }
private static SupervisorStrategy buildRestartOrEscalateStrategy() { return new OneForOneStrategy(-1, Duration.Inf(), new Function<Throwable, SupervisorStrategy.Directive>() { @Override public Directive apply(Throwable throwable) throws Exception { logException(throwable); if (throwable instanceof Error) { return OneForOneStrategy.escalate(); } else { return OneForOneStrategy.restart(); } } }); }
@Override public Directive apply(Throwable t) throws Exception { if(t instanceof ActorInitializationException) { return OneForOneStrategy.stop(); } else if(t instanceof Exception) { return OneForOneStrategy.restart(); } return OneForOneStrategy.escalate(); }
@Nonnull public OneForOneStrategy getRestartChildStrategy() { return restartChildStrategy; }
@Nonnull public OneForOneStrategy getStopChildStrategy() { return stopChildStrategy; }
public static final SupervisorStrategy defaultStrategy() { final ExceptionElement exceptionElement=matchExceptionToErrorHadling(); Function<Throwable, Directive> behavior=new Function<Throwable, Directive>(){ @Override public Directive apply(Throwable t) throws Exception { ProcessorException e=(ProcessorException)t; if ( exceptionElement.getStategy()==ErrorStrategy.ONE && exceptionElement.getAction()==Action.SKIP) { stepexcmanager.tell(new MasterWorkerProtocol.WorkFailed(e.getWorkerId(), e.getWorkId()), ActorRef.noSender()); return SupervisorStrategy.resume(); } else if( exceptionElement.getStategy()==ErrorStrategy.ONE && exceptionElement.getAction()==Action.RETRY){ if(currentrestrart < exceptionElement.getTrynumber()-1){ executor.tell(new StepExecutionManager.Work(UUID.randomUUID().toString(),3), stepexcmanager); return SupervisorStrategy.restart(); }else{ stepexcmanager.tell(new MasterWorkerProtocol.WorkFailed(e.getWorkerId(), e.getWorkId()), ActorRef.noSender()); return SupervisorStrategy.resume(); } } else if(exceptionElement.getStategy()==ErrorStrategy.ALL && exceptionElement.getAction()==Action.SKIP){ stepexcmanager.tell(new OrchestratorMasterProtocol.BatchFail(Action.SKIP), ActorRef.noSender()); return SupervisorStrategy.resume(); } else if(exceptionElement.getStategy()==ErrorStrategy.ALL && exceptionElement.getAction()==Action.RETRY){ stepexcmanager.tell(new OrchestratorMasterProtocol.BatchFail(Action.RETRY), ActorRef.noSender()); } return SupervisorStrategy.escalate(); } }; if(exceptionElement!=null){ //AllForOneStrategy: The strategy is applied to all the children if(exceptionElement.getStategy()==ErrorStrategy.ALL){ return new AllForOneStrategy(exceptionElement.getTrynumber(),Duration.create(5,TimeUnit.SECONDS),behavior); } //OneForOneStrategy: The strategy is applied to only the children that fail else if(exceptionElement.getStategy()==ErrorStrategy.ONE){ return new OneForOneStrategy(exceptionElement.getTrynumber(), Duration.create(5,TimeUnit.SECONDS),behavior); } } // The Manager does not know how to handle this error return SupervisorStrategy.defaultStrategy(); }
@Override public Directive apply(Throwable t) { return OneForOneStrategy.stop(); }