/** * Create a test actor with the passed in name. * * @param props the actor Props * @param actorId name of actor * @param <T> the actor type * @return the ActorRef */ @SuppressWarnings("unchecked") public <T extends Actor> TestActorRef<T> createTestActor(Props props, String actorId) { InvalidActorNameException lastError = null; for (int i = 0; i < 10; i++) { try { TestActorRef<T> actorRef = TestActorRef.create(system, props, actorId); return (TestActorRef<T>) addActor(actorRef, true); } catch (InvalidActorNameException e) { lastError = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } } throw lastError; }
public FhirProxyTestKit(ActorSystem actorSystem, Class<? extends Actor> context, Class<? extends MockHTTPConnector> upstreamMock) { super(actorSystem); List<MockLauncher.ActorToLaunch> testActors = new ArrayList<>(); testActors.add(new MockLauncher.ActorToLaunch("fhir-context", context)); testActors.add(new MockLauncher.ActorToLaunch("http-connector", upstreamMock)); TestingUtils.launchActors(system, testConfig.getName(), testActors); expectNoMsg((FiniteDuration) dilated(Duration.create(20, TimeUnit.MILLISECONDS))); //delay a bit - the actors sometimes need a moment fhirProxyHandler = system.actorOf(Props.create(FhirProxyHandler.class, testConfig)); }
/** * Attempts to register the subscriber to the specified channel. * * @return the temporary actor that delegates the event to given handler */ public <T> ActorRef subscribeEvent(final Handler<T> handler, final Class<T> channel) { ActorRef ref = system.actorOf(new Props().withCreator(new Creator<Actor>() { @Override public Actor create() throws Exception { return new UntypedActor() { @Override public void onReceive(Object message) throws Exception { if (message != null && message.getClass().equals(channel)) { handler.handle(channel.cast(message)); } else { unhandled(message); } } }; } })); eventStream().subscribe(ref, channel); return ref; }
@Test public void detach() throws Exception { ActorModel<Integer> model = new ActorModel<Integer>() { @Override protected ActorRef newActor() { return system().actorOf(Props.apply(new Creator<Actor>() { @Override public Actor create() throws Exception { return new UntypedActor() { @Override public void onReceive(Object message) throws Exception { unhandled(message); } }; } })); } }; model.detach(); Thread.sleep(100); assertThat(model.getObject(), is(nullValue())); assertThat(model.getActorRef().isTerminated(), is(true)); }
private static void RegisterActors(Binder binder) { Logger.debug("Actor Scanner Started..."); final Map<String, ActorHolder> map = new HashMap<>(); final ConfigurationBuilder configBuilder = build(); final Reflections reflections = new Reflections(configBuilder.setScanners(new SubTypesScanner())); final Set<Class<? extends UntypedActor>> actors = reflections.getSubTypesOf(UntypedActor.class); final Set<Class<? extends AbstractActor>> abstractActors = reflections.getSubTypesOf(AbstractActor.class); loopOnActors(map, actors); loopOnAbstractActors(map, abstractActors); if(!map.isEmpty()) Logger.debug("Registering actors: "); for(final String key : map.keySet()) { final ActorHolder actorHolder = map.get(key); final Class<? extends Actor> actor = actorHolder.getActor(); if(actorHolder.isSingleton()) { Logger.debug("Binding class " + actor.getSimpleName() + " to name: " + key + " Singleton Scoped."); binder.bind(ActorRef.class).annotatedWith(Names.named(key)).toProvider(new ActorRefProvider(actor, key, true)).in(Singleton.class); } else { Logger.debug("Binding class " + actor.getSimpleName() + " to name: " + key + " Request Scoped."); binder.bind(ActorRef.class).annotatedWith(Names.named(key)).toProvider(new ActorRefProvider(actor, key, false)); PropsContext.put(key, actorHolder); } } }
private static void loopOnAbstractActors(Map<String, ActorHolder> map, Set<Class<? extends AbstractActor>> actors) { for(final Class<? extends Actor> actor : actors) { if(ignore.contains(actor.getSimpleName())) continue; final String named = getNamed(actor); final boolean isSingleton = isSingleton(actor); final ActorHolder actorHolder = new ActorHolder(actor, isSingleton); if(named != null) { map.put(named, actorHolder); } else { if(map.containsKey(actor.getSimpleName())){ map.put(actor.getName(), actorHolder); final ActorHolder tempHolder = map.remove(actor.getSimpleName()); map.put(tempHolder.getActor().getName(), tempHolder); } else map.put(actor.getSimpleName(), actorHolder); } } }
private static void loopOnActors(Map<String, ActorHolder> map, Set<Class<? extends UntypedActor>> actors) { for(final Class<? extends Actor> actor : actors) { if(ignore.contains(actor.getSimpleName())) continue; final String named = getNamed(actor); final boolean isSingleton = isSingleton(actor); final ActorHolder actorHolder = new ActorHolder(actor, isSingleton); if(named != null) { map.put(named, actorHolder); } else { if(map.containsKey(actor.getSimpleName())){ map.put(actor.getName(), actorHolder); final ActorHolder tempHolder = map.remove(actor.getSimpleName()); map.put(tempHolder.getActor().getName(), tempHolder); } else map.put(actor.getSimpleName(), actorHolder); } } }
@Override public Actor produce() { if (args == null) { return (Actor) applicationContext.getBean(actorBeanName); } else { return (Actor) applicationContext.getBean(actorBeanName, args); } }
@Override public Actor produce() { if (args != null) { return (Actor) applicationContext.getBean(actorBeanName, args); } else { return (Actor) applicationContext.getBean(actorBeanName); } }
@Override public Actor produce() { try { Actor newActor = type.newInstance(); ApplicationContextProvider.getApplicationContext().getAutowireCapableBeanFactory().autowireBean(newActor); return newActor; } catch (InstantiationException | IllegalAccessException e) { LOG.error("Unable to create actor of type:{}", type, e); } return null; }
@Override public Class<? extends Actor> actorClass() { return (Class<? extends Actor>) applicationContext.getType(actorBeanName); }
public <A extends Actor, F> Props props( Class<A> actorClass, Class<F> factoryClass, Function<F, A> factoryFunction) { return Props.create(GuiceFactoryActorProducer.class, injector, actorClass, factoryClass, factoryFunction); }
@Test public void ask() throws Exception { BaseAskActorModel<Integer, String> model = new BaseAskActorModel<Integer, String>() { @Override protected ActorRef newActor() { return system().actorOf(Props.apply(new Creator<Actor>() { @Override public Actor create() throws Exception { return new UntypedActor() { @Override public void onReceive(Object message) throws Exception { if (message instanceof String) { getSender().tell(Integer.parseInt((String) message), getSelf()); } else { unhandled(message); } } }; } })); } }; assertThat(model.ask("100").get(), is(100)); model.detach(); }
@Test public void askByConstructor() throws Exception { BaseAskActorModel<Integer, String> model = new BaseAskActorModel<Integer, String>() { @Override protected ActorRef newActor() { return system().actorOf(Props.apply(new Creator<Actor>() { @Override public Actor create() throws Exception { return new UntypedActor() { @Override public void onReceive(Object message) throws Exception { if (message instanceof String) { getSender().tell(Integer.parseInt((String) message), getSelf()); } else { unhandled(message); } } }; } })); } }; assertThat(model.ask("100").get(), is(100)); model.detach(); }
@Test public void askWithScalaFuture() throws Exception { BaseAskActorModel<Integer, String> model = new BaseAskActorModel<Integer, String>() { @Override protected ActorRef newActor() { return system().actorOf(Props.apply(new Creator<Actor>() { @Override public Actor create() throws Exception { return new UntypedActor() { @Override public void onReceive(Object message) throws Exception { if (message instanceof String) { getSender().tell(Integer.parseInt((String) message), getSelf()); } else { unhandled(message); } } }; } })); } }; Future<Integer> f = model.askWithScalaFuture("100"); assertThat(Await.result(f, Duration.apply("3 sec")), is(100)); assertThat(model.getObject(), is(100)); model.detach(); }
@Test public void askUpdatesObject() throws Exception { BaseAskActorModel<Integer, String> model = new BaseAskActorModel<Integer, String>() { @Override protected ActorRef newActor() { return system().actorOf(Props.apply(new Creator<Actor>() { @Override public Actor create() throws Exception { return new UntypedActor() { @Override public void onReceive(Object message) throws Exception { if (message instanceof String) { getSender().tell(Integer.parseInt((String) message), getSelf()); } else { unhandled(message); } } }; } })); } }; model.ask("100").get(); assertThat(model.getObject(), is(100)); model.detach(); }
@Test public void tell() throws Exception { final AtomicBoolean b = new AtomicBoolean(false); TellActorModel<Integer, String> model = new TellActorModel<Integer, String>() { @Override protected ActorRef newActor() { return system().actorOf(Props.apply(new Creator<Actor>() { @Override public Actor create() throws Exception { return new UntypedActor() { @Override public void onReceive(Object message) throws Exception { if (message instanceof String) { b.set(true); } else { unhandled(message); } } }; } })); } }; model.tell("test"); Thread.sleep(100); assertThat(b.get(), is(true)); model.detach(); }
private static void populateStatistic(ActorMetrics statistic, ActorCell ac) { Actor actor = ac.actor(); if (actor instanceof StatisticActor) { StatisticActor statisticActor = (StatisticActor) actor; statistic.setProcessedMessages(statisticActor.getProcessedMessages()); statistic.setProcessedMessageTime(statisticActor.getMessageTime()); } statistic.setQueueSize(ac.mailbox().numberOfMessages()); statistic.setActive(ac.currentMessage() != null); }
@SuppressWarnings("unchecked") static void ScheduleOnceActors() { final ConfigurationBuilder configBuilder = build(); final Reflections reflections = new Reflections(configBuilder.setScanners(new SubTypesScanner(), new TypeAnnotationsScanner())); final Set<Class<?>> schedules = reflections.getTypesAnnotatedWith(ScheduleOnce.class); if(!schedules.isEmpty()) Logger.debug("Scheduling actors once:"); for(final Class<?> scheduleOnce : schedules) { final ActorRef actor = Akka.system().actorOf(GuiceProvider.get(Akka.system()).props((Class<? extends Actor>) scheduleOnce)); final ScheduleOnce annotation = scheduleOnce.getAnnotation(ScheduleOnce.class); long initialDelay = 0; TimeUnit timeUnit = TimeUnit.MILLISECONDS; String configName = scheduleOnce.getName() + ".initialDelay"; String configEnabled = scheduleOnce.getName() + ".enabled"; if(config.getString(configEnabled) != null && !config.getBoolean(configEnabled)) continue; if(config.getString(configName) != null) { initialDelay = getTime(config.getString(configName)); timeUnit = getTimeUnit(config.getString(configName)); } else { initialDelay = annotation.initialDelay(); timeUnit = annotation.timeUnit(); } Akka.system().scheduler().scheduleOnce( Duration.apply(initialDelay, timeUnit), actor, "tick", Akka.system().dispatcher(), null); Logger.debug(scheduleOnce + " on delay: " + initialDelay + " " + timeUnit); } }
/** * Retrieve a matching actor class for a specific path. * <br/><br/> * The method will search the routing table for a matching entry on a FIFO basis. * * @see #addRoute(String, Class) * @see #addRegexRoute(String, Class) */ public Class<? extends Actor> getActorClassForPath(String path) { for (Route route : routes.keySet()) { if (route.isRegex) { Pattern p = Pattern.compile(route.path); Matcher m = p.matcher(path); if (m.matches()) { return routes.get(route); } } else if (route.path.equals(path)) { return routes.get(route); } } return null; }
@Override public Actor create() throws Exception { Actor actor = (Actor) ConstructorUtils.invokeConstructor(clazz, args); if(actor != null) { ctx.getAutowireCapableBeanFactory().autowireBean(actor); } return actor; }
@Override public Actor create() throws Exception { if (downloadFormat == DownloadFormat.SIMPLE_CSV) { return new SimpleCsvDownloadActor(); } else if (downloadFormat == DownloadFormat.DWCA) { return new DownloadDwcaActor(); } throw new IllegalStateException("Unsupported download format"); }
@SuppressWarnings("unchecked") @Override public Class<? extends Actor> actorClass() { return (Class<? extends Actor>) applicationContext.getType(actorBeanName); }
@Override public Actor produce() { return (Actor) applicationContext.getBean(actorBeanName); }
@Override public Actor produce() { return (Actor) applicationContext.getBean(beanActorName); }
@Override public Class<? extends Actor> actorClass() { return (Class<? extends Actor>) applicationContext.getType(beanActorName); }
/** * Create a Props for the specified type using the * SpringTypeActorProducer class. */ public Props props(Class<? extends Actor> type) { return Props.create(SpringTypeActorProducer.class, applicationContext, type); }
public SpringTypeActorProducer(ApplicationContext applicationContext, Class<? extends Actor> type) { this.applicationContext = applicationContext; this.type = type; }
@Override public Class<? extends Actor> actorClass() { return type; }
@Override public Actor produce() { return (Actor) applicationContext.getBean(actorBeanName, args); }
@Override @SuppressWarnings("unchecked") public Class<? extends Actor> actorClass() { return (Class<? extends Actor>) applicationContext.getType(actorBeanName); }