Java 类akka.actor.ActorContext 实例源码

项目:hashsdn-controller    文件:RaftActorContextImpl.java   
public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
        @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
        @Nonnull Map<String, String> peerAddresses,
        @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider,
        @Nonnull Consumer<ApplyState> applyStateConsumer, @Nonnull Logger logger) {
    this.actor = actor;
    this.context = context;
    this.id = id;
    this.termInformation = Preconditions.checkNotNull(termInformation);
    this.commitIndex = commitIndex;
    this.lastApplied = lastApplied;
    this.configParams = Preconditions.checkNotNull(configParams);
    this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider);
    this.log = Preconditions.checkNotNull(logger);
    this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);

    fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory(
            configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory());

    for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
        peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
    }
}
项目:iotplatform    文件:PluginActorMessageProcessor.java   
@Override
public void onUpdate(ActorContext context) throws Exception {
  PluginMetaData oldPluginMd = pluginMd;
  pluginMd = systemContext.getPluginService().findPluginById(entityId);
  boolean requiresRestart = false;
  logger.info("[{}] Plugin configuration was updated from {} to {}.", entityId, oldPluginMd, pluginMd);
  if (!oldPluginMd.getClazz().equals(pluginMd.getClazz())) {
    logger.info("[{}] Plugin requires restart due to clazz change from {} to {}.", entityId, oldPluginMd.getClazz(),
        pluginMd.getClazz());
    requiresRestart = true;
  } else if (!oldPluginMd.getConfiguration().equals(pluginMd.getConfiguration())) {
    logger.info("[{}] Plugin requires restart due to configuration change from {} to {}.", entityId,
        oldPluginMd.getConfiguration(), pluginMd.getConfiguration());
    requiresRestart = true;
  }
  if (requiresRestart) {
    this.state = ComponentLifecycleState.SUSPENDED;
    if (pluginImpl != null) {
      pluginImpl.stop(trustedCtx);
    }
    start();
  }
}
项目:iotplatform    文件:DeviceActorMessageProcessor.java   
private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type,
    Optional<ServerAddress> server) {
  if (!rpcPendingMap.isEmpty()) {
    logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, rpcPendingMap.size(),
        sessionId);
    if (type == SessionType.SYNC) {
      logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
      rpcSubscriptions.remove(sessionId);
    }
  } else {
    logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
  }
  Set<UUID> sentOneWayIds = new HashSet<>();
  if (type == SessionType.ASYNC) {
    rpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
  } else {
    rpcPendingMap.entrySet().stream().findFirst()
        .ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
  }

  sentOneWayIds.forEach(rpcPendingMap::remove);
}
项目:iotplatform    文件:DeviceActorMessageProcessor.java   
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context,
    SessionId sessionId, Optional<ServerAddress> server, Set<UUID> sentOneWayIds) {
  return entry -> {
    ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
    ToDeviceRpcRequestBody body = request.getBody();
    if (request.isOneway()) {
      sentOneWayIds.add(request.getId());
      ToPluginRpcResponseDeviceMsg responsePluginMsg = toPluginRpcResponseMsg(entry.getValue().getMsg(),
          (String) null);
      context.parent().tell(responsePluginMsg, ActorRef.noSender());
    }
    ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(entry.getKey(), body.getMethod(), body.getParams());
    ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sessionId);
    sendMsgToSessionActor(response, server);
  };
}
项目:iotplatform    文件:DeviceActorMessageProcessor.java   
private void processSubscriptionCommands(ActorContext context, ToDeviceActorMsg msg) {
  SessionId sessionId = msg.getSessionId();
  SessionType sessionType = msg.getSessionType();
  FromDeviceMsg inMsg = msg.getPayload();
  if (inMsg.getMsgType() == MsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {
    logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
    attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
  } else if (inMsg.getMsgType() == MsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {
    logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
    attributeSubscriptions.remove(sessionId);
  } else if (inMsg.getMsgType() == MsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {
    logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
    rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
    sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());
  } else if (inMsg.getMsgType() == MsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {
    logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
    rpcSubscriptions.remove(sessionId);
  }
}
项目:iotplatform    文件:RuleManager.java   
public Optional<ActorRef> update(ActorContext context, RuleId ruleId, ComponentLifecycleEvent event) {
  RuleMetaData rule;
  if (event != ComponentLifecycleEvent.DELETED) {
    rule = systemContext.getRuleService().findRuleById(ruleId);
  } else {
    rule = ruleMap.keySet().stream().filter(r -> r.getId().equals(ruleId))
        .peek(r -> r.setState(ComponentLifecycleState.SUSPENDED)).findFirst().orElse(null);
    if (rule != null) {
      ruleMap.remove(rule);
      ruleActors.remove(ruleId);
    }
  }
  if (rule != null) {
    RuleActorMetaData actorMd = ruleMap.get(rule);
    if (actorMd == null) {
      ActorRef ref = getOrCreateRuleActor(context, rule.getId());
      actorMd = RuleActorMetaData.systemRule(rule.getId(), rule.getWeight(), ref);
      ruleMap.put(rule, actorMd);
    }
    refreshRuleChain();
    return Optional.of(actorMd.getActorRef());
  } else {
    log.warn("[{}] Can't process unknown rule!", ruleId);
    return Optional.empty();
  }
}
项目:iotplatform    文件:ASyncMsgProcessor.java   
@Override
public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) {
  try {
    if (msg.getMsgType() != MsgType.SESSION_CLOSE) {
      switch (msg.getMsgType()) {
      case STATUS_CODE_RESPONSE:
      case GET_ATTRIBUTES_RESPONSE:
        ResponseMsg responseMsg = (ResponseMsg) msg;
        if (responseMsg.getRequestId() >= 0) {
          logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg);
          pendingMap.remove(responseMsg.getRequestId());
        }
        break;
      }
      sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
    } else {
      sessionCtx.onMsg(org.iotp.analytics.ruleengine.common.msg.session.ctrl.SessionCloseMsg
          .onCredentialsRevoked(sessionCtx.getSessionId()));
    }
  } catch (SessionException e) {
    logger.warning("Failed to push session response msg", e);
  }
}
项目:iotplatform    文件:ASyncMsgProcessor.java   
@Override
public void processToAssetMsg(ActorContext context, ToDeviceMsg msg) {
  try {
    if (msg.getMsgType() != MsgType.SESSION_CLOSE) {
      switch (msg.getMsgType()) {
      case STATUS_CODE_RESPONSE:
      case GET_ATTRIBUTES_RESPONSE:
        ResponseMsg responseMsg = (ResponseMsg) msg;
        if (responseMsg.getRequestId() >= 0) {
          logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg);
          pendingMap4Asset.remove(responseMsg.getRequestId());
        }
        break;
      }
      sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
    } else {
      sessionCtx.onMsg(org.iotp.analytics.ruleengine.common.msg.session.ctrl.SessionCloseMsg
          .onCredentialsRevoked(sessionCtx.getSessionId()));
    }
  } catch (SessionException e) {
    logger.warning("Failed to push session response msg", e);
  }
}
项目:iotplatform    文件:RuleActorMessageProcessor.java   
private void pushToNextRule(ActorContext context, ChainProcessingContext ctx, RuleEngineError error) {
  if (error != null) {
    ctx = ctx.withError(error);
  }
  if (ctx.isFailure()) {
    logger.debug("[{}][{}] Forwarding processing chain to device actor due to failure.", ruleMd.getId(),
        ctx.getInMsg().getDeviceId());
    ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
  } else if (!ctx.hasNext()) {
    logger.debug("[{}][{}] Forwarding processing chain to device actor due to end of chain.", ruleMd.getId(),
        ctx.getInMsg().getDeviceId());
    ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
  } else {
    logger.debug("[{}][{}] Forwarding processing chain to next rule actor.", ruleMd.getId(),
        ctx.getInMsg().getDeviceId());
    ChainProcessingContext nextTask = ctx.getNext();
    nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self());
  }
}
项目:iotplatform    文件:RuleActorMessageProcessor.java   
@Override
public void onActivate(ActorContext context) throws Exception {
  logger.info("[{}] Going to process onActivate rule.", entityId);
  this.state = ComponentLifecycleState.ACTIVE;
  if (filters != null) {
    filters.forEach(RuleLifecycleComponent::resume);
    if (processor != null) {
      processor.resume();
    } else {
      initProcessor();
    }
    if (action != null) {
      action.resume();
    }
    logger.info("[{}] Rule resumed.", entityId);
  } else {
    start();
  }
}
项目:thingsboard    文件:PluginActorMessageProcessor.java   
@Override
public void onUpdate(ActorContext context) throws Exception {
    PluginMetaData oldPluginMd = pluginMd;
    pluginMd = systemContext.getPluginService().findPluginById(entityId);
    boolean requiresRestart = false;
    logger.info("[{}] Plugin configuration was updated from {} to {}.", entityId, oldPluginMd, pluginMd);
    if (!oldPluginMd.getClazz().equals(pluginMd.getClazz())) {
        logger.info("[{}] Plugin requires restart due to clazz change from {} to {}.",
                entityId, oldPluginMd.getClazz(), pluginMd.getClazz());
        requiresRestart = true;
    } else if (!oldPluginMd.getConfiguration().equals(pluginMd.getConfiguration())) {
        logger.info("[{}] Plugin requires restart due to configuration change from {} to {}.",
                entityId, oldPluginMd.getConfiguration(), pluginMd.getConfiguration());
        requiresRestart = true;
    }
    if (requiresRestart) {
        this.state = ComponentLifecycleState.SUSPENDED;
        if (pluginImpl != null) {
            pluginImpl.stop(trustedCtx);
        }
        start();
    }
}
项目:thingsboard    文件:DeviceActorMessageProcessor.java   
private void sendPendingRequests(ActorContext context, SessionId sessionId, SessionType type, Optional<ServerAddress> server) {
    if (!rpcPendingMap.isEmpty()) {
        logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, rpcPendingMap.size(), sessionId);
        if (type == SessionType.SYNC) {
            logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
            rpcSubscriptions.remove(sessionId);
        }
    } else {
        logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
    }
    Set<Integer> sentOneWayIds = new HashSet<>();
    if (type == SessionType.ASYNC) {
        rpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, server, sentOneWayIds));
    } else {
        rpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, server, sentOneWayIds));
    }

    sentOneWayIds.forEach(rpcPendingMap::remove);
}
项目:thingsboard    文件:DeviceActorMessageProcessor.java   
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) {
    return entry -> {
        ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
        ToDeviceRpcRequestBody body = request.getBody();
        if (request.isOneway()) {
            sentOneWayIds.add(entry.getKey());
            ToPluginRpcResponseDeviceMsg responsePluginMsg = toPluginRpcResponseMsg(entry.getValue().getMsg(), (String) null);
            context.parent().tell(responsePluginMsg, ActorRef.noSender());
        }
        ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
                entry.getKey(),
                body.getMethod(),
                body.getParams()
        );
        ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sessionId);
        sendMsgToSessionActor(response, server);
    };
}
项目:thingsboard    文件:DeviceActorMessageProcessor.java   
private void processSubscriptionCommands(ActorContext context, ToDeviceActorMsg msg) {
    SessionId sessionId = msg.getSessionId();
    SessionType sessionType = msg.getSessionType();
    FromDeviceMsg inMsg = msg.getPayload();
    if (inMsg.getMsgType() == MsgType.SUBSCRIBE_ATTRIBUTES_REQUEST) {
        logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
        attributeSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
    } else if (inMsg.getMsgType() == MsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST) {
        logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
        attributeSubscriptions.remove(sessionId);
    } else if (inMsg.getMsgType() == MsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST) {
        logger.debug("[{}] Registering rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
        rpcSubscriptions.put(sessionId, new SessionInfo(sessionType, msg.getServerAddress()));
        sendPendingRequests(context, sessionId, sessionType, msg.getServerAddress());
    } else if (inMsg.getMsgType() == MsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST) {
        logger.debug("[{}] Canceling rpc subscription for session [{}][{}]", deviceId, sessionId, sessionType);
        rpcSubscriptions.remove(sessionId);
    }
}
项目:thingsboard    文件:ASyncMsgProcessor.java   
@Override
public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) {
    try {
        if (msg.getMsgType() != MsgType.SESSION_CLOSE) {
            switch (msg.getMsgType()) {
                case STATUS_CODE_RESPONSE:
                case GET_ATTRIBUTES_RESPONSE:
                    ResponseMsg responseMsg = (ResponseMsg) msg;
                    if (responseMsg.getRequestId() >= 0) {
                        logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg);
                        pendingMap.remove(responseMsg.getRequestId());
                    }
                    break;
                default:
                    break;
            }
            sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
        } else {
            sessionCtx.onMsg(org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg.onCredentialsRevoked(sessionCtx.getSessionId()));
        }
    } catch (SessionException e) {
        logger.warning("Failed to push session response msg", e);
    }
}
项目:thingsboard    文件:ASyncMsgProcessor.java   
@Override
public void processClusterEvent(ActorContext context, ClusterEventMsg msg) {
    if (pendingMap.size() > 0 || subscribedToAttributeUpdates || subscribedToRpcCommands) {
        Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolveById(getDeviceId());
        if (!newTargetServer.equals(currentTargetServer)) {
            firstMsg = true;
            currentTargetServer = newTargetServer;
            pendingMap.values().forEach(v -> {
                forwardToAppActor(context, v, currentTargetServer);
                if (currentTargetServer.isPresent()) {
                    logger.debug("[{}] Forwarded msg to new server: {}", sessionId, currentTargetServer.get());
                } else {
                    logger.debug("[{}] Forwarded msg to local server.", sessionId);
                }
            });
            if (subscribedToAttributeUpdates) {
                toDeviceMsg(new AttributesSubscribeMsg()).ifPresent(m -> forwardToAppActor(context, m, currentTargetServer));
                logger.debug("[{}] Forwarded attributes subscription.", sessionId);
            }
            if (subscribedToRpcCommands) {
                toDeviceMsg(new RpcSubscribeMsg()).ifPresent(m -> forwardToAppActor(context, m, currentTargetServer));
                logger.debug("[{}] Forwarded rpc commands subscription.", sessionId);
            }
        }
    }
}
项目:thingsboard    文件:SyncMsgProcessor.java   
@Override
public void processClusterEvent(ActorContext context, ClusterEventMsg msg) {
    if (pendingResponse) {
        Optional<ServerAddress> newTargetServer = forwardToAppActorIfAdressChanged(context, pendingMsg, currentTargetServer);
        if (logger.isDebugEnabled()) {
            if (!newTargetServer.equals(currentTargetServer)) {
                if (newTargetServer.isPresent()) {
                    logger.debug("[{}] Forwarded msg to new server: {}", sessionId, newTargetServer.get());
                } else {
                    logger.debug("[{}] Forwarded msg to local server.", sessionId);
                }
            }
        }
        currentTargetServer = newTargetServer;
    }
}
项目:thingsboard    文件:RuleActorMessageProcessor.java   
private void pushToNextRule(ActorContext context, ChainProcessingContext ctx, RuleEngineError error) {
    if (error != null) {
        ctx = ctx.withError(error);
    }
    if (ctx.isFailure()) {
        logger.debug("[{}][{}] Forwarding processing chain to device actor due to failure.", ruleMd.getId(), ctx.getInMsg().getDeviceId());
        ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
    } else if (!ctx.hasNext()) {
        logger.debug("[{}][{}] Forwarding processing chain to device actor due to end of chain.", ruleMd.getId(), ctx.getInMsg().getDeviceId());
        ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
    } else {
        logger.debug("[{}][{}] Forwarding processing chain to next rule actor.", ruleMd.getId(), ctx.getInMsg().getDeviceId());
        ChainProcessingContext nextTask = ctx.getNext();
        nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self());
    }
}
项目:thingsboard    文件:RuleActorMessageProcessor.java   
@Override
public void onActivate(ActorContext context) throws Exception {
    logger.info("[{}] Going to process onActivate rule.", entityId);
    this.state = ComponentLifecycleState.ACTIVE;
    if (filters != null) {
        filters.forEach(RuleLifecycleComponent::resume);
        if (processor != null) {
            processor.resume();
        } else {
            initProcessor();
        }
        if (action != null) {
            action.resume();
        }
        logger.info("[{}] Rule resumed.", entityId);
    } else {
        start();
    }
}
项目:kaa    文件:LocalUserActorMessageProcessor.java   
void processEndpointEventDeliveryMessage(ActorContext context,
                                         EndpointEventDeliveryMessage message) {
  LOG.debug("[{}] processing event delivery message for [{}] with status {}",
      userId, message.getMessage().getAddress(),
      message.getStatus());
  boolean success = message.getStatus() == EventDeliveryStatus.SUCCESS;
  RouteTableAddress address = message.getMessage().getAddress();
  for (EndpointEvent event : message.getMessage().getEndpointEvents()) {
    if (success) {
      LOG.debug("[{}] registering successful delivery of event [{}] to address {}",
          userId, event.getId(), address);
      eventDeliveryTable.registerDeliverySuccess(event, address);
    } else {
      LOG.debug("[{}] registering failure to delivery of event [{}] to address {}",
          userId, event.getId(), address);
      eventDeliveryTable.registerDeliveryFailure(event, address);
    }
  }
}
项目:kaa    文件:LocalUserActorMessageProcessor.java   
void processRouteInfoMessage(ActorContext context, RouteInfoMessage message) {
  RouteInfo routeInfo = message.getRouteInfo();
  if (RouteOperation.DELETE.equals(routeInfo.getRouteOperation())) {
    LOG.debug("[{}] Removing all routes from route table by address {}",
        userId, routeInfo.getAddress());
    routeTable.removeByAddress(routeInfo.getAddress());
  } else {
    for (EventClassFamilyVersion ecfVersion : routeInfo.getEcfVersions()) {
      RouteTableKey key = new RouteTableKey(
          routeInfo.getAddress().getApplicationToken(), ecfVersion);
      LOG.debug("[{}] Updating route table with key {} and address {}",
          userId, key, routeInfo.getAddress());
      updateRouteTable(context, key, routeInfo.getAddress());
    }
  }
  reportAllLocalRoutes(routeInfo.getAddress().getServerId());
}
项目:kaa    文件:LocalUserActorMessageProcessor.java   
private void registerEndpointForEvents(ActorContext context,
                                       EndpointUserConnectMessage message,
                                       RouteTableAddress address) {
  List<EventClassFamilyVersion> ecfVersions = message.getEcfVersions();
  if (!ecfVersions.isEmpty()) {
    for (EventClassFamilyVersion ecfVersion : ecfVersions) {
      RouteTableKey key = new RouteTableKey(address.getApplicationToken(), ecfVersion);
      updateRouteTable(context, key, address);
    }
    if (firstConnectRequestToActor) {
      firstConnectRequestToActor = false;
      // report existence of this actor to other operation servers
      eventService.sendUserRouteInfo(new UserRouteInfo(tenantId, userId));
    }
    for (String serverId : routeTable.getRemoteServers()) {
      if (routeTable.isDeliveryRequired(serverId, address)) {
        LOG.debug("[{}] Sending route info about address {} to server {}",
            userId, address, serverId);
        eventService.sendRouteInfo(new RouteInfo(
            tenantId, userId, address, ecfVersions), serverId);
      }
    }
    versionMap.put(address.getEndpointKey(), message.getEcfVersions());
  }
}
项目:kaa    文件:LocalUserActorMessageProcessor.java   
protected void removeEndpoint(ActorContext context, EndpointObjectHash endpoint) {
  LOG.debug("[{}] removing endpoint [{}] from route tables", userId, endpoint);
  RouteTableAddress address = routeTable.removeLocal(endpoint);
  versionMap.remove(endpoint);
  for (String serverId : routeTable.getRemoteServers()) {
    LOG.debug("[{}] removing endpoint [{}] from remote route table on server {}",
        userId, endpoint, serverId);
    eventService.sendRouteInfo(
        RouteInfo.deleteRouteFromAddress(tenantId, userId, address), serverId);
  }
  // cleanup and notify global route actor
  GlobalRouteInfo route = GlobalRouteInfo.delete(tenantId, userId, address);
  if (eventService.isMainUserNode(userId)) {
    context.parent().tell(new EndpointRouteUpdateMessage(route), context.self());
  } else {
    LOG.debug("[{}] Sending disconnect message to global actor", userId);
    eventService.sendEndpointRouteInfo(route);
  }
}
项目:kaa    文件:GlobalUserActorMessageProcessor.java   
/**
 * Process message.
 *
 * @param context the actor context
 * @param route the global route info
 */
public void process(ActorContext context, GlobalRouteInfo route) {
  if (route.getRouteOperation() == RouteOperation.ADD) {
    LOG.debug("[{}][{}] Adding route {} for cf version {}",
        tenantId, userId, route, route.getCfVersion());
    ConfigurationKey key = ConfigurationKey.fromRouteInfo(route);
    map.add(key, route);
    checkHashAndSendNotification(context, key, route);
  } else if (route.getRouteOperation() == RouteOperation.DELETE) {
    LOG.debug("[{}][{}] Remove route {} for cf version {}",
        tenantId, userId, route, route.getCfVersion());
    map.remove(route);
  } else {
    LOG.warn("[{}][{}] unsupported route operations {}",
        tenantId, userId, route.getRouteOperation());
  }
}
项目:kaa    文件:GlobalUserActorMessageProcessor.java   
private void checkHashAndSendNotification(ActorContext context, byte[] newHash,
                                          GlobalRouteInfo route) {
  if (!Arrays.equals(newHash, route.getUcfHash())) {
    LOG.trace("Sending notification to route {}", route);
    if (route.isLocal()) {
      context.parent()
          .tell(
              new EndpointUserConfigurationUpdateMessage(toUpdate(newHash, route)),
              context.self());
    } else {
      eventService
          .sendEndpointStateInfo(route.getAddress().getServerId(), toUpdate(newHash, route));
    }
  } else {
    LOG.trace("Ignoring notification to route {} due to matching hashes", route);
  }
}
项目:kaa    文件:LocalEndpointActorMessageProcessor.java   
/**
 * Process an endpoint event receive message.
 *
 * @param context actor context
 * @param message endpoint event receive message
 */
public void processEndpointEventReceiveMessage(ActorContext context,
                                               EndpointEventReceiveMessage message) {
  EndpointEventDeliveryMessage response;
  if (state.isValidForEvents()) {
    Set<ChannelMetaData> eventChannels = state.getChannelsByType(TransportType.EVENT);
    if (!eventChannels.isEmpty()) {
      for (ChannelMetaData eventChannel : eventChannels) {
        addEventsAndReply(context, eventChannel, message);
      }
      response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.SUCCESS);
    } else {
      LOG.debug("[{}] Message ignored due to no channel contexts registered for events",
          actorKey, message);
      response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.FAILURE);
      state.setUserRegistrationPending(false);
    }
  } else {
    LOG.debug(
        "[{}][{}] Endpoint profile is not valid for receiving events. "
            + "Either no assigned user or no event families in sdk",
        endpointKey, actorKey);
    response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.FAILURE);
  }
  tellParent(context, response);
}
项目:kaa    文件:EndpointActorMessageProcessorTest.java   
@Test
public void actorTimeoutTest() {
  OperationsService osMock = Mockito.mock(OperationsService.class);
  ActorContext ctxMock = Mockito.mock(ActorContext.class);
  ActorRef appActorMock = Mockito.mock(ActorRef.class);
  Mockito.when(ctxMock.parent()).thenReturn(appActorMock);

  AkkaContext context = Mockito.mock(AkkaContext.class);
  Mockito.when(context.getOperationsService()).thenReturn(osMock);

  LocalEndpointActorMessageProcessor processor = Mockito.spy(new LocalEndpointActorMessageProcessor(context, "APP_TOKEN", EndpointObjectHash
      .fromSha1("key"), "actorKey"));
  ActorTimeoutMessage msg = new ActorTimeoutMessage(System.currentTimeMillis());

  Mockito.doNothing().when(processor).tellParent(Mockito.any(ActorContext.class), Mockito.any(Object.class));
  processor.processActorTimeoutMessage(ctxMock, msg);
  Mockito.verify(processor).tellParent(Mockito.any(ActorContext.class), Mockito.any(Object.class));
}
项目:kaa    文件:LocalEndpointActorMessageProcessor.java   
private void syncChannels(ActorContext context,
                          Set<ChannelMetaData> channels,
                          boolean cfUpdate, boolean nfUpdate) {
  for (ChannelMetaData channel : channels) {
    ClientSync originalRequest = channel.getRequestMessage().getRequest();
    ClientSync newRequest = new ClientSync();
    newRequest.setRequestId(originalRequest.getRequestId());
    newRequest.setClientSyncMetaData(originalRequest.getClientSyncMetaData());
    newRequest.setUseConfigurationRawSchema(originalRequest.isUseConfigurationRawSchema());
    if (cfUpdate && originalRequest.getConfigurationSync() != null) {
      newRequest.setForceConfigurationSync(true);
      newRequest.setConfigurationSync(originalRequest.getConfigurationSync());
    }
    if (nfUpdate && originalRequest.getNotificationSync() != null) {
      newRequest.setForceNotificationSync(true);
      newRequest.setNotificationSync(originalRequest.getNotificationSync());
    }
    LOG.debug("[{}][{}] Processing request {}", endpointKey, actorKey, newRequest);
    sync(context, new SyncRequestMessage(channel.getRequestMessage().getSession(), newRequest,
        channel.getRequestMessage().getCommand(), channel.getRequestMessage().getOriginator()));
  }
}
项目:kaa    文件:LocalEndpointActorMessageProcessor.java   
private void processUserAttachRequest(ActorContext context,
                                      ClientSync syncRequest,
                                      SyncContext responseHolder) {
  UserClientSync request = syncRequest.getUserSync();
  if (request != null && request.getUserAttachRequest() != null) {
    UserAttachRequest attachRequest = request.getUserAttachRequest();
    context.parent().tell(new UserVerificationRequestMessage(
        context.self(), attachRequest.getUserVerifierId(),
        attachRequest.getUserExternalId(), attachRequest.getUserAccessToken()), context.self());
    LOG.debug("[{}][{}] received and forwarded user attach request {}",
        endpointKey, actorKey, request.getUserAttachRequest());

    if (userAttachResponseMap.size() > 0) {
      Entry<UUID, UserVerificationResponseMessage> entryToSend = userAttachResponseMap.entrySet()
          .iterator()
          .next();
      updateResponseWithUserAttachResults(responseHolder.getResponse(), entryToSend.getValue());
      userAttachResponseMap.remove(entryToSend.getKey());
    }
  }
}
项目:kaa    文件:LocalEndpointActorMessageProcessor.java   
private void processEvents(ActorContext context,
                           ClientSync request,
                           SyncContext responseHolder) {
  if (request.getEventSync() != null) {
    EventClientSync eventRequest = request.getEventSync();
    processSeqNumber(eventRequest, responseHolder);
    if (state.isValidForEvents()) {
      sendEventsIfPresent(context, eventRequest);
    } else {
      LOG.debug(
          "[{}][{}] Endpoint profile is not valid for send/receive events. "
              + "Either no assigned user or no event families in sdk",
          endpointKey, actorKey);
    }
  }
}
项目:kaa    文件:LocalEndpointActorMessageProcessor.java   
private void updateUserConnection(ActorContext context) {
  if (!state.isValidForUser()) {
    return;
  }
  if (state.userIdMismatch()) {
    sendDisconnectFromOldUser(context, state.getProfile());
    state.setUserRegistrationPending(false);
  }
  if (!state.isUserRegistrationPending()) {
    state.setUserId(state.getProfileUserId());
    if (state.getUserId() != null) {
      sendConnectToNewUser(context, state.getProfile());
      state.setUserRegistrationPending(true);
    }
  } else {
    LOG.trace("[{}][{}] User registration request is already sent.", endpointKey, actorKey);
  }
}
项目:kaa    文件:LocalEndpointActorMessageProcessor.java   
private void addEventsAndReply(ActorContext context,
                               ChannelMetaData channel,
                               EndpointEventReceiveMessage message) {
  SyncRequestMessage pendingRequest = channel.getRequestMessage();
  SyncContext pendingResponse = channel.getResponseHolder();

  EventServerSync eventResponse = pendingResponse.getResponse().getEventSync();
  if (eventResponse == null) {
    eventResponse = new EventServerSync();
    pendingResponse.getResponse().setEventSync(eventResponse);
  }

  eventResponse.setEvents(message.getEvents());
  sendReply(context, pendingRequest, pendingResponse.getResponse());
  if (!channel.getType().isAsync()) {
    state.removeChannel(channel);
  }
}
项目:kaa    文件:LocalEndpointActorMessageProcessor.java   
/**
 * Process a ping message.
 *
 * @param context actor context
 * @param message channel aware message
 * @return        true if channel is found otherwise false
 */
public boolean processPingMessage(ActorContext context, ChannelAware message) {
  LOG.debug("[{}][{}] Received ping message for channel [{}]",
      endpointKey, actorKey, message.getChannelUuid());
  ChannelMetaData channel = state.getChannelById(message.getChannelUuid());
  if (channel != null) {
    long lastActivityTime = System.currentTimeMillis();
    LOG.debug("[{}][{}] Updating last activity time for channel [{}] to ",
        endpointKey, actorKey, message.getChannelUuid(),
        lastActivityTime);
    channel.setLastActivityTime(lastActivityTime);
    channel.getContext().writeAndFlush(new PingResponse());
    return true;
  } else {
    LOG.debug("[{}][{}] Can't find channel by uuid [{}]",
        endpointKey, actorKey, message.getChannelUuid());
    return false;
  }
}
项目:kaa    文件:LocalEndpointActorMessageProcessor.java   
/**
 * Process a log delivery message.
 *
 * @param context actor context
 * @param message log delivery message
 */
public void processLogDeliveryMessage(ActorContext context, LogDeliveryMessage message) {
  LOG.debug("[{}][{}] Received log delivery message for request [{}] with status {}",
      endpointKey, actorKey, message.getRequestId(),
      message.isSuccess());
  logUploadResponseMap.put(message.getRequestId(), message);
  Set<ChannelMetaData> channels = state.getChannelsByType(TransportType.LOGGING);
  for (ChannelMetaData channel : channels) {
    SyncRequestMessage pendingRequest = channel.getRequestMessage();
    ServerSync pendingResponse = channel.getResponseHolder().getResponse();

    pendingResponse.setLogSync(EntityConvertUtils.convert(logUploadResponseMap));

    LOG.debug("[{}][{}] sending reply to [{}] channel", endpointKey, actorKey, channel.getId());
    sendReply(context, pendingRequest, pendingResponse);
    if (!channel.getType().isAsync()) {
      state.removeChannel(channel);
    }
  }
  logUploadResponseMap.clear();
}
项目:kaa    文件:EndpointActorMessageProcessorTest.java   
@Test
public void processDisconnectMessageTest() {
  OperationsService osMock = Mockito.mock(OperationsService.class);
  ActorContext ctxMock = Mockito.mock(ActorContext.class);
  ActorRef appActorMock = Mockito.mock(ActorRef.class);
  Mockito.when(ctxMock.parent()).thenReturn(appActorMock);

  AkkaContext context = Mockito.mock(AkkaContext.class);
  Mockito.when(context.getOperationsService()).thenReturn(osMock);

  LocalEndpointActorMessageProcessor processor = Mockito.spy(new LocalEndpointActorMessageProcessor(context, "APP_TOKEN", EndpointObjectHash
      .fromSha1("key"), "actorKey"));
  ChannelAware msg = Mockito.mock(ChannelAware.class);

  Assert.assertFalse(processor.processDisconnectMessage(ctxMock, msg));
}
项目:kaa    文件:EncDecActorMessageProcessor.java   
private void processSessionInitRequest(ActorContext context, SessionInitMessage message)
    throws GeneralSecurityException, PlatformEncDecException,
    InvalidSdkTokenException, EndpointVerificationException {
  ClientSync request = decodeRequest(message);
  EndpointObjectHash key = getEndpointObjectHash(request);
  String sdkToken = getSdkToken(request);
  if (isSdkTokenValid(sdkToken)) {
    String appToken = getAppToken(sdkToken);
    verifyEndpoint(key, appToken);
    SessionInfo session = new SessionInfo(
        message.getChannelUuid(), message.getPlatformId(), message.getChannelContext(),
        message.getChannelType(), crypt.getSessionCipherPair(), key,
        appToken, sdkToken, message.getKeepAlive(),
        message.isEncrypted());
    message.onSessionCreated(session);
    forwardToOpsActor(context, session, request, message);
  } else {
    LOG.info("Invalid sdk token received: {}", sdkToken);
    throw new InvalidSdkTokenException();
  }
}
项目:kaa    文件:DefaultAkkaServiceTest.java   
@Test
public void testEndpointNotAttachedEvent() throws Exception {
  AkkaContext context = mock(AkkaContext.class);
  ActorContext actorCtxMock = mock(ActorContext.class);
  ActorRef actorMock = spy(ActorRef.class);

  ActorSystem system = ActorSystem.create();
  try {
    final Props props = Props.create(TestActor.class);
    final TestActorRef<TestActor> parentMock = TestActorRef.create(system, props, "testA");

    when(actorCtxMock.self()).thenReturn(actorMock);
    when(actorCtxMock.parent()).thenReturn(parentMock);
    EndpointEventReceiveMessage msg = mock(EndpointEventReceiveMessage.class);
    LocalEndpointActorMessageProcessor processor = spy(new LocalEndpointActorMessageProcessor(
        context, APP_TOKEN, EndpointObjectHash.fromSha1(clientPublicKeyHash.array()), "ACTOR_TOKEN"
    ));
    processor.processEndpointEventReceiveMessage(actorCtxMock, msg);
    Assert.assertEquals(
        EndpointEventDeliveryMessage.EventDeliveryStatus.FAILURE,
        ((EndpointEventDeliveryMessage) parentMock.underlyingActor().getMsg()).getStatus());

  } finally {
    JavaTestKit.shutdownActorSystem(system);
  }
}
项目:kaa    文件:UserActorMessageProcessorTest.java   
@Before
public void before() {
  cacheServiceMock = mock(CacheService.class);
  eventServiceMock = mock(EventService.class);
  originatorRefMock = mock(ActorRef.class);
  actorContextMock = mock(ActorContext.class);

  akkaContextMock = mock(AkkaContext.class);
  when(akkaContextMock.getCacheService()).thenReturn(cacheServiceMock);
  when(akkaContextMock.getEventService()).thenReturn(eventServiceMock);
  when(akkaContextMock.getEventTimeout()).thenReturn(60 * 1000L);

  messageProcessor = spy(new LocalUserActorMessageProcessor(akkaContextMock, USER_ID, TENANT_ID));
  doReturn("dummyPathName").when(messageProcessor).getActorPathName(any(ActorRef.class));
  Mockito.doNothing().when(messageProcessor).scheduleTimeoutMessage(Mockito.any(ActorContext.class), Mockito.any(EndpointEvent.class));
  Mockito.doNothing().when(messageProcessor).sendEventToLocal(Mockito.any(ActorContext.class), Mockito.any(EndpointEventReceiveMessage.class));
  ecfVersions = new ArrayList<>();
  ecfVersion1 = new EventClassFamilyVersion(ECF_ID1, ECF_ID1_VERSION);
  ecfVersion2 = new EventClassFamilyVersion(ECF_ID2, ECF_ID2_VERSION);
  ecfVersions.add(ecfVersion1);
  ecfVersions.add(ecfVersion2);

  address1 = new RouteTableAddress(endpoint1Key, APP_TOKEN);
  address2 = new RouteTableAddress(endpoint2Key, APP_TOKEN, SERVER2);
  address3 = new RouteTableAddress(endpoint3Key, APP_TOKEN);
}
项目:kaa    文件:UserActorMessageProcessorTest.java   
@Test
public void testEndpointLocalEvent() {
  EndpointUserConnectMessage message1 = new EndpointUserConnectMessage(USER_ID, endpoint1Key, ecfVersions, 1, null, APP_TOKEN, originatorRefMock);
  messageProcessor.processEndpointConnectMessage(actorContextMock, message1);

  EndpointUserConnectMessage message2 = new EndpointUserConnectMessage(USER_ID, endpoint2Key, ecfVersions, 1, null, APP_TOKEN, originatorRefMock);
  messageProcessor.processEndpointConnectMessage(actorContextMock, message2);

  verify(eventServiceMock).sendUserRouteInfo(new UserRouteInfo(TENANT_ID, USER_ID));
  verify(eventServiceMock, Mockito.times(0)).sendRouteInfo(any(RouteInfo.class), any(String.class));

  when(cacheServiceMock.getEventClassFamilyIdByEventClassFqn(new EventClassFqnKey(TENANT_ID, "testClassFqn"))).thenReturn(ECF_ID1);
  RouteTableKey routeKey = new RouteTableKey(APP_TOKEN, ecfVersion1);
  when(cacheServiceMock.getRouteKeys(new EventClassFqnVersion(TENANT_ID, "testClassFqn", ECF_ID1_VERSION))).thenReturn(Collections.singleton(routeKey));

  Event event = new Event(0, "testClassFqn", ByteBuffer.wrap(new byte[0]), null, Base64Util.encode(endpoint1Key.getData()));
  EndpointEventSendMessage eventMessage = new EndpointEventSendMessage(USER_ID, Collections.singletonList(event), endpoint2Key, APP_TOKEN, originatorRefMock);
  messageProcessor.processEndpointEventSendMessage(actorContextMock, eventMessage);

  verify(messageProcessor).sendEventToLocal(Mockito.any(ActorContext.class), Mockito.any(EndpointEventReceiveMessage.class));
}
项目:kaa    文件:UserActorMessageProcessorTest.java   
@Test
public void testEndpointRemoteReceiveEvent() {
  EndpointUserConnectMessage message1 = new EndpointUserConnectMessage(USER_ID, endpoint1Key, ecfVersions, 1, null, APP_TOKEN, originatorRefMock);
  messageProcessor.processEndpointConnectMessage(actorContextMock, message1);

  RouteTableKey routeKey = new RouteTableKey(APP_TOKEN, ecfVersion1);
  when(cacheServiceMock.getRouteKeys(new EventClassFqnVersion(TENANT_ID, "testClassFqn", ECF_ID1_VERSION))).thenReturn(Collections.singleton(routeKey));

  Event event = new Event(0, "testClassFqn", ByteBuffer.wrap(new byte[0]), null, Base64Util.encode(endpoint1Key.getData()));
  EndpointEvent endpointEvent = new EndpointEvent(endpoint2Key, event, UUID.randomUUID(), System.currentTimeMillis(), ECF_ID1_VERSION);
  RemoteEndpointEvent remoteEvent = new RemoteEndpointEvent(TENANT_ID, USER_ID, endpointEvent, new RouteTableAddress(endpoint1Key, APP_TOKEN, "SERVER1"));
  RemoteEndpointEventMessage message2 = new RemoteEndpointEventMessage(remoteEvent);
  messageProcessor.processRemoteEndpointEventMessage(actorContextMock, message2);

  verify(cacheServiceMock, Mockito.never()).getEventClassFamilyIdByEventClassFqn(new EventClassFqnKey(TENANT_ID, "testClassFqn"));

  verify(messageProcessor).sendEventToLocal(Mockito.any(ActorContext.class), Mockito.any(EndpointEventReceiveMessage.class));
}