Java 类akka.actor.Address 实例源码

项目:hashsdn-controller    文件:ActorContext.java   
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
        ClusterWrapper clusterWrapper, Configuration configuration,
        DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
    this.actorSystem = actorSystem;
    this.shardManager = shardManager;
    this.clusterWrapper = clusterWrapper;
    this.configuration = configuration;
    this.datastoreContext = datastoreContext;
    this.dispatchers = new Dispatchers(actorSystem.dispatchers());
    this.primaryShardInfoCache = primaryShardInfoCache;

    final LogicalDatastoreType convertedType =
            LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
    this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);

    setCachedProperties();

    Address selfAddress = clusterWrapper.getSelfAddress();
    if (selfAddress != null && !selfAddress.host().isEmpty()) {
        selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
    } else {
        selfAddressHostPort = null;
    }

}
项目:hashsdn-controller    文件:ShardPeerAddressResolverTest.java   
@Test
public void testResolve() {
    String type = "config";
    ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, MEMBER_1);

    MemberName memberName = MEMBER_2;
    String peerId = ShardIdentifier.create("default", memberName, type).toString();

    assertEquals("resolve", null, resolver.resolve(peerId));

    Address address = new Address("tcp", "system");
    resolver.addPeerAddress(memberName, address);

    String shardAddress = resolver.getShardActorAddress("default", memberName);
    assertEquals("getShardActorAddress", address.toString() + "/user/shardmanager-" + type + "/"
            + memberName.getName() + "-shard-default-" + type, shardAddress);

    assertEquals("resolve", shardAddress, resolver.resolve(peerId));
}
项目:hashsdn-controller    文件:ShardPeerAddressResolverTest.java   
@Test
public void testGetShardManagerPeerActorAddresses() {
    ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", MEMBER_1);

    resolver.addPeerAddress(MEMBER_1, new Address("tcp", "system1"));

    Address address2 = new Address("tcp", "system2");
    resolver.addPeerAddress(MEMBER_2, address2);

    Address address3 = new Address("tcp", "system3");
    resolver.addPeerAddress(MEMBER_3, address3);

    Collection<String> peerAddresses = resolver.getShardManagerPeerActorAddresses();
    assertEquals("getShardManagerPeerActorAddresses", Sets.newHashSet(
            address2.toString() + "/user/shardmanager-config",
            address3.toString() + "/user/shardmanager-config"), Sets.newHashSet(peerAddresses));
}
项目:hashsdn-controller    文件:RemoteRpcRegistryMXBeanImpl.java   
@Override
public Map<String, String> findRpcByName(final String name) {
    RoutingTable localTable = rpcRegistry.getLocalData();
    // Get all RPCs from local bucket
    Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));

    // Get all RPCs from remote bucket
    Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
    for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
        RoutingTable table = entry.getValue().getData();
        rpcMap.putAll(getRpcMemberMapByName(table, name, entry.getKey().toString()));
    }

    log.debug("list of RPCs {} searched by name {}", rpcMap, name);
    return rpcMap;
}
项目:hashsdn-controller    文件:Gossiper.java   
/**
 * Sends Gossip status to other members in the cluster.
 * <br>
 * 1. If there are no member, ignore the tick. <br>
 * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
 * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
 */
@VisibleForTesting
void receiveGossipTick() {
    final Address address;
    switch (clusterMembers.size()) {
        case 0:
            //no members to send gossip status to
            return;
        case 1:
            address = clusterMembers.get(0);
            break;
        default:
            final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
            address = clusterMembers.get(randomIndex);
            break;
    }

    LOG.trace("Gossiping to [{}]", address);
    getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
}
项目:hashsdn-controller    文件:BucketStoreActor.java   
/**
 * Helper to collect buckets for requested members.
 *
 * @param members requested members
 */
private void getBucketsByMembers(final Collection<Address> members) {
    Map<Address, Bucket<T>> buckets = new HashMap<>();

    //first add the local bucket if asked
    if (members.contains(selfAddress)) {
        buckets.put(selfAddress, getLocalBucket().snapshot());
    }

    //then get buckets for requested remote nodes
    for (Address address : members) {
        if (remoteBuckets.containsKey(address)) {
            buckets.put(address, remoteBuckets.get(address));
        }
    }

    getSender().tell(buckets, getSelf());
}
项目:hashsdn-controller    文件:RpcRegistry.java   
@Override
protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
    final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());

    for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
        final RoutingTable table = e.getValue().getData();

        final Collection<DOMRpcIdentifier> rpcs = table.getRoutes();
        endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
                : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
    }

    if (!endpoints.isEmpty()) {
        rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
    }
}
项目:hashsdn-controller    文件:RpcRegistryTest.java   
private void verifyEmptyBucket(final JavaTestKit testKit, final ActorRef registry, final Address address)
        throws AssertionError {
    Map<Address, Bucket<RoutingTable>> buckets;
    int numTries = 0;
    while (true) {
        buckets = retrieveBuckets(registry1, testKit, address);

        try {
            verifyBucket(buckets.get(address), Collections.emptyList());
            break;
        } catch (AssertionError e) {
            if (++numTries >= 50) {
                throw e;
            }
        }

        Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
    }
}
项目:hashsdn-controller    文件:RpcRegistryTest.java   
private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address,
        final JavaTestKit invoker) {
    final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = msg.getEndpoints();
    Assert.assertEquals(1, endpoints.size());

    final Optional<RemoteRpcEndpoint> maybeEndpoint = endpoints.get(address);
    Assert.assertNotNull(maybeEndpoint);
    Assert.assertTrue(maybeEndpoint.isPresent());

    final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
    final ActorRef router = endpoint.getRouter();
    Assert.assertNotNull(router);

    router.tell("hello", ActorRef.noSender());
    final String s = invoker.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), String.class);
    Assert.assertEquals("hello", s);
}
项目:trial    文件:Example2.java   
/**
 * @param args
 * @throws InterruptedException
 */
public static void main(String[] args) throws InterruptedException {
    ActorSystem _system = ActorSystem.create("RemoteRouteeRouterExample");

    Address addr1 = new Address("akka", "remotesys", "127.0.0.1", 2552);
    Address addr2 = new Address("akka", "remotesys", "127.0.0.1", 2552);

    Address[] addresses = new Address[] { addr1, addr2 };

    ActorRef randomRouter = _system.actorOf(new Props(MsgEchoActor.class)
            .withRouter(new RemoteRouterConfig(new RandomRouter(5),addresses)));

    for (int i = 1; i <= 10; i++) {
        // sends randomly to actors
        randomRouter.tell(i);
    }
    _system.shutdown();
}
项目:trial    文件:ClientActorSystem.java   
@SuppressWarnings("serial")
public void remoteActorCreationDemo1() {
    log.info("Creating a actor using remote deployment mechanism");

    // create the address object that points to the remote server
    Address addr = new Address("akka", "ServerSys", "127.0.0.1", 2552);

    // creating the ServerActor on the specified remote server
    final ActorRef serverActor = system.actorOf(new Props(ServerActor.class)
            .withDeploy(new Deploy(new RemoteScope(addr))));

    // create a local actor and pass the reference of the remote actor
    actor = system.actorOf(new Props(new UntypedActorFactory() {
        public UntypedActor create() {
            return new ClientActor(serverActor);
        }
    }));
    // send a message to the local client actor
    actor.tell("Start-RemoteActorCreationDemo1");
}
项目:trial    文件:JobControllerActor.java   
/**
 * Add the new worker to the router mechanism
 * 
 * @param address
 */
private void addWorkerRoute(String address) {

    // send the stop message to all the worker actors
    if (workerRouterActor != null) {
        for (int i = 0; i < workerAddressMap.size(); i++)
            workerRouterActor.tell("STOP");
    }

    // add the address to the Map
    workerAddressMap.put(address, AddressFromURIString.parse(address));

    Address[] addressNodes = new Address[workerAddressMap.size()];

    Address[] workerAddress = workerAddressMap.values().toArray(
            addressNodes);

    // update the workerRouter actor with the information on all workers
    workerRouterActor = getContext().system().actorOf(
            new Props(WorkerActor.class).withRouter(new RemoteRouterConfig(
                    new RoundRobinRouter(workerAddress.length),
                    workerAddress)));

}
项目:trial    文件:WorkServerActorSystem.java   
public MyUnboundedPriorityMailbox(ActorSystem.Settings settings,
        Config config) {

    // Creating a new PriorityGenerator,
    super(new PriorityGenerator() {
        @Override
        public int gen(Object message) {
            if (message instanceof Address)
                return 0; // Worker Registration messages should be
                            // treated
                            // with highest priority
            else if (message.equals(PoisonPill.getInstance()))
                return 3; // PoisonPill when no other left
            else
                return 1; // By default they go with medium priority
        }
    });
}
项目:Concierge    文件:ConciergeConfig.java   
private ActorPath path(String host) {
  final String[] split = host.split(":");
  final String hostname = split[0];
  final int port = Integer.parseInt(split[1]);

  final Address actorSystemAddress = new Address("akka.tcp", "concierge", hostname, port);
  return RootActorPath.apply(actorSystemAddress, "/").child("user");
}
项目:akka-kubernetes-example    文件:SimpleClusterMain.java   
public static void main(String[] args) throws IOException {
  ActorSystem actorSystem = ActorSystem.create(CLUSTER_NAME);
  actorSystem.actorOf(SimpleClusterListener.props());
  final ActorMaterializer materializer = ActorMaterializer.create(actorSystem);

  Cluster cluster = Cluster.get(actorSystem);
  List<Address> addresses = Arrays.asList(System.getenv().get("SEED_NODES").split(","))
      .stream()
      .map(ip -> new Address("akka.tcp", CLUSTER_NAME, ip, 2551))
      .collect(Collectors.toList());
  cluster.joinSeedNodes(addresses);
}
项目:hashsdn-controller    文件:ShardPeerAddressResolver.java   
Collection<String> getShardManagerPeerActorAddresses() {
    Collection<String> peerAddresses = new ArrayList<>();
    for (Map.Entry<MemberName, Address> entry: memberNameToAddress.entrySet()) {
        if (!localMemberName.equals(entry.getKey())) {
            peerAddresses.add(getShardManagerActorPathBuilder(entry.getValue()).toString());
        }
    }

    return peerAddresses;
}
项目:hashsdn-controller    文件:ShardPeerAddressResolver.java   
String getShardActorAddress(String shardName, MemberName memberName) {
    Address memberAddress = memberNameToAddress.get(memberName);
    if (memberAddress != null) {
        return getShardManagerActorPathBuilder(memberAddress).append("/").append(
                getShardIdentifier(memberName, shardName)).toString();
    }

    return null;
}
项目:hashsdn-controller    文件:ShardManager.java   
private void addPeerAddress(final MemberName memberName, final Address address) {
    peerAddressResolver.addPeerAddress(memberName, address);

    for (ShardInformation info : localShards.values()) {
        String shardName = info.getShardName();
        String peerId = getShardIdentifier(memberName, shardName).toString();
        info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());

        info.peerUp(memberName, peerId, getSelf());
    }
}
项目:hashsdn-controller    文件:ShardingServiceAddressResolver.java   
public String resolve(final MemberName memberName) {
    Preconditions.checkNotNull(memberName);
    final Address address = memberNameToAddress.get(memberName);
    Preconditions.checkNotNull(address, "Requested member[%s] is not present in the resolver ",
            memberName.toString());

    return getActorPathBuilder(address).toString();
}
项目:hashsdn-controller    文件:ShardPeerAddressResolverTest.java   
@Test
public void testGetShardActorAddress() {
    ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", MEMBER_1);

    assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", MEMBER_2));

    Address address2 = new Address("tcp", "system2");
    resolver.addPeerAddress(MEMBER_2, address2);
    assertEquals("getPeerAddress", address2, resolver.getPeerAddress(MEMBER_2));

    Address address3 = new Address("tcp", "system3");
    resolver.addPeerAddress(MEMBER_3, address3);
    assertEquals("getPeerAddress", address3, resolver.getPeerAddress(MEMBER_3));

    assertEquals("getShardActorAddress",
            address2.toString() + "/user/shardmanager-config/member-2-shard-default-config",
            resolver.getShardActorAddress("default", MEMBER_2));

    assertEquals("getShardActorAddress",
            address3.toString() + "/user/shardmanager-config/member-3-shard-default-config",
            resolver.getShardActorAddress("default", MEMBER_3));

    assertEquals("getShardActorAddress",
            address2.toString() + "/user/shardmanager-config/member-2-shard-topology-config",
            resolver.getShardActorAddress("topology", MEMBER_2));

    resolver.removePeerAddress(MEMBER_2);
    assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("default", MEMBER_2));
    assertEquals("getShardActorAddress", null, resolver.getShardActorAddress("topology", MEMBER_2));
    assertEquals("getShardActorAddress",
            address3.toString() + "/user/shardmanager-config/member-3-shard-default-config",
            resolver.getShardActorAddress("default", MEMBER_3));
}
项目:hashsdn-controller    文件:QuarantinedMonitorActorTest.java   
@Test
public void testOnReceiveAnother() throws Exception {
    final Address local = Address.apply("http", "local");
    final Address remote = Address.apply("http", "remote");
    final Throwable t = new RuntimeException("Another exception");
    final InvalidAssociation cause = InvalidAssociation.apply(local, remote, t, Option.apply(null));
    final AssociationErrorEvent event = new AssociationErrorEvent(cause, local, remote, true, Logging.ErrorLevel());
    actor.tell(event, ActorRef.noSender());
    verify(callback, never()).apply();
}
项目:hashsdn-controller    文件:RpcRegistrar.java   
private void updateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
    /*
     * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close
     * the old registration. This minimizes churn observed by listeners, as they will not observe RPC
     * unavailability which would occur if we were to do it the other way around.
     *
     * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap,
     * hence we register all new implementations before closing all registrations.
     */
    final Collection<DOMRpcImplementationRegistration<?>> prevRegs = new ArrayList<>(endpoints.size());

    for (Entry<Address, Optional<RemoteRpcEndpoint>> e : endpoints.entrySet()) {
        LOG.debug("Updating RPC registrations for {}", e.getKey());

        final DOMRpcImplementationRegistration<?> prevReg;
        final Optional<RemoteRpcEndpoint> maybeEndpoint = e.getValue();
        if (maybeEndpoint.isPresent()) {
            final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
            final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config);
            prevReg = regs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl,
                endpoint.getRpcs()));
        } else {
            prevReg = regs.remove(e.getKey());
        }

        if (prevReg != null) {
            prevRegs.add(prevReg);
        }
    }

    for (DOMRpcImplementationRegistration<?> r : prevRegs) {
        r.close();
    }
}
项目:hashsdn-controller    文件:RemoteRpcRegistryMXBeanImpl.java   
@Override
public Map<String, String> findRpcByRoute(final String routeId) {
    RoutingTable localTable = rpcRegistry.getLocalData();
    Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));

    Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
    for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
        RoutingTable table = entry.getValue().getData();
        rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, entry.getKey().toString()));
    }

    log.debug("list of RPCs {} searched by route {}", rpcMap, routeId);
    return rpcMap;
}
项目:hashsdn-controller    文件:BucketStoreAccess.java   
<T extends BucketData<T>> void getBucketsByMembers(final Collection<Address> members,
        final Consumer<Map<Address, Bucket<T>>> callback) {
    Patterns.ask(context.parent(), getBucketsByMembersMessage(members), timeout)
        .onComplete(new OnComplete<Object>() {
            @SuppressWarnings("unchecked")
            @Override
            public void onComplete(final Throwable failure, final Object success) {
                if (failure == null) {
                    callback.accept((Map<Address, Bucket<T>>) success);
                }
            }
        }, context.dispatcher());
}
项目:hashsdn-controller    文件:BucketStoreAccess.java   
void getBucketVersions(final Consumer<Map<Address, Long>> callback) {
    Patterns.ask(context.parent(), Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete<Object>() {
        @SuppressWarnings("unchecked")
        @Override
        public void onComplete(final Throwable failure, final Object success) {
            if (failure == null) {
                callback.accept((Map<Address, Long>) success);
            }
        }
    }, context.dispatcher());
}
项目:hashsdn-controller    文件:Gossiper.java   
private void addPeer(final Address address) {
    if (!clusterMembers.contains(address)) {
        clusterMembers.add(address);
    }
    peers.computeIfAbsent(address, input -> getContext().system()
        .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
}
项目:hashsdn-controller    文件:Gossiper.java   
@VisibleForTesting
void setClusterMembers(final Address... members) {
    clusterMembers.clear();
    peers.clear();

    for (Address addr : members) {
        addPeer(addr);
    }
}
项目:hashsdn-controller    文件:BucketStoreActor.java   
/**
 * Helper to collect all known buckets.
 *
 * @return self owned + remote buckets
 */
private Map<Address, Bucket<T>> getAllBuckets() {
    Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);

    //first add the local bucket
    all.put(selfAddress, getLocalBucket().snapshot());

    //then get all remote buckets
    all.putAll(remoteBuckets);

    return all;
}
项目:hashsdn-controller    文件:BucketStoreActor.java   
private void removeBucket(final Address addr) {
    final Bucket<T> bucket = remoteBuckets.remove(addr);
    if (bucket != null) {
        bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
        onBucketRemoved(addr, bucket);
    }
    versions.remove(addr);
}
项目:hashsdn-controller    文件:BucketStoreActor.java   
private void addWatch(final Address addr, final ActorRef ref) {
    if (!watchedActors.containsKey(ref)) {
        getContext().watch(ref);
        LOG.debug("Watching {}", ref);
    }
    watchedActors.put(ref, addr);
}
项目:hashsdn-controller    文件:BucketStoreActor.java   
private void removeWatch(final Address addr, final ActorRef ref) {
    watchedActors.remove(ref, addr);
    if (!watchedActors.containsKey(ref)) {
        getContext().unwatch(ref);
        LOG.debug("No longer watching {}", ref);
    }
}
项目:hashsdn-controller    文件:BucketStoreActor.java   
private void actorTerminated(final Terminated message) {
    LOG.info("Actor termination {} received", message);

    for (Address addr : watchedActors.removeAll(message.getActor())) {
        versions.remove(addr);
        final Bucket<T> bucket = remoteBuckets.remove(addr);
        if (bucket != null) {
            LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
            onBucketRemoved(addr, bucket);
        }
    }
}
项目:hashsdn-controller    文件:RpcRegistrarTest.java   
@Before
public void setUp() throws Exception {
    MockitoAnnotations.initMocks(this);
    system = ActorSystem.create("test");

    final JavaTestKit testKit = new JavaTestKit(system);
    final RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("system").build();
    final Props props = RpcRegistrar.props(config, service);
    testActorRef = new TestActorRef<>(system, props, testKit.getRef(), "actorRef");
    endpointAddress = new Address("http", "local");

    final DOMRpcIdentifier firstEndpointId = DOMRpcIdentifier.create(
            SchemaPath.create(true, QName.create("first:identifier", "foo")));
    final DOMRpcIdentifier secondEndpointId = DOMRpcIdentifier.create(
            SchemaPath.create(true, QName.create("second:identifier", "bar")));

    final JavaTestKit senderKit = new JavaTestKit(system);
    firstEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(firstEndpointId));
    secondEndpoint = new RemoteRpcEndpoint(senderKit.getRef(), Collections.singletonList(secondEndpointId));

    Mockito.doReturn(oldReg).when(service).registerRpcImplementation(
            Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs()));

    Mockito.doReturn(newReg).when(service).registerRpcImplementation(
            Mockito.any(RemoteRpcImplementation.class), Mockito.eq(secondEndpoint.getRpcs()));

    rpcRegistrar = testActorRef.underlyingActor();
}
项目:hashsdn-controller    文件:RpcRegistrarTest.java   
@Test
public void testHandleReceiveAddEndpoint() throws Exception {
    final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of(
            endpointAddress, Optional.of(firstEndpoint));
    testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());

    Mockito.verify(service).registerRpcImplementation(
            Mockito.any(RemoteRpcImplementation.class), Mockito.eq(firstEndpoint.getRpcs()));
    Mockito.verifyNoMoreInteractions(service, oldReg, newReg);
}
项目:hashsdn-controller    文件:RpcRegistrarTest.java   
@Test
public void testHandleReceiveRemoveEndpoint() throws Exception {
    final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = ImmutableMap.of(
            endpointAddress, Optional.empty());
    testActorRef.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
    Mockito.verifyNoMoreInteractions(service, oldReg, newReg);
}
项目:hashsdn-controller    文件:RpcRegistryTest.java   
/**
 * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
 */
@Test
public void testRpcAddedOnMultiNodes() throws Exception {
    final JavaTestKit testKit = new JavaTestKit(node3);

    // Add rpc on node 1
    List<DOMRpcIdentifier> addedRouteIds1 = createRouteIds();
    registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender());

    final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
        UpdateRemoteEndpoints.class);

    // Add rpc on node 2
    List<DOMRpcIdentifier> addedRouteIds2 = createRouteIds();
    registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender());

    final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
        UpdateRemoteEndpoints.class);
    Address node2Address = node2.provider().getDefaultAddress();
    Address node1Address = node1.provider().getDefaultAddress();

    Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry3, testKit, node1Address,
            node2Address);

    verifyBucket(buckets.get(node1Address), addedRouteIds1);
    verifyBucket(buckets.get(node2Address), addedRouteIds2);

    Map<Address, Long> versions = retrieveVersions(registry3, testKit);
    Assert.assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(),
            versions.get(node1Address));
    Assert.assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(),
            versions.get(node2Address));

    assertEndpoints(req1, node1Address, invoker1);
    assertEndpoints(req2, node2Address, invoker2);

}
项目:hashsdn-controller    文件:RpcRegistryTest.java   
private static Map<Address, Bucket<RoutingTable>> retrieveBuckets(final ActorRef bucketStore,
        final JavaTestKit testKit, final Address... addresses) {
    int numTries = 0;
    while (true) {
        bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef());
        @SuppressWarnings("unchecked")
        Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
                Map.class);

        boolean foundAll = true;
        for (Address addr : addresses) {
            Bucket<RoutingTable> bucket = buckets.get(addr);
            if (bucket == null) {
                foundAll = false;
                break;
            }
        }

        if (foundAll) {
            return buckets;
        }

        if (++numTries >= 50) {
            Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses)
                    + ", Actual: " + buckets);
        }

        Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
    }
}
项目:hashsdn-controller    文件:RpcRegistryTest.java   
@Test
public void testAddRoutesConcurrency() throws Exception {
    final JavaTestKit testKit = new JavaTestKit(node1);

    final int nRoutes = 500;
    final Collection<DOMRpcIdentifier> added = new ArrayList<>(nRoutes);
    for (int i = 0; i < nRoutes; i++) {
        final DOMRpcIdentifier routeId = DOMRpcIdentifier.create(SchemaPath.create(true,
                new QName(new URI("/mockrpc"), "type" + i)));
        added.add(routeId);

        //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
        registry1.tell(new AddOrUpdateRoutes(Arrays.asList(routeId)),
                ActorRef.noSender());
    }

    FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
    int numTries = 0;
    while (true) {
        registry1.tell(GET_ALL_BUCKETS, testKit.getRef());
        @SuppressWarnings("unchecked")
        Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(duration, Map.class);

        Bucket<RoutingTable> localBucket = buckets.values().iterator().next();
        RoutingTable table = localBucket.getData();
        if (table != null && table.size() == nRoutes) {
            for (DOMRpcIdentifier r : added) {
                Assert.assertTrue("RoutingTable contains " + r, table.contains(r));
            }

            break;
        }

        if (++numTries >= 50) {
            Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
        }

        Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
    }
}