@Override public void onChanged(TreeCacheEvent event) { ChildData data = event.getData(); if (data == null) { return; } String path = data.getPath(); switch (event.getType()) { case NODE_ADDED: case NODE_REMOVED: case NODE_UPDATED: { boolean isFullHostPath = StringUtils.isNotBlank(path) && path.split(RedirectorConstants.DELIMETER).length == 7; if (isFullHostPath && path.contains(RedirectorConstants.DELIMETER + applicationName + RedirectorConstants.DELIMETER)) { snapshotNeeded = true; } break; } } }
@Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { try { lock.lock(); if (!stacksCacheAvailable) { log.info("Stack cache is NOT initialized, event intercepted: " + event.getType().name()); } if (event.getType() == TreeCacheEvent.Type.INITIALIZED) { stacksCacheAvailable = true; stacksCacheAvailability.signalAll(); log.info("stacks cache is initialized"); } else if (listener.isPresent() && stacksCacheAvailable && event.getType() != TreeCacheEvent.Type.CONNECTION_LOST && event.getType() != TreeCacheEvent.Type.CONNECTION_SUSPENDED) { listener.get().onChanged(event); } } finally { lock.unlock(); } }
@Override public void childEvent(CuratorFramework curator, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if (data == null) return; String dataPath = data.getPath(); if (Strings.isNullOrEmpty(dataPath)) return; if (dataPath.startsWith(watchPath)) { switch (event.getType()) { case NODE_ADDED: listener.onServiceAdded(dataPath, Jsons.fromJson(data.getData(), CommonServiceNode.class)); break; case NODE_REMOVED: listener.onServiceRemoved(dataPath, Jsons.fromJson(data.getData(), CommonServiceNode.class)); break; case NODE_UPDATED: listener.onServiceUpdated(dataPath, Jsons.fromJson(data.getData(), CommonServiceNode.class)); break; } Logs.RSD.info("ZK node data change={}, nodePath={}, watchPath={}, ns={}"); } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (!serverNode.isLocalJobPausedPath(path)) { return; } JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); if (null == jobScheduleController) { return; } if (Type.NODE_ADDED == event.getType()) { jobScheduleController.pauseJob(); } if (Type.NODE_REMOVED == event.getType()) { jobScheduleController.resumeJob(); serverService.clearJobPausedStatus(); } }
private void treeCacheChangeHandler(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) { switch (treeCacheEvent.getType()) { case INITIALIZED: { //cache is ready so release a permit allowing the application to continue initialising initialisedSemaphore.release(); break; } case NODE_ADDED: { if (!treeCacheEvent.getData().getPath().equals(pathToCache)) { LOGGER.info("Curator TreeCache property added: " + childDataToLogString(treeCacheEvent.getData())); } else { LOGGER.info("Curator TreeCache path added: " + treeCacheEvent.getData().getPath()); } break; } case NODE_REMOVED: { LOGGER.info("Curator TreeCache property removed: " + childDataToLogString(treeCacheEvent.getData())); break; } case NODE_UPDATED: { LOGGER.info("Curator TreeCache property updated: " + childDataToLogString(treeCacheEvent.getData())); break; } case CONNECTION_LOST: { LOGGER.debug("Connection to Zookeeper lost"); break; } case CONNECTION_RECONNECTED: { LOGGER.debug("Connection to Zookeeper re-established"); break; } case CONNECTION_SUSPENDED: { LOGGER.debug("Connection to Zookeeper suspended"); break; } } }
private static void checkTreeCache(CuratorFramework curatorFramework, String path) throws Exception { final Semaphore semaphore = new Semaphore(0); TreeCache treeCache = TreeCache.newBuilder(curatorFramework, path) .setCacheData(true) .setMaxDepth(3) .build(); if (treeCache == null) { LOGGER.error("treeCache is null"); } treeCache.getListenable().addListener((client, event) -> { if (event.getType().equals(TreeCacheEvent.Type.INITIALIZED)) { semaphore.release(); } }); treeCache.start(); semaphore.tryAcquire(2, TimeUnit.SECONDS); Map<String, ChildData> map = treeCache.getCurrentChildren("/propertyService"); if (map == null) { LOGGER.error("map is null"); } map.entrySet().forEach(entry -> { LOGGER.info("{} - {}", entry.getKey(), Bytes.toString(entry.getValue().getData())); }); }
private CloudJobConfiguration getJobConfig(final TreeCacheEvent event) { try { return CloudJobConfigurationGsonFactory.fromJson(new String(event.getData().getData())); // CHECKSTYLE:OFF } catch (final Exception ex) { log.warn("Wrong Cloud Job Configuration with:", ex.getMessage()); // CHECKSTYLE:ON return null; } }
@Test public void assertChildEventWhenDataIsNull() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null)); verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any()); verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any()); }
@Test public void assertChildEventWhenIsNotConfigPath() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/other/test_job", null, "".getBytes()))); verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any()); verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any()); }
@Test public void assertChildEventWhenIsRootConfigPath() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/config/job", null, "".getBytes()))); verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any()); verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any()); }
@Test public void assertChildEventWhenStateIsAddAndIsConfigPathAndInvalidData() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData("/config/job/test_job", null, "".getBytes()))); verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any()); verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any()); }
@Test public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndDaemonJob() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(CloudJobExecutionType.DAEMON).getBytes()))); verify(readyService).remove(Collections.singletonList("test_job")); verify(producerManager).reschedule(ArgumentMatchers.<String>any()); }
@Test public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndMisfireDisabled() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(false).getBytes()))); verify(readyService).setMisfireDisabled("test_job"); verify(producerManager).reschedule(ArgumentMatchers.<String>any()); }
@Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData eventData = event.getData(); if (event.getType() == Type.CONNECTION_RECONNECTED) { LOGGER.traceMarker("ZK-Event", "========= Reconnect ========="); registerZkNodeAndWatch(); return; } if (event.getType() == Type.CONNECTION_LOST) { LOGGER.traceMarker("ZK-Event", "========= Lost ========="); return; } if (event.getType() == Type.INITIALIZED) { LOGGER.traceMarker("ZK-Event", "========= Initialized ========="); return; } if (event.getType() == Type.NODE_ADDED) { LOGGER.traceMarker("ZK-Event", "========= Node Added: %s =========", eventData.getPath()); return; } if (event.getType() == Type.NODE_UPDATED) { LOGGER.traceMarker("ZK-Event", "========= Node Updated: %s =========", eventData.getPath()); onDataChanged(eventData.getPath()); return; } if (event.getType() == Type.NODE_REMOVED) { LOGGER.traceMarker("ZK-Event", "========= Node Removed: %s =========", eventData.getPath()); close(); return; } }
@Override public void onChanged(TreeCacheEvent event) { switch (event.getType()) { case NODE_ADDED: case NODE_REMOVED: case NODE_UPDATED: if (needUpdate(event.getData().getPath())) { onChanged(); log.info("Service discovery changes: type: {} path: {}", event.getType(), event.getData().getPath()); } break; } }
@Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { if (started) { retrieveIndividualServices(Future.future()); } }
@Override public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { String path = null == event.getData() ? "" : event.getData().getPath(); if (path.isEmpty()) { return; } dataChanged(client, event, path); }
@Test public void assertLeaderElectionJobListenerWhenIsLeaderHostPathAndIsRemoveAndIsLeader() { when(leaderElectionService.hasLeader()).thenReturn(true); electionListenerManager.new LeaderElectionJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/leader/election/host", null, "localhost".getBytes())), "/testJob/leader/election/host"); verify(leaderElectionService).hasLeader(); verify(leaderElectionService, times(0)).leaderElection(); }
@Override public void childEvent(CuratorFramework client, TreeCacheEvent event) { if (logger.isDebugEnabled()) { logger.debug("childEvent info={}", event.toString()); } switch (event.getType()) { case NODE_ADDED: logNodeChanged(client, event); doNodeAdded(client, event); break; case NODE_UPDATED: logNodeChanged(client, event); doNodeUpdated(client, event); break; case NODE_REMOVED: logNodeChanged(client, event); doNodeRemoved(client, event); break; case CONNECTION_SUSPENDED: logNodeChanged(client, event); doCntSuspended(client, event); break; case CONNECTION_RECONNECTED: logNodeChanged(client, event); doCntReconnected(client, event); break; case CONNECTION_LOST: logNodeChanged(client, event); doCntLost(client, event); break; case INITIALIZED: logNodeChanged(client, event); doInitialized(client, event); break; } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (configNode.isMonitorExecutionPath(path) && Type.NODE_UPDATED == event.getType()) { if (!Boolean.valueOf(new String(event.getData().getData()))) { executionService.removeExecutionInfo(); } } }
@Test public void assertShardingTotalCountChangedJobListenerWhenIsShardingTotalCountPath() { shardingListenerManager.new ShardingTotalCountChangedJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_ADDED, new ChildData("/testJob/config/shardingTotalCount", null, "3".getBytes())), "/testJob/config/shardingTotalCount"); verify(shardingService).setReshardingFlag(); verify(executionService).setNeedFixExecutionInfoFlag(); }
@Test public void assertLeaderElectionJobListenerWhenJobShutdownAndIsLeader() { when(leaderElectionService.isLeader()).thenReturn(true); when(serverNode.isLocalJobPausedPath("/testJob/server/mockedIP/shutdown")).thenReturn(true); electionListenerManager.new LeaderElectionJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_ADDED, new ChildData("/testJob/server/mockedIP/shutdown", null, "localhost".getBytes())), "/testJob/server/mockedIP/shutdown"); verify(leaderElectionService).removeLeader(); }
@Test public void assertJobPausedStatusJobListenerWhenIsJobPausedPathButJobIsNotExisted() { jobOperationListenerManager.new JobPausedStatusJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_ADDED, new ChildData("/testJob/servers/" + ip + "/paused", null, "".getBytes())), "/testJob/servers/" + ip + "/paused"); verify(jobScheduleController, times(0)).pauseJob(); verify(jobScheduleController, times(0)).resumeJob(); }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (!isStopped(client) && isNotified(client, event, path)) { loadKeyAndPushEvent(event, path); } }
@Test public void assertJobShutdownStatusJobListenerWhenIsJobShutdownPathAndAdd() { JobRegistry.getInstance().addJobScheduleController("testJob", jobScheduleController); jobOperationListenerManager.new JobShutdownStatusJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_ADDED, new ChildData("/testJob/servers/" + ip + "/shutdown", null, "".getBytes())), "/testJob/servers/" + ip + "/shutdown"); verify(jobScheduleController).shutdown(); verify(serverService).processServerShutdown(); }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (Type.NODE_REMOVED == event.getType() && guaranteeNode.isCompletedRootNode(path)) { for (ElasticJobListener each : elasticJobListeners) { if (each instanceof AbstractDistributeOnceElasticJobListener) { ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete(); } } } }
@Test public void assertJobCrashedJobListenerWhenIsRunningItemPathAndRemoveAndItemNotCompletedAndEnableFailoverButHasRunningItems() { when(executionService.isCompleted(0)).thenReturn(false); when(configService.isFailover()).thenReturn(true); when(shardingService.getLocalHostShardingItems()).thenReturn(Arrays.asList(1, 2)); when(executionService.hasRunningItems(Arrays.asList(1, 2))).thenReturn(true); failoverListenerManager.new JobCrashedJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/execution/0/running", null, "".getBytes())), "/testJob/execution/0/running"); verify(executionService).isCompleted(0); verify(configService).isFailover(); verify(failoverService).setCrashedFailoverFlag(0); verify(shardingService).getLocalHostShardingItems(); verify(executionService).hasRunningItems(Arrays.asList(1, 2)); verify(failoverService, times(0)).failoverIfNecessary(); }
@Test public void assertFailoverJobCrashedJobListenerWhenIsRunningItemPathAndRemoveAndItemNotCompletedAndEnableFailoverAndHasNotRunningItems() { when(executionService.isCompleted(0)).thenReturn(false); when(configService.isFailover()).thenReturn(true); when(shardingService.getLocalHostShardingItems()).thenReturn(Arrays.asList(1, 2)); when(executionService.hasRunningItems(Arrays.asList(1, 2))).thenReturn(false); failoverListenerManager.new FailoverJobCrashedJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/execution/0/failover", null, "".getBytes())), "/testJob/execution/0/failover"); verify(executionService).isCompleted(0); verify(configService).isFailover(); verify(failoverService).setCrashedFailoverFlag(0); verify(shardingService).getLocalHostShardingItems(); verify(executionService).hasRunningItems(Arrays.asList(1, 2)); verify(failoverService).failoverIfNecessary(); }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { EventHelper eventHelper = new EventHelper(path, event); if (eventHelper.isLeaderCrashedOrServerOn() && !leaderElectionService.hasLeader() && !serverService.getAvailableServers().isEmpty()) { log.debug("Elastic job: leader crashed, elect a new leader now."); leaderElectionService.leaderElection(); log.debug("Elastic job: leader election completed."); return; } if (eventHelper.isServerOff() && leaderElectionService.isLeader()) { leaderElectionService.removeLeader(); } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (configNode.isFailoverPath(path) && Type.NODE_UPDATED == event.getType()) { if (!Boolean.valueOf(new String(event.getData().getData()))) { failoverService.removeFailoverInfo(); } } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (configNode.isCronPath(path) && Type.NODE_UPDATED == event.getType()) { String cronExpression = new String(event.getData().getData()); JobScheduleController jobScheduler = JobRegistry.getInstance().getJobScheduleController(jobName); if (null != jobScheduler) { jobScheduler.rescheduleJob(cronExpression); } } }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (configurationNode.isShardingTotalCountPath(path)) { shardingService.setReshardingFlag(); executionService.setNeedFixExecutionInfoFlag(); } // 动态变更分片策略 if (configurationNode.isShardingStrategyClassPath(path)) { shardingService.setReshardingFlag(); } }
@Test public void assertJobCrashedJobListenerWhenIsRunningItemPathAndRemoveButItemCompleted() { when(executionService.isCompleted(0)).thenReturn(true); failoverListenerManager.new JobCrashedJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/execution/0/running", null, "".getBytes())), "/testJob/execution/0/running"); verify(executionService).isCompleted(0); verify(failoverService, times(0)).setCrashedFailoverFlag(0); }
@Override protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { if (configurationNode.isShardingTotalCountPath(path)) { shardingService.setReshardingFlag(); executionService.setNeedFixExecutionInfoFlag(); } }
@Test public void assertLeaderElectionJobListenerWhenIsLeaderHostPathAndIsRemoveAndIsNotLeaderWithoutAvailableServers() { when(leaderElectionService.hasLeader()).thenReturn(false); when(serverService.getAvailableServers()).thenReturn(Collections.<String>emptyList()); electionListenerManager.new LeaderElectionJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/leader/election/host", null, "localhost".getBytes())), "/testJob/leader/election/host"); verify(leaderElectionService).hasLeader(); verify(serverService).getAvailableServers(); verify(leaderElectionService, times(0)).leaderElection(); }
@Test public void assertLeaderElectionJobListenerWhenJobDisabledAndIsNotLeader() { when(leaderElectionService.isLeader()).thenReturn(false); when(serverNode.isLocalJobPausedPath("/testJob/server/mockedIP/disabled")).thenReturn(true); electionListenerManager.new LeaderElectionJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_ADDED, new ChildData("/testJob/server/mockedIP/disabled", null, "localhost".getBytes())), "/testJob/server/mockedIP/disabled"); verify(leaderElectionService, times(0)).removeLeader(); }
@Test public void assertFailoverJobCrashedJobListenerWhenIsRunningItemPathAndRemoveAndItemNotCompletedAndEnableFailoverButHasRunningItems() { when(executionService.isCompleted(0)).thenReturn(false); when(configService.isFailover()).thenReturn(true); when(shardingService.getLocalHostShardingItems()).thenReturn(Arrays.asList(1, 2)); when(executionService.hasRunningItems(Arrays.asList(1, 2))).thenReturn(true); failoverListenerManager.new FailoverJobCrashedJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/execution/0/failover", null, "".getBytes())), "/testJob/execution/0/failover"); verify(executionService).isCompleted(0); verify(configService).isFailover(); verify(failoverService).setCrashedFailoverFlag(0); verify(shardingService).getLocalHostShardingItems(); verify(executionService).hasRunningItems(Arrays.asList(1, 2)); verify(failoverService, times(0)).failoverIfNecessary(); }
@Test public void assertJobPausedStatusJobListenerWhenIsNotJobPausedPath() { jobOperationListenerManager.new JobPausedStatusJobListener().dataChanged(null, new TreeCacheEvent( TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/testJob/servers/" + ip + "/other", null, "".getBytes())), "/testJob/servers/" + ip + "/other"); verify(jobScheduleController, times(0)).pauseJob(); verify(jobScheduleController, times(0)).resumeJob(); }