private void onProducerRemoved(final ProducerRemoved message) { LOG.debug("Received ProducerRemoved: {}", message); final List<CompletableFuture<Object>> futures = new ArrayList<>(); for (final String address : resolver.getShardingServicePeerActorAddresses()) { final ActorSelection selection = actorSystem.actorSelection(address); futures.add(FutureConverters.toJava( actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees()))) .toCompletableFuture()); } final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()])); final ActorRef respondTo = getSender(); combinedFuture .thenRun(() -> respondTo.tell(new Status.Success(null), self())) .exceptionally(e -> { respondTo.tell(new Status.Failure(null), self()); return null; }); }
private void assignUsers(final StateObjectChangeMessage.Request request, final SubjectState subjectState) { try { if (request.getStateObjectChangeDTO().getUserAssignments() == null || request.getStateObjectChangeDTO().getUserAssignments().isEmpty()) { LOG.debug("All user assignements are done at the moment for P_ID [{}]", request.getPiId()); } else { PatternsCS .ask(getContext().parent(), new AssignUsersMessage.Request(request.getPiId(), request.getStateObjectChangeDTO().getUserAssignments()), Global.TIMEOUT) .toCompletableFuture().get(); } triggerSendInternal(subjectState, request); triggerSendExternal(subjectState, request); triggerSendProcess(subjectState, request); } catch (final Exception e) { LOG.error(e.getMessage()); sender.tell(new Status.Failure(new IllegalStateException("Error: " + e.getMessage())), getSelf()); } }
private void triggerSendExternal(final SubjectState subjectState, final StateObjectChangeMessage.Request request) { final List<MessageFlow> messageFlows = subjectState.getCurrentState().getMessageFlow().stream() .filter(mf -> SubjectModelType.EXTERNAL.equals(mf.getReceiver().getSubjectModelType())) .collect(Collectors.toList()); messageFlows.stream() .map(mf -> getExternalOutputMessage(request.getPiId(), mf, subjectState.getSubject())) .forEachOrdered(output -> { LOG.debug("Send message to external-communicator [{}]", output); externalCommunicatorClient.sendExternalOutputMessage(output); subjectState.setToNotifiedEC(); }); if (messageFlows.size() >= 1) { if (waitForECResponse(subjectState)) { changeToNextState(subjectState, request); } else { sender.tell(new Status.Failure(new IllegalStateException( "Could not send message to all external users in PI_ID [" + request.getPiId() + "]")), getSelf()); } } }
@Test public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() { testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting"); createRaftActors(); createRequestLeadershipResultCollectorActor(); sendPayloadWithFollower2Lagging(); sendFollower2RequestLeadershipTransferToLeader(); verifyRaftState(follower1Actor, RaftState.Follower); verifyRaftState(follower2Actor, RaftState.Follower); verifyRaftState(follower3Actor, RaftState.Follower); Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class); assertTrue(failure.cause() instanceof LeadershipTransferFailedException); testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending"); }
private void onMakeLeaderLocal() { LOG.debug("{}: onMakeLeaderLocal received", persistenceId()); if (isLeader()) { getSender().tell(new Status.Success(null), getSelf()); return; } final ActorSelection leader = getLeader(); if (leader == null) { // Leader is not present. The cluster is most likely trying to // elect a leader and we should let that run its normal course // TODO we can wait for the election to complete and retry the // request. We can also let the caller retry by sending a flag // in the response indicating the request is "reTryable". getSender().tell(new Failure( new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. " + "Currently there is no leader for " + persistenceId())), getSelf()); return; } leader.tell(new RequestLeadership(getId(), getSender()), getSelf()); }
@SuppressWarnings("checkstyle:IllegalCatch") void registerCohort(final ActorRef sender, final RegisterCohort cohort) { takeLock(); try { final ActorRef cohortRef = cohort.getCohort(); final RegistrationTreeNode<ActorRef> node = findNodeFor(cohort.getPath().getRootIdentifier().getPathArguments()); addRegistration(node, cohort.getCohort()); cohortToNode.put(cohortRef, node); } catch (final Exception e) { sender.tell(new Status.Failure(e), ActorRef.noSender()); return; } finally { releaseLock(); } sender.tell(new Status.Success(null), ActorRef.noSender()); }
private void onGetShardRole(final GetShardRole message) { LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName()); final String name = message.getName(); final ShardInformation shardInformation = localShards.get(name); if (shardInformation == null) { LOG.info("{}: no shard information for {} found", persistenceId(), name); getSender().tell(new Status.Failure( new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender()); return; } getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender()); }
private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId, final RemoveServerReply replyMsg, final String leaderPath) { shardReplicaOperationsInProgress.remove(shardId.getShardName()); LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName()); if (replyMsg.getStatus() == ServerChangeStatus.OK) { LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(), shardId.getShardName()); originalSender.tell(new Status.Success(null), getSelf()); } else { LOG.warn("{}: Leader failed to remove shard replica {} with status {}", persistenceId(), shardId, replyMsg.getStatus()); Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId); originalSender.tell(new Status.Failure(failure), getSelf()); } }
@SuppressWarnings("checkstyle:IllegalCatch") private void onCreateShard(final CreateShard createShard) { LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard); Object reply; try { String shardName = createShard.getModuleShardConfig().getShardName(); if (localShards.containsKey(shardName)) { LOG.debug("{}: Shard {} already exists", persistenceId(), shardName); reply = new Status.Success(String.format("Shard with name %s already exists", shardName)); } else { doCreateShard(createShard); reply = new Status.Success(null); } } catch (Exception e) { LOG.error("{}: onCreateShard failed", persistenceId(), e); reply = new Status.Failure(e); } if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) { getSender().tell(reply, getSelf()); } }
private void onNotifyProducerRemoved(final NotifyProducerRemoved message) { LOG.debug("Received NotifyProducerRemoved: {}", message); final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next()); if (registration == null) { LOG.warn("The notification contained a path on which no producer is registered, throwing away"); getSender().tell(new Status.Success(null), noSender()); return; } try { registration.close(); getSender().tell(new Status.Success(null), noSender()); } catch (final DOMDataTreeProducerException e) { LOG.error("Unable to close producer", e); getSender().tell(new Status.Failure(e), noSender()); } }
@Override public void run() { final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); ask.onComplete(new OnComplete<Object>() { @Override public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { if (throwable != null) { tryReschedule(throwable); } else { final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; final java.util.Optional<String> leaderActor = findLeader.getLeaderActor(); if (leaderActor.isPresent()) { // leader is found, backend seems ready, check if the frontend is ready LOG.debug("{} - Leader for config shard is ready. Ending lookup.", clusterWrapper.getCurrentMemberName()); replyTo.tell(new Status.Success(null), noSender()); } else { tryReschedule(null); } } } }, system.dispatcher()); }
@Override public void onReceive(Object message) throws InterpreterException { if (message instanceof InterpreterInterface.SubmoduleOutPortHasSignal) { moduleOutPortHasSignal(((InterpreterInterface.SubmoduleOutPortHasSignal) message).getOutPortId()); } else if (message instanceof InstanceProvider) { setInstanceProvider((InstanceProvider) message); } else if (message instanceof RuntimeContext) { setRuntimeContext((RuntimeContext) message); } else if (message == TopLevelInterpreterActorInterface.Start.INSTANCE) { startRunning(); } else if (message instanceof Props) { start((Props) message); } else if (message instanceof Terminated) { childActorTerminated(((Terminated) message).actor()); } else if (message instanceof Status.Failure) { failure(((Status.Failure) message).cause()); } else { super.onReceive(message); } }
void cancel(long executionId, Throwable throwable) { ActorRef child = getContext().getChild(String.valueOf(executionId)); if (child != null) { if (!scheduledTerminations.containsKey(child)) { getContext().watch(child); child.tell(new Status.Failure(throwable), getSelf()); // Give the top-level interpreter some time to finish. Otherwise, we will terminate it after a timeout. Cancellable scheduledTermination = getContext().system().scheduler().scheduleOnce( Duration.create(1, TimeUnit.MINUTES), child, PoisonPill.getInstance(), getContext().dispatcher(), getSelf() ); scheduledTerminations.put(child, scheduledTermination); } } else { log.warning("Request to cancel unknown execution {} because of: {}", executionId, throwable); } }
@Override public void handleMessage(Object message) throws Exception { if (message instanceof LookupKvStateLocation) { // Add to received lookups queue receivedLookups.add((LookupKvStateLocation) message); Object msg = lookupResponses.poll(); if (msg != null) { if (msg instanceof Throwable) { sender().tell(new Status.Failure((Throwable) msg), self()); } else { sender().tell(new Status.Success(msg), self()); } } } else if (message instanceof UUID) { this.leaderSessionId = (UUID) message; } else { LOG.debug("Received unhandled message: {}", message); } }
@Override public Object handleMessage(Object message) { if(message instanceof SubmitTask) { SubmitTask submitTask = (SubmitTask) message; lastTDD = submitTask.tasks(); try { lastTDD.loadBigData(blobCache); return Acknowledge.get(); } catch (Exception e) { e.printStackTrace(); return new Status.Failure(e); } } else { return super.handleMessage(message); } }
@Override public void handleMessage(Object message) { if (message instanceof JobManagerMessages.SubmitJob) { JobID jid = ((JobManagerMessages.SubmitJob) message).jobGraph().getJobID(); getSender().tell( decorateMessage(new JobManagerMessages.JobSubmitSuccess(jid)), getSelf()); } else if (message.getClass() == JobManagerMessages.getRequestLeaderSessionID().getClass()) { getSender().tell( decorateMessage(new JobManagerMessages.ResponseLeaderSessionID(leaderSessionID)), getSelf()); } else if (message instanceof JobManagerMessages.RequestBlobManagerPort$) { getSender().tell(1337, getSelf()); } else { getSender().tell( decorateMessage(new Status.Failure(new Exception("Unknown message " + message))), getSelf()); } }
public ICMPPing(){ receive(ReceiveBuilder.match(PingMessage.class, s -> { String[] splitted = s.getIp().split("\\."); if(splitted.length != 4) { sender().tell(new Status.Failure(new IllegalArgumentException("IP address is in wrong format.")), self()); return; } byte[] nums = new byte[4]; for(int i = 0; i < 4; i++){ nums[i] = Byte.parseByte(splitted[i]); } PingResult result = InetAddress.getByAddress(nums).isReachable(PING_TIMEOUT) ? PingResult.OK : PingResult.UNREACHABLE; sender().tell(result, self()); }).build()); }
@Test public void itShouldPlaceKeyValuesFromListOfSetMessageIntoMap() { TestProbe testProbe = TestProbe.apply(system); TestActorRef<AkkademyDb> actorRef = TestActorRef.create(system, Props.create(AkkademyDb.class)); AkkademyDb akkademyDb = actorRef.underlyingActor(); List list = Arrays.asList( new SetRequest("key2", "value2", testProbe.ref()), new SetRequest("key3", "value3", testProbe.ref())); actorRef.tell(list, ActorRef.noSender()); assertEquals(akkademyDb.map.get("key2"), "value2"); assertEquals(akkademyDb.map.get("key3"), "value3"); testProbe.expectMsg(new Status.Success("key2")); testProbe.expectMsg(new Status.Success("key3")); }
public Flow<JsonNode, JsonNode, NotUsed> createWebSocketFlow(Publisher<JsonNode> webSocketIn, ActorRef userActor) { // http://doc.akka.io/docs/akka/current/scala/stream/stream-flows-and-basics.html#stream-materialization // http://doc.akka.io/docs/akka/current/scala/stream/stream-integrations.html#integrating-with-actors // source is what comes in: browser ws events -> play -> publisher -> userActor // sink is what comes out: userActor -> websocketOut -> play -> browser ws events final Sink<JsonNode, NotUsed> sink = Sink.actorRef(userActor, new Status.Success("success")); final Source<JsonNode, NotUsed> source = Source.fromPublisher(webSocketIn); final Flow<JsonNode, JsonNode, NotUsed> flow = Flow.fromSinkAndSource(sink, source); // Unhook the user actor when the websocket flow terminates // http://doc.akka.io/docs/akka/current/scala/stream/stages-overview.html#watchTermination return flow.watchTermination((ignore, termination) -> { termination.whenComplete((done, throwable) -> { logger.info("Terminating actor {}", userActor); stocksActor.tell(new Stock.Unwatch(null), userActor); actorSystem.stop(userActor); }); return NotUsed.getInstance(); }); }
@Override public void onReceive(Object message) throws Exception { if (message instanceof QueryState) { @SuppressWarnings("unchecked") QueryState<K> queryState = (QueryState<K>) message; LOG.debug("Received QueryState for key " + queryState.getKey() + "."); try { V value = keyValueState.getValue(queryState.getTimestamp(), queryState.getKey()); if (value == null) { sender().tell(new StateNotFound(queryState.getKey()), getSelf()); } else { sender().tell(new StateFound<>(queryState.getKey(), value), getSelf()); } } catch (WrongKeyPartitionException ex) { sender().tell(new Status.Failure(ex), getSelf()); } LOG.debug("Handled QueryState for key " + queryState.getKey() + "."); } }
public Result stream() { Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew()) .mapMaterializedValue(sourceActor -> { sourceActor.tell(ByteString.fromString("kiki"), null); try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } sourceActor.tell(ByteString.fromString("foo"), null); sourceActor.tell(ByteString.fromString("bar"), null); sourceActor.tell(new Status.Success(NotUsed.getInstance()), null); new CreateTraceEntry().traceEntryMarker(); return null; }); return ok().chunked(source); }
private void handleStateObjectChangeMessage(final StateObjectChangeMessage.Request request) throws Exception { final SubjectState subjectState = Optional .ofNullable( subjectStateRepository.getSubjectStateOfUser(request.getPiId(), request.getUserId())) .get(); sender = getSender(); final ActorRef bussinessObjectCheckActor = getContext().actorOf( springExtension.props("BusinessObjectCheckActor", subjectState.getCurrentState().getSId()), UUID.randomUUID().toString()); // must block thread since transaction is lost when using completable future final Future<Object> future = Patterns.ask(bussinessObjectCheckActor, request, Global.TIMEOUT); final boolean correct = ((Boolean) Await.result(future, Global.TIMEOUT.duration())).booleanValue(); if (!correct) { sender.tell(new Status.Failure( new IllegalArgumentException("Check of business objects returned false")), getSelf()); } else { initBusinessObjectInstances(subjectState, request); setValuesOfBusinessObjectFieldInstances(subjectState.getCurrentState(), request); sendMessages(subjectState, request); TransactionSynchronizationManager .registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { sender.tell(new EmptyMessage(), getSelf()); handleAdditionalActions(subjectState); } }); } }
private void triggerSendProcess(final SubjectState subjectState, final Request request) { subjectState.getCurrentState().getMessageFlow().stream() .filter(mf -> SubjectModelType.PROCESS.equals(mf.getReceiver().getSubjectModelType())) .forEachOrdered(mf -> { final BusinessObjectInstance boInstance = businessObjectInstanceRepository.getBusinessObjectInstanceOfModelInProcess( request.getPiId(), mf.getBusinessObjectModels().get(0).getBomId()); try { PatternsCS .ask(getContext().parent(), new SendProcessMessage.Request(request.getPiId(), subjectState.getSubject().getSId(), request.getStateObjectChangeDTO().getUserAssignments() == null || request.getStateObjectChangeDTO().getUserAssignments().isEmpty() ? null : request.getStateObjectChangeDTO().getUserAssignments().get(0) .getUserId(), mf.getMfId(), boInstance != null ? boInstance.getBoiId() : null), Global.TIMEOUT) .toCompletableFuture().get(); subjectState.setToSent(); changeToNextState(subjectState, request); } catch (final Exception e) { LOG.error(e.getMessage()); sender.tell(new Status.Failure(new IllegalStateException("Error: " + e.getMessage())), getSelf()); } }); }
@Override public void execute(final MessagesSendMessage.Request request) throws Exception { final List<CompletableFuture<Object>> futures = request.getUserMessageFlowIds() .stream().map(userMessageFlow -> convertToFuture(request.getPiId(), userMessageFlow.getLeft(), userMessageFlow.getRight())) .collect(Collectors.toList()); final ActorRef sender = getSender(); try { CompletableFuture.allOf(Iterables.toArray(futures, CompletableFuture.class)).get(); LOG.info("All users received the message in PI_ID [{}]", request.getPiId()); final SubjectState sendState = subjectStateRepository.findOne(request.getSendSubjectState()); sendState.setToSent(); subjectStateRepository.save((SubjectStateImpl) sendState); LOG.debug("{} is set to 'SENT'", sendState); } catch (final Exception e) { LOG.error("At least one user did not receive the message in PI_ID [{}]", request.getPiId()); sender.tell( new Status.Failure(new IllegalStateException( "Could not send message to all users in PI_ID [" + request.getPiId() + "]")), getSelf()); } TransactionSynchronizationManager .registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { sender.tell(new MessagesSendMessage.Response(), getSelf()); } }); }
@Override @SuppressWarnings("checkstyle:IllegalCatch") CohortBehaviour<?> process(CanCommit message) { final PostCanCommitStep nextStep; try { nextStep = cohort.canCommit(message.getTxId(), message.getCandidates(), message.getSchema()).get(); } catch (final Exception e) { getSender().tell(new Status.Failure(e), getSelf()); return this; } getSender().tell(new Success(getSelf(), message.getTxId()), getSelf()); return new PostCanCommit(message.getTxId(), nextStep); }
@Override @SuppressWarnings("checkstyle:IllegalCatch") final CohortBehaviour<?> abort() { try { getStep().abort().get(); } catch (final Exception e) { LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e); getSender().tell(new Status.Failure(e), getSelf()); return idleState; } getSender().tell(new Success(getSelf(), txId), getSelf()); return idleState; }
@Override @SuppressWarnings("checkstyle:IllegalCatch") CohortBehaviour<?> process(PreCommit message) { final PostPreCommitStep nextStep; try { nextStep = getStep().preCommit().get(); } catch (final Exception e) { getSender().tell(new Status.Failure(e), getSelf()); return idleState; } getSender().tell(new Success(getSelf(), message.getTxId()), getSelf()); return new PostPreCommit(getTxId(), nextStep); }
@Override @SuppressWarnings("checkstyle:IllegalCatch") CohortBehaviour<?> process(Commit message) { try { getStep().commit().get(); } catch (final Exception e) { getSender().tell(new Status.Failure(e), getSender()); return idleState; } getSender().tell(new Success(getSelf(), getTxId()), getSelf()); return idleState; }
void removeCommitCohort(final ActorRef sender, final RemoveCohort message) { final ActorRef cohort = message.getCohort(); final RegistrationTreeNode<ActorRef> node = cohortToNode.get(cohort); if (node != null) { removeRegistration(node, cohort); cohortToNode.remove(cohort); } sender.tell(new Status.Success(null), ActorRef.noSender()); cohort.tell(PoisonPill.getInstance(), cohort); }
private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; } shardReplicaOperationsInProgress.add(shardName); final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), primaryPath, shardId); Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); Future<Object> futureObj = ask(getContext().actorSelection(primaryPath), new RemoveServer(shardId.toString()), removeServerTimeout); futureObj.onComplete(new OnComplete<Object>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { shardReplicaOperationsInProgress.remove(shardName); String msg = String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName); LOG.debug("{}: {}", persistenceId(), msg, failure); // FAILURE sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); } else { // SUCCESS self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); }
private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; } shardReplicaOperationsInProgress.add(shardName); final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), primaryPath, shardId); Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); Future<Object> futureObj = ask(getContext().actorSelection(primaryPath), new RemoveServer(shardId.toString()), removeServerTimeout); futureObj.onComplete(new OnComplete<Object>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { shardReplicaOperationsInProgress.remove(shardName); String msg = String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName); LOG.debug("{}: {}", persistenceId(), msg, failure); // FAILURE sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); } else { // SUCCESS self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); }
private void onGetLocalShardIds() { final List<String> response = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { response.add(info.getShardId().toString()); } getSender().tell(new Status.Success(response), getSelf()); }
private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) { if (shardReplicaOperationsInProgress.contains(shardName)) { String msg = String.format("A shard replica operation for %s is already in progress", shardName); LOG.debug("{}: {}", persistenceId(), msg); sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); return true; } return false; }
private void onAddServerFailure(final String shardName, final String message, final Throwable failure, final ActorRef sender, final boolean removeShardOnFailure) { shardReplicaOperationsInProgress.remove(shardName); if (removeShardOnFailure) { ShardInformation shardInfo = localShards.remove(shardName); if (shardInfo.getActor() != null) { shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); } } sender.tell(new Status.Failure(message == null ? failure : new RuntimeException(message, failure)), getSelf()); }
private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg, final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) { String shardName = shardInfo.getShardName(); shardReplicaOperationsInProgress.remove(shardName); LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath); if (replyMsg.getStatus() == ServerChangeStatus.OK) { LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName); // Make the local shard voting capable shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf()); shardInfo.setActiveMember(true); persistShardList(); sender.tell(new Status.Success(null), getSelf()); } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) { sendLocalReplicaAlreadyExistsReply(shardName, sender); } else { LOG.warn("{}: Leader failed to add shard replica {} with status {}", persistenceId(), shardName, replyMsg.getStatus()); Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId()); onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure); } }
private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) { LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus); ActorRef sender = getSender(); final String shardName = flipMembersVotingStatus.getShardName(); findLocalShard(shardName, sender, localShardFound -> { Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, Timeout.apply(30, TimeUnit.SECONDS)); future.onComplete(new OnComplete<Object>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { sender.tell(new Status.Failure(new RuntimeException( String.format("Failed to access local shard %s", shardName), failure)), self()); return; } OnDemandRaftState raftState = (OnDemandRaftState) response; Map<String, Boolean> serverVotingStatusMap = new HashMap<>(); for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) { serverVotingStatusMap.put(e.getKey(), !e.getValue()); } serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName) .toString(), !raftState.isVoting()); changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap), shardName, localShardFound.getPath(), sender); } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); }); }
@Override public void onUnknownResponse(final Object response) { String msg = String.format("Failed to find leader for shard %s: received response: %s", shardName, response); LOG.debug("{}: {}", persistenceId, msg); targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : new RuntimeException(msg)), shardManagerActor); }
@Override protected final AbstractDataStoreClientBehavior onCommand(final Object command) { if (command instanceof GetClientRequest) { ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender()); } else { LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command); } return this; }