public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration, DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) { this.actorSystem = actorSystem; this.shardManager = shardManager; this.clusterWrapper = clusterWrapper; this.configuration = configuration; this.datastoreContext = datastoreContext; this.dispatchers = new Dispatchers(actorSystem.dispatchers()); this.primaryShardInfoCache = primaryShardInfoCache; final LogicalDatastoreType convertedType = LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name()); this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType); setCachedProperties(); Address selfAddress = clusterWrapper.getSelfAddress(); if (selfAddress != null && !selfAddress.host().isEmpty()) { selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get(); } else { selfAddressHostPort = null; } }
@Test public void testResolve() { String type = "config"; ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, MEMBER_1); MemberName memberName = MEMBER_2; String peerId = ShardIdentifier.create("default", memberName, type).toString(); assertEquals("resolve", null, resolver.resolve(peerId)); Address address = new Address("tcp", "system"); resolver.addPeerAddress(memberName, address); String shardAddress = resolver.getShardActorAddress("default", memberName); assertEquals("getShardActorAddress", address.toString() + "/user/shardmanager-" + type + "/" + memberName.getName() + "-shard-default-" + type, shardAddress); assertEquals("resolve", shardAddress, resolver.resolve(peerId)); }
@Test public void testGetShardManagerPeerActorAddresses() { ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", MEMBER_1); resolver.addPeerAddress(MEMBER_1, new Address("tcp", "system1")); Address address2 = new Address("tcp", "system2"); resolver.addPeerAddress(MEMBER_2, address2); Address address3 = new Address("tcp", "system3"); resolver.addPeerAddress(MEMBER_3, address3); Collection<String> peerAddresses = resolver.getShardManagerPeerActorAddresses(); assertEquals("getShardManagerPeerActorAddresses", Sets.newHashSet( address2.toString() + "/user/shardmanager-config", address3.toString() + "/user/shardmanager-config"), Sets.newHashSet(peerAddresses)); }
@Override public Map<String, String> findRpcByName(final String name) { RoutingTable localTable = rpcRegistry.getLocalData(); // Get all RPCs from local bucket Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT)); // Get all RPCs from remote bucket Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets(); for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) { RoutingTable table = entry.getValue().getData(); rpcMap.putAll(getRpcMemberMapByName(table, name, entry.getKey().toString())); } log.debug("list of RPCs {} searched by name {}", rpcMap, name); return rpcMap; }
/** * Sends Gossip status to other members in the cluster. * <br> * 1. If there are no member, ignore the tick. <br> * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br> * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it. */ @VisibleForTesting void receiveGossipTick() { final Address address; switch (clusterMembers.size()) { case 0: //no members to send gossip status to return; case 1: address = clusterMembers.get(0); break; default: final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); address = clusterMembers.get(randomIndex); break; } LOG.trace("Gossiping to [{}]", address); getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address))); }
/** * Helper to collect buckets for requested members. * * @param members requested members */ private void getBucketsByMembers(final Collection<Address> members) { Map<Address, Bucket<T>> buckets = new HashMap<>(); //first add the local bucket if asked if (members.contains(selfAddress)) { buckets.put(selfAddress, getLocalBucket().snapshot()); } //then get buckets for requested remote nodes for (Address address : members) { if (remoteBuckets.containsKey(address)) { buckets.put(address, remoteBuckets.get(address)); } } getSender().tell(buckets, getSelf()); }
@Override protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) { final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size()); for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) { final RoutingTable table = e.getValue().getData(); final Collection<DOMRpcIdentifier> rpcs = table.getRoutes(); endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty() : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs))); } if (!endpoints.isEmpty()) { rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); } }
private void verifyEmptyBucket(final JavaTestKit testKit, final ActorRef registry, final Address address) throws AssertionError { Map<Address, Bucket<RoutingTable>> buckets; int numTries = 0; while (true) { buckets = retrieveBuckets(registry1, testKit, address); try { verifyBucket(buckets.get(address), Collections.emptyList()); break; } catch (AssertionError e) { if (++numTries >= 50) { throw e; } } Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } }
private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address, final JavaTestKit invoker) { final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = msg.getEndpoints(); Assert.assertEquals(1, endpoints.size()); final Optional<RemoteRpcEndpoint> maybeEndpoint = endpoints.get(address); Assert.assertNotNull(maybeEndpoint); Assert.assertTrue(maybeEndpoint.isPresent()); final RemoteRpcEndpoint endpoint = maybeEndpoint.get(); final ActorRef router = endpoint.getRouter(); Assert.assertNotNull(router); router.tell("hello", ActorRef.noSender()); final String s = invoker.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), String.class); Assert.assertEquals("hello", s); }
/** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { ActorSystem _system = ActorSystem.create("RemoteRouteeRouterExample"); Address addr1 = new Address("akka", "remotesys", "127.0.0.1", 2552); Address addr2 = new Address("akka", "remotesys", "127.0.0.1", 2552); Address[] addresses = new Address[] { addr1, addr2 }; ActorRef randomRouter = _system.actorOf(new Props(MsgEchoActor.class) .withRouter(new RemoteRouterConfig(new RandomRouter(5),addresses))); for (int i = 1; i <= 10; i++) { // sends randomly to actors randomRouter.tell(i); } _system.shutdown(); }
@SuppressWarnings("serial") public void remoteActorCreationDemo1() { log.info("Creating a actor using remote deployment mechanism"); // create the address object that points to the remote server Address addr = new Address("akka", "ServerSys", "127.0.0.1", 2552); // creating the ServerActor on the specified remote server final ActorRef serverActor = system.actorOf(new Props(ServerActor.class) .withDeploy(new Deploy(new RemoteScope(addr)))); // create a local actor and pass the reference of the remote actor actor = system.actorOf(new Props(new UntypedActorFactory() { public UntypedActor create() { return new ClientActor(serverActor); } })); // send a message to the local client actor actor.tell("Start-RemoteActorCreationDemo1"); }
/** * Add the new worker to the router mechanism * * @param address */ private void addWorkerRoute(String address) { // send the stop message to all the worker actors if (workerRouterActor != null) { for (int i = 0; i < workerAddressMap.size(); i++) workerRouterActor.tell("STOP"); } // add the address to the Map workerAddressMap.put(address, AddressFromURIString.parse(address)); Address[] addressNodes = new Address[workerAddressMap.size()]; Address[] workerAddress = workerAddressMap.values().toArray( addressNodes); // update the workerRouter actor with the information on all workers workerRouterActor = getContext().system().actorOf( new Props(WorkerActor.class).withRouter(new RemoteRouterConfig( new RoundRobinRouter(workerAddress.length), workerAddress))); }
public MyUnboundedPriorityMailbox(ActorSystem.Settings settings, Config config) { // Creating a new PriorityGenerator, super(new PriorityGenerator() { @Override public int gen(Object message) { if (message instanceof Address) return 0; // Worker Registration messages should be // treated // with highest priority else if (message.equals(PoisonPill.getInstance())) return 3; // PoisonPill when no other left else return 1; // By default they go with medium priority } }); }
private ActorPath path(String host) { final String[] split = host.split(":"); final String hostname = split[0]; final int port = Integer.parseInt(split[1]); final Address actorSystemAddress = new Address("akka.tcp", "concierge", hostname, port); return RootActorPath.apply(actorSystemAddress, "/").child("user"); }
public static void main(String[] args) throws IOException { ActorSystem actorSystem = ActorSystem.create(CLUSTER_NAME); actorSystem.actorOf(SimpleClusterListener.props()); final ActorMaterializer materializer = ActorMaterializer.create(actorSystem); Cluster cluster = Cluster.get(actorSystem); List<Address> addresses = Arrays.asList(System.getenv().get("SEED_NODES").split(",")) .stream() .map(ip -> new Address("akka.tcp", CLUSTER_NAME, ip, 2551)) .collect(Collectors.toList()); cluster.joinSeedNodes(addresses); }
Collection<String> getShardManagerPeerActorAddresses() { Collection<String> peerAddresses = new ArrayList<>(); for (Map.Entry<MemberName, Address> entry: memberNameToAddress.entrySet()) { if (!localMemberName.equals(entry.getKey())) { peerAddresses.add(getShardManagerActorPathBuilder(entry.getValue()).toString()); } } return peerAddresses; }
String getShardActorAddress(String shardName, MemberName memberName) { Address memberAddress = memberNameToAddress.get(memberName); if (memberAddress != null) { return getShardManagerActorPathBuilder(memberAddress).append("/").append( getShardIdentifier(memberName, shardName)).toString(); } return null; }
private void addPeerAddress(final MemberName memberName, final Address address) { peerAddressResolver.addPeerAddress(memberName, address); for (ShardInformation info : localShards.values()) { String shardName = info.getShardName(); String peerId = getShardIdentifier(memberName, shardName).toString(); info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf()); info.peerUp(memberName, peerId, getSelf()); } }
public String resolve(final MemberName memberName) { Preconditions.checkNotNull(memberName); final Address address = memberNameToAddress.get(memberName); Preconditions.checkNotNull(address, "Requested member[%s] is not present in the resolver ", memberName.toString()); return getActorPathBuilder(address).toString(); }
@Test public void testGetShardActorAddress() { ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", MEMBER_1); assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", MEMBER_2)); Address address2 = new Address("tcp", "system2"); resolver.addPeerAddress(MEMBER_2, address2); assertEquals("getPeerAddress", address2, resolver.getPeerAddress(MEMBER_2)); Address address3 = new Address("tcp", "system3"); resolver.addPeerAddress(MEMBER_3, address3); assertEquals("getPeerAddress", address3, resolver.getPeerAddress(MEMBER_3)); assertEquals("getShardActorAddress", address2.toString() + "/user/shardmanager-config/member-2-shard-default-config", resolver.getShardActorAddress("default", MEMBER_2)); assertEquals("getShardActorAddress", address3.toString() + "/user/shardmanager-config/member-3-shard-default-config", resolver.getShardActorAddress("default", MEMBER_3)); assertEquals("getShardActorAddress", address2.toString() + "/user/shardmanager-config/member-2-shard-topology-config", resolver.getShardActorAddress("topology", MEMBER_2)); resolver.removePeerAddress(MEMBER_2); assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", MEMBER_2)); assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("topology", MEMBER_2)); assertEquals("getShardActorAddress", address3.toString() + "/user/shardmanager-config/member-3-shard-default-config", resolver.getShardActorAddress("default", MEMBER_3)); }
@Test public void testOnReceiveAnother() throws Exception { final Address local = Address.apply("http", "local"); final Address remote = Address.apply("http", "remote"); final Throwable t = new RuntimeException("Another exception"); final InvalidAssociation cause = InvalidAssociation.apply(local, remote, t, Option.apply(null)); final AssociationErrorEvent event = new AssociationErrorEvent(cause, local, remote, true, Logging.ErrorLevel()); actor.tell(event, ActorRef.noSender()); verify(callback, never()).apply(); }
private void updateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) { /* * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close * the old registration. This minimizes churn observed by listeners, as they will not observe RPC * unavailability which would occur if we were to do it the other way around. * * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap, * hence we register all new implementations before closing all registrations. */ final Collection<DOMRpcImplementationRegistration<?>> prevRegs = new ArrayList<>(endpoints.size()); for (Entry<Address, Optional<RemoteRpcEndpoint>> e : endpoints.entrySet()) { LOG.debug("Updating RPC registrations for {}", e.getKey()); final DOMRpcImplementationRegistration<?> prevReg; final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue(); if (maybeEndpoint.isPresent()) { final RemoteRpcEndpoint endpoint = maybeEndpoint.get(); final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config); prevReg = regs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl, endpoint.getRpcs())); } else { prevReg = regs.remove(e.getKey()); } if (prevReg != null) { prevRegs.add(prevReg); } } for (DOMRpcImplementationRegistration<?> r : prevRegs) { r.close(); } }
@Override public Map<String, String> findRpcByRoute(final String routeId) { RoutingTable localTable = rpcRegistry.getLocalData(); Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT)); Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets(); for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) { RoutingTable table = entry.getValue().getData(); rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, entry.getKey().toString())); } log.debug("list of RPCs {} searched by route {}", rpcMap, routeId); return rpcMap; }
<T extends BucketData<T>> void getBucketsByMembers(final Collection<Address> members, final Consumer<Map<Address, Bucket<T>>> callback) { Patterns.ask(context.parent(), getBucketsByMembersMessage(members), timeout) .onComplete(new OnComplete<Object>() { @SuppressWarnings("unchecked") @Override public void onComplete(final Throwable failure, final Object success) { if (failure == null) { callback.accept((Map<Address, Bucket<T>>) success); } } }, context.dispatcher()); }
void getBucketVersions(final Consumer<Map<Address, Long>> callback) { Patterns.ask(context.parent(), Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete<Object>() { @SuppressWarnings("unchecked") @Override public void onComplete(final Throwable failure, final Object success) { if (failure == null) { callback.accept((Map<Address, Long>) success); } } }, context.dispatcher()); }
private void addPeer(final Address address) { if (!clusterMembers.contains(address)) { clusterMembers.add(address); } peers.computeIfAbsent(address, input -> getContext().system() .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress())); }
@VisibleForTesting void setClusterMembers(final Address... members) { clusterMembers.clear(); peers.clear(); for (Address addr : members) { addPeer(addr); } }
/** * Helper to collect all known buckets. * * @return self owned + remote buckets */ private Map<Address, Bucket<T>> getAllBuckets() { Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1); //first add the local bucket all.put(selfAddress, getLocalBucket().snapshot()); //then get all remote buckets all.putAll(remoteBuckets); return all; }
private void removeBucket(final Address addr) { final Bucket<T> bucket = remoteBuckets.remove(addr); if (bucket != null) { bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref)); onBucketRemoved(addr, bucket); } versions.remove(addr); }
private void addWatch(final Address addr, final ActorRef ref) { if (!watchedActors.containsKey(ref)) { getContext().watch(ref); LOG.debug("Watching {}", ref); } watchedActors.put(ref, addr); }
private void removeWatch(final Address addr, final ActorRef ref) { watchedActors.remove(ref, addr); if (!watchedActors.containsKey(ref)) { getContext().unwatch(ref); LOG.debug("No longer watching {}", ref); } }
private void actorTerminated(final Terminated message) { LOG.info("Actor termination {} received", message); for (Address addr : watchedActors.removeAll(message.getActor())) { versions.remove(addr); final Bucket<T> bucket = remoteBuckets.remove(addr); if (bucket != null) { LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr); onBucketRemoved(addr, bucket); } } }
@Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); system = ActorSystem.create("test"); final JavaTestKit testKit = new JavaTestKit(system); final RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("system").build(); final Props props = RpcRegistrar.props(config, service); testActorRef = new TestActorRef<>(system, props, testKit.getRef(), "actorRef"); endpointAddress = new Address("http", "local"); final DOMRpcIdentifier firstEndpointId = DOMRpcIdentifier.create( SchemaPath.create(true, QName.create("first:identifier", "foo"))); final DOMRpcIdentifier secondEndpointId = DOMRpcIdentifier.create( SchemaPath.create(true, QName.create("second:identifier", "bar"))); final JavaTestKit senderKit = new JavaTestKit(system); firstEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(firstEndpointId)); secondEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(secondEndpointId)); Mockito.doReturn(oldReg).when(service).registerRpcImplementation( Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs())); Mockito.doReturn(newReg).when(service).registerRpcImplementation( Mockito.any(RemoteRpcImplementation.class), Mockito.eq(secondEndpoint.getRpcs())); rpcRegistrar = testActorRef.underlyingActor(); }
@Test public void testHandleReceiveAddEndpoint() throws Exception { final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of( endpointAddress, Optional.of(firstEndpoint)); testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); Mockito.verify(service).registerRpcImplementation( Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs())); Mockito.verifyNoMoreInteractions(service, oldReg, newReg); }
@Test public void testHandleReceiveRemoveEndpoint() throws Exception { final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of( endpointAddress, Optional.empty()); testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); Mockito.verifyNoMoreInteractions(service, oldReg, newReg); }
/** * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated. */ @Test public void testRpcAddedOnMultiNodes() throws Exception { final JavaTestKit testKit = new JavaTestKit(node3); // Add rpc on node 1 List<DOMRpcIdentifier> addedRouteIds1 = createRouteIds(); registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender()); final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), UpdateRemoteEndpoints.class); // Add rpc on node 2 List<DOMRpcIdentifier> addedRouteIds2 = createRouteIds(); registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender()); final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), UpdateRemoteEndpoints.class); Address node2Address = node2.provider().getDefaultAddress(); Address node1Address = node1.provider().getDefaultAddress(); Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry3, testKit, node1Address, node2Address); verifyBucket(buckets.get(node1Address), addedRouteIds1); verifyBucket(buckets.get(node2Address), addedRouteIds2); Map<Address, Long> versions = retrieveVersions(registry3, testKit); Assert.assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(), versions.get(node1Address)); Assert.assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(), versions.get(node2Address)); assertEndpoints(req1, node1Address, invoker1); assertEndpoints(req2, node2Address, invoker2); }
private static Map<Address, Bucket<RoutingTable>> retrieveBuckets(final ActorRef bucketStore, final JavaTestKit testKit, final Address... addresses) { int numTries = 0; while (true) { bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef()); @SuppressWarnings("unchecked") Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), Map.class); boolean foundAll = true; for (Address addr : addresses) { Bucket<RoutingTable> bucket = buckets.get(addr); if (bucket == null) { foundAll = false; break; } } if (foundAll) { return buckets; } if (++numTries >= 50) { Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses) + ", Actual: " + buckets); } Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } }
@Test public void testAddRoutesConcurrency() throws Exception { final JavaTestKit testKit = new JavaTestKit(node1); final int nRoutes = 500; final Collection<DOMRpcIdentifier> added = new ArrayList<>(nRoutes); for (int i = 0; i < nRoutes; i++) { final DOMRpcIdentifier routeId = DOMRpcIdentifier.create(SchemaPath.create(true, new QName(new URI("/mockrpc"), "type" + i))); added.add(routeId); //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); registry1.tell(new AddOrUpdateRoutes(Arrays.asList(routeId)), ActorRef.noSender()); } FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS); int numTries = 0; while (true) { registry1.tell(GET_ALL_BUCKETS, testKit.getRef()); @SuppressWarnings("unchecked") Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(duration, Map.class); Bucket<RoutingTable> localBucket = buckets.values().iterator().next(); RoutingTable table = localBucket.getData(); if (table != null && table.size() == nRoutes) { for (DOMRpcIdentifier r : added) { Assert.assertTrue("RoutingTable contains " + r, table.contains(r)); } break; } if (++numTries >= 50) { Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size()); } Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } }