@Override public void run() { final Future<ActorRef> localShardFuture = context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier())); localShardFuture.onComplete(new OnComplete<ActorRef>() { @Override public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable { if (throwable != null) { tryReschedule(throwable); } else { LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup); system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef, shardingService, toLookup, lookupMaxRetries), system.dispatcher()); } } }, system.dispatcher()); }
private void createRoom(ApiRequestForward forward) { CREATE_ROOM_REQUEST request; try { request = CREATE_ROOM_REQUEST.parseFrom(forward.getArgs(0)); } catch (InvalidProtocolBufferException e) { getLogger().error("Input args error {}", e); return; } // fixme 加载用户信息 // 默认用户为在线 final ActorRef roomContext = getContext().actorOf(RoomContext.props(request.getRoomId(), null), "roomContext"); roomIdToRoomContextActor.put(request.getRoomId(), roomContext); // 把当前用户加入 room Id 列表上 request.getUserIdsList().forEach(u -> usersToRoomId.put(u, request.getRoomId())); }
@Test() public void getRecommendedContents() { TestKit probe = new TestKit(system); ActorRef subject = system.actorOf(props); Request reqObj = new Request(); reqObj.setRequestId("1"); reqObj.setOperation(ActorOperations.GET_RECOMMENDED_COURSES.getValue()); HashMap<String, Object> innerMap = new HashMap<>(); innerMap.put(JsonKey.REQUESTED_BY, "USR"); reqObj.setRequest(innerMap); subject.tell(reqObj, probe.getRef()); probe.expectMsgClass(duration("100 second"),ProjectCommonException.class); }
@Test public void shouldSendCoffeePreparedWithRandomCoffeeForInaccurateResponse() { new JavaTestKit(system) {{ Integer accuracy = 50; Long runs = 1000L; ActorRef guest = system.deadLetters(); ActorRef barista = system.actorOf(Barista.props(duration("0 milliseconds"), accuracy)); List<Coffee> coffees = new ArrayList<>(); for (int i = 0; i < runs; i++) { barista.tell(new Barista.PrepareCoffee(new Coffee.Akkaccino(), guest), getRef()); Barista.CoffeePrepared cp = expectMsgClass(duration("50 milliseconds"), Barista.CoffeePrepared.class); coffees.add(cp.coffee); } Long expectedCount = runs * accuracy / 100; Long variation = expectedCount / 10; Long numberOfCorrectCoffee = coffees.stream().filter(c -> c.equals(new Coffee.Akkaccino())).count(); assertThat(numberOfCorrectCoffee).isBetween(expectedCount - variation, expectedCount + variation); }}; }
@Test public void testCourseProgressWithInvalidBatchIdNull(){ TestKit probe = new TestKit(system); ActorRef subject = system.actorOf(props); Request actorMessage = new Request(); actorMessage.put(JsonKey.REQUESTED_BY , userId); actorMessage.put(JsonKey.BATCH_ID , null); actorMessage.put(JsonKey.PERIOD , "fromBegining"); actorMessage.setOperation(ActorOperations.COURSE_PROGRESS_METRICS.getValue()); subject.tell(actorMessage, probe.getRef()); ProjectCommonException res= probe.expectMsgClass(duration("100 second"),ProjectCommonException.class); }
@Test public void shouldRestartWaiterAndResendPrepareCoffeeToBaristaOnFailure() { new JavaTestKit(system) {{ createActor(CoffeeHouse.class, "resend-prepare-coffee", () -> new CoffeeHouse(Integer.MAX_VALUE) { @Override protected ActorRef createBarista() { return getRef(); } @Override protected ActorRef createWaiter() { //stubbing out the waiter actor to always throw exception return context().actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() { @Override public Receive createReceive() { return receiveBuilder().matchAny(o -> { throw new Waiter.FrustratedException(new Coffee.Akkaccino(), system.deadLetters()); }).build(); } }), "waiter"); } }); ActorRef waiter = expectActor(this, "/user/resend-prepare-coffee/waiter"); waiter.tell("Blow up", ActorRef.noSender()); expectMsgEquals(new Barista.PrepareCoffee(new Coffee.Akkaccino(), system.deadLetters())); }}; }
@Test public void shouldLogResponseFromCoffeeHouse() { new JavaTestKit(system) {{ interceptInfoLogMessage(this, "stub response", 1, () -> { new CoffeeHouseApp(system) { @Override protected ActorRef createCoffeeHouse() { return createStubActor("stub-coffee-house", () -> new AbstractLoggingActor() { @Override public Receive createReceive() { return receiveBuilder().matchAny(o -> getSender().tell("stub response", getSelf())).build(); } }); } }; }); }}; }
public static void tell(Request request) { if (null != BackgroundRequestRouterActor.routerMap && null != BackgroundRequestRouterActor.routerMap.get(request.getOperation())) { BackgroundRequestRouterActor.routerMap.get(request.getOperation()).tell(request, ActorRef.noSender()); } else if (null != RequestRouterActor.routerMap && null != RequestRouterActor.routerMap.get(request.getOperation())) { RequestRouterActor.routerMap.get(request.getOperation()).tell(request, ActorRef.noSender()); } else { Object obj = ActorSystemFactory.getActorSystem().initializeActorSystem(request.getOperation()); if (obj instanceof ActorRef) { ProjectLogger .log("In ActorUtil(org.sunbird.learner.util) Actor ref is running " + ((ActorRef) obj)); ((ActorRef) obj).tell(request, ActorRef.noSender()); } else { ProjectLogger.log("In ActorUtil(org.sunbird.learner.util) Actor selection is running " + ((ActorSelection) obj)); ((ActorSelection) obj).tell(request, ActorRef.noSender()); } } }
@Test public void testUpdateTanentPreference(){ TestKit probe = new TestKit(system); ActorRef subject = system.actorOf(props); Request actorMessage = new Request(); List<Map<String , Object>> reqList = new ArrayList<>(); Map<String , Object> map = new HashMap<>(); map.put(JsonKey.ROLE , "admin"); reqList.add(map); actorMessage.getRequest().put(JsonKey.TENANT_PREFERENCE , reqList); actorMessage.getRequest().put(JsonKey.ROOT_ORG_ID , orgId); actorMessage.getRequest().put(JsonKey.REQUESTED_BY , USER_ID); actorMessage.setOperation(ActorOperations.UPDATE_TENANT_PREFERENCE.getValue()); subject.tell(actorMessage, probe.getRef()); Response res= probe.expectMsgClass(duration("100 second"),Response.class); }
/** * This method handles the canCommit phase for a transaction. * * @param transactionID the ID of the transaction to canCommit * @param sender the actor to which to send the response * @param shard the transaction's shard actor */ void handleCanCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) { // Lookup the cohort entry that was cached previously (or should have been) by // transactionReady (via the ForwardedReadyTransaction message). final CohortEntry cohortEntry = cohortCache.get(transactionID); if (cohortEntry == null) { // Either canCommit was invoked before ready (shouldn't happen) or a long time passed // between canCommit and ready and the entry was expired from the cache or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID)); log.error(ex.getMessage()); sender.tell(new Failure(ex), shard.self()); return; } cohortEntry.setReplySender(sender); cohortEntry.setShard(shard); handleCanCommit(cohortEntry); }
@Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) { { final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new FindLocalShard("non-existent", false), getRef()); LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); assertEquals("getShardName", "non-existent", notFound.getShardName()); } }; }
@Test public void test11joinUserOrganisationInvalidOrgId(){ TestKit probe = new TestKit(system); ActorRef subject = system.actorOf(props); Request reqObj = new Request(); reqObj.setOperation(ActorOperations.JOIN_USER_ORGANISATION.getValue()); HashMap<String, Object> innerMap = new HashMap<>(); reqObj.getRequest().put(JsonKey.USER_ORG , innerMap); innerMap.put(JsonKey.ORGANISATION_ID , orgId+"bjic3r9"); innerMap.put(JsonKey.USER_ID , USER_ID); List<String> roles = new ArrayList<>(); roles.add("ADMIN"); innerMap.put(JsonKey.ROLES, roles); subject.tell(reqObj, probe.getRef()); ProjectCommonException resp = probe.expectMsgClass(duration("200 second"),ProjectCommonException.class); }
private synchronized void performRegistration(ActorRef shard) { if (isClosed()) { return; } cohortRegistry = shard; Future<Object> future = Patterns.ask(shard, new DataTreeCohortActorRegistry.RegisterCohort(subtree, actor), TIMEOUT); future.onComplete(new OnComplete<Object>() { @Override public void onComplete(Throwable failure, Object val) { if (failure != null) { LOG.error("Unable to register {} as commit cohort", getInstance(), failure); } if (isClosed()) { removeRegistration(); } } }, actorContext.getClientDispatcher()); }
@Test @SuppressWarnings("checkstyle:IllegalCatch") public void testExecuteRemoteOperationAsync() { new JavaTestKit(getSystem()) { { ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef)); ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class), mock(Configuration.class)); ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); Future<Object> future = actorContext.executeOperationAsync(actor, "hello"); try { Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); assertEquals("Result", "hello", result); } catch (Exception e) { throw new AssertionError(e); } } }; }
@Test public void testUpdateUserTcStatus(){ TestKit probe = new TestKit(system); ActorRef subject = system.actorOf(props); Request actorMessage = new Request(); Map<String , Object> map = new HashMap<>(); map.put(JsonKey.TERM_AND_CONDITION_STATUS , "ACCEPTED"); actorMessage.getRequest().put(JsonKey.TENANT_PREFERENCE , map); actorMessage.getRequest().put(JsonKey.ROOT_ORG_ID , orgId); actorMessage.getRequest().put(JsonKey.REQUESTED_BY , USER_ID); actorMessage.setOperation(ActorOperations.UPDATE_TC_STATUS_OF_USER.getValue()); subject.tell(actorMessage, probe.getRef()); Response res= probe.expectMsgClass(duration("100 second"),Response.class); }
public void onReceiveTestWithInvalidOperation() throws Throwable { TestKit probe = new TestKit(system); ActorRef subject = system.actorOf(props); Request reqObj = new Request(); reqObj.setRequestId("1211"); reqObj.setOperation("INVALID_OPERATION"); HashMap<String, Object> innerMap = new HashMap<>(); innerMap.put(JsonKey.COURSE, reqObj.getRequest()); innerMap.put(JsonKey.USER_ID, "USR1"); innerMap.put(JsonKey.ID, ""); reqObj.setRequest(innerMap); subject.tell(reqObj, probe.getRef()); probe.expectMsgClass(duration("100 second"), ProjectCommonException.class); }
@Test public void testdGetSkillWithInvalidUserId(){ TestKit probe = new TestKit(system); ActorRef subject = system.actorOf(props); Request actorMessage = new Request(); actorMessage.put(JsonKey.REQUESTED_BY , USER_ID); actorMessage.put(JsonKey.ENDORSED_USER_ID , ENDORSED_USER_ID+1123); actorMessage.setOperation(ActorOperations.GET_SKILL.getValue()); subject.tell(actorMessage, probe.getRef()); ProjectCommonException res= probe.expectMsgClass(duration("10 second"),ProjectCommonException.class); }
@Test public void serveCoffeeShouldBeSentAfterFinishCoffeeDuration() { new JavaTestKit(system) {{ ActorRef guest = createGuest(this, getRef()); new Within(duration("50 milliseconds"), duration("200 milliseconds")) { @Override protected void run() { guest.tell(new Waiter.CoffeeServed(new Coffee.Akkaccino()), ActorRef.noSender()); expectMsgEquals(new Waiter.ServeCoffee(new Coffee.Akkaccino())); } }; }}; }
private static Props printerProps(ActorRef coffeeHouse) { return Props.create(AbstractLoggingActor.class, () -> new AbstractLoggingActor() { { coffeeHouse.tell("Brew Coffee", self()); receive(ReceiveBuilder. matchAny(o -> log().info(o.toString())).build() ); } }); }
@Test public void sendingServeCoffeeShouldResultInApproveCoffeeToCoffeeHouse() { new JavaTestKit(system) {{ ActorRef coffeeHouse = getRef(); TestProbe guest = new TestProbe(system); ActorRef waiter = system.actorOf(Waiter.props(coffeeHouse)); waiter.tell(new Waiter.ServeCoffee(new Coffee.Akkaccino()), guest.ref()); expectMsgEquals(new CoffeeHouse.ApproveCoffee(new Coffee.Akkaccino(), guest.ref())); }}; }
private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot) { // We found that the log was out of sync so just send a negative reply. final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(), context.getPayloadVersion(), forceInstallSnapshot); log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply); sender.tell(reply, actor()); }
@Test public void checkActorsCreation() { new TestKit(system) {{ for(ActorDefinitor definitor : App.actorDefinitions) { ActorRef ref = Utilities.getActor(definitor.getActorName(), system); assertTrue(ref != null); } }}; }
private ActorRef getOrCreateTenantActor(TenantId tenantId) { ActorRef tenantActor = tenantActors.get(tenantId); if (tenantActor == null) { tenantActor = context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId)) .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString()); tenantActors.put(tenantId, tenantActor); } return tenantActor; }
@BeforeClass public static void startApp() { app = Helpers.fakeApplication(); Helpers.start(app); headerMap = new HashMap<String, String[]>(); headerMap.put(HeaderParam.X_Consumer_ID.getName(), new String[]{"Service test consumer"}); headerMap.put(HeaderParam.X_Device_ID.getName(), new String[]{"Some Device Id"}); headerMap.put(HeaderParam.X_Authenticated_Userid.getName(), new String[]{"Authenticated user id"}); headerMap.put(JsonKey.MESSAGE_ID, new String[]{"Unique Message id"}); system = ActorSystem.create("system"); ActorRef subject = system.actorOf(props); BaseController.setActorRef(subject); }
private ActorRef switchMatchModeActor(ApiRequest request) { ByteString matchMode = request.getArgs(0); if (matchMode == null || matchMode.isEmpty()) { throw new IllegalArgumentException("Arg : matchMode must be input."); } return selectMatchModeActorRef(MATCH_MODE.forNumber(Integer.valueOf(matchMode.toStringUtf8()))); }
@Test public void sendingApproveCoffeeShouldForwardPrepareCoffeeIfCaffeineLimitNotReached() { new JavaTestKit(system) {{ ActorRef coffeeHouse = createActor(CoffeeHouse.class, "prepare-coffee", () -> new CoffeeHouse(Integer.MAX_VALUE) { @Override protected ActorRef createBarista() { return getRef(); } }); coffeeHouse.tell(new CoffeeHouse.CreateGuest(new Coffee.Akkaccino(), Integer.MAX_VALUE), ActorRef.noSender()); ActorRef guest = expectActor(this, "/user/prepare-coffee/$*"); coffeeHouse.tell(new CoffeeHouse.ApproveCoffee(new Coffee.Akkaccino(), guest), getRef()); expectMsgEquals(new Barista.PrepareCoffee(new Coffee.Akkaccino(), guest)); }}; }
@Test public void sendingCoffeeFinishedShouldResultInServeCoffeeMessageToWaiter() { new JavaTestKit(system) {{ ActorRef guest = createGuest(this, getRef()); guest.tell(Guest.CoffeeFinished.Instance, ActorRef.noSender()); expectMsgEquals(new Waiter.ServeCoffee(new Coffee.Akkaccino())); }}; }
@SuppressWarnings("checkstyle:IllegalCatch") private static void testFindPrimaryExceptions(final Object expectedException) throws Exception { ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props()); DatastoreContext dataStoreContext = DatastoreContext.newBuilder() .logicalStoreType(LogicalDatastoreType.CONFIGURATION) .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { @Override protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) { return Futures.successful(expectedException); } }; Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar"); try { Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); fail("Expected" + expectedException.getClass().toString()); } catch (Exception e) { if (!expectedException.getClass().isInstance(e)) { fail("Expected Exception of type " + expectedException.getClass().toString()); } } Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); }
@Test public void shouldSendComplaintWhenWrongDrinkIsSent() { new JavaTestKit(system) {{ ActorRef guest = createGuest(this, getRef()); guest.tell(new Waiter.CoffeeServed(new Coffee.MochaPlay()), ActorRef.noSender()); expectMsgEquals(new Waiter.Complaint((new Coffee.Akkaccino()))); }}; }
@Test public void sendingPrepareCoffeeShouldResultInCoffeePreparedResponse() { new JavaTestKit(system) {{ ActorRef barista = system.actorOf(Barista.props(duration("100 milliseconds"), 100)); new Within(duration("50 milliseconds"), duration("1000 milliseconds")) { @Override protected void run() { barista.tell(new Barista.PrepareCoffee(new Coffee.Akkaccino(), system.deadLetters()), getRef()); expectMsgEquals(new Barista.CoffeePrepared(new Coffee.Akkaccino(), system.deadLetters())); } }; }}; }
@Test public void sendingApproveCoffeeShouldForwardPrepareCoffeeIfCaffeineLimitNotReached() { new JavaTestKit(system) {{ ActorRef coffeeHouse = createActor(CoffeeHouse.class, "prepare-coffee", () -> new CoffeeHouse(Integer.MAX_VALUE) { @Override protected ActorRef createBarista() { return getRef(); } }); coffeeHouse.tell(new CoffeeHouse.CreateGuest(new Coffee.Akkaccino()), ActorRef.noSender()); ActorRef guest = expectActor(this, "/user/prepare-coffee/$*"); coffeeHouse.tell(new CoffeeHouse.ApproveCoffee(new Coffee.Akkaccino(), guest), getRef()); expectMsgEquals(new Barista.PrepareCoffee(new Coffee.Akkaccino(), guest)); }}; }
@Test public void sendingCoffeeServedShouldIncreaseCoffeeCount() { new JavaTestKit(system) {{ ActorRef guest = system.actorOf(Guest.props(system.deadLetters(), new Coffee.Akkaccino(), duration("100 milliseconds"), Integer.MAX_VALUE)); interceptInfoLogMessage(this, ".*[Ee]njoy.*1\\.*", 1, () -> { guest.tell(new Waiter.CoffeeServed(new Coffee.Akkaccino()), ActorRef.noSender()); }); }}; }
@Test public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception { LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting"); new JavaTestKit(getSystem()) { { String memberId = "member-1-shard-default-" + shardMrgIDSuffix; final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); DataTree mockDataTree = mock(DataTree.class); shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mockDataTree, DataStoreVersions.CURRENT_VERSION), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); shardManager.tell( new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); } }; LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending"); }