@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (!serverNode.isLocalJobPausedPath(path)) { return; } JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); if (null == jobScheduleController) { return; } if (Type.NODE_ADDED == event.getType()) { jobScheduleController.pauseJob(); } if (Type.NODE_REMOVED == event.getType()) { jobScheduleController.resumeJob(); serverService.clearJobPausedStatus(); } }
@Test public void testStartup() throws Exception { client.create().forPath("/test"); client.create().forPath("/test/1", "one".getBytes()); client.create().forPath("/test/2", "two".getBytes()); client.create().forPath("/test/3", "three".getBytes()); client.create().forPath("/test/2/sub", "two-sub".getBytes()); cache = newTreeCacheWithListeners(client, "/test"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/1", "one".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2", "two".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/3", "three".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2/sub", "two-sub".getBytes()); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("1", "2", "3")); Assert.assertEquals(cache.getCurrentChildren("/test/1").keySet(), ImmutableSet.of()); Assert.assertEquals(cache.getCurrentChildren("/test/2").keySet(), ImmutableSet.of("sub")); Assert.assertNull(cache.getCurrentChildren("/test/non_exist")); }
@Test public void testCreateParents() throws Exception { cache = newTreeCacheWithListeners(client, "/one/two/three"); cache.start(); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); Assert.assertNull(client.checkExists().forPath("/one/two/three")); cache.close(); cache = buildWithListeners(TreeCache.newBuilder(client, "/one/two/three").setCreateParentNodes(true)); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/one/two/three"); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); Assert.assertNotNull(client.checkExists().forPath("/one/two/three")); }
@Test public void testDepth0() throws Exception { client.create().forPath("/test"); client.create().forPath("/test/1", "one".getBytes()); client.create().forPath("/test/2", "two".getBytes()); client.create().forPath("/test/3", "three".getBytes()); client.create().forPath("/test/2/sub", "two-sub".getBytes()); cache = buildWithListeners(TreeCache.newBuilder(client, "/test").setMaxDepth(0)); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of()); Assert.assertNull(cache.getCurrentData("/test/1")); Assert.assertNull(cache.getCurrentChildren("/test/1")); Assert.assertNull(cache.getCurrentData("/test/non_exist")); }
@Test public void testDepth1() throws Exception { client.create().forPath("/test"); client.create().forPath("/test/1", "one".getBytes()); client.create().forPath("/test/2", "two".getBytes()); client.create().forPath("/test/3", "three".getBytes()); client.create().forPath("/test/2/sub", "two-sub".getBytes()); cache = buildWithListeners(TreeCache.newBuilder(client, "/test").setMaxDepth(1)); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/1", "one".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2", "two".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/3", "three".getBytes()); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("1", "2", "3")); Assert.assertEquals(cache.getCurrentChildren("/test/1").keySet(), ImmutableSet.of()); Assert.assertEquals(cache.getCurrentChildren("/test/2").keySet(), ImmutableSet.of()); Assert.assertNull(cache.getCurrentData("/test/2/sub")); Assert.assertNull(cache.getCurrentChildren("/test/2/sub")); Assert.assertNull(cache.getCurrentChildren("/test/non_exist")); }
@Test public void testDepth1Deeper() throws Exception { client.create().forPath("/test"); client.create().forPath("/test/foo"); client.create().forPath("/test/foo/bar"); client.create().forPath("/test/foo/bar/1", "one".getBytes()); client.create().forPath("/test/foo/bar/2", "two".getBytes()); client.create().forPath("/test/foo/bar/3", "three".getBytes()); client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes()); cache = buildWithListeners(TreeCache.newBuilder(client, "/test/foo/bar").setMaxDepth(1)); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo/bar"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo/bar/1", "one".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo/bar/2", "two".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo/bar/3", "three".getBytes()); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); }
@Test public void testFromRoot() throws Exception { client.create().forPath("/test"); client.create().forPath("/test/one", "hey there".getBytes()); cache = newTreeCacheWithListeners(client, "/"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); Assert.assertTrue(cache.getCurrentChildren("/").keySet().contains("test")); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one")); Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of()); Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); }
@Test public void testFromRootWithDepth() throws Exception { client.create().forPath("/test"); client.create().forPath("/test/one", "hey there".getBytes()); cache = buildWithListeners(TreeCache.newBuilder(client, "/").setMaxDepth(1)); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); Assert.assertTrue(cache.getCurrentChildren("/").keySet().contains("test")); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of()); Assert.assertNull(cache.getCurrentData("/test/one")); Assert.assertNull(cache.getCurrentChildren("/test/one")); }
@Test public void testWithNamespace() throws Exception { client.create().forPath("/outer"); client.create().forPath("/outer/foo"); client.create().forPath("/outer/test"); client.create().forPath("/outer/test/one", "hey there".getBytes()); cache = newTreeCacheWithListeners(client.usingNamespace("outer"), "/test"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one")); Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of()); Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); }
@Test public void testWithNamespaceAtRoot() throws Exception { client.create().forPath("/outer"); client.create().forPath("/outer/foo"); client.create().forPath("/outer/test"); client.create().forPath("/outer/test/one", "hey there".getBytes()); cache = newTreeCacheWithListeners(client.usingNamespace("outer"), "/"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/foo"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); Assert.assertEquals(cache.getCurrentChildren("/").keySet(), ImmutableSet.of("foo", "test")); Assert.assertEquals(cache.getCurrentChildren("/foo").keySet(), ImmutableSet.of()); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one")); Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of()); Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); }
@Test public void testChildrenInitialized() throws Exception { client.create().forPath("/test", "".getBytes()); client.create().forPath("/test/1", "1".getBytes()); client.create().forPath("/test/2", "2".getBytes()); client.create().forPath("/test/3", "3".getBytes()); cache = newTreeCacheWithListeners(client, "/test"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/1"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/2"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/3"); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); }
@Test public void testUpdateWhenNotCachingData() throws Exception { client.create().forPath("/test"); cache = buildWithListeners(TreeCache.newBuilder(client, "/test").setCacheData(false)); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.INITIALIZED); client.create().forPath("/test/foo", "first".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); client.setData().forPath("/test/foo", "something new".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/foo"); assertNoMoreEvents(); Assert.assertNotNull(cache.getCurrentData("/test/foo")); // No byte data querying the tree because we're not caching data. Assert.assertNull(cache.getCurrentData("/test/foo").getData()); }
@Test public void testDeleteThenCreate() throws Exception { client.create().forPath("/test"); client.create().forPath("/test/foo", "one".getBytes()); cache = newTreeCacheWithListeners(client, "/test"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); assertEvent(TreeCacheEvent.Type.INITIALIZED); client.delete().forPath("/test/foo"); assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/foo", "one".getBytes()); client.create().forPath("/test/foo", "two".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); client.delete().forPath("/test/foo"); assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/foo", "two".getBytes()); client.create().forPath("/test/foo", "two".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); assertNoMoreEvents(); }
@Test public void testDeleteThenCreateRoot() throws Exception { client.create().forPath("/test"); client.create().forPath("/test/foo", "one".getBytes()); cache = newTreeCacheWithListeners(client, "/test/foo"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); assertEvent(TreeCacheEvent.Type.INITIALIZED); client.delete().forPath("/test/foo"); assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/foo"); client.create().forPath("/test/foo", "two".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); client.delete().forPath("/test/foo"); assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/foo"); client.create().forPath("/test/foo", "two".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); assertNoMoreEvents(); }
@Test public void testKilledSession() throws Exception { client.create().forPath("/test"); cache = newTreeCacheWithListeners(client, "/test"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.INITIALIZED); client.create().forPath("/test/foo", "foo".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo"); client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/me"); KillSession2.kill(client.getZookeeperClient().getZooKeeper()); assertEvent(TreeCacheEvent.Type.CONNECTION_LOST); assertEvent(TreeCacheEvent.Type.CONNECTION_RECONNECTED); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/me", "data".getBytes()); assertNoMoreEvents(); }
@Test public void testDeleteNodeAfterCloseDoesntCallExecutor() throws Exception { client.create().forPath("/test"); cache = newTreeCacheWithListeners(client, "/test"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.INITIALIZED); client.create().forPath("/test/one", "hey there".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); cache.close(); assertNoMoreEvents(); client.delete().forPath("/test/one"); assertNoMoreEvents(); }
/** * Make sure TreeCache gets to a sane state when we can't initially connect to server. */ @Test public void testServerNotStartedYet() throws Exception { // Stop the existing server. server.stop(); // Shutdown the existing client and re-create it started. client.close(); initCuratorFramework(); // Start the client disconnected. cache = newTreeCacheWithListeners(client, "/test"); cache.start(); assertNoMoreEvents(); // Now restart the server. server.restart(); assertEvent(TreeCacheEvent.Type.INITIALIZED); client.create().forPath("/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertNoMoreEvents(); }
@Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData eventData = event.getData(); if (event.getType() == Type.CONNECTION_RECONNECTED) { LOGGER.traceMarker("ZK-Event", "========= Reconnect ========="); registerZkNodeAndWatch(); return; } if (event.getType() == Type.CONNECTION_LOST) { LOGGER.traceMarker("ZK-Event", "========= Lost ========="); return; } if (event.getType() == Type.INITIALIZED) { LOGGER.traceMarker("ZK-Event", "========= Initialized ========="); return; } if (event.getType() == Type.NODE_ADDED) { LOGGER.traceMarker("ZK-Event", "========= Node Added: %s =========", eventData.getPath()); return; } if (event.getType() == Type.NODE_UPDATED) { LOGGER.traceMarker("ZK-Event", "========= Node Updated: %s =========", eventData.getPath()); onDataChanged(eventData.getPath()); return; } if (event.getType() == Type.NODE_REMOVED) { LOGGER.traceMarker("ZK-Event", "========= Node Removed: %s =========", eventData.getPath()); close(); return; } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (Type.NODE_REMOVED == event.getType() && guaranteeNode.isStartedRootNode(path)) { for (ElasticJobListener each : elasticJobListeners) { if (each instanceof AbstractDistributeOnceElasticJobListener) { ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart(); } } } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (Type.NODE_REMOVED == event.getType() && guaranteeNode.isCompletedRootNode(path)) { for (ElasticJobListener each : elasticJobListeners) { if (each instanceof AbstractDistributeOnceElasticJobListener) { ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete(); } } } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (configNode.isMonitorExecutionPath(path) && Type.NODE_UPDATED == event.getType()) { if (!Boolean.valueOf(new String(event.getData().getData()))) { executionService.removeExecutionInfo(); } } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (Type.NODE_ADDED != event.getType() || !serverNode.isLocalJobTriggerPath(path)) { return; } serverService.clearJobTriggerStatus(); JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); if (null == jobScheduleController) { return; } if (serverService.isLocalhostServerReady()) { jobScheduleController.triggerJob(); } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (!serverNode.isLocalJobShutdownPath(path)) { return; } JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); if (null != jobScheduleController && Type.NODE_ADDED == event.getType()) { jobScheduleController.shutdown(); serverService.processServerShutdown(); } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (configNode.isFailoverPath(path) && Type.NODE_UPDATED == event.getType()) { if (!Boolean.valueOf(new String(event.getData().getData()))) { failoverService.removeFailoverInfo(); } } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (configNode.isCronPath(path) && Type.NODE_UPDATED == event.getType()) { String cronExpression = new String(event.getData().getData()); JobScheduleController jobScheduler = JobRegistry.getInstance().getJobScheduleController(jobName); if (null != jobScheduler) { jobScheduler.rescheduleJob(cronExpression); } } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { // 全局分片策略监听 if (GlobalNodePath.isShardingStrategyPath(path)) { if (Type.NODE_UPDATED == event.getType()) { JobShardingStrategyCache.reLoadGlobalStrategy(globalConfigurationService, getStrategyPath(path)); } if (Type.NODE_REMOVED == event.getType()) { JobShardingStrategyCache.removeStrategy(getStrategyPath(path)); } } }
@Test public void testStartEmpty() throws Exception { cache = newTreeCacheWithListeners(client, "/test"); cache.start(); assertEvent(TreeCacheEvent.Type.INITIALIZED); client.create().forPath("/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertNoMoreEvents(); }
@Test public void testStartEmptyDeeper() throws Exception { cache = newTreeCacheWithListeners(client, "/test/foo/bar"); cache.start(); assertEvent(TreeCacheEvent.Type.INITIALIZED); client.create().creatingParentsIfNeeded().forPath("/test/foo"); assertNoMoreEvents(); client.create().forPath("/test/foo/bar"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/foo/bar"); assertNoMoreEvents(); }
@Test public void testAsyncInitialPopulation() throws Exception { client.create().forPath("/test"); client.create().forPath("/test/one", "hey there".getBytes()); cache = newTreeCacheWithListeners(client, "/test"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); assertEvent(TreeCacheEvent.Type.INITIALIZED); assertNoMoreEvents(); }
@Test public void testSyncInitialPopulation() throws Exception { cache = newTreeCacheWithListeners(client, "/test"); cache.start(); assertEvent(TreeCacheEvent.Type.INITIALIZED); client.create().forPath("/test"); client.create().forPath("/test/one", "hey there".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); assertNoMoreEvents(); }
@Test public void testBasics() throws Exception { client.create().forPath("/test"); cache = newTreeCacheWithListeners(client, "/test"); cache.start(); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test"); assertEvent(TreeCacheEvent.Type.INITIALIZED); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of()); Assert.assertNull(cache.getCurrentChildren("/t")); Assert.assertNull(cache.getCurrentChildren("/testing")); client.create().forPath("/test/one", "hey there".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one"); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one")); Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there"); Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of()); Assert.assertNull(cache.getCurrentChildren("/test/o")); Assert.assertNull(cache.getCurrentChildren("/test/onely")); client.setData().forPath("/test/one", "sup!".getBytes()); assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one"); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one")); Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!"); client.delete().forPath("/test/one"); assertEvent(TreeCacheEvent.Type.NODE_REMOVED, "/test/one", "sup!".getBytes()); Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of()); assertNoMoreEvents(); }
private boolean isJobConfigNode(final TreeCacheEvent event, final String path, final Type type) { return type == event.getType() && path.startsWith(CloudJobConfigurationNode.ROOT) && path.length() > CloudJobConfigurationNode.ROOT.length(); }
private boolean isLeaderCrashed() { return electionNode.isLeaderHostPath(path) && Type.NODE_REMOVED == event.getType(); }