private DbleServer() { this.config = new ServerConfig(); scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TimerScheduler-%d").build()); /* * | offline | Change Server status to OFF | * | online | Change Server status to ON | */ this.isOnline = new AtomicBoolean(true); // load data node active index from properties dnIndexProperties = DnPropertyUtil.loadDnIndexProps(); this.startupTime = TimeUtil.currentTimeMillis(); if (isUseZkSwitch()) { dnIndexLock = new InterProcessMutex(ZKUtils.getConnection(), KVPathUtil.getDnIndexLockPath()); } xaSessionCheck = new XASessionCheck(); }
@Override public boolean tryLock(String key) { try { InterProcessMutex mutex = locks.computeIfAbsent(key, __ -> new InterProcessMutex(curatorFramework, String.join("/", basePath, key))); boolean owned = mutex.isAcquiredInThisProcess(); if(owned) { return true; } else { mutex.acquire(timeout, TimeUnit.MILLISECONDS); } return mutex.isAcquiredInThisProcess(); } catch (Exception e) { return false; } }
@Override public void stateChanged(CuratorFramework client, ConnectionState newState) { switch (newState) { case LOST: case SUSPENDED: Collection<InterProcessMutex> oldLocks = new ArrayList<>(locks.values()); locks.clear(); oldLocks.stream().parallel().forEach(lock -> { try { lock.release(); } catch (Exception e) { logger.trace("Can't release lock on " + newState); } }); break; default: } }
private static void inMutex(CuratorFramework curator, String mutexPath, Runnable work) { final InterProcessMutex mutex = new InterProcessMutex(curator, mutexPath); try { // try to acquire mutex for index within flush period if (mutex.acquire(LOCK_ACQUIRE_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) { try { work.run(); } finally { mutex.release(); } } else { _log.warn("could not acquire index lock after {} millis!!", LOCK_ACQUIRE_TIMEOUT.getMillis()); } } catch (Exception e) { throw Throwables.propagate(e); } }
/** * Call each registered {@link SetupStep} one after the other. * @throws SetupException Thrown with any error during running setup, including Zookeeper interactions, and * individual failures in the {@link SetupStep}. */ @PostConstruct public void runSetup() throws SetupException { HAConfiguration.ZookeeperProperties zookeeperProperties = HAConfiguration.getZookeeperProperties(configuration); InterProcessMutex lock = curatorFactory.lockInstance(zookeeperProperties.getZkRoot()); try { LOG.info("Trying to acquire lock for running setup."); lock.acquire(); LOG.info("Acquired lock for running setup."); handleSetupInProgress(configuration, zookeeperProperties); for (SetupStep step : setupSteps) { LOG.info("Running setup step: {}", step); step.run(); } clearSetupInProgress(zookeeperProperties); } catch (SetupException se) { LOG.error("Got setup exception while trying to setup", se); throw se; } catch (Throwable e) { LOG.error("Error running setup steps", e); throw new SetupException("Error running setup steps", e); } finally { releaseLock(lock); curatorFactory.close(); } }
@Test public void shouldRunRegisteredSetupSteps() throws Exception { Set<SetupStep> steps = new LinkedHashSet<>(); SetupStep setupStep1 = mock(SetupStep.class); SetupStep setupStep2 = mock(SetupStep.class); steps.add(setupStep1); steps.add(setupStep2); when(configuration. getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); setupServerIdSelectionMocks(); setupSetupInProgressPathMocks(ZooDefs.Ids.OPEN_ACL_UNSAFE); InterProcessMutex lock = mock(InterProcessMutex.class); when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(lock); SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration); setupSteps.runSetup(); verify(setupStep1).run(); verify(setupStep2).run(); }
@Test public void shouldCreateSetupInProgressNode() throws Exception { Set<SetupStep> steps = new LinkedHashSet<>(); SetupStep setupStep1 = mock(SetupStep.class); steps.add(setupStep1); when(configuration. getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_ACL)).thenReturn("digest:user:pwd"); List<ACL> aclList = Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "user:pwd"))); setupServerIdSelectionMocks(); CreateBuilder createBuilder = setupSetupInProgressPathMocks(aclList).getLeft(); InterProcessMutex lock = mock(InterProcessMutex.class); when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(lock); SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration); setupSteps.runSetup(); verify(createBuilder).withACL(aclList); verify(createBuilder).forPath(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT+SetupSteps.SETUP_IN_PROGRESS_NODE, "id2".getBytes(Charsets.UTF_8)); }
@Test public void shouldDeleteSetupInProgressNodeAfterCompletion() throws Exception { Set<SetupStep> steps = new LinkedHashSet<>(); SetupStep setupStep1 = mock(SetupStep.class); steps.add(setupStep1); when(configuration. getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); when(configuration.getString(HAConfiguration.HA_ZOOKEEPER_ACL)).thenReturn("digest:user:pwd"); List<ACL> aclList = Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "user:pwd"))); setupServerIdSelectionMocks(); DeleteBuilder deleteBuilder = setupSetupInProgressPathMocks(aclList).getRight(); InterProcessMutex lock = mock(InterProcessMutex.class); when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(lock); SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration); setupSteps.runSetup(); verify(deleteBuilder).forPath(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT+SetupSteps.SETUP_IN_PROGRESS_NODE); }
@Test public void shouldThrowSetupExceptionAndNotDoSetupIfSetupInProgressNodeExists() throws Exception { Set<SetupStep> steps = new LinkedHashSet<>(); SetupStep setupStep1 = mock(SetupStep.class); steps.add(setupStep1); when(configuration. getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); setupServerIdSelectionMocks(); setupSetupInProgressPathMocks(ZooDefs.Ids.OPEN_ACL_UNSAFE, mock(Stat.class)); InterProcessMutex lock = mock(InterProcessMutex.class); when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(lock); SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration); try { setupSteps.runSetup(); } catch (Exception e) { assertTrue(e instanceof SetupException); } verifyZeroInteractions(setupStep1); }
/** * 使用分布式锁执行任务 * * @param path * @param getLockTimeout 获取锁超时时间(单位ms) * @param task * @auth anduo 2015年5月8日 */ public void distributeLock(String path, int getLockTimeout, Runnable task) { InterProcessMutex lock = new InterProcessMutex(client, path); try { LOGGER.debug("尝试获取锁。。。"); if (lock.acquire(getLockTimeout, TimeUnit.MILLISECONDS)) { try { LOGGER.debug("获得锁,开始执行任务。。。"); task.run(); } finally { lock.release(); LOGGER.debug("释放锁,path:" + path); } } else { LOGGER.info("任务执行失败,在时间:" + getLockTimeout + "ms内,未获得分布式锁!"); } } catch (Exception e) { LOGGER.error("执行分布式锁任务异常。", e); } }
public void supervene() { final CountDownLatch countDownLatch = new CountDownLatch(1); final InterProcessLock interProcessLock = new InterProcessMutex(curatorFramework, "/lock"); int count = 10; while (count > 0) { new Thread() { @Override public void run() { try { countDownLatch.await(); interProcessLock.acquire(); String now = simpleDateFormat.format(new Date()); LOG.info("Now time: ".concat(now)); interProcessLock.release(); } catch (Exception e) { throw new RuntimeException(e); } } }.start(); count--; } countDownLatch.countDown(); }
@Test public void testContention() throws Exception { try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) { client.start(); InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two"); InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two"); CountDownLatch latch = new CountDownLatch(1); AsyncWrappers.lockAsync(lock1).thenAccept(__ -> { latch.countDown(); // don't release the lock }); Assert.assertTrue(timing.awaitLatch(latch)); CountDownLatch latch2 = new CountDownLatch(1); AsyncWrappers.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).exceptionally(e -> { if ( e instanceof AsyncWrappers.TimeoutException ) { latch2.countDown(); // lock should still be held } return null; }); Assert.assertTrue(timing.awaitLatch(latch2)); } }
/** * @param client the client * @param leaderPath the path for this leadership group * @param executorService thread pool to use * @param listener listener */ public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener) { Preconditions.checkNotNull(client, "client cannot be null"); PathUtils.validatePath(leaderPath); Preconditions.checkNotNull(listener, "listener cannot be null"); this.client = client; this.listener = new WrappedListener(this, listener); hasLeadership = false; this.executorService = executorService; mutex = new InterProcessMutex(client, leaderPath) { @Override protected byte[] getLockNodeBytes() { return (id.length() > 0) ? getIdBytes(id) : null; } }; }
private CuratorFrameworkWithLock createCuratorFramework(String key) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(key) .retryPolicy(retryPolicy) .connectionTimeoutMs(1000) .sessionTimeoutMs(15000) .build(); curatorFramework.start(); // create a lock on this client InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockDir); CuratorFrameworkWithLock result = new CuratorFrameworkWithLock(curatorFramework, lock); return result; }
public ZkRangeStore(String clientName, CuratorFramework client, String lockPath, String sequencePath, long time, TimeUnit unit, long initialValue, int rangeSize) { this.clientName = clientName; this.client = client; this.initialValue = initialValue; this.rangeSize = rangeSize; lockPath = ZKPaths.makePath(lockPath, clientName); this.sequencePath = ZKPaths.makePath(sequencePath, clientName); this.time = time; this.unit = unit; this.lock = new InterProcessMutex(client, lockPath); }
private SafeLock safeLock(String executionPath) { InterProcessMutex mtx = mutexMap.computeIfAbsent( executionPath, k -> new InterProcessMutex(curator, absolutePath(LOCK_PATH, executionPath))); try { mtx.acquire(); } catch (Exception e) { throw new ReplicationException(e); } return mtx::release; }
public ZookeeperSharedInterProcessLock(CuratorFramework client, String path) { this.client = client; this.path = path; mutex = new InterProcessMutex(client, path); mutex.makeRevocable(new RevocationListener<InterProcessMutex>() { @Override public void revocationRequested(InterProcessMutex forLock) { try { forLock.release(); } catch (Exception e) { log.error("Error while trying to revoke lock: ", e); } } }); }
private static void initZKIfNot(CuratorFramework zkConn) throws Exception { String confInited = KVPathUtil.getConfInitedPath(); //init conf if not if (zkConn.checkExists().forPath(confInited) == null) { InterProcessMutex confLock = new InterProcessMutex(zkConn, KVPathUtil.getConfInitLockPath()); //someone acquired the lock if (!confLock.acquire(100, TimeUnit.MILLISECONDS)) { //loop wait for initialized while (true) { if (!confLock.acquire(100, TimeUnit.MILLISECONDS)) { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1000)); } else { try { if (zkConn.checkExists().forPath(confInited) == null) { XmltoZkMain.initFileToZK(); } break; } finally { confLock.release(); } } } } else { try { XmltoZkMain.initFileToZK(); } finally { confLock.release(); } } } }
@Override public void runWithLock(Runnable runnable, Duration acquireTimeout) { InterProcessMutex mutex = acquire(acquireTimeout); try { runnable.run(); } finally { release(mutex); } }
private InterProcessMutex acquire(Duration acquireTimeout) { InterProcessMutex mutex = new InterProcessMutex(_curatorFramework, _path); try { if (!mutex.acquire(acquireTimeout.getMillis(), TimeUnit.MILLISECONDS)) { throw new TimeoutException(); } } catch (Exception e) { throw Throwables.propagate(e); } return mutex; }
private void release(InterProcessMutex mutex) { try { mutex.release(); } catch (Exception e) { throw Throwables.propagate(e); } }
@Override public AtomicLong call() throws Exception { while (true) { InterProcessMutex lock = new InterProcessMutex(client, "/lock/" + name); try { if (null != client.checkExists().forPath("/seq/" + name)) {// 已经存在 lock.acquire(); return getCurrentMaxIndex(); } else {// 不存在 lock.acquire(); if (null == client.checkExists().forPath("/seq/" + name)) { String data = String.valueOf(incrStep); client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath("/seq/" + name, data.getBytes(Charset.forName("UTF-8"))); boundaryMaxValue = incrStep; return new AtomicLong(0); } else { return getCurrentMaxIndex(); } } } catch(Exception e){ if(retry-- == 0){ throw e; } }finally { lock.release(); } } }
private static boolean acquire(InterProcessMutex lock) { try { return lock.acquire(2, TimeUnit.SECONDS); } catch (Exception e) { throw new RuntimeException("Exception acquiring Zookeeper lock", e); } }
private void releaseLock(InterProcessMutex lock) { try { lock.release(); LOG.info("Released lock after running setup."); } catch (Exception e) { LOG.error("Error releasing acquired lock.", e); } }
@Test public void shouldRunSetupStepsUnderLock() throws Exception { Set<SetupStep> steps = new LinkedHashSet<>(); SetupStep setupStep1 = mock(SetupStep.class); SetupStep setupStep2 = mock(SetupStep.class); steps.add(setupStep1); steps.add(setupStep2); when(configuration. getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); setupServerIdSelectionMocks(); setupSetupInProgressPathMocks(ZooDefs.Ids.OPEN_ACL_UNSAFE); InterProcessMutex lock = mock(InterProcessMutex.class); when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(lock); InOrder inOrder = inOrder(lock, setupStep1, setupStep2); SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration); setupSteps.runSetup(); inOrder.verify(lock).acquire(); inOrder.verify(setupStep1).run(); inOrder.verify(setupStep2).run(); inOrder.verify(lock).release(); }
@Test public void shouldReleaseLockOnException() throws Exception { Set<SetupStep> steps = new LinkedHashSet<>(); SetupStep setupStep1 = mock(SetupStep.class); steps.add(setupStep1); when(configuration. getString(HAConfiguration.ATLAS_SERVER_HA_ZK_ROOT_KEY, HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT); setupServerIdSelectionMocks(); setupSetupInProgressPathMocks(ZooDefs.Ids.OPEN_ACL_UNSAFE); doThrow(new RuntimeException("Simulating setup failure.")).when(setupStep1).run(); InterProcessMutex lock = mock(InterProcessMutex.class); when(curatorFactory.lockInstance(HAConfiguration.ATLAS_SERVER_ZK_ROOT_DEFAULT)). thenReturn(lock); InOrder inOrder = inOrder(lock, setupStep1); SetupSteps setupSteps = new SetupSteps(steps, curatorFactory, configuration); try { setupSteps.runSetup(); } catch (Exception e) { assertTrue(e instanceof SetupException); } inOrder.verify(lock).acquire(); inOrder.verify(setupStep1).run(); inOrder.verify(lock).release(); }
@Test public void testBasic() { try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) ) { client.start(); InterProcessMutex lock = new InterProcessMutex(client, "/one/two"); complete(AsyncWrappers.lockAsync(lock), (__, e) -> { Assert.assertNull(e); AsyncWrappers.release(lock); }); } }
public static void main(String[] args) throws Exception { FakeLimitedResource resource = new FakeLimitedResource(); try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); InterProcessLock lock1 = new InterProcessMutex(client, PATH1); InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2); InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2)); if (!lock.acquire(10, TimeUnit.SECONDS)) { throw new IllegalStateException("could not acquire the lock"); } System.out.println("has the lock"); System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess()); System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess()); try { resource.use(); //access resource exclusively } finally { System.out.println("releasing the lock"); lock.release(); // always release the lock in a finally block } System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess()); System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess()); } }
@Test public void testSetShardAddition() throws Exception { Assert.assertArrayEquals(manageShards.toArray(), shardStateManager.getManagedShards().toArray()); manageShards.add(2); context.addShard(2); Assert.assertArrayEquals(manageShards.toArray(), shardStateManager.getManagedShards().toArray()); final ZKShardLockManager lockManager = (ZKShardLockManager) Whitebox.getInternalState(context, "lockManager"); Map<Integer, InterProcessMutex> lockObjects = (Map<Integer, InterProcessMutex>) Whitebox.getInternalState (lockManager, "locks"); Assert.assertTrue(lockObjects.get(2) != null); // assert that we have a lock object for shard "2" }
@Test public void testSetShardDeletion() { Assert.assertArrayEquals(manageShards.toArray(), shardStateManager.getManagedShards().toArray()); manageShards.remove(1); context.removeShard(1); Assert.assertArrayEquals(manageShards.toArray(), shardStateManager.getManagedShards().toArray()); final ZKShardLockManager lockManager = (ZKShardLockManager) Whitebox.getInternalState(context, "lockManager"); Map<Integer, InterProcessMutex> lockObjects = (Map<Integer, InterProcessMutex>) Whitebox.getInternalState (lockManager, "locks"); Assert.assertTrue(lockObjects.get(1) == null); // assert that we don't have a lock object for shard "1" }
public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; lock = new InterProcessMutex(client, lockPath); }
private MycatServer() { //读取文件配置 this.config = new MycatConfig(); //定时线程池,单线程线程池 scheduler = Executors.newSingleThreadScheduledExecutor(); //SQL记录器 this.sqlRecorder = new SQLRecorder(config.getSystem().getSqlRecordCount()); /** * 是否在线,MyCat manager中有命令控制 * | offline | Change MyCat status to OFF | * | online | Change MyCat status to ON | */ this.isOnline = new AtomicBoolean(true); //缓存服务初始化 cacheService = new CacheService(); //路由计算初始化 routerService = new RouteService(cacheService); // load datanode active index from properties dnIndexProperties = loadDnIndexProps(); try { //SQL解析器 sqlInterceptor = (SQLInterceptor) Class.forName( config.getSystem().getSqlInterceptor()).newInstance(); } catch (Exception e) { throw new RuntimeException(e); } //catlet加载器 catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath() + File.separator + "catlet", config.getSystem().getCatletClassCheckSeconds()); //记录启动时间 this.startupTime = TimeUtil.currentTimeMillis(); if(isUseZkSwitch()) { String path= ZKUtils.getZKBasePath()+"lock/dnindex.lock"; dnindexLock = new InterProcessMutex(ZKUtils.getConnection(), path); } }
public void init(CuratorFramework client, String lockId) { this.client = client; this.path = ROOT_PATH + lockId; interProcessMutex = new InterProcessMutex(client, this.path); }
public DistributedLock(String path) { interProcessLock = new InterProcessMutex(client, path); }
protected void doStart() throws Exception { int i = startingId; while (this.id == -1) { String lockPath = ZKPaths.makePath(locksBasePath, String.valueOf(i)); String memberPath = ZKPaths.makePath(membersBasePath, String.valueOf(i)); log.trace("Acquiring mutex for member {} via lock path {}", i, lockPath); InterProcessMutex mutex = new InterProcessMutex(this.client, lockPath); mutex.acquire(); log.debug("Acquired mutex for member {} via lock path {}", i, lockPath); try { Stat stat = client.checkExists().creatingParentContainersIfNeeded().forPath(memberPath); if (stat == null) { log.debug("Claiming container id {} via member path {}", i, memberPath); try { //no peer has this node yet, grab it: pen = new PersistentNode(client, CreateMode.EPHEMERAL, false, memberPath, payload); pen.start(); pen.waitForInitialCreate(30000, TimeUnit.SECONDS); this.id = i; log.info("Claimed container id {} via member path {}", i, memberPath); return; } catch (InterruptedException e) { CloseableUtils.closeQuietly(pen); ThreadUtils.checkInterrupted(e); Throwables.propagate(e); } } } finally { mutex.release(); log.debug("Released mutex for member {} via lock path {}", i, lockPath); } i++; } }
public void adoptRemotePartition(Partition partitionToTakeIn) throws Exception { String adZNode = ADOPTION_ADS_ROOT_ZNODE + "/" + partitionToTakeIn.getPartitionFullNameWithBrokerId(); logger.debug("Will try to adopt Partition {}", adZNode); //try to lock the Ad first and then initiate the adoption process String adZNodeLockPath = ADOPTION_ADS_LOCK_ROOT_ZNODE + "/" + partitionToTakeIn.getPartitionFullNameWithBrokerId(); InterProcessMutex lock = new InterProcessMutex(client, adZNodeLockPath); if ( lock.acquire(30000, TimeUnit.MILLISECONDS) ) { logger.debug("Successfully acquired lock on Ad: {}", adZNodeLockPath); try { //check again if the Ad is still valid if(new String(client.getData().forPath(adZNode)).equals(AD_POSTING_TEXT)) { boolean reassignmentSucceeded = reassignPartitionToLocalBroker( partitionToTakeIn.brokerId, partitionToTakeIn.topicName, partitionToTakeIn.partitionId, Utils.getLocalBrokerId(), adZNode); logger.debug("Reassignment succeeded: {}", reassignmentSucceeded); } else { logger.debug("Ad {} seems stale. Skipping adoption process", adZNode); } } finally { lock.release(); while(true) { try { client.delete().forPath(adZNodeLockPath); break; } catch(KeeperException.NoNodeException noNodeEx) { //ignore and break break; } catch (Exception ex) { logger.debug("Trying to delete {} after releasing lock but got exception {}. Will retry", adZNodeLockPath, ex); } } } } else { logger.debug("Failed to acquire lock on Ad: {}", adZNodeLockPath); } }
private boolean reassignPartitionToLocalBroker(int remoteBrokerId, String topicName, int partitionId, int localBrokerId, String adZNode) throws Exception { boolean succeeded = false; String reassignmentConfigFileName = "partitions-to-move.json." + System.currentTimeMillis(); String reassignmentProcessLockPath = ADOPTION_ADS_LOCK_ROOT_ZNODE; InterProcessMutex lock = new InterProcessMutex(client, reassignmentProcessLockPath); while(!succeeded) { if (lock.acquire(30000, TimeUnit.MILLISECONDS) ) { logger.debug("Locking {} succeeded", reassignmentProcessLockPath); try { String currentReplicas = Utils.getReplicasOfTopicPartition(client, topicName, partitionId); String desiredReplicas = currentReplicas.replace(""+remoteBrokerId, ""+localBrokerId); String reassignmentJson = String.format("{\"partitions\":[{\"topic\":\"%s\",\"partition\":%d,\"replicas\":[%s]}],\"version\":1}", topicName, partitionId, desiredReplicas); //do kafka reassignment PrintWriter out = new PrintWriter(reassignmentConfigFileName); out.println( reassignmentJson ); out.close(); logger.debug("Reassignment will be kicked for {}", reassignmentJson); String[] reassignCmdArgs = { "--reassignment-json-file=" + reassignmentConfigFileName, "--zookeeper=" + client.getZookeeperClient().getCurrentConnectionString(), "--execute" }; ReassignPartitionsCommand.main(reassignCmdArgs); //Hacky: Restart kafka controller. Controller seems buggy sometimes Utils.restartKafkaController(client); //Hacky: Sleep for 5 mins to let the reassignment process complete logger.debug("Reassignment command has been initiated. Will sleep for {} ms", 10 * 60000); Thread.sleep(10 * 60000); Files.deleteIfExists(Paths.get(reassignmentConfigFileName)); logger.debug("Setting data for Ad {} as {}", adZNode, AD_ADOPTION_COMPLETE_TEXT + "-" + localBrokerId); //mark the ad as done client.setData().forPath(adZNode, (AD_ADOPTION_COMPLETE_TEXT + "-" + localBrokerId).getBytes()); succeeded = true; } finally { lock.release(); } } else { logger.debug("Locking {} failed. Will probably retry", reassignmentProcessLockPath); //check if ad is still valid, otherwise break retry loop if(!new String(client.getData().forPath(adZNode)).equals(AD_POSTING_TEXT)) { logger.debug("Ad {} has expired. Quit trying to reassign", adZNode); break; } else { logger.debug("Ad {} is still valid. Will retry", adZNode); } } } return succeeded; }
/** * Dequeues the next job from the job queue and runs it. * @return True if a job was dequeued and executed, false if the queue was empty. */ @VisibleForTesting boolean runNextJob() { try { Queue<Message> messages = _messageSupplier.get(); Message message; while ((message = messages.poll()) != null) { String jobIdString = (String) message.getPayload(); // If this job has recently reported that it cannot run on this server then skip it. DateTime now = new DateTime(); DateTime delayUntilTime = _recentNotOwnerDelays.get(jobIdString, EPOCH); if (now.isBefore(delayUntilTime)) { _log.debug("Waiting {} for next attempt to run job locally: {}", PeriodFormat.getDefault().print(new Interval(now, delayUntilTime).toPeriod()), jobIdString); continue; } InterProcessMutex mutex = getMutex(jobIdString); if (!acquireMutex(mutex)) { _log.debug("Failed to get mutex for job {}", jobIdString); continue; } try { String jobTypeName = getJobTypeNameFromId(jobIdString); RegistryEntry<?, ?> entry = _jobHandlerRegistry.getRegistryEntry(jobTypeName); _log.info("Executing job {}... ", jobIdString); boolean ranLocally = run(jobIdString, entry); if (ranLocally) { acknowledgeQueueMessage(message.getId()); _log.info("Executing job {}... DONE", jobIdString); } else { // The job self-reported it could not be run locally. Cache that knowledge and wait before // attempting this job again. _recentNotOwnerDelays.put(jobIdString, new DateTime().plus(_notOwnerRetryDelay)); _recentNotOwnerDelays.cleanUp(); _log.info("Executing job {}... not local", jobIdString); } } finally { mutex.release(); } return true; } _log.debug("Job queue was empty or contained only non-local jobs"); } catch (Throwable t) { _log.warn("runNextJob failed unexpectedly", t); } return false; }