private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { getSender().tell(new Status.Success(null), getSelf()); return; } final ActorSelection leader = getLeader(); if (leader == null) { // Leader is not present. The cluster is most likely trying to // elect a leader and we should let that run its normal course // TODO we can wait for the election to complete and retry the // request. We can also let the caller retry by sending a flag // in the response indicating the request is "reTryable". getSender().tell(new Failure( new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. " + "Currently there is no leader for " + persistenceId())), getSelf()); return; } leader.tell(new RequestLeadership(getId(), getSender()), getSelf()); }
@SuppressWarnings("checkstyle:IllegalCatch") private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionId()); boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { try { commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); } catch (Exception e) { LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), message.getTransactionId(), e); getSender().tell(new Failure(e), getSelf()); } } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) { messageRetrySupport.addMessageToRetry(message, getSender(), "Could not process ready local transaction " + message.getTransactionId()); } else { LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader); message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(message, getContext()); } } }
private void handleCanCommit(final CohortEntry cohortEntry) { cohortEntry.canCommit(new FutureCallback<Void>() { @Override public void onSuccess(final Void result) { log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId()); if (cohortEntry.isDoImmediateCommit()) { doCommit(cohortEntry); } else { cohortEntry.getReplySender().tell( CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable(), cohortEntry.getShard().self()); } } @Override public void onFailure(final Throwable failure) { log.debug("{}: An exception occurred during canCommit for {}: {}", name, cohortEntry.getTransactionId(), failure); cohortCache.remove(cohortEntry.getTransactionId()); cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); } }); }
/** * This method handles the canCommit phase for a transaction. * * @param transactionID the ID of the transaction to canCommit * @param sender the actor to which to send the response * @param shard the transaction's shard actor */ void handleCanCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) { // Lookup the cohort entry that was cached previously (or should have been) by // transactionReady (via the ForwardedReadyTransaction message). final CohortEntry cohortEntry = cohortCache.get(transactionID); if (cohortEntry == null) { // Either canCommit was invoked before ready (shouldn't happen) or a long time passed // between canCommit and ready and the entry was expired from the cache or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID)); log.error(ex.getMessage()); sender.tell(new Failure(ex), shard.self()); return; } cohortEntry.setReplySender(sender); cohortEntry.setShard(shard); handleCanCommit(cohortEntry); }
void doCommit(final CohortEntry cohortEntry) { log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId()); // We perform the preCommit phase here atomically with the commit phase. This is an // optimization to eliminate the overhead of an extra preCommit message. We lose front-end // coordination of preCommit across shards in case of failure but preCommit should not // normally fail since we ensure only one concurrent 3-phase commit. cohortEntry.preCommit(new FutureCallback<DataTreeCandidate>() { @Override public void onSuccess(final DataTreeCandidate candidate) { finishCommit(cohortEntry.getReplySender(), cohortEntry); } @Override public void onFailure(final Throwable failure) { log.error("{} An exception occurred while preCommitting transaction {}", name, cohortEntry.getTransactionId(), failure); cohortCache.remove(cohortEntry.getTransactionId()); cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); } }); }
/** * This method handles the preCommit and commit phases for a transaction. * * @param transactionID the ID of the transaction to commit * @param sender the actor to which to send the response * @param shard the transaction's shard actor */ void handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) { final CohortEntry cohortEntry = cohortCache.get(transactionID); if (cohortEntry == null) { // Either a long time passed between canCommit and commit and the entry was expired from the cache // or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID)); log.error(ex.getMessage()); sender.tell(new Failure(ex), shard.self()); return; } cohortEntry.setReplySender(sender); doCommit(cohortEntry); }
void abortPendingTransactions(final String reason, final Shard shard) { final Failure failure = new Failure(new RuntimeException(reason)); Collection<ShardDataTreeCohort> pending = dataTree.getAndClearPendingTransactions(); log.debug("{}: Aborting {} pending queued transactions", name, pending.size()); for (ShardDataTreeCohort cohort : pending) { CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier()); if (cohortEntry == null) { continue; } if (cohortEntry.getReplySender() != null) { cohortEntry.getReplySender().tell(failure, shard.self()); } } cohortCache.clear(); }
boolean handleMessage(Object message, EntityOwnershipShard shard) { boolean handled = true; if (CommitTransactionReply.isSerializedType(message)) { // Successful reply from a local commit. inflightCommitSucceeded(shard); } else if (message instanceof akka.actor.Status.Failure) { // Failure reply from a local commit. inflightCommitFailure(((Failure) message).cause(), shard); } else if (COMMIT_RETRY_MESSAGE.equals(message)) { retryInflightCommit(shard); } else { handled = false; } return handled; }
@Override public void onReceive(Object message) { if (message instanceof GetSnapshotReply) { onGetSnapshotReply((GetSnapshotReply)message); } else if (message instanceof Failure) { LOG.debug("{}: Received {}", params.id, message); params.replyToActor.tell(message, getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } else if (message instanceof ReceiveTimeout) { String msg = String.format( "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s " + "did not respond.", params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(), remainingShardNames); LOG.warn("{}: {}", params.id, msg); params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } }
@Test public void testGetSnapshotFailureReply() { JavaTestKit kit = new JavaTestKit(getSystem()); ActorRef replyActor = getSystem().actorOf(ShardManagerGetSnapshotReplyActor.props( Arrays.asList("shard1", "shard2"), "config", null, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)), "testGetSnapshotFailureReply"); kit.watch(replyActor); replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard1", MEMBER_1, "config").toString(), Snapshot.create(ByteState.of(new byte[]{1,2,3}), Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1", null)), ActorRef.noSender()); replyActor.tell(new Failure(new RuntimeException()), ActorRef.noSender()); kit.expectMsgClass(Failure.class); kit.expectTerminated(replyActor); }
@Test public void testExecuteRpcFailureWithException() { new JavaTestKit(node1) { { when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), Mockito.<NormalizedNode<?, ?>>any())) .thenReturn(Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture( new DOMRpcImplementationNotAvailableException("NOT FOUND"))); final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null); rpcInvoker1.tell(executeMsg, getRef()); final Failure rpcResponse = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); Assert.assertTrue(rpcResponse.cause() instanceof DOMRpcException); } }; }
/** * The journal doesn't have any events yet for this persistenceId, which means the actor could either become a slave or a master, * depending on the first command. */ protected Receive justCreated() { Receive receive = createReceive(); return ReceiveBuilder.create() .match(commandType, c -> isReadOnly(c), c -> { log.debug("not accepting {}", c); sender().tell(new Failure(new UnknownActorException("Actor " + persistenceId() + " does not know yet whether it's slave or not. Try again later. Was handling:" + c)), self()); }) .match(commandType, c -> { log.debug("Received write command as first, becoming master: {}", c); getContext().become(master()); if (receive.onMessage().isDefinedAt(c)) { receive.onMessage().apply(c); } }) .match(Query.EventEnvelope.class, e -> { log.debug("Received event envelope as first, becoming slave."); getContext().become(slave()); receiveEnvelope(e); }) .build() .orElse(receive); // allow any non-command custom messages to just pass through to the actual actor implementation. }
private Receive startBackup(long offset) { query .eventsByTag(tag, NoOffset.getInstance()) // create backups of max [N] elements, or at least every [T] on activity // FIXME write a stage that, instead of buffering each chunk into memory, creates sub-streams instead. .groupedWithin(eventChunkSize, eventChunkDuration) .filter(list -> list.size() > 0) .mapAsync(4, list -> s3.store(tag, Vector.ofAll(list)).thenApply(done -> list.get(list.size() - 1).offset())) .runWith(Sink.actorRefWithAck(self(), "init", "ack", "done", Failure::new), materializer); return ReceiveBuilder.create() .matchEquals("init", msg -> sender().tell("ack", self())) .match(Long.class, l -> pipe(s3.saveOffset(l).thenApply(done -> "ack"), context().dispatcher()).to(sender())) .match(Failure.class, msg -> { log.error("Stream failed, rethrowing", msg.cause()); throw new RuntimeException(msg.cause()); }) .matchEquals("done", msg -> { throw new IllegalStateException("eventsByTag completed, this should not happen. Killing actor, hoping for restart"); }) .build(); }
@Override public Receive createReceive() { return ReceiveBuilder.create() .matchEquals("init", msg -> sender().tell("ack", self())) .match(Long.class, (Long o) -> { log.debug("Persisting {}", o); persist(o, done -> { offset = o; if (lastSequenceNr() > 1) { deleteMessages(lastSequenceNr() - 1); } context().system().scheduler().scheduleOnce(updateAccuracy, sender(), "ack", context().dispatcher(), self()); }); }) .match(Failure.class, msg -> { log.error("Stream failed, rethrowing", msg.cause()); throw new RuntimeException(msg.cause()); }) .matchEquals("done", msg -> { log.debug("Completed, with offset now {}", offset); context().stop(self()); }) .build(); }
private void startRestore() { s3 .list(tag) // skip over entries until the one BEFORE entry where startTime >= offset (since the one before may have been only partially restored) .via(dropUntilNext(l -> S3.getStartInstant(l).toEpochMilli() >= offset, true)) .flatMapConcat(entry -> s3.loadEvents(entry.key().substring(entry.key().lastIndexOf("/") + 1))) .mapAsync(maxInFlight, e -> { log.debug("Replaying {}:{}", e.getPersistenceId(), e.getSequenceNr()); return ask(shardRegion, e, timeout); }) .map(resp -> { log.debug("Responded {}", resp); return (Long) resp; }) // only save one lastOffset update per minute, and only the lowest one .conflate((Long l1, Long l2) -> l1 < l2 ? l1 : l2) .runWith(Sink.actorRefWithAck(self(), "init", "ack", "done", Failure::new), materializer); }
@SuppressWarnings("checkstyle:IllegalCatch") private void handleConnectClient(final ConnectClientRequest message) { try { final ClientIdentifier clientId = message.getTarget(); final LeaderFrontendState existing = findFrontend(clientId); if (existing != null) { existing.touch(); } if (!isLeader() || !isLeaderActive()) { LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + "isLeadershipTransferInProgress: {}.", persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress()); throw new NotLeaderException(getSelf()); } final ABIVersion selectedVersion = selectVersion(message); final LeaderFrontendState frontend; if (existing == null) { frontend = new LeaderFrontendState(persistenceId(), clientId, store); knownFrontends.put(clientId.getFrontendId(), frontend); LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId); } else { frontend = existing; } frontend.reconnect(); message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(), ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion), ActorRef.noSender()); } catch (RequestException | RuntimeException e) { message.getReplyTo().tell(new Failure(e), ActorRef.noSender()); } }
@SuppressWarnings("checkstyle:IllegalCatch") protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) { try { commitCoordinator.handleBatchedModifications(batched, sender, this); } catch (Exception e) { LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), batched.getTransactionId(), e); sender.tell(new Failure(e), getSelf()); } }
private boolean failIfIsolatedLeader(final ActorRef sender) { if (isIsolatedLeader()) { sender.tell(new Failure(new NoShardLeaderException(String.format( "Shard %s was the leader but has lost contact with all of its followers. Either all" + " other follower nodes are down or this node is isolated by a network partition.", persistenceId()))), getSelf()); return true; } return false; }
private void handleCreateTransaction(final Object message) { if (isLeader()) { createTransaction(CreateTransaction.fromSerializable(message)); } else if (getLeader() != null) { getLeader().forward(message, getContext()); } else { getSender().tell(new Failure(new NoShardLeaderException( "Could not create a shard transaction", persistenceId())), getSelf()); } }
@SuppressWarnings("checkstyle:IllegalCatch") void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = cohortCache.remove(transactionID); if (cohortEntry == null) { return; } log.debug("{}: Aborting transaction {}", name, transactionID); final ActorRef self = shard.getSelf(); cohortEntry.abort(new FutureCallback<Void>() { @Override public void onSuccess(final Void result) { shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null); if (sender != null) { sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } } @Override public void onFailure(final Throwable failure) { log.error("{}: An exception happened during abort", name, failure); shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null); if (sender != null) { sender.tell(new Failure(failure), self); } } }); shard.getShardMBean().incrementAbortTransactionsCount(); }
@Test(expected = IllegalStateException.class) public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception { new JavaTestKit(getSystem()) { { final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(), "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount"); JavaTestKit watcher = new JavaTestKit(getSystem()); watcher.watch(transaction); BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION); batched.setReady(true); batched.setTotalMessagesSent(2); transaction.tell(batched, getRef()); Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); watcher.expectMsgClass(duration("5 seconds"), Terminated.class); if (failure != null) { Throwables.throwIfInstanceOf(failure.cause(), Exception.class); Throwables.throwIfUnchecked(failure.cause()); throw new RuntimeException(failure.cause()); } } }; }
@Test public void testGetSnapshotTimeout() { JavaTestKit kit = new JavaTestKit(getSystem()); ActorRef replyActor = getSystem().actorOf(ShardManagerGetSnapshotReplyActor.props( Arrays.asList("shard1"), "config", null, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.MILLISECONDS)), "testGetSnapshotTimeout"); kit.watch(replyActor); Failure failure = kit.expectMsgClass(Failure.class); assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass()); kit.expectTerminated(replyActor); }
@SuppressWarnings("unchecked") private static <T> T expectMsgClassOrFailure(final Class<T> msgClass, final JavaTestKit kit, final String msg) { Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class); if (reply instanceof Failure) { throw new AssertionError(msg + " failed", ((Failure)reply).cause()); } return (T)reply; }
@Test public void testAddShardReplicaForNonExistentShardConfig() throws Exception { new JavaTestKit(getSystem()) { { ActorRef shardManager = actorFactory .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) .withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new AddShardReplica("model-inventory"), getRef()); Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); assertEquals("Failure obtained", true, resp.cause() instanceof IllegalArgumentException); } }; }
@Test public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception { LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting"); new JavaTestKit(getSystem()) { { String memberId = "member-1-shard-default-" + shardMrgIDSuffix; ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), DataStoreVersions.CURRENT_VERSION), getRef()); shardManager.tell( new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor); shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); Failure resp = expectMsgClass(duration("5 seconds"), Failure.class); assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), LocalShardFound.class); } }; LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending"); }
@Test public void testRemoveShardReplicaForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) { { ActorRef shardManager = actorFactory .createActor(newShardMgrProps(new ConfigurationImpl(new EmptyModuleShardConfigProvider())) .withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new RemoveShardReplica("model-inventory", MEMBER_1), getRef()); Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class); assertEquals("Failure obtained", true, resp.cause() instanceof PrimaryNotFoundException); } }; }
public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange, final Class<?> firstForwardedServerChangeClass, final Object secondServerChange) throws Exception { new JavaTestKit(getSystem()) { { JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); final JavaTestKit secondRequestKit = new JavaTestKit(getSystem()); MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder() .put(shardName, Arrays.asList("member-2")).build()); final TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(), newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor) .cluster(new MockClusterWrapper()).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), shardMgrID); shardManager.underlyingActor() .setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(firstServerChange, getRef()); mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass); shardManager.tell(secondServerChange, secondRequestKit.getRef()); secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); } }; }
@Test public void testChangeServersVotingStatusWithNoLeader() throws Exception { new JavaTestKit(getSystem()) { { String memberId = "member-1-shard-default-" + shardMrgIDSuffix; ActorRef respondActor = actorFactory .createActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId); ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), respondActor); shardManager.tell(new RoleChangeNotification(memberId, null, RaftState.Follower.name()), respondActor); shardManager.tell( new ChangeShardMembersVotingStatus("default", ImmutableMap.of("member-2", Boolean.TRUE)), getRef()); MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class); Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); assertEquals("Failure resposnse", true, resp.cause() instanceof NoShardLeaderException); } }; }
@Test(expected = IllegalStateException.class) public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception { new ShardTestKit(getSystem()) { { final TestActorRef<Shard> shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testBatchedModificationsReadyWithIncorrectTotalMessageCount"); waitUntilLeader(shard); final TransactionIdentifier transactionID = nextTransactionId(); final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION); batched.setReady(true); batched.setTotalMessagesSent(2); shard.tell(batched, getRef()); final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); if (failure != null) { Throwables.propagateIfPossible(failure.cause(), Exception.class); throw new RuntimeException(failure.cause()); } } }; }
@Test public void testBatchedModificationsWithOperationFailure() throws Exception { new ShardTestKit(getSystem()) { { final TestActorRef<Shard> shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testBatchedModificationsWithOperationFailure"); waitUntilLeader(shard); // Test merge with invalid data. An exception should occur when // the merge is applied. Note that // write will not validate the children for performance reasons. final TransactionIdentifier transactionID = nextTransactionId(); final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)) .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION); batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData)); shard.tell(batched, getRef()); Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); final Throwable cause = failure.cause(); batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION); batched.setReady(true); batched.setTotalMessagesSent(2); shard.tell(batched, getRef()); failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); assertEquals("Failure cause", cause, failure.cause()); } }; }
@Test public void testTransactionMessagesWithNoLeader() { new ShardTestKit(getSystem()) { { dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1); final TestActorRef<Shard> shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testTransactionMessagesWithNoLeader"); waitUntilNoLeader(shard); final TransactionIdentifier txId = nextTransactionId(); shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), getRef()); Failure failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef()); failure = expectMsgClass(Failure.class); assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass()); } }; }
@Test public void testCanCommitBeforeReadyFailure() throws Exception { new ShardTestKit(getSystem()) { { final TestActorRef<Shard> shard = actorFactory.createTestActor( newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCanCommitBeforeReadyFailure"); shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); } }; }
protected Receive master() { return ReceiveBuilder.create() .match(Query.EventEnvelope.class, e -> { log.error("Actor is not in slave mode, but was sent an EventEnvelope: {} from {} \n" + "Possibly the same persistenceId was created on several datacenters independently. That will not end well.\n" + "The incoming event has been ignored. The proper cause of action is to delete either this or the other aggregate.", e, sender()); sender().tell(new Failure(new IllegalStateException("Actor is not in slave mode, but was sent an EventEnvelope. " + "Possibly the same persistenceId was created on several datacenters independently. That will not end well.")), self()); }) .build() .orElse(createReceive()); }
protected Receive slave() { return ReceiveBuilder.create() .match(Query.EventEnvelope.class, e -> { receiveEnvelope(e); }) .match(commandType, c -> !isReadOnly(c), c -> sender().tell(new Failure(new IllegalStateException("Actor is in slave mode and does not accept non-readOnly command " + c)), self()) ) .build() .orElse(createReceive()); }
private Sink<EventEnvelope,NotUsed> filteredDataCenterSink() { log.debug("filteredDataCenterSink()"); return Flow.<EventEnvelope>create() .mapAsync(parallelism, e -> { return visibilityRepo.isVisibleTo(dataCenter, e.persistenceId()).thenApply(v -> { log.debug("Visibility of {}: {}", e, v); return Tuple.of(e,v);}); }) .filter(t -> t._2) .map(t -> t._1) .via(dataCenter.uploadFlow()) .map(EventDelivered::new) .to(Sink.actorRef(self(), new Failure(new IllegalStateException("Remote datacenter closed connection")))); }
void timedOut(Shard shard) { replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())), shard.getSelf()); }
/** * Tests bootstrapping the entity-ownership shard when there's no shards initially configured for local * member. The entity-ownership shard is initially created as inactive (ie remains a follower), requiring * an AddShardReplica request to join it to an existing leader. */ @Test public void testEntityOwnershipShardBootstrapping() throws Exception { String name = "testEntityOwnershipShardBootstrapping"; String moduleShardsConfig = MODULE_SHARDS_MEMBER_1_CONFIG; MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false) .datastoreContextBuilder(leaderDatastoreContextBuilder).build(); AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore(); final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore); leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME); MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false) .datastoreContextBuilder(followerDatastoreContextBuilder).build(); AbstractDataStore follower1DistributedDataStore = follower1Node.configDataStore(); follower1DistributedDataStore.waitTillReady(); leaderNode.waitForMembersUp("member-2"); follower1Node.waitForMembersUp("member-1"); DOMEntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1DistributedDataStore); leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener); // Register a candidate for follower1 - should get queued since follower1 has no leader final DOMEntityOwnershipCandidateRegistration candidateReg = follower1EntityOwnershipService.registerCandidate(ENTITY1); Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1)); // Add replica in follower1 AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME); follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica, follower1Node.kit().getRef()); Object reply = follower1Node.kit().expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), Success.class, Failure.class); if (reply instanceof Failure) { throw new AssertionError("AddShardReplica failed", ((Failure)reply).cause()); } // The queued candidate registration should proceed verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true)); reset(leaderMockListener); candidateReg.close(); verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, false)); reset(leaderMockListener); // Restart follower1 and verify the entity ownership shard is re-instated by registering. Cluster.get(leaderNode.kit().getSystem()).down(Cluster.get(follower1Node.kit().getSystem()).selfAddress()); follower1Node.cleanup(); follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false) .datastoreContextBuilder(followerDatastoreContextBuilder).build(); follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore()); follower1EntityOwnershipService.registerCandidate(ENTITY1); verify(leaderMockListener, timeout(20000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true)); verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, raftState -> { assertNull("Custom RaftPolicy class name", raftState.getCustomRaftPolicyClassName()); assertEquals("Peer count", 1, raftState.getPeerAddresses().keySet().size()); assertThat("Peer Id", Iterables.<String>getLast(raftState.getPeerAddresses().keySet()), org.hamcrest.CoreMatchers.containsString("member-1")); }); }
@Test(expected = TestException.class) public void testOnReceiveBatchedModificationsFailure() throws Exception { new JavaTestKit(getSystem()) { { ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class); DataTreeModification mockModification = Mockito.mock(DataTreeModification.class); ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, nextTransactionId(), mockModification); final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModificationsFailure"); JavaTestKit watcher = new JavaTestKit(getSystem()); watcher.watch(transaction); YangInstanceIdentifier path = TestModel.TEST_PATH; ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doThrow(new TestException()).when(mockModification).write(path, node); final TransactionIdentifier tx1 = nextTransactionId(); BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); batched.addModification(new WriteModification(path, node)); transaction.tell(batched, getRef()); expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION); batched.setReady(true); batched.setTotalMessagesSent(2); transaction.tell(batched, getRef()); Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); watcher.expectMsgClass(duration("5 seconds"), Terminated.class); if (failure != null) { Throwables.propagateIfPossible(failure.cause(), Exception.class); throw new RuntimeException(failure.cause()); } } }; }
@Test public void testGetSnapshot() throws Exception { LOG.info("testGetSnapshot starting"); JavaTestKit kit = new JavaTestKit(getSystem()); MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder() .put("shard1", Arrays.asList("member-1")).put("shard2", Arrays.asList("member-1")) .put("astronauts", Collections.<String>emptyList()).build()); TestActorRef<TestShardManager> shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig) .withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); Failure failure = kit.expectMsgClass(Failure.class); assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); waitForShardInitialized(shardManager, "shard1", kit); waitForShardInitialized(shardManager, "shard2", kit); shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot"); assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot()); Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName; assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet( Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer))); // Add a new replica JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); TestShardManager shardManagerInstance = shardManager.underlyingActor(); shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); shardManager.tell(new AddShardReplica("astronauts"), kit.getRef()); mockShardLeaderKit.expectMsgClass(AddServer.class); mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, "")); kit.expectMsgClass(Status.Success.class); waitForShardInitialized(shardManager, "astronauts", kit); // Send another GetSnapshot and verify shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot"); assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet( Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer))); ShardManagerSnapshot snapshot = datastoreSnapshot.getShardManagerSnapshot(); assertNotNull("Expected ShardManagerSnapshot", snapshot); assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet(snapshot.getShardList())); LOG.info("testGetSnapshot ending"); }