public RaftActorContextImpl(ActorRef actor, ActorContext context, String id, @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied, @Nonnull Map<String, String> peerAddresses, @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider, @Nonnull Consumer<ApplyState> applyStateConsumer, @Nonnull Logger logger) { this.actor = actor; this.context = context; this.id = id; this.termInformation = Preconditions.checkNotNull(termInformation); this.commitIndex = commitIndex; this.lastApplied = lastApplied; this.configParams = Preconditions.checkNotNull(configParams); this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider); this.log = Preconditions.checkNotNull(logger); this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer); fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory( configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory()); for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) { peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING)); } }
@Override public void onUpdate(ActorContext context) throws Exception { PluginMetaData oldPluginMd = pluginMd; pluginMd = systemContext.getPluginService().findPluginById(entityId); boolean requiresRestart = false; logger.info("[{}] Plugin configuration was updated from {} to {}.", entityId, oldPluginMd, pluginMd); if (!oldPluginMd.getClazz().equals(pluginMd.getClazz())) { logger.info("[{}] Plugin requires restart due to clazz change from {} to {}.", entityId, oldPluginMd.getClazz(), pluginMd.getClazz()); requiresRestart = true; } else if (!oldPluginMd.getConfiguration().equals(pluginMd.getConfiguration())) { logger.info("[{}] Plugin requires restart due to configuration change from {} to {}.", entityId, oldPluginMd.getConfiguration(), pluginMd.getConfiguration()); requiresRestart = true; } if (requiresRestart) { this.state = ComponentLifecycleState.SUSPENDED; if (pluginImpl != null) { pluginImpl.stop(trustedCtx); } start(); } }
private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) { if (!rpcPendingMap.isEmpty()) { logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, rpcPendingMap.size(), sessionId); if (type == SessionType.SYNC) { logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); } } else { logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId); } Set<UUID> sentOneWayIds = new HashSet<>(); if (type == SessionType.ASYNC) { rpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds)); } else { rpcPendingMap.entrySet().stream().findFirst() .ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds)); } sentOneWayIds.forEach(rpcPendingMap::remove); }
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<UUID> sentOneWayIds) { return entry -> { ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg(); ToDeviceRpcRequestBody body = request.getBody(); if (request.isOneway()) { sentOneWayIds.add(request.getId()); ToPluginRpcResponseDeviceMsg responsePluginMsg = toPluginRpcResponseMsg(entry.getValue().getMsg(), (String) null); context.parent().tell(responsePluginMsg, ActorRef.noSender()); } ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(entry.getKey(), body.getMethod(), body.getParams()); ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sessionId); sendMsgToSessionActor(response, server); }; }
private void processSubscriptionCommands(ActorContext context, ToDeviceActorMsg msg) { SessionId sessionId = msg.getSessionId(); SessionType sessionType = msg.getSessionType(); FromDeviceMsg inMsg = msg.getPayload(); if (inMsg.getMsgType() == MsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) { logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId); attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress())); } else if (inMsg.getMsgType() == MsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) { logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId); attributeSubscriptions.remove(sessionId); } else if (inMsg.getMsgType() == MsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) { logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType); rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress())); sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress()); } else if (inMsg.getMsgType() == MsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) { logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType); rpcSubscriptions.remove(sessionId); } }
public Optional<ActorRef> update(ActorContext context, RuleId ruleId, ComponentLifecycleEvent event) { RuleMetaData rule; if (event != ComponentLifecycleEvent.DELETED) { rule = systemContext.getRuleService().findRuleById(ruleId); } else { rule = ruleMap.keySet().stream().filter(r -> r.getId().equals(ruleId)) .peek(r -> r.setState(ComponentLifecycleState.SUSPENDED)).findFirst().orElse(null); if (rule != null) { ruleMap.remove(rule); ruleActors.remove(ruleId); } } if (rule != null) { RuleActorMetaData actorMd = ruleMap.get(rule); if (actorMd == null) { ActorRef ref = getOrCreateRuleActor(context, rule.getId()); actorMd = RuleActorMetaData.systemRule(rule.getId(), rule.getWeight(), ref); ruleMap.put(rule, actorMd); } refreshRuleChain(); return Optional.of(actorMd.getActorRef()); } else { log.warn("[{}] Can't process unknown rule!", ruleId); return Optional.empty(); } }
@Override public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) { try { if (msg.getMsgType() != MsgType.SESSION_CLOSE) { switch (msg.getMsgType()) { case STATUS_CODE_RESPONSE: case GET_ATTRIBUTES_RESPONSE: ResponseMsg responseMsg = (ResponseMsg) msg; if (responseMsg.getRequestId() >= 0) { logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg); pendingMap.remove(responseMsg.getRequestId()); } break; } sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg)); } else { sessionCtx.onMsg(org.iotp.analytics.ruleengine.common.msg.session.ctrl.SessionCloseMsg .onCredentialsRevoked(sessionCtx.getSessionId())); } } catch (SessionException e) { logger.warning("Failed to push session response msg", e); } }
@Override public void processToAssetMsg(ActorContext context, ToDeviceMsg msg) { try { if (msg.getMsgType() != MsgType.SESSION_CLOSE) { switch (msg.getMsgType()) { case STATUS_CODE_RESPONSE: case GET_ATTRIBUTES_RESPONSE: ResponseMsg responseMsg = (ResponseMsg) msg; if (responseMsg.getRequestId() >= 0) { logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg); pendingMap4Asset.remove(responseMsg.getRequestId()); } break; } sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg)); } else { sessionCtx.onMsg(org.iotp.analytics.ruleengine.common.msg.session.ctrl.SessionCloseMsg .onCredentialsRevoked(sessionCtx.getSessionId())); } } catch (SessionException e) { logger.warning("Failed to push session response msg", e); } }
private void pushToNextRule(ActorContext context, ChainProcessingContext ctx, RuleEngineError error) { if (error != null) { ctx = ctx.withError(error); } if (ctx.isFailure()) { logger.debug("[{}][{}] Forwarding processing chain to device actor due to failure.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender()); } else if (!ctx.hasNext()) { logger.debug("[{}][{}] Forwarding processing chain to device actor due to end of chain.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender()); } else { logger.debug("[{}][{}] Forwarding processing chain to next rule actor.", ruleMd.getId(), ctx.getInMsg().getDeviceId()); ChainProcessingContext nextTask = ctx.getNext(); nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self()); } }
@Override public void onActivate(ActorContext context) throws Exception { logger.info("[{}] Going to process onActivate rule.", entityId); this.state = ComponentLifecycleState.ACTIVE; if (filters != null) { filters.forEach(RuleLifecycleComponent::resume); if (processor != null) { processor.resume(); } else { initProcessor(); } if (action != null) { action.resume(); } logger.info("[{}] Rule resumed.", entityId); } else { start(); } }
private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) { if (!rpcPendingMap.isEmpty()) { logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, rpcPendingMap.size(), sessionId); if (type == SessionType.SYNC) { logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); } } else { logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId); } Set<Integer> sentOneWayIds = new HashSet<>(); if (type == SessionType.ASYNC) { rpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds)); } else { rpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds)); } sentOneWayIds.forEach(rpcPendingMap::remove); }
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) { return entry -> { ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg(); ToDeviceRpcRequestBody body = request.getBody(); if (request.isOneway()) { sentOneWayIds.add(entry.getKey()); ToPluginRpcResponseDeviceMsg responsePluginMsg = toPluginRpcResponseMsg(entry.getValue().getMsg(), (String) null); context.parent().tell(responsePluginMsg, ActorRef.noSender()); } ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg( entry.getKey(), body.getMethod(), body.getParams() ); ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sessionId); sendMsgToSessionActor(response, server); }; }
@Override public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) { try { if (msg.getMsgType() != MsgType.SESSION_CLOSE) { switch (msg.getMsgType()) { case STATUS_CODE_RESPONSE: case GET_ATTRIBUTES_RESPONSE: ResponseMsg responseMsg = (ResponseMsg) msg; if (responseMsg.getRequestId() >= 0) { logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg); pendingMap.remove(responseMsg.getRequestId()); } break; default: break; } sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg)); } else { sessionCtx.onMsg(org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg.onCredentialsRevoked(sessionCtx.getSessionId())); } } catch (SessionException e) { logger.warning("Failed to push session response msg", e); } }
@Override public void processClusterEvent(ActorContext context, ClusterEventMsg msg) { if (pendingMap.size() > 0 || subscribedToAttributeUpdates || subscribedToRpcCommands) { Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolveById(getDeviceId()); if (!newTargetServer.equals(currentTargetServer)) { firstMsg = true; currentTargetServer = newTargetServer; pendingMap.values().forEach(v -> { forwardToAppActor(context, v, currentTargetServer); if (currentTargetServer.isPresent()) { logger.debug("[{}] Forwarded msg to new server: {}", sessionId, currentTargetServer.get()); } else { logger.debug("[{}] Forwarded msg to local server.", sessionId); } }); if (subscribedToAttributeUpdates) { toDeviceMsg(new AttributesSubscribeMsg()).ifPresent(m -> forwardToAppActor(context, m, currentTargetServer)); logger.debug("[{}] Forwarded attributes subscription.", sessionId); } if (subscribedToRpcCommands) { toDeviceMsg(new RpcSubscribeMsg()).ifPresent(m -> forwardToAppActor(context, m, currentTargetServer)); logger.debug("[{}] Forwarded rpc commands subscription.", sessionId); } } } }
@Override public void processClusterEvent(ActorContext context, ClusterEventMsg msg) { if (pendingResponse) { Optional<ServerAddress> newTargetServer = forwardToAppActorIfAdressChanged(context, pendingMsg, currentTargetServer); if (logger.isDebugEnabled()) { if (!newTargetServer.equals(currentTargetServer)) { if (newTargetServer.isPresent()) { logger.debug("[{}] Forwarded msg to new server: {}", sessionId, newTargetServer.get()); } else { logger.debug("[{}] Forwarded msg to local server.", sessionId); } } } currentTargetServer = newTargetServer; } }
void processEndpointEventDeliveryMessage(ActorContext context, EndpointEventDeliveryMessage message) { LOG.debug("[{}] processing event delivery message for [{}] with status {}", userId, message.getMessage().getAddress(), message.getStatus()); boolean success = message.getStatus() == EventDeliveryStatus.SUCCESS; RouteTableAddress address = message.getMessage().getAddress(); for (EndpointEvent event : message.getMessage().getEndpointEvents()) { if (success) { LOG.debug("[{}] registering successful delivery of event [{}] to address {}", userId, event.getId(), address); eventDeliveryTable.registerDeliverySuccess(event, address); } else { LOG.debug("[{}] registering failure to delivery of event [{}] to address {}", userId, event.getId(), address); eventDeliveryTable.registerDeliveryFailure(event, address); } } }
void processRouteInfoMessage(ActorContext context, RouteInfoMessage message) { RouteInfo routeInfo = message.getRouteInfo(); if (RouteOperation.DELETE.equals(routeInfo.getRouteOperation())) { LOG.debug("[{}] Removing all routes from route table by address {}", userId, routeInfo.getAddress()); routeTable.removeByAddress(routeInfo.getAddress()); } else { for (EventClassFamilyVersion ecfVersion : routeInfo.getEcfVersions()) { RouteTableKey key = new RouteTableKey( routeInfo.getAddress().getApplicationToken(), ecfVersion); LOG.debug("[{}] Updating route table with key {} and address {}", userId, key, routeInfo.getAddress()); updateRouteTable(context, key, routeInfo.getAddress()); } } reportAllLocalRoutes(routeInfo.getAddress().getServerId()); }
private void registerEndpointForEvents(ActorContext context, EndpointUserConnectMessage message, RouteTableAddress address) { List<EventClassFamilyVersion> ecfVersions = message.getEcfVersions(); if (!ecfVersions.isEmpty()) { for (EventClassFamilyVersion ecfVersion : ecfVersions) { RouteTableKey key = new RouteTableKey(address.getApplicationToken(), ecfVersion); updateRouteTable(context, key, address); } if (firstConnectRequestToActor) { firstConnectRequestToActor = false; // report existence of this actor to other operation servers eventService.sendUserRouteInfo(new UserRouteInfo(tenantId, userId)); } for (String serverId : routeTable.getRemoteServers()) { if (routeTable.isDeliveryRequired(serverId, address)) { LOG.debug("[{}] Sending route info about address {} to server {}", userId, address, serverId); eventService.sendRouteInfo(new RouteInfo( tenantId, userId, address, ecfVersions), serverId); } } versionMap.put(address.getEndpointKey(), message.getEcfVersions()); } }
protected void removeEndpoint(ActorContext context, EndpointObjectHash endpoint) { LOG.debug("[{}] removing endpoint [{}] from route tables", userId, endpoint); RouteTableAddress address = routeTable.removeLocal(endpoint); versionMap.remove(endpoint); for (String serverId : routeTable.getRemoteServers()) { LOG.debug("[{}] removing endpoint [{}] from remote route table on server {}", userId, endpoint, serverId); eventService.sendRouteInfo( RouteInfo.deleteRouteFromAddress(tenantId, userId, address), serverId); } // cleanup and notify global route actor GlobalRouteInfo route = GlobalRouteInfo.delete(tenantId, userId, address); if (eventService.isMainUserNode(userId)) { context.parent().tell(new EndpointRouteUpdateMessage(route), context.self()); } else { LOG.debug("[{}] Sending disconnect message to global actor", userId); eventService.sendEndpointRouteInfo(route); } }
/** * Process message. * * @param context the actor context * @param route the global route info */ public void process(ActorContext context, GlobalRouteInfo route) { if (route.getRouteOperation() == RouteOperation.ADD) { LOG.debug("[{}][{}] Adding route {} for cf version {}", tenantId, userId, route, route.getCfVersion()); ConfigurationKey key = ConfigurationKey.fromRouteInfo(route); map.add(key, route); checkHashAndSendNotification(context, key, route); } else if (route.getRouteOperation() == RouteOperation.DELETE) { LOG.debug("[{}][{}] Remove route {} for cf version {}", tenantId, userId, route, route.getCfVersion()); map.remove(route); } else { LOG.warn("[{}][{}] unsupported route operations {}", tenantId, userId, route.getRouteOperation()); } }
private void checkHashAndSendNotification(ActorContext context, byte[] newHash, GlobalRouteInfo route) { if (!Arrays.equals(newHash, route.getUcfHash())) { LOG.trace("Sending notification to route {}", route); if (route.isLocal()) { context.parent() .tell( new EndpointUserConfigurationUpdateMessage(toUpdate(newHash, route)), context.self()); } else { eventService .sendEndpointStateInfo(route.getAddress().getServerId(), toUpdate(newHash, route)); } } else { LOG.trace("Ignoring notification to route {} due to matching hashes", route); } }
/** * Process an endpoint event receive message. * * @param context actor context * @param message endpoint event receive message */ public void processEndpointEventReceiveMessage(ActorContext context, EndpointEventReceiveMessage message) { EndpointEventDeliveryMessage response; if (state.isValidForEvents()) { Set<ChannelMetaData> eventChannels = state.getChannelsByType(TransportType.EVENT); if (!eventChannels.isEmpty()) { for (ChannelMetaData eventChannel : eventChannels) { addEventsAndReply(context, eventChannel, message); } response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.SUCCESS); } else { LOG.debug("[{}] Message ignored due to no channel contexts registered for events", actorKey, message); response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.FAILURE); state.setUserRegistrationPending(false); } } else { LOG.debug( "[{}][{}] Endpoint profile is not valid for receiving events. " + "Either no assigned user or no event families in sdk", endpointKey, actorKey); response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.FAILURE); } tellParent(context, response); }
@Test public void actorTimeoutTest() { OperationsService osMock = Mockito.mock(OperationsService.class); ActorContext ctxMock = Mockito.mock(ActorContext.class); ActorRef appActorMock = Mockito.mock(ActorRef.class); Mockito.when(ctxMock.parent()).thenReturn(appActorMock); AkkaContext context = Mockito.mock(AkkaContext.class); Mockito.when(context.getOperationsService()).thenReturn(osMock); LocalEndpointActorMessageProcessor processor = Mockito.spy(new LocalEndpointActorMessageProcessor(context, "APP_TOKEN", EndpointObjectHash .fromSha1("key"), "actorKey")); ActorTimeoutMessage msg = new ActorTimeoutMessage(System.currentTimeMillis()); Mockito.doNothing().when(processor).tellParent(Mockito.any(ActorContext.class), Mockito.any(Object.class)); processor.processActorTimeoutMessage(ctxMock, msg); Mockito.verify(processor).tellParent(Mockito.any(ActorContext.class), Mockito.any(Object.class)); }
private void syncChannels(ActorContext context, Set<ChannelMetaData> channels, boolean cfUpdate, boolean nfUpdate) { for (ChannelMetaData channel : channels) { ClientSync originalRequest = channel.getRequestMessage().getRequest(); ClientSync newRequest = new ClientSync(); newRequest.setRequestId(originalRequest.getRequestId()); newRequest.setClientSyncMetaData(originalRequest.getClientSyncMetaData()); newRequest.setUseConfigurationRawSchema(originalRequest.isUseConfigurationRawSchema()); if (cfUpdate && originalRequest.getConfigurationSync() != null) { newRequest.setForceConfigurationSync(true); newRequest.setConfigurationSync(originalRequest.getConfigurationSync()); } if (nfUpdate && originalRequest.getNotificationSync() != null) { newRequest.setForceNotificationSync(true); newRequest.setNotificationSync(originalRequest.getNotificationSync()); } LOG.debug("[{}][{}] Processing request {}", endpointKey, actorKey, newRequest); sync(context, new SyncRequestMessage(channel.getRequestMessage().getSession(), newRequest, channel.getRequestMessage().getCommand(), channel.getRequestMessage().getOriginator())); } }
private void processUserAttachRequest(ActorContext context, ClientSync syncRequest, SyncContext responseHolder) { UserClientSync request = syncRequest.getUserSync(); if (request != null && request.getUserAttachRequest() != null) { UserAttachRequest attachRequest = request.getUserAttachRequest(); context.parent().tell(new UserVerificationRequestMessage( context.self(), attachRequest.getUserVerifierId(), attachRequest.getUserExternalId(), attachRequest.getUserAccessToken()), context.self()); LOG.debug("[{}][{}] received and forwarded user attach request {}", endpointKey, actorKey, request.getUserAttachRequest()); if (userAttachResponseMap.size() > 0) { Entry<UUID, UserVerificationResponseMessage> entryToSend = userAttachResponseMap.entrySet() .iterator() .next(); updateResponseWithUserAttachResults(responseHolder.getResponse(), entryToSend.getValue()); userAttachResponseMap.remove(entryToSend.getKey()); } } }
private void processEvents(ActorContext context, ClientSync request, SyncContext responseHolder) { if (request.getEventSync() != null) { EventClientSync eventRequest = request.getEventSync(); processSeqNumber(eventRequest, responseHolder); if (state.isValidForEvents()) { sendEventsIfPresent(context, eventRequest); } else { LOG.debug( "[{}][{}] Endpoint profile is not valid for send/receive events. " + "Either no assigned user or no event families in sdk", endpointKey, actorKey); } } }
private void updateUserConnection(ActorContext context) { if (!state.isValidForUser()) { return; } if (state.userIdMismatch()) { sendDisconnectFromOldUser(context, state.getProfile()); state.setUserRegistrationPending(false); } if (!state.isUserRegistrationPending()) { state.setUserId(state.getProfileUserId()); if (state.getUserId() != null) { sendConnectToNewUser(context, state.getProfile()); state.setUserRegistrationPending(true); } } else { LOG.trace("[{}][{}] User registration request is already sent.", endpointKey, actorKey); } }
private void addEventsAndReply(ActorContext context, ChannelMetaData channel, EndpointEventReceiveMessage message) { SyncRequestMessage pendingRequest = channel.getRequestMessage(); SyncContext pendingResponse = channel.getResponseHolder(); EventServerSync eventResponse = pendingResponse.getResponse().getEventSync(); if (eventResponse == null) { eventResponse = new EventServerSync(); pendingResponse.getResponse().setEventSync(eventResponse); } eventResponse.setEvents(message.getEvents()); sendReply(context, pendingRequest, pendingResponse.getResponse()); if (!channel.getType().isAsync()) { state.removeChannel(channel); } }
/** * Process a ping message. * * @param context actor context * @param message channel aware message * @return true if channel is found otherwise false */ public boolean processPingMessage(ActorContext context, ChannelAware message) { LOG.debug("[{}][{}] Received ping message for channel [{}]", endpointKey, actorKey, message.getChannelUuid()); ChannelMetaData channel = state.getChannelById(message.getChannelUuid()); if (channel != null) { long lastActivityTime = System.currentTimeMillis(); LOG.debug("[{}][{}] Updating last activity time for channel [{}] to ", endpointKey, actorKey, message.getChannelUuid(), lastActivityTime); channel.setLastActivityTime(lastActivityTime); channel.getContext().writeAndFlush(new PingResponse()); return true; } else { LOG.debug("[{}][{}] Can't find channel by uuid [{}]", endpointKey, actorKey, message.getChannelUuid()); return false; } }
/** * Process a log delivery message. * * @param context actor context * @param message log delivery message */ public void processLogDeliveryMessage(ActorContext context, LogDeliveryMessage message) { LOG.debug("[{}][{}] Received log delivery message for request [{}] with status {}", endpointKey, actorKey, message.getRequestId(), message.isSuccess()); logUploadResponseMap.put(message.getRequestId(), message); Set<ChannelMetaData> channels = state.getChannelsByType(TransportType.LOGGING); for (ChannelMetaData channel : channels) { SyncRequestMessage pendingRequest = channel.getRequestMessage(); ServerSync pendingResponse = channel.getResponseHolder().getResponse(); pendingResponse.setLogSync(EntityConvertUtils.convert(logUploadResponseMap)); LOG.debug("[{}][{}] sending reply to [{}] channel", endpointKey, actorKey, channel.getId()); sendReply(context, pendingRequest, pendingResponse); if (!channel.getType().isAsync()) { state.removeChannel(channel); } } logUploadResponseMap.clear(); }
@Test public void processDisconnectMessageTest() { OperationsService osMock = Mockito.mock(OperationsService.class); ActorContext ctxMock = Mockito.mock(ActorContext.class); ActorRef appActorMock = Mockito.mock(ActorRef.class); Mockito.when(ctxMock.parent()).thenReturn(appActorMock); AkkaContext context = Mockito.mock(AkkaContext.class); Mockito.when(context.getOperationsService()).thenReturn(osMock); LocalEndpointActorMessageProcessor processor = Mockito.spy(new LocalEndpointActorMessageProcessor(context, "APP_TOKEN", EndpointObjectHash .fromSha1("key"), "actorKey")); ChannelAware msg = Mockito.mock(ChannelAware.class); Assert.assertFalse(processor.processDisconnectMessage(ctxMock, msg)); }
private void processSessionInitRequest(ActorContext context, SessionInitMessage message) throws GeneralSecurityException, PlatformEncDecException, InvalidSdkTokenException, EndpointVerificationException { ClientSync request = decodeRequest(message); EndpointObjectHash key = getEndpointObjectHash(request); String sdkToken = getSdkToken(request); if (isSdkTokenValid(sdkToken)) { String appToken = getAppToken(sdkToken); verifyEndpoint(key, appToken); SessionInfo session = new SessionInfo( message.getChannelUuid(), message.getPlatformId(), message.getChannelContext(), message.getChannelType(), crypt.getSessionCipherPair(), key, appToken, sdkToken, message.getKeepAlive(), message.isEncrypted()); message.onSessionCreated(session); forwardToOpsActor(context, session, request, message); } else { LOG.info("Invalid sdk token received: {}", sdkToken); throw new InvalidSdkTokenException(); } }
@Test public void testEndpointNotAttachedEvent() throws Exception { AkkaContext context = mock(AkkaContext.class); ActorContext actorCtxMock = mock(ActorContext.class); ActorRef actorMock = spy(ActorRef.class); ActorSystem system = ActorSystem.create(); try { final Props props = Props.create(TestActor.class); final TestActorRef<TestActor> parentMock = TestActorRef.create(system, props, "testA"); when(actorCtxMock.self()).thenReturn(actorMock); when(actorCtxMock.parent()).thenReturn(parentMock); EndpointEventReceiveMessage msg = mock(EndpointEventReceiveMessage.class); LocalEndpointActorMessageProcessor processor = spy(new LocalEndpointActorMessageProcessor( context, APP_TOKEN, EndpointObjectHash.fromSha1(clientPublicKeyHash.array()), "ACTOR_TOKEN" )); processor.processEndpointEventReceiveMessage(actorCtxMock, msg); Assert.assertEquals( EndpointEventDeliveryMessage.EventDeliveryStatus.FAILURE, ((EndpointEventDeliveryMessage) parentMock.underlyingActor().getMsg()).getStatus()); } finally { JavaTestKit.shutdownActorSystem(system); } }
@Before public void before() { cacheServiceMock = mock(CacheService.class); eventServiceMock = mock(EventService.class); originatorRefMock = mock(ActorRef.class); actorContextMock = mock(ActorContext.class); akkaContextMock = mock(AkkaContext.class); when(akkaContextMock.getCacheService()).thenReturn(cacheServiceMock); when(akkaContextMock.getEventService()).thenReturn(eventServiceMock); when(akkaContextMock.getEventTimeout()).thenReturn(60 * 1000L); messageProcessor = spy(new LocalUserActorMessageProcessor(akkaContextMock, USER_ID, TENANT_ID)); doReturn("dummyPathName").when(messageProcessor).getActorPathName(any(ActorRef.class)); Mockito.doNothing().when(messageProcessor).scheduleTimeoutMessage(Mockito.any(ActorContext.class), Mockito.any(EndpointEvent.class)); Mockito.doNothing().when(messageProcessor).sendEventToLocal(Mockito.any(ActorContext.class), Mockito.any(EndpointEventReceiveMessage.class)); ecfVersions = new ArrayList<>(); ecfVersion1 = new EventClassFamilyVersion(ECF_ID1, ECF_ID1_VERSION); ecfVersion2 = new EventClassFamilyVersion(ECF_ID2, ECF_ID2_VERSION); ecfVersions.add(ecfVersion1); ecfVersions.add(ecfVersion2); address1 = new RouteTableAddress(endpoint1Key, APP_TOKEN); address2 = new RouteTableAddress(endpoint2Key, APP_TOKEN, SERVER2); address3 = new RouteTableAddress(endpoint3Key, APP_TOKEN); }
@Test public void testEndpointLocalEvent() { EndpointUserConnectMessage message1 = new EndpointUserConnectMessage(USER_ID, endpoint1Key, ecfVersions, 1, null, APP_TOKEN, originatorRefMock); messageProcessor.processEndpointConnectMessage(actorContextMock, message1); EndpointUserConnectMessage message2 = new EndpointUserConnectMessage(USER_ID, endpoint2Key, ecfVersions, 1, null, APP_TOKEN, originatorRefMock); messageProcessor.processEndpointConnectMessage(actorContextMock, message2); verify(eventServiceMock).sendUserRouteInfo(new UserRouteInfo(TENANT_ID, USER_ID)); verify(eventServiceMock, Mockito.times(0)).sendRouteInfo(any(RouteInfo.class), any(String.class)); when(cacheServiceMock.getEventClassFamilyIdByEventClassFqn(new EventClassFqnKey(TENANT_ID, "testClassFqn"))).thenReturn(ECF_ID1); RouteTableKey routeKey = new RouteTableKey(APP_TOKEN, ecfVersion1); when(cacheServiceMock.getRouteKeys(new EventClassFqnVersion(TENANT_ID, "testClassFqn", ECF_ID1_VERSION))).thenReturn(Collections.singleton(routeKey)); Event event = new Event(0, "testClassFqn", ByteBuffer.wrap(new byte[0]), null, Base64Util.encode(endpoint1Key.getData())); EndpointEventSendMessage eventMessage = new EndpointEventSendMessage(USER_ID, Collections.singletonList(event), endpoint2Key, APP_TOKEN, originatorRefMock); messageProcessor.processEndpointEventSendMessage(actorContextMock, eventMessage); verify(messageProcessor).sendEventToLocal(Mockito.any(ActorContext.class), Mockito.any(EndpointEventReceiveMessage.class)); }
@Test public void testEndpointRemoteReceiveEvent() { EndpointUserConnectMessage message1 = new EndpointUserConnectMessage(USER_ID, endpoint1Key, ecfVersions, 1, null, APP_TOKEN, originatorRefMock); messageProcessor.processEndpointConnectMessage(actorContextMock, message1); RouteTableKey routeKey = new RouteTableKey(APP_TOKEN, ecfVersion1); when(cacheServiceMock.getRouteKeys(new EventClassFqnVersion(TENANT_ID, "testClassFqn", ECF_ID1_VERSION))).thenReturn(Collections.singleton(routeKey)); Event event = new Event(0, "testClassFqn", ByteBuffer.wrap(new byte[0]), null, Base64Util.encode(endpoint1Key.getData())); EndpointEvent endpointEvent = new EndpointEvent(endpoint2Key, event, UUID.randomUUID(), System.currentTimeMillis(), ECF_ID1_VERSION); RemoteEndpointEvent remoteEvent = new RemoteEndpointEvent(TENANT_ID, USER_ID, endpointEvent, new RouteTableAddress(endpoint1Key, APP_TOKEN, "SERVER1")); RemoteEndpointEventMessage message2 = new RemoteEndpointEventMessage(remoteEvent); messageProcessor.processRemoteEndpointEventMessage(actorContextMock, message2); verify(cacheServiceMock, Mockito.never()).getEventClassFamilyIdByEventClassFqn(new EventClassFqnKey(TENANT_ID, "testClassFqn")); verify(messageProcessor).sendEventToLocal(Mockito.any(ActorContext.class), Mockito.any(EndpointEventReceiveMessage.class)); }