@Test public void testMajorityPropose() throws Exception { final List<TestPriest> majorityTestPriests = Stream.generate(this::testPriest) .limit(PRIESTS_COUNT - MINORITY) .collect(toList()); final List<TestPriest> minorityTestPriests = Stream.generate(this::testPriest) .limit(MINORITY) .collect(toList()); final Set<ActorPath> priestsPaths = Stream .concat(majorityTestPriests.stream(), minorityTestPriests.stream()) .map(p -> p.path) .collect(toSet()); final List<TestKit> majorityKits = majorityTestPriests.stream().map(p -> p.kit).collect(toList()); minorityTestPriests.forEach(p -> p.priest.tell(PoisonPill.getInstance(), ActorRef.noSender())); final ActorRef leader = system.actorOf(DecreePresident.props(new Cluster(priestsPaths), 1)); leader.tell(new PaxosAPI.Propose("VALUE", 1), ActorRef.noSender()); majorityKits.forEach(kit -> kit.expectMsg(new PaxosAPI.Decide("VALUE", 1))); }
@Test public void testMinorityPropose() throws Exception { final List<TestPriest> majorityTestPriests = Stream.generate(this::testPriest) .limit(PRIESTS_COUNT - MINORITY) .collect(toList()); final List<TestPriest> minorityTestPriests = Stream.generate(this::testPriest) .limit(MINORITY) .collect(toList()); final Set<ActorPath> priestsPaths = Stream .concat(majorityTestPriests.stream(), minorityTestPriests.stream()) .map(p -> p.path) .collect(toSet()); final List<TestKit> majorityKits = majorityTestPriests.stream().map(p -> p.kit).collect(toList()); majorityTestPriests.forEach(p -> p.priest.tell(PoisonPill.getInstance(), ActorRef.noSender())); final ActorRef leader = system.actorOf(DecreePresident.props(new Cluster(priestsPaths), 1)); leader.tell(new PaxosAPI.Propose("VALUE", 1), ActorRef.noSender()); majorityKits.forEach(kit -> kit.expectNoMsg(Duration.create(1, SECONDS))); }
@Test public void singleLeaderBroadcastTest() { final String prefix = "simpleLeader"; final Set<ActorPath> broadcastPaths = LongStream.range(0, PRIESTS_COUNT) .boxed().map(l -> system.child(prefix + l)) .collect(Collectors.toSet()); final List<TestBroadcast> testPriests = LongStream.range(0, PRIESTS_COUNT) .boxed() .map(l -> testBroadcast(prefix, l, new Cluster(broadcastPaths))) .collect(toList()); final List<String> decrees = Stream .generate(UUID::randomUUID) .map(UUID::toString) .limit(1000) .collect(toList()); for (String v : decrees) { testPriests.get(0).broadcast.tell(new AtomicBroadcastAPI.Broadcast(v), ActorRef.noSender()); testPriests.forEach(p -> p.kit.expectMsg(new AtomicBroadcastAPI.Deliver(v))); } }
public static ActorSystem startCommandClient(ClientConfig clientConfig) { Config conf = ConfigFactory.parseString("akka.cluster.roles=[" + clientConfig.getRole() + "]") .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + clientConfig.getPort())) .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + HostUtils.lookupIp())) .withFallback(ConfigFactory.load("application")); ActorSystem system = ActorSystem.create(Constants.PerformanceSystem, conf); Set<ActorPath> initialContacts = new HashSet<>(clientConfig.getContactPoint() .map(p->ActorPaths.fromString(p)) .collect(Collectors.toList())); ClusterClientSettings settings = ClusterClientSettings.create(system).withInitialContacts(initialContacts); final ActorRef clusterClient = system.actorOf(ClusterClient.props(settings), "clusterClient"); system.actorOf(CommandClientActor.props(clusterClient, clientConfig), clientConfig.getRole() ); return system; }
@Test public void testSimplePropose() throws Exception { final List<TestPriest> testPriests = Stream.generate(this::testPriest) .limit(PRIESTS_COUNT) .collect(toList()); final Set<ActorPath> priestsPaths = testPriests.stream().map(p -> p.path).collect(toSet()); final ActorRef leader = system.actorOf(DecreePresident.props(new Cluster(priestsPaths), 1)); leader.tell(new PaxosAPI.Propose("VALUE", 1), ActorRef.noSender()); final List<TestKit> kits = testPriests.stream().map(p -> p.kit).collect(toList()); kits.forEach(kit -> kit.expectMsg(new PaxosAPI.Decide("VALUE", 1))); }
private ActorPath path(String host) { final String[] split = host.split(":"); final String hostname = split[0]; final int port = Integer.parseInt(split[1]); final Address actorSystemAddress = new Address("akka.tcp", "concierge", hostname, port); return RootActorPath.apply(actorSystemAddress, "/").child("user"); }
@Test public void killOneByOneTest() { final String prefix = "killOneByOneTest"; final Set<ActorPath> broadcastPaths = LongStream.range(0, PRIESTS_COUNT) .boxed() .map(l -> system.child(prefix + l)) .collect(Collectors.toSet()); final List<TestBroadcast> testPriests = LongStream.range(0, PRIESTS_COUNT) .boxed() .map(l -> testBroadcast(prefix, l, new Cluster(broadcastPaths))) .collect(toList()); while (testPriests.size() > PRIESTS_COUNT / 2) { final TestBroadcast currentLeader = testPriests.get(0); final List<String> decrees = Stream .generate(UUID::randomUUID) .map(UUID::toString) .limit(1000) .collect(toList()); for (String v : decrees) { currentLeader.broadcast.tell(new AtomicBroadcastAPI.Broadcast(v), ActorRef.noSender()); testPriests.forEach(p -> p.kit.expectMsg(new AtomicBroadcastAPI.Deliver(v))); } currentLeader.broadcast.tell(PoisonPill.getInstance(), ActorRef.noSender()); testPriests.remove(currentLeader); } }
private ActorRef storage(String prefix) { final Set<ActorPath> storagePaths = LongStream.range(0, PRIEST_COUNT) .boxed() .map(l -> system.child(prefix + l)) .collect(toSet()); final List<ActorRef> testPriests = LongStream.range(0, PRIEST_COUNT) .boxed() .map(l -> system.actorOf(LinearizableStorage.props(new Cluster(storagePaths)), prefix + l)) .collect(toList()); return testPriests.get(0); }
protected Logger findLogger(Optional<ActorPath> origin) { ActorPath path = origin.orElse(getContext().parent().path()); return loggers.computeIfAbsent( path, key -> { return LoggerFactory.getLogger(key.toString()); } ); }
public TestPriest(ActorRef priest, ActorPath path, TestKit kit) { this.priest = priest; this.path = path; this.kit = kit; }
@Test public void integrityAndTotalOrderTest() { final String prefix = "integrityAndTotalOrderTest"; final Set<ActorPath> broadcastPaths = LongStream.range(0, PRIESTS_COUNT) .boxed() .map(l -> system.child(prefix + l)) .collect(Collectors.toSet()); final List<TestBroadcast> testPriests = LongStream.range(0, PRIESTS_COUNT) .boxed() .map(l -> testBroadcast(prefix, l, new Cluster(broadcastPaths))) .collect(toList()); final List<String> decrees = Stream .generate(UUID::randomUUID) .map(UUID::toString) .limit(1000) .collect(toList()); final Random rd = new Random(); for (String v : decrees) { testPriests.get(rd.nextInt(testPriests.size())).broadcast .tell(new AtomicBroadcastAPI.Broadcast(v), ActorRef.noSender()); } final Set<List<String>> resultSet = new HashSet<>(); testPriests.stream().map(t -> t.kit).forEach(k -> { final List<String> received = new ArrayList<>(); k.receiveWhile( Duration.create(1, MINUTES), Duration.create(10, SECONDS), 1000, o -> received.add((String) ((AtomicBroadcastAPI.Deliver) o).value()) ); resultSet.add(received); }); Assert.assertEquals(resultSet.size(), 1); final List<String> representative = resultSet.stream().findAny().orElseThrow(IllegalStateException::new); Assert.assertTrue(decrees.containsAll(representative)); Assert.assertTrue(representative.containsAll(decrees)); }
public Cluster(Set<ActorPath> paths, String suffix) { this.paths = paths.stream() .map(p -> p.child(suffix)) .collect(toSet()); }
public Cluster(Set<ActorPath> paths) { this.paths = new HashSet<>(paths); }
public Set<ActorPath> paths() { return unmodifiableSet(paths); }
protected final ActorSelection selectActor(ActorPath path) { return shard.getContext().system().actorSelection(path); }
public ActorSelection actorSelection(ActorPath actorPath) { return actorSystem.actorSelection(actorPath); }
@Override public ActorPath getListenerActorPath() { return dataChangeListenerActor.path(); }
@Override public ActorPath getListenerActorPath() { return dataTreeChangeListenerPath.path(); }
public ActorPath getListenerRegistrationPath() { return listenerRegistrationPath.path(); }
private static ActorRef actor(ActorSystem actorSystem, ActorPath path) { return await(actorSystem.actorSelection(path).resolveOne(ModuleInterpretation.DEFAULT_DURATION)); }
@Override @Nullable public ActorPath parent() { return null; }
@Override @Nullable public ActorPath $div(@Nullable final String child) { return null; }
@Override @Nullable public ActorPath $div(@Nullable final scala.collection.Iterable<String> child) { return null; }
@Override @Nullable public ActorPath descendant(@Nullable final Iterable<String> names) { return null; }
@Override @Nullable public ActorPath withUid(final int uid) { return null; }
@Override public int compareTo(@Nullable final ActorPath o) { return 0; }
public MockActorRef(@Nonnull final ActorPath actorPath) { super(); assert actorPath != null; this.actorPath = actorPath; }
@Override @Nonnull public ActorPath path() { return actorPath; }
public LogMessageBuilder withOrigin(ActorPath origin) { instance.origin = Optional.ofNullable(origin); return this; }
public Optional<ActorPath> getOrigin() { return origin; }
ActorPath getListenerActorPath();