@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { final Type eventType = event.getType(); final String path = event.getData().getPath(); final String name = ZKHelper.getNameFromPath(path); LOGGER.debugMarker("ZoneEventListener", "Receive zookeeper event %s %s", eventType, path); if (eventType == Type.CHILD_ADDED || eventType == Type.CHILD_UPDATED) { agentService.report(new AgentPath(zone.getName(), name), AgentStatus.IDLE); return; } if (eventType == Type.CHILD_REMOVED) { agentService.report(new AgentPath(zone.getName(), name), AgentStatus.OFFLINE); return; } }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { Type type = event.getType(); if( type.equals(Type.CHILD_ADDED) || type.equals(Type.CHILD_UPDATED) || type.equals(Type.CHILD_REMOVED) ) { String remoteId = ZKPaths.getNodeFromPath(event.getData().getPath()); String[] clientInfo = new String(event.getData().getData()).split(":"); if( !isThisNode(remoteId) && !hasSameInfo(clientInfo) ) { if( type.equals(Type.CHILD_ADDED) ) { senders.get(remoteId); addReceiver(remoteId, clientInfo); } else if( type.equals(Type.CHILD_UPDATED) ) { senders.get(remoteId); removeReceiver(remoteId); addReceiver(remoteId, clientInfo); } else { removeReceiver(remoteId); } } } }
/** * Returns a future that waits for the particular 'eventType' to happen in the * path tracked by 'cache_'. */ private Future<Void> createEventFuture(final Type eventType) throws InterruptedException { final Status status = new Status(); PathChildrenCacheListener listener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception { if (event.getType() == eventType) { status.done = true; } } }; cache_.getListenable().clear(); cache_.getListenable().addListener(listener); return executor_.submit(new Callable<Void>() { @Override public Void call() throws Exception { // Wait infinitely until the event is triggered while (!status.done) { TimeUnit.SECONDS.sleep(1); } return null; } }); }
private void checkEventFuture(Future<Void> future, Type eventType) throws ExecutionException, InterruptedException { try { future.get(MAX_WAIT_SEC, TimeUnit.SECONDS); } catch (TimeoutException e) { future.cancel(true); assertFalse("The event " + eventType + " didn't happen.", true); } }
private boolean shouldIgnore(PathChildrenCacheEvent event, String key, String value) { if (event.getType() == Type.CHILD_REMOVED) { value = null; } return notificationsToIgnore.containsKey(key) && Objects.equals(notificationsToIgnore.remove(key), value); }
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) { /* * Obtain just the worker's name */ try{ getAbsentWorkerTasks(event.getData().getPath().replaceFirst("/workers/", "")); } catch (Exception e) { LOG.error("Exception while trying to re-assign tasks", e); } } }
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { try{ assignTask(event.getData().getPath().replaceFirst("/tasks/", ""), event.getData().getData()); } catch (Exception e) { LOG.error("Exception when assigning task.", e); } } }
@Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { Type type = event.getType(); LOGGER.debug("Event = " + type); if( type == Type.CHILD_ADDED || type == Type.CHILD_REMOVED ) { final boolean init; if( !startedInitialTasks ) { synchronized( initialTasksLock ) { init = startedInitialTasks; } } else { init = startedInitialTasks; } if( init ) { String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); switch( type ) { case CHILD_ADDED: LOGGER.debug("Task added " + taskId); requestFullStatus(Collections.singleton(taskId)); addRunner(taskId); break; case CHILD_REMOVED: LOGGER.debug("Task removed " + taskId); statusHandler.retireStatus(taskId); runnerHandler.deleteRunner(taskId, false); break; } } else { LOGGER.debug("Not init yet. " + event.getData().getPath()); } } else { long sessionId = client.getZookeeperClient().getZooKeeper().getSessionId(); if( type == Type.CONNECTION_SUSPENDED ) { suspendedSessionId = sessionId; } else if( (type == Type.CONNECTION_RECONNECTED && suspendedSessionId != sessionId) || type == Type.CONNECTION_LOST ) { runnerHandler.lostConnection(); } else if( type == Type.INITIALIZED ) { synchronized( initialTasksLock ) { startInitialTasks(); } } } }
@Test public void should_listen_children_change_event() throws Throwable { // init: create node and watch it String path = ZKPaths.makePath("/", "flow-children-test"); zkClient.create(path, null); Assert.assertEquals(true, zkClient.exist(path)); final CountDownLatch latch = new CountDownLatch(3); // should receive 3 events final AtomicInteger counterForChildAdded = new AtomicInteger(0); final AtomicInteger counterForChildRemoved = new AtomicInteger(0); zkClient.watchChildren(path, (client, event) -> { if (event.getType() == Type.CHILD_ADDED) { counterForChildAdded.getAndAdd(1); } if (event.getType() == Type.CHILD_REMOVED) { counterForChildRemoved.getAndAdd(1); } ChildData childData = event.getData(); System.out.println(Thread.currentThread().getName()); System.out.println(childData.getPath()); latch.countDown(); }); // when: String firstChildPath = ZKPaths.makePath(path, "child-1"); zkClient.create(firstChildPath, "1".getBytes()); Thread.sleep(5); String secondChildPath = ZKPaths.makePath(path, "child-2"); zkClient.create(secondChildPath, "2".getBytes()); Thread.sleep(5); zkClient.delete(secondChildPath, false); Thread.sleep(5); // then: latch.await(10L, TimeUnit.SECONDS); Assert.assertEquals(1, zkClient.getChildren(path).size()); Assert.assertEquals(2, counterForChildAdded.get()); Assert.assertEquals(1, counterForChildRemoved.get()); }
@Test public void should_listen_tree_event() throws Throwable { // init: String path = ZKPaths.makePath("/", "flow-tree-test"); final AtomicBoolean isTriggerNodeAddedEvent = new AtomicBoolean(false); final AtomicBoolean isTriggerNodeRemovedEvent = new AtomicBoolean(false); final AtomicBoolean isTriggerNodeUpdatedEvent = new AtomicBoolean(false); zkClient.watchTree(path, (client, event) -> { if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { isTriggerNodeAddedEvent.set(true); } if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { isTriggerNodeRemovedEvent.set(true); } if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) { isTriggerNodeUpdatedEvent.set(true); } }); // when: create, update and delete node zkClient.create(path, null); Assert.assertEquals(true, zkClient.exist(path)); Thread.sleep(100); zkClient.setData(path, "hello".getBytes()); Assert.assertEquals("hello", new String(zkClient.getData(path))); Thread.sleep(100); zkClient.delete(path, false); Assert.assertEquals(false, zkClient.exist(path)); Thread.sleep(100); // then: check listener been triggered Assert.assertEquals(true, isTriggerNodeAddedEvent.get()); Assert.assertEquals(true, isTriggerNodeRemovedEvent.get()); Assert.assertEquals(true, isTriggerNodeUpdatedEvent.get()); }
void refresh(Type type, String path, String data);