@Override public void run(String... strings) throws Exception { client = CuratorFrameworkFactory.newClient(zookeeperConnString, new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)); client.start(); client.getZookeeperClient().blockUntilConnectedOrTimedOut(); leaderLatch = new LeaderLatch(client, "/http-job-scheduler/leader", ManagementFactory.getRuntimeMXBean().getName()); leaderLatch.addListener(new LeaderLatchListener() { @Override public void isLeader() { setMaster(true); masterJobScheduler.resume(); } @Override public void notLeader() { setMaster(false); masterJobScheduler.pause(); } }); leaderLatch.start(); }
/** * Participates for leader lock with the given configuration. * * @throws Exception if any errors encountered. */ @Override public void participateForLeadership() throws Exception { // if the existing leader latch is closed, recreate and connect again if (LeaderLatch.State.CLOSED.equals(leaderLatchRef.get().getState())) { // remove listener from earlier closed leader latch leaderLatchRef.get().removeListener(leaderLatchListener); leaderLatchRef.set(createLeaderLatch()); leaderLatchRef.get().addListener(leaderLatchListener); LOG.info("Existing leader latch is in CLOSED state, it is recreated."); } // if the existing leader latch is not yet started, start now!! if (LeaderLatch.State.LATENT.equals(leaderLatchRef.get().getState())) { leaderLatchRef.get().start(); LOG.info("Existing leader latch is in LATENT state, it is started. leader latch: [{}]", leaderLatchRef.get()); } }
/** * 异步选主,只能启动一次 * * @param listener 选主监听器 */ public void startLeaderElect(final LeaderLatchListener listener) { executor.submit(new Runnable() { @Override public void run() { boolean errFlag = true; leaderLatch = new LeaderLatch((CuratorFramework) registryCenter.getRawClient(), ConsoleNode.LATCH); leaderLatch.addListener(listener); do { try { leaderLatch.start(); leaderLatch.await(); } catch (Exception e) { log.error("Failed to elect a Leader! will retry", e); errFlag = false; } } while (!errFlag); } }); }
@Override public void elect() throws Exception { zkClient.createContainers(ctx.getLeaderElectionZKPath()); latch = new LeaderLatch(zkClient, ctx.getLeaderElectionZKPath(), ctx.getLeaderElectionID()); latch.addListener(new LeaderLatchListener() { @Override public void notLeader() { } @Override public void isLeader() { } }); latch.start(); logger.info("[elect]{}", ctx); }
@Test public void testLeaderElectionIsJoinedOnStart() throws Exception { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); LeaderLatch leaderLatch = mock(LeaderLatch.class); when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch); ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, activeInstanceState, serviceState); activeInstanceElectorService.start(); verify(leaderLatch).start(); }
@Test public void testListenerIsAddedForActiveInstanceCallbacks() throws Exception { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); LeaderLatch leaderLatch = mock(LeaderLatch.class); when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch); ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, activeInstanceState, serviceState); activeInstanceElectorService.start(); verify(leaderLatch).addListener(activeInstanceElectorService); }
@Test public void testLeaderElectionIsLeftOnStop() throws IOException, AtlasException { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); LeaderLatch leaderLatch = mock(LeaderLatch.class); when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch); ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, activeInstanceState, serviceState); activeInstanceElectorService.start(); activeInstanceElectorService.stop(); verify(leaderLatch).close(); }
@Test public void testCuratorFactoryIsClosedOnStop() throws AtlasException { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); LeaderLatch leaderLatch = mock(LeaderLatch.class); when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch); ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, activeInstanceState, serviceState); activeInstanceElectorService.start(); activeInstanceElectorService.stop(); verify(curatorFactory).close(); }
@Test public void testSharedStateIsUpdatedWhenInstanceIsActive() throws Exception { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); LeaderLatch leaderLatch = mock(LeaderLatch.class); when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch); ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, activeInstanceState, serviceState); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); verify(activeInstanceState).update("id1"); }
/** * @param client the client * @param path path to reap children from * @param executor executor to use for background tasks * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted * @param mode reaping mode * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster * @param lockSchema a set of the possible subnodes of the children of path that must be reaped in addition to the child nodes */ public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath, Set<String> lockSchema) { this.client = client; this.mode = mode; this.executor = new CloseableScheduledExecutorService(executor); this.reapingThresholdMs = reapingThresholdMs; if (leaderPath != null) { leaderLatch = new LeaderLatch(client, leaderPath); } else { leaderLatch = null; } this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch); this.lockSchema = lockSchema; addPath(path); }
private void addListenerToLeaderLatch(LeaderLatch leaderLatch) { LeaderLatchListener listener = new LeaderLatchListener() { @Override public void isLeader() { reapingIsActive.set(true); for ( PathHolder holder : activePaths.values() ) { schedule(holder, reapingThresholdMs); } } @Override public void notLeader() { reapingIsActive.set(false); } }; leaderLatch.addListener(listener); reapingIsActive.set(leaderLatch.hasLeadership()); }
@Override public void releaseControl(long dpid) { log.info("Releasing control for {}", HexString.toHexString(dpid)); String dpidStr = HexString.toHexString(dpid); SwitchLeadershipData swData = switches.remove(dpidStr); if (swData == null) { log.debug("Trying to release control of a switch we are not contesting"); return; } LeaderLatch latch = swData.getLatch(); latch.removeListener(swData.getListener()); try { latch.close(); } catch (IOException e) { // I think it's OK not to do anything here. Either the node got // deleted correctly, or the connection went down and the node got deleted. log.debug("releaseControl: caught IOException {}", dpidStr); } }
@Inject public TaskResource(TaskRequestManager taskRequestManager, TaskManager taskManager, SlaveManager slaveManager, MesosClient mesosClient, SingularityTaskMetadataConfiguration taskMetadataConfiguration, SingularityAuthorizationHelper authorizationHelper, RequestManager requestManager, SingularityValidator validator, DisasterManager disasterManager, AsyncHttpClient httpClient, LeaderLatch leaderLatch, ObjectMapper objectMapper, RequestHelper requestHelper) { super(httpClient, leaderLatch, objectMapper); this.taskManager = taskManager; this.taskRequestManager = taskRequestManager; this.taskMetadataConfiguration = taskMetadataConfiguration; this.slaveManager = slaveManager; this.mesosClient = mesosClient; this.requestManager = requestManager; this.authorizationHelper = authorizationHelper; this.validator = validator; this.disasterManager = disasterManager; this.requestHelper = requestHelper; }
@Inject void injectPollerDependencies(SingularityManagedScheduledExecutorServiceFactory executorServiceFactory, LeaderLatch leaderLatch, SingularityExceptionNotifier exceptionNotifier, SingularityAbort abort, SingularityMesosScheduler mesosScheduler, SingularityConfiguration configuration, @Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDelta30sAverage) { this.executorService = executorServiceFactory.get(getClass().getSimpleName()); this.leaderLatch = checkNotNull(leaderLatch, "leaderLatch is null"); this.exceptionNotifier = checkNotNull(exceptionNotifier, "exceptionNotifier is null"); this.abort = checkNotNull(abort, "abort is null"); this.mesosScheduler = checkNotNull(mesosScheduler, "mesosScheduler is null"); this.delayPollersWhenDeltaOverMs = configuration.getDelayPollersWhenDeltaOverMs(); this.statusUpdateDelta30sAverage = checkNotNull(statusUpdateDelta30sAverage, "statusUpdateDeltaAverage is null"); }
@Inject public StatusResource(LocalLbAdapter adapter, LoadBalancerConfiguration loadBalancerConfiguration, BaragonAgentMetadata agentMetadata, AtomicReference<BaragonAgentState> agentState, @Named(BaragonAgentServiceModule.AGENT_LEADER_LATCH) LeaderLatch leaderLatch, @Named(BaragonAgentServiceModule.AGENT_MOST_RECENT_REQUEST_ID) AtomicReference<String> mostRecentRequestId, @Named(BaragonDataModule.BARAGON_ZK_CONNECTION_STATE) AtomicReference<ConnectionState> connectionState, @Named(BaragonAgentServiceModule.CONFIG_ERROR_MESSAGE) AtomicReference<Optional<String>> errorMessage) { this.adapter = adapter; this.loadBalancerConfiguration = loadBalancerConfiguration; this.leaderLatch = leaderLatch; this.mostRecentRequestId = mostRecentRequestId; this.connectionState = connectionState; this.agentMetadata = agentMetadata; this.errorMessage = errorMessage; this.agentState = agentState; }
@Inject public BootstrapManaged(BaragonKnownAgentsDatastore knownAgentsDatastore, BaragonLoadBalancerDatastore loadBalancerDatastore, BaragonAgentConfiguration configuration, AgentHeartbeatWorker agentHeartbeatWorker, BaragonAgentMetadata baragonAgentMetadata, LifecycleHelper lifecycleHelper, CuratorFramework curatorFramework, ResyncListener resyncListener, ConfigChecker configChecker, AtomicReference<BaragonAgentState> agentState, @Named(BaragonAgentServiceModule.AGENT_SCHEDULED_EXECUTOR) ScheduledExecutorService executorService, @Named(BaragonAgentServiceModule.AGENT_LEADER_LATCH) LeaderLatch leaderLatch) { this.configuration = configuration; this.leaderLatch = leaderLatch; this.curatorFramework = curatorFramework; this.resyncListener = resyncListener; this.knownAgentsDatastore = knownAgentsDatastore; this.loadBalancerDatastore = loadBalancerDatastore; this.baragonAgentMetadata = baragonAgentMetadata; this.executorService = executorService; this.agentHeartbeatWorker = agentHeartbeatWorker; this.lifecycleHelper = lifecycleHelper; this.configChecker = configChecker; this.agentState = agentState; }
@Inject public StatusManager(BaragonRequestDatastore requestDatastore, ObjectMapper objectMapper, @Named(BaragonDataModule.BARAGON_SERVICE_LEADER_LATCH) LeaderLatch leaderLatch, @Named(BaragonDataModule.BARAGON_SERVICE_WORKER_LAST_START) AtomicLong workerLastStart, @Named(BaragonDataModule.BARAGON_ELB_WORKER_LAST_START) AtomicLong elbWorkerLastStart, @Named(BaragonDataModule.BARAGON_ZK_CONNECTION_STATE) AtomicReference<ConnectionState> connectionState, @Named(BaragonServiceModule.BARAGON_SERVICE_HTTP_CLIENT)AsyncHttpClient httpClient) { this.requestDatastore = requestDatastore; this.leaderLatch = leaderLatch; this.workerLastStart = workerLastStart; this.elbWorkerLastStart = elbWorkerLastStart; this.connectionState = connectionState; this.httpClient = httpClient; this.objectMapper = objectMapper; }
private void init(){ CuratorFramework client = CuratorFrameworkFactory.newClient(zooKeeperUrl, new RetryForever(1000)); leaderLatch = new LeaderLatch(client, "/EasyTransMasterSelector"+"/" + applicationName); try { client.start(); leaderLatch.start(); } catch (Exception e) { throw new RuntimeException(e); } }
@SuppressWarnings("resource") @Override public void process() { try { CuratorFramework _curator = CuratorContainer.getInstance().getCurator(); HeartBeatVo conf = HeartBeatConfigContainer.getInstance().getHbConf(); CuratorContainer.getInstance().createZkNode(conf.getLeaderPath()); // 获取进程ID和服务器hostName final BrokerInfoVo brokerInfo = new BrokerInfoVo(); String name = ManagementFactory.getRuntimeMXBean().getName(); String[] pidAndHostName = StringUtils.split(name, "@"); brokerInfo.setPid(pidAndHostName[0]); brokerInfo.setHostName(pidAndHostName[1]); brokerInfo.setIp(InetAddress.getLocalHost().getHostAddress()); LeaderLatch ll = new LeaderLatch(_curator, conf.getLeaderPath(), JsonUtil.toJson(brokerInfo)); ll.addListener(new LeaderLatchListener() { @Override public void notLeader() { LoggerFactory.getLogger().error("本机现在切换到非leader状态,准备终止当前进程PID:{}", brokerInfo.getPid()); System.exit(-1); } @Override public void isLeader() { LoggerFactory.getLogger().info("本机现在切换到leader状态."); } }); ll.start(); ll.await(); // 注册控制事件 CuratorContainer.getInstance().createZkNode(conf.getControlPath()); _curator.getData().usingWatcher(WatcherType.CONTROL).forPath(conf.getControlPath()); } catch (Exception e) { throw new RuntimeException("选举leader时发生错误!", e); } }
public static ZkScheduledJobOwnershipDecider buildDecider(String identifier) throws Exception { ZkScheduledJobOwnershipDecider decider = new ZkScheduledJobOwnershipDecider( String.format("%s/job/%s/owner", StringUtils.stripEnd(Configuration.getProperties().getString("zk_path"), "/"), identifier)); decider.latch = new LeaderLatch(ZkClient.getClient(), decider.zkPath, nodeIdentifier, LeaderLatch.CloseMode.NOTIFY_LEADER); decider.latch.start(); return decider; }
@Inject void injectPollerDependencies(SingularityManagedScheduledExecutorServiceFactory executorServiceFactory, LeaderLatch leaderLatch, SingularityExceptionNotifier exceptionNotifier, SingularityAbort abort, SingularityMesosSchedulerDelegator mesosScheduler) { this.executorService = executorServiceFactory.get(getClass().getSimpleName()); this.leaderLatch = checkNotNull(leaderLatch, "leaderLatch is null"); this.exceptionNotifier = checkNotNull(exceptionNotifier, "exceptionNotifier is null"); this.abort = checkNotNull(abort, "abort is null"); this.mesosScheduler = checkNotNull(mesosScheduler, "mesosScheduler is null"); }
/** * Creates a {@code SingletonService} backed by Curator. * * @param client A client to interact with a ZooKeeper ensemble. * @param groupPath The root ZooKeeper path service members advertise their presence under. * @param memberToken A token used to form service member node names. * @param codec A codec that can be used to deserialize group member {@link ServiceInstance} data. */ CuratorSingletonService( CuratorFramework client, String groupPath, String memberToken, Codec<ServiceInstance> codec) { leaderLatch = new LeaderLatch(client, groupPath); advertiser = new Advertiser(client, groupPath, memberToken, codec); this.groupPath = PathUtils.validatePath(groupPath); }
public ClientLeaderLatch(String name, CuratorFramework client, String path) throws Exception { this.name=name; //leaderSelector 多个客户端监听同一个节点 this.leaderLatch=new LeaderLatch(client, path, name); //启动 leaderLatch.start(); // 可以添加多个Listener,告知外界 leaderLatch.addListener(getLatchListener(), executor); }
/** * 在主节点执行操作. * * @param latchNode 分布式锁使用的作业节点名称 * @param callback 执行操作的回调 */ public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { latch.start(); latch.await(); callback.execute(); //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON handleException(ex); } }
/** * 阻塞式的初始化和Zookeeper的连接,这个方法在一个容器生命周期中只允许允许一次。 * <p/> * 和zookeeper之间的连接遵循每实例一个ZkClientProvider的方式,这样当实例内部有多个同步线程的时候, * 可以共享一个ZkClientProvider,状态都是一致的,避免有些线程是leader,有些是standby的情况。 * <p/> * 改方法是全异步的,如果zookeeper连接不上,也会返回。但是一般上层应用拿不到leader latch,不会成为leader。 * 而且<code>apache.zookeeper.ClientCnxn</code>包的日志会打印如下: * <pre> * Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket * connection and attempting reconnect * </pre> * 参数<code>name</code>标示同步线程的名称,默认的话只有一个可以初始化好和zk的连接,其他的都不再重复初始化了。 */ @Override public void init(String name) { if (!state.compareAndSet(State.LATENT, State.STARTED)) { logger.debug("ZkHaGuard can only be initialized once because LeaderLatch should be singleton"); return; } if (zkClientProvider == null) { throw new IllegalStateException("ZkClientProvider should not be null"); } logger.info("LeaderLatch will start soon by " + name); zkClientProvider.init(); leaderLatch = new LeaderLatch(zkClientProvider.provideClient(), latchPath, NetUtils.getLocalHostIP()); leaderLatch.addListener(new LeaderLatchListener() { @Override public void isLeader() { logger.info("This instance is leader"); } @Override public void notLeader() { logger.warn("This instance is NOT leader"); } }); try { leaderLatch.start(); } catch (Exception e) { throw new RuntimeException("Leader latch init failed", e); } logger.info("LeaderLatch starts by " + name + " asynchronously"); }
/** * Creates a ZooKeeperLeaderElectionService object. * * @param client Client which is connected to the ZooKeeper quorum * @param latchPath ZooKeeper node path for the leader election latch * @param leaderPath ZooKeeper node path for the node which stores the current leader information */ public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) { this.client = Preconditions.checkNotNull(client, "CuratorFramework client"); this.leaderPath = Preconditions.checkNotNull(leaderPath, "leaderPath"); leaderLatch = new LeaderLatch(client, latchPath); cache = new NodeCache(client, leaderPath); issuedLeaderSessionID = null; confirmedLeaderSessionID = null; leaderContender = null; running = false; }
@Test public void testRegisteredHandlersAreNotifiedWhenInstanceIsActive() throws AtlasException { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); LeaderLatch leaderLatch = mock(LeaderLatch.class); when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch); Set<ActiveStateChangeHandler> changeHandlers = new HashSet<>(); final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class); final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class); changeHandlers.add(handler1); changeHandlers.add(handler2); ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, activeInstanceState, serviceState); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); verify(handler1).instanceIsActive(); verify(handler2).instanceIsActive(); }
@Test public void testRegisteredHandlersAreNotifiedOfPassiveWhenStateUpdateFails() throws Exception { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); LeaderLatch leaderLatch = mock(LeaderLatch.class); when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch); Set<ActiveStateChangeHandler> changeHandlers = new HashSet<>(); final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class); final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class); changeHandlers.add(handler1); changeHandlers.add(handler2); doThrow(new AtlasBaseException()).when(activeInstanceState).update("id1"); ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, activeInstanceState, serviceState); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); verify(handler1).instanceIsPassive(); verify(handler2).instanceIsPassive(); }
@Test public void testElectionIsRejoinedWhenStateUpdateFails() throws Exception { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); LeaderLatch leaderLatch = mock(LeaderLatch.class); when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch); doThrow(new AtlasBaseException()).when(activeInstanceState).update("id1"); ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, activeInstanceState, serviceState); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); InOrder inOrder = inOrder(leaderLatch, curatorFactory); inOrder.verify(leaderLatch).close(); inOrder.verify(curatorFactory).leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); inOrder.verify(leaderLatch).addListener(activeInstanceElectorService); inOrder.verify(leaderLatch).start(); }
@Test public void testRegisteredHandlersAreNotifiedOfPassiveWhenInstanceIsPassive() throws AtlasException { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); LeaderLatch leaderLatch = mock(LeaderLatch.class); when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch); Set<ActiveStateChangeHandler> changeHandlers = new HashSet<>(); final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class); final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class); changeHandlers.add(handler1); changeHandlers.add(handler2); ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, activeInstanceState, serviceState); activeInstanceElectorService.start(); activeInstanceElectorService.notLeader(); verify(handler1).instanceIsPassive(); verify(handler2).instanceIsPassive(); }
@Test public void testPassiveStateSetIfActivationFails() throws Exception { when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"}); when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); when(configuration.getString( HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); LeaderLatch leaderLatch = mock(LeaderLatch.class); when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch); doThrow(new AtlasBaseException()).when(activeInstanceState).update("id1"); ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, activeInstanceState, serviceState); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); InOrder inOrder = inOrder(serviceState); inOrder.verify(serviceState).becomingActive(); inOrder.verify(serviceState).becomingPassive(); inOrder.verify(serviceState).setPassive(); }
@Inject ScheduledLoggingLeaderService(final LeaderLatch latch, final CountDownLatch firstRunLatch, final AtomicLong counter) { super(latch); this.counter = counter; this.firstRunLatch = firstRunLatch; }
/** * @param client client * @param queueAllocator allocator for new queues * @param queuePath path for the queues * @param leaderPath path for the leader that monitors queue sizes (must be different than queuePath) * @param policies sharding policies */ public QueueSharder(CuratorFramework client, QueueAllocator<U, T> queueAllocator, String queuePath, String leaderPath, QueueSharderPolicies policies) { this.client = client; this.queueAllocator = queueAllocator; this.queuePath = queuePath; this.policies = policies; leaderLatch = new LeaderLatch(client, leaderPath); service = Executors.newSingleThreadExecutor(policies.getThreadFactory()); }
/** * @param client client * @param executor thread pool * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted * @param leaderLatch a pre-created leader latch to ensure only 1 reaper is active in the cluster * @param ownsLeaderLatch indicates whether or not the reaper owns the leader latch (if it exists) and thus should start/stop it * */ private Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch, boolean ownsLeaderLatch) { this.client = client; this.executor = new CloseableScheduledExecutorService(executor); this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD; this.leaderLatch = leaderLatch; if (leaderLatch != null) { addListenerToLeaderLatch(leaderLatch); } this.ownsLeaderLatch = ownsLeaderLatch; }
private static LeaderLatch makeLeaderLatchIfPathNotNull(CuratorFramework client, String leaderPath) { if (leaderPath == null) { return null; } else { return new LeaderLatch(client, leaderPath); } }
public void start() throws Exception { client.start(); client.getZookeeperClient().blockUntilConnectedOrTimedOut(); leaderLatch = new LeaderLatch(client, latchpath, id); ClusterSyncManagerLeaderListener listener = new ClusterSyncManagerLeaderListener(failOverTask, getNode()); leaderLatch.addListener(listener); leaderLatch.start(); }
/** * Test if {@link ZookeeperRegistry#requestControl(long, IControllerRegistryService.ControlChangeCallback)} * correctly take control of specific switch. * Because {@link ZookeeperRegistry#requestControl(long, IControllerRegistryService.ControlChangeCallback)} * doesn't return values, inject mock {@link LeaderLatch} object and verify latch is correctly set up. * * @throws Exception */ @Test public void testRequestControl() throws Exception { // Mock LeaderLatch LeaderLatch latch = createMock(LeaderLatch.class); latch.addListener(anyObject(SwitchLeaderListener.class)); expectLastCall().once(); latch.start(); expectLastCall().once(); replay(latch); PowerMock.expectNew(LeaderLatch.class, anyObject(CuratorFramework.class), anyObject(String.class), anyObject(String.class)) .andReturn(latch).once(); PowerMock.replay(LeaderLatch.class); String controllerId = "controller2013"; registry.registerController(controllerId); LoggingCallback callback = new LoggingCallback(1); long dpidToRequest = 2000L; try { registry.requestControl(dpidToRequest, callback); } catch (RegistryException e) { e.printStackTrace(); fail(e.getMessage()); } verify(latch); }
/** * Test if {@link ZookeeperRegistry#releaseControl(long)} correctly release control of specific switch. * Because {@link ZookeeperRegistry#releaseControl(long)} doesn't return values, inject mock * {@link LeaderLatch} object and verify latch is correctly set up. * * @throws Exception */ @Test public void testReleaseControl() throws Exception { // Mock of LeaderLatch LeaderLatch latch = createMock(LeaderLatch.class); latch.addListener(anyObject(SwitchLeaderListener.class)); expectLastCall().once(); latch.start(); expectLastCall().once(); latch.removeListener(anyObject(SwitchLeaderListener.class)); expectLastCall().once(); latch.close(); expectLastCall().once(); replay(latch); PowerMock.expectNew(LeaderLatch.class, anyObject(CuratorFramework.class), anyObject(String.class), anyObject(String.class)) .andReturn(latch).once(); PowerMock.replay(LeaderLatch.class); String controllerId = "controller2013"; registry.registerController(controllerId); long dpidToRequest = 2000L; LoggingCallback callback = new LoggingCallback(1); registry.requestControl(dpidToRequest, callback); registry.releaseControl(dpidToRequest); verify(latch); }
@Override protected void run() throws InterruptedException { // Beware race conditions: closeLeaderLatch() may be called by another thread at any time. while (isRunning()) { try { // Start attempting to acquire leadership via the Curator leadership latch. LOG.debug("Attempting to acquire leadership: {}", getId()); LeaderLatch latch = startLeaderLatch(); // Wait until (a) leadership is acquired or (b) the latch is closed by service shutdown or ZK cxn loss. if (isRunning()) { try { latch.await(); } catch (EOFException e) { // Latch was closed while we were waiting. checkState(!latch.hasLeadership()); } } // If we succeeded in acquiring leadership, start/run the leadership-managed delegate service. if (isRunning() && latch.hasLeadership()) { LOG.debug("Leadership acquired: {}", getId()); runAsLeader(); LOG.debug("Leadership released: {}", getId()); } } finally { closeLeaderLatch(); } if (isRunning()) { // If we lost or relinquished leadership, wait a while for things to settle before trying to // re-acquire leadership (eg. wait for a network hiccup to the ZooKeeper server to resolve). sleep(_reacquireDelayNanos); } } }