Java 类org.apache.curator.framework.recipes.leader.LeaderLatch 实例源码

项目:simpleJobScheduler    文件:Application.java   
@Override
public void run(String... strings) throws Exception {
    client = CuratorFrameworkFactory.newClient(zookeeperConnString,
            new ExponentialBackoffRetry(1000, Integer.MAX_VALUE));
    client.start();
    client.getZookeeperClient().blockUntilConnectedOrTimedOut();

    leaderLatch = new LeaderLatch(client, "/http-job-scheduler/leader",
            ManagementFactory.getRuntimeMXBean().getName());

    leaderLatch.addListener(new LeaderLatchListener() {
        @Override
        public void isLeader() {
            setMaster(true);
            masterJobScheduler.resume();
        }

        @Override
        public void notLeader() {
            setMaster(false);
            masterJobScheduler.pause();
        }
    });
    leaderLatch.start();
}
项目:registry    文件:ZKLeadershipParticipant.java   
/**
 * Participates for leader lock with the given configuration.
 *
 * @throws Exception if any errors encountered.
 */
@Override
public void participateForLeadership() throws Exception {
    // if the existing leader latch is closed, recreate and connect again
    if (LeaderLatch.State.CLOSED.equals(leaderLatchRef.get().getState())) {
        // remove listener from earlier closed leader latch
        leaderLatchRef.get().removeListener(leaderLatchListener);

        leaderLatchRef.set(createLeaderLatch());
        leaderLatchRef.get().addListener(leaderLatchListener);
        LOG.info("Existing leader latch is in CLOSED state, it is recreated.");
    }

    // if the existing leader latch is not yet started, start now!!
    if (LeaderLatch.State.LATENT.equals(leaderLatchRef.get().getState())) {
        leaderLatchRef.get().start();
        LOG.info("Existing leader latch is in LATENT state, it is started. leader latch: [{}]", leaderLatchRef.get());
    }
}
项目:elastic-jobx    文件:ConsoleRegistryCenter.java   
/**
 * 异步选主,只能启动一次
 *
 * @param listener 选主监听器
 */
public void startLeaderElect(final LeaderLatchListener listener) {
    executor.submit(new Runnable() {
        @Override
        public void run() {
            boolean errFlag = true;
            leaderLatch = new LeaderLatch((CuratorFramework) registryCenter.getRawClient(), ConsoleNode.LATCH);
            leaderLatch.addListener(listener);
            do {
                try {
                    leaderLatch.start();
                    leaderLatch.await();
                } catch (Exception e) {
                    log.error("Failed to elect a Leader! will retry", e);
                    errFlag = false;
                }
            } while (!errFlag);
        }
    });
}
项目:x-pipe    文件:DefaultLeaderElector.java   
@Override
public void elect() throws Exception {

    zkClient.createContainers(ctx.getLeaderElectionZKPath());

    latch = new LeaderLatch(zkClient, ctx.getLeaderElectionZKPath(), ctx.getLeaderElectionID());
    latch.addListener(new LeaderLatchListener() {

        @Override
        public void notLeader() {
        }

        @Override
        public void isLeader() {
        }
    });

    latch.start();
    logger.info("[elect]{}", ctx);
}
项目:incubator-atlas    文件:ActiveInstanceElectorServiceTest.java   
@Test
public void testLeaderElectionIsJoinedOnStart() throws Exception {
    when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
    when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
    when(configuration.getString(
            HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
            thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
    LeaderLatch leaderLatch = mock(LeaderLatch.class);
    when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch);

    ActiveInstanceElectorService activeInstanceElectorService =
            new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
                    activeInstanceState, serviceState);
    activeInstanceElectorService.start();

    verify(leaderLatch).start();
}
项目:incubator-atlas    文件:ActiveInstanceElectorServiceTest.java   
@Test
public void testListenerIsAddedForActiveInstanceCallbacks() throws Exception {
    when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
    when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
    when(configuration.getString(
            HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
            thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);

    LeaderLatch leaderLatch = mock(LeaderLatch.class);
    when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch);

    ActiveInstanceElectorService activeInstanceElectorService =
            new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
                    activeInstanceState, serviceState);
    activeInstanceElectorService.start();

    verify(leaderLatch).addListener(activeInstanceElectorService);
}
项目:incubator-atlas    文件:ActiveInstanceElectorServiceTest.java   
@Test
public void testLeaderElectionIsLeftOnStop() throws IOException, AtlasException {
    when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
    when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
    when(configuration.getString(
            HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
            thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);

    LeaderLatch leaderLatch = mock(LeaderLatch.class);
    when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch);

    ActiveInstanceElectorService activeInstanceElectorService =
            new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
                    activeInstanceState, serviceState);
    activeInstanceElectorService.start();
    activeInstanceElectorService.stop();

    verify(leaderLatch).close();
}
项目:incubator-atlas    文件:ActiveInstanceElectorServiceTest.java   
@Test
public void testCuratorFactoryIsClosedOnStop() throws AtlasException {
    when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
    when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
    when(configuration.getString(
            HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
            thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);

    LeaderLatch leaderLatch = mock(LeaderLatch.class);
    when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch);

    ActiveInstanceElectorService activeInstanceElectorService =
            new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
                    activeInstanceState, serviceState);
    activeInstanceElectorService.start();
    activeInstanceElectorService.stop();

    verify(curatorFactory).close();
}
项目:incubator-atlas    文件:ActiveInstanceElectorServiceTest.java   
@Test
public void testSharedStateIsUpdatedWhenInstanceIsActive() throws Exception {
    when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
    when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
    when(configuration.getString(
            HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
            thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);

    LeaderLatch leaderLatch = mock(LeaderLatch.class);
    when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch);

    ActiveInstanceElectorService activeInstanceElectorService =
            new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
                    activeInstanceState, serviceState);

    activeInstanceElectorService.start();
    activeInstanceElectorService.isLeader();

    verify(activeInstanceState).update("id1");
}
项目:curator    文件:ChildReaper.java   
/**
 * @param client the client
 * @param path path to reap children from
 * @param executor executor to use for background tasks
 * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
 * @param mode reaping mode
 * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
 * @param lockSchema a set of the possible subnodes of the children of path that must be reaped in addition to the child nodes
 */
public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath, Set<String> lockSchema)
{
    this.client = client;
    this.mode = mode;
    this.executor = new CloseableScheduledExecutorService(executor);
    this.reapingThresholdMs = reapingThresholdMs;
    if (leaderPath != null)
    {
        leaderLatch = new LeaderLatch(client, leaderPath);
    }
    else
    {
        leaderLatch = null;
    }
    this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch);
    this.lockSchema = lockSchema;
    addPath(path);
}
项目:curator    文件:Reaper.java   
private void addListenerToLeaderLatch(LeaderLatch leaderLatch)
{

    LeaderLatchListener listener = new LeaderLatchListener()
    {
        @Override
        public void isLeader()
        {
            reapingIsActive.set(true);
            for ( PathHolder holder : activePaths.values() )
            {
                schedule(holder, reapingThresholdMs);
            }
        }

        @Override
        public void notLeader()
        {
            reapingIsActive.set(false);
        }
    };
    leaderLatch.addListener(listener);

    reapingIsActive.set(leaderLatch.hasLeadership());
}
项目:spring-open    文件:ZookeeperRegistry.java   
@Override
public void releaseControl(long dpid) {
    log.info("Releasing control for {}", HexString.toHexString(dpid));

    String dpidStr = HexString.toHexString(dpid);

    SwitchLeadershipData swData = switches.remove(dpidStr);

    if (swData == null) {
        log.debug("Trying to release control of a switch we are not contesting");
        return;
    }

    LeaderLatch latch = swData.getLatch();

    latch.removeListener(swData.getListener());

    try {
        latch.close();
    } catch (IOException e) {
        // I think it's OK not to do anything here. Either the node got
        // deleted correctly, or the connection went down and the node got deleted.
        log.debug("releaseControl: caught IOException {}", dpidStr);
    }
}
项目:Singularity    文件:TaskResource.java   
@Inject
public TaskResource(TaskRequestManager taskRequestManager, TaskManager taskManager, SlaveManager slaveManager, MesosClient mesosClient, SingularityTaskMetadataConfiguration taskMetadataConfiguration,
                    SingularityAuthorizationHelper authorizationHelper, RequestManager requestManager, SingularityValidator validator, DisasterManager disasterManager,
                    AsyncHttpClient httpClient, LeaderLatch leaderLatch, ObjectMapper objectMapper, RequestHelper requestHelper) {
  super(httpClient, leaderLatch, objectMapper);
  this.taskManager = taskManager;
  this.taskRequestManager = taskRequestManager;
  this.taskMetadataConfiguration = taskMetadataConfiguration;
  this.slaveManager = slaveManager;
  this.mesosClient = mesosClient;
  this.requestManager = requestManager;
  this.authorizationHelper = authorizationHelper;
  this.validator = validator;
  this.disasterManager = disasterManager;
  this.requestHelper = requestHelper;
}
项目:Singularity    文件:SingularityLeaderOnlyPoller.java   
@Inject
void injectPollerDependencies(SingularityManagedScheduledExecutorServiceFactory executorServiceFactory,
                              LeaderLatch leaderLatch,
                              SingularityExceptionNotifier exceptionNotifier,
                              SingularityAbort abort,
                              SingularityMesosScheduler mesosScheduler,
                              SingularityConfiguration configuration,
                              @Named(SingularityMainModule.STATUS_UPDATE_DELTA_30S_AVERAGE) AtomicLong statusUpdateDelta30sAverage) {
  this.executorService = executorServiceFactory.get(getClass().getSimpleName());
  this.leaderLatch = checkNotNull(leaderLatch, "leaderLatch is null");
  this.exceptionNotifier = checkNotNull(exceptionNotifier, "exceptionNotifier is null");
  this.abort = checkNotNull(abort, "abort is null");
  this.mesosScheduler = checkNotNull(mesosScheduler, "mesosScheduler is null");
  this.delayPollersWhenDeltaOverMs = configuration.getDelayPollersWhenDeltaOverMs();
  this.statusUpdateDelta30sAverage = checkNotNull(statusUpdateDelta30sAverage, "statusUpdateDeltaAverage is null");
}
项目:Baragon    文件:StatusResource.java   
@Inject
public StatusResource(LocalLbAdapter adapter,
                      LoadBalancerConfiguration loadBalancerConfiguration,
                      BaragonAgentMetadata agentMetadata,
                      AtomicReference<BaragonAgentState> agentState,
                      @Named(BaragonAgentServiceModule.AGENT_LEADER_LATCH) LeaderLatch leaderLatch,
                      @Named(BaragonAgentServiceModule.AGENT_MOST_RECENT_REQUEST_ID) AtomicReference<String> mostRecentRequestId,
                      @Named(BaragonDataModule.BARAGON_ZK_CONNECTION_STATE) AtomicReference<ConnectionState> connectionState,
                      @Named(BaragonAgentServiceModule.CONFIG_ERROR_MESSAGE) AtomicReference<Optional<String>> errorMessage) {
  this.adapter = adapter;
  this.loadBalancerConfiguration = loadBalancerConfiguration;
  this.leaderLatch = leaderLatch;
  this.mostRecentRequestId = mostRecentRequestId;
  this.connectionState = connectionState;
  this.agentMetadata = agentMetadata;
  this.errorMessage = errorMessage;
  this.agentState = agentState;
}
项目:Baragon    文件:BootstrapManaged.java   
@Inject
public BootstrapManaged(BaragonKnownAgentsDatastore knownAgentsDatastore,
                        BaragonLoadBalancerDatastore loadBalancerDatastore,
                        BaragonAgentConfiguration configuration,
                        AgentHeartbeatWorker agentHeartbeatWorker,
                        BaragonAgentMetadata baragonAgentMetadata,
                        LifecycleHelper lifecycleHelper,
                        CuratorFramework curatorFramework,
                        ResyncListener resyncListener,
                        ConfigChecker configChecker,
                        AtomicReference<BaragonAgentState> agentState,
                        @Named(BaragonAgentServiceModule.AGENT_SCHEDULED_EXECUTOR) ScheduledExecutorService executorService,
                        @Named(BaragonAgentServiceModule.AGENT_LEADER_LATCH) LeaderLatch leaderLatch) {
  this.configuration = configuration;
  this.leaderLatch = leaderLatch;
  this.curatorFramework = curatorFramework;
  this.resyncListener = resyncListener;
  this.knownAgentsDatastore = knownAgentsDatastore;
  this.loadBalancerDatastore = loadBalancerDatastore;
  this.baragonAgentMetadata = baragonAgentMetadata;
  this.executorService = executorService;
  this.agentHeartbeatWorker = agentHeartbeatWorker;
  this.lifecycleHelper = lifecycleHelper;
  this.configChecker = configChecker;
  this.agentState = agentState;
}
项目:Baragon    文件:StatusManager.java   
@Inject
public StatusManager(BaragonRequestDatastore requestDatastore,
                     ObjectMapper objectMapper,
                     @Named(BaragonDataModule.BARAGON_SERVICE_LEADER_LATCH) LeaderLatch leaderLatch,
                     @Named(BaragonDataModule.BARAGON_SERVICE_WORKER_LAST_START) AtomicLong workerLastStart,
                     @Named(BaragonDataModule.BARAGON_ELB_WORKER_LAST_START) AtomicLong elbWorkerLastStart,
                     @Named(BaragonDataModule.BARAGON_ZK_CONNECTION_STATE) AtomicReference<ConnectionState> connectionState,
                     @Named(BaragonServiceModule.BARAGON_SERVICE_HTTP_CLIENT)AsyncHttpClient httpClient) {
  this.requestDatastore = requestDatastore;
  this.leaderLatch = leaderLatch;
  this.workerLastStart = workerLastStart;
  this.elbWorkerLastStart = elbWorkerLastStart;
  this.connectionState = connectionState;
  this.httpClient = httpClient;
  this.objectMapper = objectMapper;
}
项目:EasyTransaction    文件:ZooKeeperMasterSelectorImpl.java   
private void init(){
    CuratorFramework client = CuratorFrameworkFactory.newClient(zooKeeperUrl, new RetryForever(1000));
    leaderLatch = new LeaderLatch(client, "/EasyTransMasterSelector"+"/" + applicationName);
    try {
        client.start();
        leaderLatch.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
项目:DBus    文件:LeaderElectHandler.java   
@SuppressWarnings("resource")
@Override
public void process() {
    try {
        CuratorFramework _curator = CuratorContainer.getInstance().getCurator();
        HeartBeatVo conf = HeartBeatConfigContainer.getInstance().getHbConf();
        CuratorContainer.getInstance().createZkNode(conf.getLeaderPath());

        // 获取进程ID和服务器hostName
        final BrokerInfoVo brokerInfo = new BrokerInfoVo();
        String name = ManagementFactory.getRuntimeMXBean().getName();
        String[] pidAndHostName = StringUtils.split(name, "@");
        brokerInfo.setPid(pidAndHostName[0]);
        brokerInfo.setHostName(pidAndHostName[1]);
        brokerInfo.setIp(InetAddress.getLocalHost().getHostAddress());
        LeaderLatch ll = new LeaderLatch(_curator, conf.getLeaderPath(), JsonUtil.toJson(brokerInfo));

        ll.addListener(new LeaderLatchListener() {
            @Override
            public void notLeader() {
                LoggerFactory.getLogger().error("本机现在切换到非leader状态,准备终止当前进程PID:{}", brokerInfo.getPid());
                System.exit(-1);
            }
            @Override
            public void isLeader() {
                LoggerFactory.getLogger().info("本机现在切换到leader状态.");
            }
        });

        ll.start();
        ll.await();
        // 注册控制事件
        CuratorContainer.getInstance().createZkNode(conf.getControlPath());
        _curator.getData().usingWatcher(WatcherType.CONTROL).forPath(conf.getControlPath());
    } catch (Exception e) {
        throw new RuntimeException("选举leader时发生错误!", e);
    }
}
项目:soundwave    文件:ZkScheduledJobOwnershipDecider.java   
public static ZkScheduledJobOwnershipDecider buildDecider(String identifier) throws Exception {
  ZkScheduledJobOwnershipDecider decider = new ZkScheduledJobOwnershipDecider(
      String.format("%s/job/%s/owner",
          StringUtils.stripEnd(Configuration.getProperties().getString("zk_path"), "/"),
          identifier));
  decider.latch =
      new LeaderLatch(ZkClient.getClient(), decider.zkPath, nodeIdentifier,
          LeaderLatch.CloseMode.NOTIFY_LEADER);
  decider.latch.start();
  return decider;
}
项目:Mastering-Mesos    文件:SingularityLeaderOnlyPoller.java   
@Inject
void injectPollerDependencies(SingularityManagedScheduledExecutorServiceFactory executorServiceFactory,
    LeaderLatch leaderLatch,
    SingularityExceptionNotifier exceptionNotifier,
    SingularityAbort abort,
    SingularityMesosSchedulerDelegator mesosScheduler) {
  this.executorService = executorServiceFactory.get(getClass().getSimpleName());
  this.leaderLatch = checkNotNull(leaderLatch, "leaderLatch is null");
  this.exceptionNotifier = checkNotNull(exceptionNotifier, "exceptionNotifier is null");
  this.abort = checkNotNull(abort, "abort is null");
  this.mesosScheduler = checkNotNull(mesosScheduler, "mesosScheduler is null");
}
项目:Mastering-Mesos    文件:CuratorSingletonService.java   
/**
 * Creates a {@code SingletonService} backed by Curator.
 *
 * @param client A client to interact with a ZooKeeper ensemble.
 * @param groupPath The root ZooKeeper path service members advertise their presence under.
 * @param memberToken A token used to form service member node names.
 * @param codec A codec that can be used to deserialize group member {@link ServiceInstance} data.
 */
CuratorSingletonService(
    CuratorFramework client,
    String groupPath,
    String memberToken,
    Codec<ServiceInstance> codec) {

  leaderLatch = new LeaderLatch(client, groupPath);
  advertiser = new Advertiser(client, groupPath, memberToken, codec);
  this.groupPath = PathUtils.validatePath(groupPath);
}
项目:javabase    文件:ClientLeaderLatch.java   
public ClientLeaderLatch(String name, CuratorFramework client, String path) throws Exception {
    this.name=name;
    //leaderSelector 多个客户端监听同一个节点
    this.leaderLatch=new LeaderLatch(client, path, name);
    //启动
    leaderLatch.start();
    // 可以添加多个Listener,告知外界
    leaderLatch.addListener(getLatchListener(), executor);
}
项目:ElasticJob    文件:JobNodeStorage.java   
/**
 * 在主节点执行操作.
 * 
 * @param latchNode 分布式锁使用的作业节点名称
 * @param callback 执行操作的回调
 */
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
    try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
        latch.start();
        latch.await();
        callback.execute();
    //CHECKSTYLE:OFF
    } catch (final Exception ex) {
    //CHECKSTYLE:ON
        handleException(ex);
    }
}
项目:fountain    文件:ZkHaGuard.java   
/**
 * 阻塞式的初始化和Zookeeper的连接,这个方法在一个容器生命周期中只允许允许一次。
 * <p/>
 * 和zookeeper之间的连接遵循每实例一个ZkClientProvider的方式,这样当实例内部有多个同步线程的时候,
 * 可以共享一个ZkClientProvider,状态都是一致的,避免有些线程是leader,有些是standby的情况。
 * <p/>
 * 改方法是全异步的,如果zookeeper连接不上,也会返回。但是一般上层应用拿不到leader latch,不会成为leader。
 * 而且<code>apache.zookeeper.ClientCnxn</code>包的日志会打印如下:
 * <pre>
 *     Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket
 *     connection and attempting reconnect
 * </pre>
 * 参数<code>name</code>标示同步线程的名称,默认的话只有一个可以初始化好和zk的连接,其他的都不再重复初始化了。
 */
@Override
public void init(String name) {
    if (!state.compareAndSet(State.LATENT, State.STARTED)) {
        logger.debug("ZkHaGuard can only be initialized once because LeaderLatch should be singleton");
        return;
    }

    if (zkClientProvider == null) {
        throw new IllegalStateException("ZkClientProvider should not be null");
    }
    logger.info("LeaderLatch will start soon by " + name);
    zkClientProvider.init();
    leaderLatch = new LeaderLatch(zkClientProvider.provideClient(), latchPath, NetUtils.getLocalHostIP());
    leaderLatch.addListener(new LeaderLatchListener() {
        @Override
        public void isLeader() {
            logger.info("This instance is leader");
        }

        @Override
        public void notLeader() {
            logger.warn("This instance is NOT leader");
        }
    });
    try {
        leaderLatch.start();
    } catch (Exception e) {
        throw new RuntimeException("Leader latch init failed", e);
    }
    logger.info("LeaderLatch starts by " + name + " asynchronously");
}
项目:flink    文件:ZooKeeperLeaderElectionService.java   
/**
 * Creates a ZooKeeperLeaderElectionService object.
 *
 * @param client Client which is connected to the ZooKeeper quorum
 * @param latchPath ZooKeeper node path for the leader election latch
 * @param leaderPath ZooKeeper node path for the node which stores the current leader information
 */
public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) {
    this.client = Preconditions.checkNotNull(client, "CuratorFramework client");
    this.leaderPath = Preconditions.checkNotNull(leaderPath, "leaderPath");

    leaderLatch = new LeaderLatch(client, latchPath);
    cache = new NodeCache(client, leaderPath);

    issuedLeaderSessionID = null;
    confirmedLeaderSessionID = null;
    leaderContender = null;

    running = false;
}
项目:incubator-atlas    文件:ActiveInstanceElectorServiceTest.java   
@Test
public void testRegisteredHandlersAreNotifiedWhenInstanceIsActive() throws AtlasException {
    when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
    when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
    when(configuration.getString(
            HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
            thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);

    LeaderLatch leaderLatch = mock(LeaderLatch.class);
    when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch);

    Set<ActiveStateChangeHandler> changeHandlers = new HashSet<>();
    final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class);
    final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class);

    changeHandlers.add(handler1);
    changeHandlers.add(handler2);

    ActiveInstanceElectorService activeInstanceElectorService =
            new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
                    activeInstanceState, serviceState);
    activeInstanceElectorService.start();
    activeInstanceElectorService.isLeader();

    verify(handler1).instanceIsActive();
    verify(handler2).instanceIsActive();
}
项目:incubator-atlas    文件:ActiveInstanceElectorServiceTest.java   
@Test
public void testRegisteredHandlersAreNotifiedOfPassiveWhenStateUpdateFails() throws Exception {
    when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
    when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
    when(configuration.getString(
            HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
            thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);


    LeaderLatch leaderLatch = mock(LeaderLatch.class);
    when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch);

    Set<ActiveStateChangeHandler> changeHandlers = new HashSet<>();
    final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class);
    final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class);

    changeHandlers.add(handler1);
    changeHandlers.add(handler2);

    doThrow(new AtlasBaseException()).when(activeInstanceState).update("id1");

    ActiveInstanceElectorService activeInstanceElectorService =
            new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
                    activeInstanceState, serviceState);
    activeInstanceElectorService.start();
    activeInstanceElectorService.isLeader();

    verify(handler1).instanceIsPassive();
    verify(handler2).instanceIsPassive();
}
项目:incubator-atlas    文件:ActiveInstanceElectorServiceTest.java   
@Test
public void testElectionIsRejoinedWhenStateUpdateFails() throws Exception {
    when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
    when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
    when(configuration.getString(
            HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
            thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);


    LeaderLatch leaderLatch = mock(LeaderLatch.class);
    when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch);

    doThrow(new AtlasBaseException()).when(activeInstanceState).update("id1");

    ActiveInstanceElectorService activeInstanceElectorService =
            new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory,
                    activeInstanceState, serviceState);

    activeInstanceElectorService.start();
    activeInstanceElectorService.isLeader();

    InOrder inOrder = inOrder(leaderLatch, curatorFactory);
    inOrder.verify(leaderLatch).close();
    inOrder.verify(curatorFactory).leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);
    inOrder.verify(leaderLatch).addListener(activeInstanceElectorService);
    inOrder.verify(leaderLatch).start();
}
项目:incubator-atlas    文件:ActiveInstanceElectorServiceTest.java   
@Test
public void testRegisteredHandlersAreNotifiedOfPassiveWhenInstanceIsPassive() throws AtlasException {
    when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
    when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
    when(configuration.getString(
            HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
            thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);


    LeaderLatch leaderLatch = mock(LeaderLatch.class);
    when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch);

    Set<ActiveStateChangeHandler> changeHandlers = new HashSet<>();
    final ActiveStateChangeHandler handler1 = mock(ActiveStateChangeHandler.class);
    final ActiveStateChangeHandler handler2 = mock(ActiveStateChangeHandler.class);

    changeHandlers.add(handler1);
    changeHandlers.add(handler2);

    ActiveInstanceElectorService activeInstanceElectorService =
            new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory,
                    activeInstanceState, serviceState);
    activeInstanceElectorService.start();
    activeInstanceElectorService.notLeader();

    verify(handler1).instanceIsPassive();
    verify(handler2).instanceIsPassive();
}
项目:incubator-atlas    文件:ActiveInstanceElectorServiceTest.java   
@Test
public void testPassiveStateSetIfActivationFails() throws Exception {
    when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
    when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1"});
    when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000");
    when(configuration.getString(
            HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).
            thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT);


    LeaderLatch leaderLatch = mock(LeaderLatch.class);
    when(curatorFactory.leaderLatchInstance("id1", HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)).thenReturn(leaderLatch);

    doThrow(new AtlasBaseException()).when(activeInstanceState).update("id1");

    ActiveInstanceElectorService activeInstanceElectorService =
            new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(),
                    curatorFactory, activeInstanceState, serviceState);
    activeInstanceElectorService.start();
    activeInstanceElectorService.isLeader();

    InOrder inOrder = inOrder(serviceState);
    inOrder.verify(serviceState).becomingActive();
    inOrder.verify(serviceState).becomingPassive();
    inOrder.verify(serviceState).setPassive();
}
项目:cultivar_old    文件:ScheduledLoggingLeaderService.java   
@Inject
ScheduledLoggingLeaderService(final LeaderLatch latch, final CountDownLatch firstRunLatch, final AtomicLong counter) {
    super(latch);

    this.counter = counter;
    this.firstRunLatch = firstRunLatch;
}
项目:curator    文件:QueueSharder.java   
/**
 * @param client client
 * @param queueAllocator allocator for new queues
 * @param queuePath path for the queues
 * @param leaderPath path for the leader that monitors queue sizes (must be different than queuePath)
 * @param policies sharding policies
 */
public QueueSharder(CuratorFramework client, QueueAllocator<U, T> queueAllocator, String queuePath, String leaderPath, QueueSharderPolicies policies)
{
    this.client = client;
    this.queueAllocator = queueAllocator;
    this.queuePath = queuePath;
    this.policies = policies;
    leaderLatch = new LeaderLatch(client, leaderPath);
    service = Executors.newSingleThreadExecutor(policies.getThreadFactory());
}
项目:curator    文件:Reaper.java   
/**
 * @param client             client
 * @param executor           thread pool
 * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
 * @param leaderLatch        a pre-created leader latch to ensure only 1 reaper is active in the cluster
 * @param ownsLeaderLatch    indicates whether or not the reaper owns the leader latch (if it exists) and thus should start/stop it
 * */
private Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch, boolean ownsLeaderLatch)
{
    this.client = client;
    this.executor = new CloseableScheduledExecutorService(executor);
    this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
    this.leaderLatch = leaderLatch;
    if (leaderLatch != null)
    {
        addListenerToLeaderLatch(leaderLatch);
    }
    this.ownsLeaderLatch = ownsLeaderLatch;
}
项目:curator    文件:Reaper.java   
private static LeaderLatch makeLeaderLatchIfPathNotNull(CuratorFramework client, String leaderPath)
{
    if (leaderPath == null)
    {
        return null;
    }
    else
    {
        return new LeaderLatch(client, leaderPath);
    }
}
项目:Decision    文件:ClusterSyncManager.java   
public void start() throws Exception {
    client.start();
    client.getZookeeperClient().blockUntilConnectedOrTimedOut();
    leaderLatch = new LeaderLatch(client, latchpath, id);

    ClusterSyncManagerLeaderListener listener = new ClusterSyncManagerLeaderListener(failOverTask, getNode());
    leaderLatch.addListener(listener);

    leaderLatch.start();
}
项目:spring-open    文件:ZookeeperRegistryTest.java   
/**
 * Test if {@link ZookeeperRegistry#requestControl(long, IControllerRegistryService.ControlChangeCallback)}
 * correctly take control of specific switch.
 * Because {@link ZookeeperRegistry#requestControl(long, IControllerRegistryService.ControlChangeCallback)}
 * doesn't return values, inject mock {@link LeaderLatch} object and verify latch is correctly set up.
 *
 * @throws Exception
 */
@Test
public void testRequestControl() throws Exception {
    // Mock LeaderLatch
    LeaderLatch latch = createMock(LeaderLatch.class);
    latch.addListener(anyObject(SwitchLeaderListener.class));
    expectLastCall().once();
    latch.start();
    expectLastCall().once();
    replay(latch);

    PowerMock.expectNew(LeaderLatch.class,
            anyObject(CuratorFramework.class),
            anyObject(String.class),
            anyObject(String.class))
            .andReturn(latch).once();
    PowerMock.replay(LeaderLatch.class);

    String controllerId = "controller2013";
    registry.registerController(controllerId);

    LoggingCallback callback = new LoggingCallback(1);
    long dpidToRequest = 2000L;

    try {
        registry.requestControl(dpidToRequest, callback);
    } catch (RegistryException e) {
        e.printStackTrace();
        fail(e.getMessage());
    }

    verify(latch);
}
项目:spring-open    文件:ZookeeperRegistryTest.java   
/**
 * Test if {@link ZookeeperRegistry#releaseControl(long)} correctly release control of specific switch.
 * Because {@link ZookeeperRegistry#releaseControl(long)} doesn't return values, inject mock
 * {@link LeaderLatch} object and verify latch is correctly set up.
 *
 * @throws Exception
 */
@Test
public void testReleaseControl() throws Exception {
    // Mock of LeaderLatch
    LeaderLatch latch = createMock(LeaderLatch.class);
    latch.addListener(anyObject(SwitchLeaderListener.class));
    expectLastCall().once();
    latch.start();
    expectLastCall().once();
    latch.removeListener(anyObject(SwitchLeaderListener.class));
    expectLastCall().once();
    latch.close();
    expectLastCall().once();
    replay(latch);

    PowerMock.expectNew(LeaderLatch.class,
            anyObject(CuratorFramework.class),
            anyObject(String.class),
            anyObject(String.class))
            .andReturn(latch).once();
    PowerMock.replay(LeaderLatch.class);

    String controllerId = "controller2013";
    registry.registerController(controllerId);

    long dpidToRequest = 2000L;
    LoggingCallback callback = new LoggingCallback(1);

    registry.requestControl(dpidToRequest, callback);
    registry.releaseControl(dpidToRequest);

    verify(latch);
}
项目:cultivar    文件:ScheduledLoggingLeaderService.java   
@Inject
ScheduledLoggingLeaderService(final LeaderLatch latch, final CountDownLatch firstRunLatch, final AtomicLong counter) {
    super(latch);

    this.counter = counter;
    this.firstRunLatch = firstRunLatch;
}
项目:curator-extensions    文件:LeaderService.java   
@Override
protected void run() throws InterruptedException {
    // Beware race conditions: closeLeaderLatch() may be called by another thread at any time.
    while (isRunning()) {
        try {
            // Start attempting to acquire leadership via the Curator leadership latch.
            LOG.debug("Attempting to acquire leadership: {}", getId());
            LeaderLatch latch = startLeaderLatch();

            // Wait until (a) leadership is acquired or (b) the latch is closed by service shutdown or ZK cxn loss.
            if (isRunning()) {
                try {
                    latch.await();
                } catch (EOFException e) {
                    // Latch was closed while we were waiting.
                    checkState(!latch.hasLeadership());
                }
            }

            // If we succeeded in acquiring leadership, start/run the leadership-managed delegate service.
            if (isRunning() && latch.hasLeadership()) {
                LOG.debug("Leadership acquired: {}", getId());
                runAsLeader();
                LOG.debug("Leadership released: {}", getId());
            }
        } finally {
            closeLeaderLatch();
        }

        if (isRunning()) {
            // If we lost or relinquished leadership, wait a while for things to settle before trying to
            // re-acquire leadership (eg. wait for a network hiccup to the ZooKeeper server to resolve).
            sleep(_reacquireDelayNanos);
        }
    }
}