@Override public void run(String... strings) throws Exception { client = CuratorFrameworkFactory.newClient(zookeeperConnString, new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)); client.start(); client.getZookeeperClient().blockUntilConnectedOrTimedOut(); leaderLatch = new LeaderLatch(client, "/http-job-scheduler/leader", ManagementFactory.getRuntimeMXBean().getName()); leaderLatch.addListener(new LeaderLatchListener() { @Override public void isLeader() { setMaster(true); masterJobScheduler.resume(); } @Override public void notLeader() { setMaster(false); masterJobScheduler.pause(); } }); leaderLatch.start(); }
/** * 异步选主,只能启动一次 * * @param listener 选主监听器 */ public void startLeaderElect(final LeaderLatchListener listener) { executor.submit(new Runnable() { @Override public void run() { boolean errFlag = true; leaderLatch = new LeaderLatch((CuratorFramework) registryCenter.getRawClient(), ConsoleNode.LATCH); leaderLatch.addListener(listener); do { try { leaderLatch.start(); leaderLatch.await(); } catch (Exception e) { log.error("Failed to elect a Leader! will retry", e); errFlag = false; } } while (!errFlag); } }); }
@Override public void elect() throws Exception { zkClient.createContainers(ctx.getLeaderElectionZKPath()); latch = new LeaderLatch(zkClient, ctx.getLeaderElectionZKPath(), ctx.getLeaderElectionID()); latch.addListener(new LeaderLatchListener() { @Override public void notLeader() { } @Override public void isLeader() { } }); latch.start(); logger.info("[elect]{}", ctx); }
private void addListenerToLeaderLatch(LeaderLatch leaderLatch) { LeaderLatchListener listener = new LeaderLatchListener() { @Override public void isLeader() { reapingIsActive.set(true); for ( PathHolder holder : activePaths.values() ) { schedule(holder, reapingThresholdMs); } } @Override public void notLeader() { reapingIsActive.set(false); } }; leaderLatch.addListener(listener); reapingIsActive.set(leaderLatch.hasLeadership()); }
@SuppressWarnings("resource") @Override public void process() { try { CuratorFramework _curator = CuratorContainer.getInstance().getCurator(); HeartBeatVo conf = HeartBeatConfigContainer.getInstance().getHbConf(); CuratorContainer.getInstance().createZkNode(conf.getLeaderPath()); // 获取进程ID和服务器hostName final BrokerInfoVo brokerInfo = new BrokerInfoVo(); String name = ManagementFactory.getRuntimeMXBean().getName(); String[] pidAndHostName = StringUtils.split(name, "@"); brokerInfo.setPid(pidAndHostName[0]); brokerInfo.setHostName(pidAndHostName[1]); brokerInfo.setIp(InetAddress.getLocalHost().getHostAddress()); LeaderLatch ll = new LeaderLatch(_curator, conf.getLeaderPath(), JsonUtil.toJson(brokerInfo)); ll.addListener(new LeaderLatchListener() { @Override public void notLeader() { LoggerFactory.getLogger().error("本机现在切换到非leader状态,准备终止当前进程PID:{}", brokerInfo.getPid()); System.exit(-1); } @Override public void isLeader() { LoggerFactory.getLogger().info("本机现在切换到leader状态."); } }); ll.start(); ll.await(); // 注册控制事件 CuratorContainer.getInstance().createZkNode(conf.getControlPath()); _curator.getData().usingWatcher(WatcherType.CONTROL).forPath(conf.getControlPath()); } catch (Exception e) { throw new RuntimeException("选举leader时发生错误!", e); } }
@Inject public SingularityLeaderLatch(@Named(SingularityMainModule.HTTP_HOST_AND_PORT) final HostAndPort httpHostAndPort, final CuratorFramework curatorFramework, final Set<LeaderLatchListener> listeners) throws Exception { super(checkNotNull(curatorFramework, "curatorFramework is null"), LEADER_PATH, httpHostAndPort.toString()); checkNotNull(listeners, "listeners is null"); for (LeaderLatchListener listener : listeners) { addListener(listener); } }
public LeaderLatchListener getLatchListener() { return new LeaderLatchListener() { @Override public void isLeader() { log.info(name+" => isLeader() "); doJob(); } @Override public void notLeader() { log.info(name+" =>relinquishing notLeader"); } }; }
private LeaderLatchListener createLeaderLatchListener() { return new LeaderLatchListener() { @Override public void isLeader() { LOG.info("This instance with id [{}] acquired leadership", serverUrl); } @Override public void notLeader() { LOG.info("This instance with id [{}] lost leadership", serverUrl); } }; }
/** * 阻塞式的初始化和Zookeeper的连接,这个方法在一个容器生命周期中只允许允许一次。 * <p/> * 和zookeeper之间的连接遵循每实例一个ZkClientProvider的方式,这样当实例内部有多个同步线程的时候, * 可以共享一个ZkClientProvider,状态都是一致的,避免有些线程是leader,有些是standby的情况。 * <p/> * 改方法是全异步的,如果zookeeper连接不上,也会返回。但是一般上层应用拿不到leader latch,不会成为leader。 * 而且<code>apache.zookeeper.ClientCnxn</code>包的日志会打印如下: * <pre> * Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket * connection and attempting reconnect * </pre> * 参数<code>name</code>标示同步线程的名称,默认的话只有一个可以初始化好和zk的连接,其他的都不再重复初始化了。 */ @Override public void init(String name) { if (!state.compareAndSet(State.LATENT, State.STARTED)) { logger.debug("ZkHaGuard can only be initialized once because LeaderLatch should be singleton"); return; } if (zkClientProvider == null) { throw new IllegalStateException("ZkClientProvider should not be null"); } logger.info("LeaderLatch will start soon by " + name); zkClientProvider.init(); leaderLatch = new LeaderLatch(zkClientProvider.provideClient(), latchPath, NetUtils.getLocalHostIP()); leaderLatch.addListener(new LeaderLatchListener() { @Override public void isLeader() { logger.info("This instance is leader"); } @Override public void notLeader() { logger.warn("This instance is NOT leader"); } }); try { leaderLatch.start(); } catch (Exception e) { throw new RuntimeException("Leader latch init failed", e); } logger.info("LeaderLatch starts by " + name + " asynchronously"); }
@Override public void configure(Binder binder) { binder.bind(HostAndPort.class).annotatedWith(named(HTTP_HOST_AND_PORT)).toProvider(SingularityHostAndPortProvider.class).in(Scopes.SINGLETON); binder.bind(LeaderLatch.class).to(SingularityLeaderLatch.class).in(Scopes.SINGLETON); binder.bind(CuratorFramework.class).toProvider(SingularityCuratorProvider.class).in(Scopes.SINGLETON); Multibinder<ConnectionStateListener> connectionStateListeners = Multibinder.newSetBinder(binder, ConnectionStateListener.class); connectionStateListeners.addBinding().to(SingularityAbort.class).in(Scopes.SINGLETON); Multibinder<LeaderLatchListener> leaderLatchListeners = Multibinder.newSetBinder(binder, LeaderLatchListener.class); leaderLatchListeners.addBinding().to(SingularityLeaderController.class).in(Scopes.SINGLETON); binder.bind(SingularityDriverManager.class).in(Scopes.SINGLETON); binder.bind(SingularityLeaderController.class).in(Scopes.SINGLETON); binder.bind(SingularityMailer.class).in(Scopes.SINGLETON); binder.bind(SingularitySmtpSender.class).in(Scopes.SINGLETON); binder.bind(MailTemplateHelpers.class).in(Scopes.SINGLETON); binder.bind(SingularityExceptionNotifier.class).in(Scopes.SINGLETON); binder.bind(LoadBalancerClient.class).to(LoadBalancerClientImpl.class).in(Scopes.SINGLETON); binder.bind(SingularityMailRecordCleaner.class).in(Scopes.SINGLETON); binder.bind(SingularityWebhookPoller.class).in(Scopes.SINGLETON); binder.bind(MesosClient.class).in(Scopes.SINGLETON); binder.bind(SingularityAbort.class).in(Scopes.SINGLETON); binder.bind(SingularityExceptionNotifierManaged.class).in(Scopes.SINGLETON); binder.bind(SingularityWebhookSender.class).in(Scopes.SINGLETON); binder.bind(NotifyingExceptionMapper.class).in(Scopes.SINGLETON); binder.bind(ObjectMapper.class).toProvider(DropwizardObjectMapperProvider.class).in(Scopes.SINGLETON); binder.bind(MetricRegistry.class).toProvider(DropwizardMetricRegistryProvider.class).in(Scopes.SINGLETON); binder.bind(AsyncHttpClient.class).to(SingularityHttpClient.class).in(Scopes.SINGLETON); binder.bind(ServerProvider.class).in(Scopes.SINGLETON); binder.bind(SingularityDropwizardHealthcheck.class).in(Scopes.SINGLETON); binder.bindConstant().annotatedWith(Names.named(SERVER_ID_PROPERTY)).to(UUID.randomUUID().toString()); binder.bind(SingularityManagedScheduledExecutorServiceFactory.class).in(Scopes.SINGLETON); binder.bind(ScheduledExecutorService.class).annotatedWith(HEALTHCHECK_THREADPOOL_NAMED).toProvider(new SingularityManagedScheduledExecutorServiceProvider(configuration.getHealthcheckStartThreads(), configuration.getThreadpoolShutdownDelayInSeconds(), "healthcheck")).in(Scopes.SINGLETON); binder.bind(ScheduledExecutorService.class).annotatedWith(NEW_TASK_THREADPOOL_NAMED).toProvider(new SingularityManagedScheduledExecutorServiceProvider(configuration.getCheckNewTasksScheduledThreads(), configuration.getThreadpoolShutdownDelayInSeconds(), "check-new-task")).in(Scopes.SINGLETON); binder.bind(SingularityGraphiteReporterManaged.class).in(Scopes.SINGLETON); }
public SwitchLeadershipData(LeaderLatch latch, ControlChangeCallback cb, LeaderLatchListener listener) { this.latch = latch; this.cb = cb; this.listener = listener; }
public LeaderLatchListener getListener() { return listener; }
@Override public void configure(Binder binder) { binder.bind(HostAndPort.class).annotatedWith(named(HTTP_HOST_AND_PORT)).toProvider(SingularityHostAndPortProvider.class).in(Scopes.SINGLETON); binder.bind(LeaderLatch.class).to(SingularityLeaderLatch.class).in(Scopes.SINGLETON); binder.bind(CuratorFramework.class).toProvider(SingularityCuratorProvider.class).in(Scopes.SINGLETON); Multibinder<ConnectionStateListener> connectionStateListeners = Multibinder.newSetBinder(binder, ConnectionStateListener.class); connectionStateListeners.addBinding().to(SingularityAbort.class).in(Scopes.SINGLETON); Multibinder<LeaderLatchListener> leaderLatchListeners = Multibinder.newSetBinder(binder, LeaderLatchListener.class); leaderLatchListeners.addBinding().to(SingularityLeaderController.class).in(Scopes.SINGLETON); binder.bind(SingularityLeaderController.class).in(Scopes.SINGLETON); if (configuration.getSmtpConfigurationOptional().isPresent()) { binder.bind(SingularityMailer.class).to(SmtpMailer.class).in(Scopes.SINGLETON); } else { binder.bind(SingularityMailer.class).toInstance(NoopMailer.getInstance()); } binder.bind(SingularitySmtpSender.class).in(Scopes.SINGLETON); binder.bind(MailTemplateHelpers.class).in(Scopes.SINGLETON); binder.bind(SingularityExceptionNotifier.class).in(Scopes.SINGLETON); binder.bind(LoadBalancerClient.class).to(LoadBalancerClientImpl.class).in(Scopes.SINGLETON); binder.bind(SingularityMailRecordCleaner.class).in(Scopes.SINGLETON); binder.bind(SingularityWebhookPoller.class).in(Scopes.SINGLETON); binder.bind(SingularityAbort.class).in(Scopes.SINGLETON); binder.bind(SingularityExceptionNotifierManaged.class).in(Scopes.SINGLETON); binder.bind(SingularityWebhookSender.class).in(Scopes.SINGLETON); binder.bind(SingularityUsageHelper.class).in(Scopes.SINGLETON); binder.bind(NotifyingExceptionMapper.class).in(Scopes.SINGLETON); binder.bind(ObjectMapper.class).toProvider(DropwizardObjectMapperProvider.class).in(Scopes.SINGLETON); binder.bind(MetricRegistry.class).toProvider(DropwizardMetricRegistryProvider.class).in(Scopes.SINGLETON); binder.bind(AsyncHttpClient.class).to(SingularityAsyncHttpClient.class).in(Scopes.SINGLETON); binder.bind(ServerProvider.class).in(Scopes.SINGLETON); binder.bind(SingularityDropwizardHealthcheck.class).in(Scopes.SINGLETON); binder.bindConstant().annotatedWith(Names.named(SERVER_ID_PROPERTY)).to(UUID.randomUUID().toString()); binder.bind(SingularityManagedScheduledExecutorServiceFactory.class).in(Scopes.SINGLETON); binder.bind(ScheduledExecutorService.class).annotatedWith(HEALTHCHECK_THREADPOOL_NAMED).toProvider(new SingularityManagedScheduledExecutorServiceProvider(configuration.getHealthcheckStartThreads(), configuration.getThreadpoolShutdownDelayInSeconds(), "healthcheck")).in(Scopes.SINGLETON); binder.bind(ScheduledExecutorService.class).annotatedWith(NEW_TASK_THREADPOOL_NAMED).toProvider(new SingularityManagedScheduledExecutorServiceProvider(configuration.getCheckNewTasksScheduledThreads(), configuration.getThreadpoolShutdownDelayInSeconds(), "check-new-task")).in(Scopes.SINGLETON); binder.bind(SingularityGraphiteReporterManaged.class).in(Scopes.SINGLETON); binder.bind(SingularityMesosStatusUpdateHandler.class).in(Scopes.SINGLETON); if (configuration.isCacheOffers()) { binder.bind(OfferCache.class).to(SingularityOfferCache.class).in(Scopes.SINGLETON); } else { binder.bind(OfferCache.class).to(SingularityNoOfferCache.class).in(Scopes.SINGLETON); } }
public AttachLeaderSelectorListener(final LeaderLatchListener listener, final String path) { this.listener = listener; this.path = path; }