Java 类org.apache.curator.framework.recipes.cache.PathChildrenCache 实例源码

项目:centraldogma    文件:ZooKeeperCommandExecutor.java   
private ZooKeeperCommandExecutor(String replicaId, CommandExecutor delegate, CuratorFramework curator,
                                 String zkPath, boolean createPathIfNotExist, File revisionFile,
                                 int numWorkers, int maxLogCount, long minLogAgeMillis) {
    super(replicaId);

    this.delegate = delegate;
    this.revisionFile = revisionFile;
    this.curator = curator;
    this.zkPath = zkPath;
    this.createPathIfNotExist = createPathIfNotExist;
    this.maxLogCount = maxLogCount;
    this.minLogAgeMillis = minLogAgeMillis;

    final ThreadPoolExecutor executor = new ThreadPoolExecutor(
            numWorkers, numWorkers,
            60, TimeUnit.SECONDS, new LinkedTransferQueue<>(),
            new DefaultThreadFactory("zookeeper-command-executor", true));
    executor.allowCoreThreadTimeOut(true);
    this.executor = executor;

    logWatcher = new PathChildrenCache(curator, absolutePath(LOG_PATH), true);
    logWatcher.getListenable().addListener(this, MoreExecutors.directExecutor());
    oldLogRemover = new OldLogRemover();
    leaderSelector = new LeaderSelector(curator, absolutePath(LEADER_PATH), oldLogRemover);
    leaderSelector.autoRequeue();
}
项目:iotplatform    文件:ZkDiscoveryService.java   
@PostConstruct
public void init() {
  log.info("Initializing...");
  Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
  Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
  Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
  Assert.notNull(zkSessionTimeout, MiscUtils.missingProperty("zk.session_timeout_ms"));

  log.info("Initializing discovery service using ZK connect string: {}", zkUrl);

  zkNodesDir = zkDir + "/nodes";
  try {
    client = CuratorFrameworkFactory.newClient(zkUrl, zkSessionTimeout, zkConnectionTimeout,
        new RetryForever(zkRetryInterval));
    client.start();
    client.blockUntilConnected();
    cache = new PathChildrenCache(client, zkNodesDir, true);
    cache.getListenable().addListener(this);
    cache.start();
  } catch (Exception e) {
    log.error("Failed to connect to ZK: {}", e.getMessage(), e);
    CloseableUtils.closeQuietly(client);
    throw new RuntimeException(e);
  }
}
项目:redant    文件:DefaultServiceDiscovery.java   
@Override
public void watchSlave() {
    if(client==null){
        throw new IllegalArgumentException("param illegal with client={null}");
    }
    try {
        initSlaveNode();
        PathChildrenCache watcher = new PathChildrenCache(
                client,
                ZkNode.ROOT_NODE_PATH,
                true
        );
        watcher.getListenable().addListener(new SlaveNodeWatcher());
        watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
    }catch(Exception e){
        LOGGER.error("watchSlave error cause:",e);
    }
}
项目:jigsaw-payment    文件:RefreshableTransportPool.java   
public void start() throws Exception {
    if (this.cache == null) {
        if (this.executorService == null) {
            this.cache = new PathChildrenCache(client, path, cacheData);
        } else {
            this.cache = new PathChildrenCache(client, path, cacheData,
                    dataIsCompressed, this.executorService);
        }
    }
    this.cache.getListenable().addListener(this);
    this.cache.start(StartMode.POST_INITIALIZED_EVENT);
    //this.prepareInstances();
    // call super to initialize the pool;
    super.start();
    LOG.info("transport pooling factory started. ");
}
项目:mycat-src-1.6.1-RELEASE    文件:ZKUtils.java   
public  static void addChildPathCache(  String path ,PathChildrenCacheListener listener )
{
    NameableExecutor businessExecutor = MycatServer.getInstance().getBusinessExecutor();
    ExecutorService executor = businessExecutor ==null?Executors.newFixedThreadPool(5):
            businessExecutor;

    try {
        /**
         * 监听子节点的变化情况
         */
        final PathChildrenCache childrenCache = new PathChildrenCache(getConnection(), path, true);
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        childrenCache.getListenable().addListener(listener,executor);
    } catch (Exception e) {
       throw new RuntimeException(e);
    }
}
项目:GoPush    文件:ZkUtils.java   
/**
     * 设置子节点更改监听
     *
     * @param path
     * @throws Exception
     */
    public boolean listenerPathChildrenCache(String path, BiConsumer<CuratorFramework, PathChildrenCacheEvent> biConsumer) {

        if (!ObjectUtils.allNotNull(zkClient, path, biConsumer)) {
            return Boolean.FALSE;
        }
        try {
            Stat stat = exists(path);
            if (stat != null) {
                PathChildrenCache watcher = new PathChildrenCache(zkClient, path, true);
                watcher.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                //该模式下 watcher在重连的时候会自动 rebuild 否则需要重新rebuild
                watcher.getListenable().addListener(biConsumer::accept, pool);
                if (!pathChildrenCaches.contains(watcher)) {
                    pathChildrenCaches.add(watcher);
                }
//                else{
//                    watcher.rebuild();
//                }
                return Boolean.TRUE;
            }
        } catch (Exception e) {
            log.error("listen path children cache fail! path:{} , error:{}", path, e);
        }
        return Boolean.FALSE;
    }
项目:Equella    文件:ZookeeperServiceImpl.java   
@Override
public PathChildrenCache createPathCache(String type, boolean cacheData, PathChildrenCacheListener listener,
    StartMode startMode)
{
    try
    {
        PathChildrenCache cache = new PathChildrenCache(client, getParentPath(type), cacheData);
        if( listener != null )
        {
            cache.getListenable().addListener(listener);
        }
        cache.start(startMode);
        return cache;
    }
    catch( Exception e )
    {
        throw Throwables.propagate(e);
    }
}
项目:QDrill    文件:ZkAbstractStore.java   
public ZkAbstractStore(CuratorFramework framework, PStoreConfig<V> config)
    throws IOException {
  this.parent = "/" + config.getName();
  this.prefix = parent + "/";
  this.framework = framework;
  this.config = config;

  // make sure the parent node exists.
  try {
    if (framework.checkExists().forPath(parent) == null) {
      framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
    }

    this.childrenCache = new PathChildrenCache(framework, parent, true);
    this.childrenCache.start(StartMode.BUILD_INITIAL_CACHE);

  } catch (Exception e) {
    throw new RuntimeException("Failure while accessing Zookeeper for PStore: " + e.getMessage(), e);
  }

}
项目:storm-dynamic-spout    文件:ZookeeperWatchTrigger.java   
/**
 * Close the trigger.
 */
@Override
public void close() {
    // Close each of the caches that we originally opened
    for (PathChildrenCache cache : caches) {
        try {
            cache.close();
        } catch (IOException ex) {
            logger.error("Unable to close cache {}", ex);
        }
    }

    if (curator != null) {
        curator.close();
        curator = null;
        curatorHelper = null;
    }

    isOpen = false;
}
项目:redirector    文件:ServiceCacheImplProxy.java   
public ServiceCacheImplProxy(ServiceDiscoveryImpl<T> discovery, String name, ThreadFactory threadFactory) {
    this.serviceCacheImpl = new ServiceCacheImpl<T>(discovery, name, threadFactory);

    try {
        Field privateListenerContainerField = ServiceCacheImpl.class.getDeclaredField("listenerContainer");
        privateListenerContainerField.setAccessible(true);
        this.listenerContainer = (ListenerContainer)privateListenerContainerField.get(serviceCacheImpl);
    } catch (NoSuchFieldException | IllegalAccessException e) {
        log.error("Failed to construct Service Cache. Container listeners is null.");
    }

    Preconditions.checkNotNull(discovery, "discovery cannot be null");
    Preconditions.checkNotNull(name, "name cannot be null");
    Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null");
    Preconditions.checkNotNull(this.listenerContainer, "container of listeners can not be null");


    this.discovery = discovery;
    this.cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, threadFactory);
    this.cache.getListenable().addListener(this);
}
项目:redirector    文件:ZkPathChildrenCacheWrapper.java   
@Override
public void start(final PathChildrenCache.StartMode startMode) throws DataSourceConnectorException {
    if (useCache || useCacheWhenNotConnectedToDataSource) {
        if (! connector.isConnected()) {
            throw new DataSourceConnectorException("Failed to start cache for path=" + path + " due to no connection");
        }
        try {
            cache.start(startMode);
            allowUseCache();

            log.debug("Successfully started cache for path={}", path);
        } catch (Exception e) {
            log.error("Failed to start cache for path={}", path, e);
        }
    }
}
项目:redirector    文件:ServiceDiscovery.java   
void applyChangesToHostCaches(Set<XreStackPath> allStackPaths) {
    List<XreStackPath> keysToRemove = hostCaches.keySet().stream().filter(xreStackPath -> !allStackPaths.contains(xreStackPath)).collect(Collectors.toList());
    keysToRemove.forEach(key -> {
        closeCache(key);
        hostCaches.remove(key);
    });

    allStackPaths.stream()
        .filter(path -> ! hostCaches.keySet().contains(path))
        .forEach(xreStackPath -> {
            PathChildrenCache cache = new PathChildrenCache(curator, getAbsolutePath(xreStackPath.getPath()), true);
            cache.getListenable().addListener((client, event) -> listenersNotifier.notifyListeners());
            try {
                cache.start();
            } catch (Exception e) {
                log.error("Failed to start cache for discovered xreStackPath=" + xreStackPath.getPath(), e.getMessage());
            }

            hostCaches.put(xreStackPath, cache);
        });
    log.info("DiscoveredStacksCount=" + allStackPaths.size() + " and AppliedStacksCount=" + hostCaches.size());
}
项目:albedo-thrift    文件:ZookeeperServiceDiscover.java   
@Override
    public void watchService() {
        logger.info("watchService {}", path);
        cache =new PathChildrenCache(curatorFramework, path
                ,true);
        cache.getListenable().addListener((client,event)->{
            switch (event.getType()) {
                case CHILD_ADDED:
                    dealAdd(event);
                    break;
                case CHILD_REMOVED:
                    dealRemove(event);
                    break;
                default:
                    break;
            }
        });
        try {
//          PathChildrenCache.StartMode.POST_INITIALIZED_EVENT
            cache.start();
//            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
项目:niubi-job    文件:StandbyJobSummaryListener.java   
@PostConstruct
public void listen() throws Exception {
    StandbyApiFactory standbyApiFactory = new StandbyApiFactoryImpl(client);
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, standbyApiFactory.pathApi().getJobPath(), true);
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        @Override
        public synchronized void childEvent(CuratorFramework clientInner, PathChildrenCacheEvent event) throws Exception {
            if (!EventHelper.isChildUpdateEvent(event) && !EventHelper.isChildAddEvent(event)) {
                return;
            }
            StandbyJobData standbyJobData = new StandbyJobData(event.getData());
            if (!standbyJobData.getData().isOperated()) {
                return;
            }
            LoggerHelper.info("begin update standby job summary " + standbyJobData.getData());
            standbyJobSummaryService.updateJobSummary(standbyJobData.getData());
            standbyJobLogService.updateJobLog(standbyJobData.getData());
            LoggerHelper.info("update standby job summary successfully " + standbyJobData.getData());
        }
    });
    pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
}
项目:niubi-job    文件:MasterSlaveJobSummaryListener.java   
@PostConstruct
public void listen() throws Exception {
    MasterSlaveApiFactory masterSlaveApiFactory = new MasterSlaveApiFactoryImpl(client);
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, masterSlaveApiFactory.pathApi().getJobPath(), true);
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        @Override
        public synchronized void childEvent(CuratorFramework clientInner, PathChildrenCacheEvent event) throws Exception {
            if (!EventHelper.isChildUpdateEvent(event) && !EventHelper.isChildAddEvent(event)) {
                return;
            }
            MasterSlaveJobData masterSlaveJobData = new MasterSlaveJobData(event.getData());
            if (!masterSlaveJobData.getData().isOperated()) {
                return;
            }
            LoggerHelper.info("begin update master-slave job summary " + masterSlaveJobData.getData());
            masterSlaveJobSummaryService.updateJobSummary(masterSlaveJobData.getData());
            masterSlaveJobLogService.updateJobLog(masterSlaveJobData.getData());
            LoggerHelper.info("update master-slave job summary successfully " + masterSlaveJobData.getData());
        }
    });
    pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
}
项目:zipkin    文件:SampleRateUpdater.java   
SampleRateUpdater(CuratorFramework client,
    GroupMember storeRateMember,
    String storeRatePath,
    String sampleRatePath,
    Function<Map<String, Integer>, Optional<Float>> calculator,
    Supplier<Boolean> guard
) {
  this.storeRateMember = storeRateMember;
  this.sampleRatePath = sampleRatePath;
  this.calculator = calculator;
  this.guard = guard;
  // We don't need to cache the data as we can already access it from storeRateMember
  this.dataWatcher = new PathChildrenCache(client, storeRatePath, false);
  try {
    this.dataWatcher.start();
  } catch (Exception e) {
    throw new IllegalStateException(e);
  }
  dataWatcher.getListenable().addListener(this);
}
项目:coco    文件:SimpleModeCenter.java   
public void start() {
    Preconditions.checkNotNull(nameSpace, "nameSpace can not be null");
    Preconditions.checkNotNull(zkurl, "zkurl can not be null");
    zkClient = RegisterHolder.getClient(zkurl);
    pathChildrenCache = new PathChildrenCache(zkClient, "/" + nameSpace, true);
    pathChildrenCache.getListenable().addListener(this);
    try {
        pathChildrenCache.start();
        initDataFromZk();
    } catch (Exception e) {
        // zookeeper cluster 不可用时,load the conf from localfile
        if (e instanceof ConnectionLossException) {
            logger.error(" ConnectionLossException has happen ,start to laod the local confFile to mem");
        }
    }

}
项目:thingsboard    文件:ZkDiscoveryService.java   
@PostConstruct
public void init() {
    log.info("Initializing...");
    Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
    Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
    Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
    Assert.notNull(zkSessionTimeout, MiscUtils.missingProperty("zk.session_timeout_ms"));

    log.info("Initializing discovery service using ZK connect string: {}", zkUrl);

    zkNodesDir = zkDir + "/nodes";
    try {
        client = CuratorFrameworkFactory.newClient(zkUrl, zkSessionTimeout, zkConnectionTimeout, new RetryForever(zkRetryInterval));
        client.start();
        client.blockUntilConnected();
        cache = new PathChildrenCache(client, zkNodesDir, true);
        cache.getListenable().addListener(this);
        cache.start();
    } catch (Exception e) {
        log.error("Failed to connect to ZK: {}", e.getMessage(), e);
        CloseableUtils.closeQuietly(client);
        throw new RuntimeException(e);
    }
}
项目:drill    文件:TestEphemeralStore.java   
/**
 * This test ensures store subscribes to receive events from underlying client. Dispatcher tests ensures listeners
 * are fired on incoming events. These two sets of tests ensure observer pattern in {@code TransientStore} works fine.
 */
@Test
public void testStoreRegistersDispatcherAndStartsItsClient() throws Exception {
  final StoreWithMockClient<String> store = new StoreWithMockClient<>(config, curator);

  final PathChildrenCache cache = Mockito.mock(PathChildrenCache.class);
  final ZookeeperClient client = store.getClient();
  Mockito
      .when(client.getCache())
      .thenReturn(cache);

  final ListenerContainer<PathChildrenCacheListener> container = Mockito.mock(ListenerContainer.class);
  Mockito
      .when(cache.getListenable())
      .thenReturn(container);

  store.start();

  Mockito
      .verify(container)
      .addListener(store.dispatcher);

  Mockito
      .verify(client)
      .start();
}
项目:x-pipe    文件:AbstractClusterServers.java   
@Override
protected void doStart() throws Exception {

    CuratorFramework client = zkClient.get();

    serversCache = new PathChildrenCache(client, MetaZkConfig.getMetaServerRegisterPath(), true,
    XpipeThreadFactory.create(String.format("PathChildrenCache(%d)", currentServer.getServerId())));
    serversCache.getListenable().addListener(new ChildrenChanged());
    serversCache.start();

    future = scheduled.scheduleWithFixedDelay(new AbstractExceptionLogTask() {
        @Override
        public void doRun() {
            try {
                childrenChanged();
            } catch (Throwable th) {
                logger.error("[doStart]", th);
            }

        }
    }, 1000, metaServerConfig.getClusterServersRefreshMilli(), TimeUnit.MILLISECONDS);

}
项目:nakadi    文件:EventTypeCache.java   
@VisibleForTesting
EventTypeCache(final EventTypeRepository eventTypeRepository,
               final TimelineDbRepository timelineRepository,
               final ZooKeeperHolder zkClient,
               final PathChildrenCache cache,
               final TimelineSync timelineSync) {
    this.zkClient = zkClient;
    this.eventTypeCache = setupInMemoryEventTypeCache(eventTypeRepository, timelineRepository);
    this.cacheSync = cache;
    this.timelineSync = timelineSync;
    this.timelineRegistrations = new ConcurrentHashMap<>();
    if (null != cacheSync) {
        this.cacheSync.getListenable().addListener((curator, event) -> this.onZkEvent(event));
    }
    preloadEventTypes(eventTypeRepository, timelineRepository);
}
项目:nakadi    文件:EventTypeCache.java   
private static PathChildrenCache setupCacheSync(final CuratorFramework zkClient) throws Exception {
    try {
        zkClient.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath(ZKNODE_PATH);
    } catch (final KeeperException.NodeExistsException expected) {
        // silently do nothing since it means that the node is already there
    }

    final PathChildrenCache cacheSync = new PathChildrenCache(zkClient, ZKNODE_PATH, false);

    // It is important to preload all data before specifying callback for updates, because otherwise preload won't
    // give any effect - all changes will be removed.
    cacheSync.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

    return cacheSync;
}
项目:gondola    文件:ZookeeperShardManagerClientTest.java   
@BeforeMethod
public void setUp() throws Exception {
    MockitoAnnotations.initMocks(this);
    servers = new HashMap<>();
    shardManagers = new HashMap<>();
    for (String hostId : config.getHostIds()) {
        Gondola gondola = mock(Gondola.class);
        when(gondola.getHostId()).thenReturn(hostId);
        when(gondola.getConfig()).thenReturn(config);
        ShardManager shardManager = mock(ShardManager.class);
        ZookeeperShardManagerServer server =
            new ZookeeperShardManagerServer("foo", zookeeperServer.getConnectString(), gondola, shardManager);
        shardManagers.put(hostId, shardManager);
        servers.put(hostId, server);
    }

    client = new ZookeeperShardManagerClient("foo", "fooClientName", zookeeperServer.getConnectString(), config);
    stats = (PathChildrenCache) Whitebox.getInternalState(client, "stats");
    CountDownLatch latch = new CountDownLatch(1);
    this.stats.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> {
        if (this.stats.getCurrentData().size() == config.getMembers().size()) {
            latch.countDown();
        }
    });
    latch.await();
}
项目:RecordServiceClient    文件:ZooKeeperTest.java   
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  org.apache.log4j.BasicConfigurator.configure();
  MiniClusterController.Start(0);
  controller_ = MiniClusterController.instance();
  rand_ = new Random();
  cf_ = CuratorFrameworkFactory.builder()
      .connectString(testConnectionStr)
      .connectionTimeoutMs(30 * 1000)
      .aclProvider(new TestACLProvider())
      .retryPolicy(new ExponentialBackoffRetry(1000, 3))
      .build();
  cf_.start();
  cache_ = new PathChildrenCache(cf_,
      RecordServiceConfig.ZOOKEEPER_ZNODE_DEFAULT + "/planners", true);
  cache_.start();
}
项目:workflow    文件:Scheduler.java   
private boolean taskIsComplete(PathChildrenCache completedTasksCache, RunId runId, ExecutableTask task)
{
    if ( (task == null) || !task.isExecutable() )
    {
        return true;
    }
    String completedTaskPath = ZooKeeperConstants.getCompletedTaskPath(runId, task.getTaskId());
    ChildData currentData = completedTasksCache.getCurrentData(completedTaskPath);
    if ( currentData != null )
    {
        TaskExecutionResult result = workflowManager.getSerializer().deserialize(currentData.getData(), TaskExecutionResult.class);
        if ( result.getSubTaskRunId().isPresent() )
        {
            RunnableTask runnableTask = getRunnableTask(result.getSubTaskRunId().get());
            return (runnableTask != null) && runnableTask.getCompletionTimeUtc().isPresent();
        }
        return true;
    }
    return false;
}
项目:spring-open    文件:ZookeeperRegistryTest.java   
/**
 * Create mock {@link PathChildrenCache} using given controller ID and DPIDs.
 *
 * @param controllerId Controller ID to represent current data.
 * @param paths        List of HexString indicating switch's DPID.
 * @param listener     Callback object to be set as Listenable.
 * @return Mock PathChildrenCache object
 * @throws Exception
 */
private PathChildrenCache createPathChildrenCacheMock(
            final String controllerId,
            final String[] paths,
            ListenerContainer<PathChildrenCacheListener> listener)
                throws Exception {
    PathChildrenCache pathChildrenCache = createMock(PathChildrenCache.class);

    expect(pathChildrenCache.getListenable()).andReturn(listener).anyTimes();

    pathChildrenCache.start(anyObject(StartMode.class));
    expectLastCall().anyTimes();

    List<ChildData> childs = new ArrayList<ChildData>();
    for (String path : paths) {
        childs.add(createChildDataMockForCurrentData(controllerId, path));
    }
    expect(pathChildrenCache.getCurrentData()).andReturn(childs).anyTimes();

    pathChildrenCache.rebuild();
    expectLastCall().anyTimes();

    replay(pathChildrenCache);

    return pathChildrenCache;
}
项目:curator-extensions    文件:NodeDiscovery.java   
/**
 * Creates an instance of {@code ZooKeeperNodeDiscovery}.
 *
 * @param curator    Curator framework reference.
 * @param nodePath   The path in ZooKeeper to watch.
 * @param parser     The strategy to convert from ZooKeeper {@code byte[]} to {@code T}.
 */
public NodeDiscovery(CuratorFramework curator, String nodePath, NodeDataParser<T> parser) {
    checkNotNull(curator);
    checkNotNull(nodePath);
    checkNotNull(parser);
    checkArgument(curator.getState() == CuratorFrameworkState.STARTED);
    checkArgument(!"".equals(nodePath));

    ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat(getClass().getSimpleName() + "(" + nodePath + ")-%d")
            .setDaemon(true)
            .build();

    _nodes = Maps.newConcurrentMap();
    _listeners = Sets.newSetFromMap(Maps.<NodeListener<T>, Boolean>newConcurrentMap());
    _curator = curator;
    _executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
    _pathCache = new PathChildrenCache(curator, nodePath, true, false, _executor);
    _nodeDataParser = parser;
    _closed = false;
}
项目:fluo    文件:TransactorCache.java   
public TransactorCache(Environment env) {
  final FluoConfiguration conf = env.getConfiguration();

  timeoutCache =
      CacheBuilder.newBuilder().maximumSize(FluoConfigurationImpl.getTransactorMaxCacheSize(conf))
          .expireAfterAccess(
              FluoConfigurationImpl.getTransactorCacheTimeout(conf, TimeUnit.MILLISECONDS),
              TimeUnit.MILLISECONDS)
          .concurrencyLevel(10).build();

  this.env = env;
  cache = new PathChildrenCache(env.getSharedResources().getCurator(),
      ZookeeperPath.TRANSACTOR_NODES, true);
  try {
    cache.start(StartMode.BUILD_INITIAL_CACHE);
    status = TcStatus.OPEN;
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}
项目:PetiteRPC    文件:ZookeeperRegistry.java   
@Override
public void shutdownGracefully() {

       for (PathChildrenCache childrenCache : services.values()) {
           try {
               childrenCache.close();
           } catch (IOException ignored) {}
       }

    client.close();

}
项目:consistent_config    文件:ZkConfig.java   
private void _initWatch() throws Exception {
    PathChildrenCache watcher = new PathChildrenCache(
            client,
            STORE_PATH,
            true    // if cache data
    );
    watcher.getListenable().addListener((client1, event) -> {
        try {
            rwlock.writeLock().lock();
            ChildData data = event.getData();
            if (data == null) {
                System.out.println("No data in event[" + event + "]");
            } else {
                System.out.println("Receive event: "
                        + "type=[" + event.getType() + "]"
                        + ", path=[" + data.getPath() + "]"
                        + ", data=[" + new String(data.getData()) + "]"
                        + ", stat=[" + data.getStat() + "]");
                if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED
                        || event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
                    String path = data.getPath();
                    if (path.startsWith(STORE_PATH)) {
                        String key = path.replace(STORE_PATH + "/", "");
                        String dataStr = new String(data.getData(), "utf-8");
                        storeMap.put(key, dataStr);
                    }
                }

            }
        }finally {
            rwlock.writeLock().unlock();
        }
    });
    watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

}
项目:coon    文件:CuratorZkTransporter.java   
@Override
public void addDataListener(String path, DataListener listener) {
    try {
        // 第一步:获取-校验-创建监听器
        PathChildrenCacheListener pathChildrenCacheListener = dataListenerMap.get(listener);
        if(pathChildrenCacheListener != null){//已监听
            return;
        } else {
            // 添加外部监听器
            Set<DataListener> dataListenerSet = dataListenersMap.get(path);
            if(dataListenerSet == null){
                dataListenersMap.put(path, dataListenerSet = new ConcurrentHashSet<DataListener>());
            }
            dataListenerSet.add(listener);
            dataListenerMap.put(listener, pathChildrenCacheListener = new PathChildrenCacheListenerImpl(path));
        }

        // 第二步:获取-校验-创建子节点缓存连接
        PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(path);
        if(pathChildrenCache == null){
            pathChildrenCacheMap.put(path, pathChildrenCache = new PathChildrenCache(client, path, true));
            // 第三步:启动监听
            pathChildrenCache.start(StartMode.POST_INITIALIZED_EVENT);
        }

        // 第四步:添加监听器
        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
项目:coon    文件:CuratorZkTransporter.java   
@Override
public void removeDataListener(String path, DataListener listener) {
    try {
        // 第一步:移除dataListenerMap中的数据
        PathChildrenCacheListener pathChildrenCacheListener = dataListenerMap.get(listener);
        if(pathChildrenCacheListener == null){
            return;
        } else {
            dataListenerMap.remove(listener);

            // 第二步:移除Set<DataListener>中的数据
            Set<DataListener> dataListenerSet = dataListenersMap.get(path);
            if(dataListenerSet != null && dataListenerSet.contains(listener)){
                dataListenerSet.remove(listener);
            }

            // 第三步:移除dataListenersMap和childDataMap中的数据
            if(dataListenerSet == null || dataListenerSet.isEmpty()){
                dataListenersMap.remove(path);
                childDataMap.remove(path);
            }
        }

        // 第四步:取消监听,并移除pathChildrenCacheMap中的数据
        PathChildrenCache pathChildrenCache = pathChildrenCacheMap.get(path);
        if(pathChildrenCache != null){
            pathChildrenCache.getListenable().removeListener(pathChildrenCacheListener);
            ((PathChildrenCacheListenerImpl)listener).unwatch();
            if(pathChildrenCache.getListenable().size() == 0){
                pathChildrenCacheMap.remove(path);
                pathChildrenCache.close();
            }
        }
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    }
}
项目:redirector    文件:ServiceCacheImplProxy.java   
@Override
public void start() throws Exception {
    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

    cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    for (ChildData childData : cache.getCurrentData()) {
        addInstance(childData, true);
    }
    discovery.cacheOpened(this);
}
项目:redirector    文件:ZookeeperConnector.java   
private static void startCache(IPathChildrenCacheWrapper cache, String path) {
    try {
        cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
    } catch (DataSourceConnectorException e) {
        log.error("Failed to start path children cache for path=" + path, e);
    }
}
项目:redirector    文件:ZkPathChildrenCacheWrapper.java   
public ZkPathChildrenCacheWrapper(IDataSourceConnector connector,
                                  String path,
                                  boolean dataIsCompressed,
                                  PathChildrenCache cache) {
    super(connector);
    this.useCache = cache != null;
    this.path = path;
    this.dataIsCompressed = dataIsCompressed;
    this.cache = cache;
}
项目:redirector    文件:ZkPathChildrenCacheWrapperTest.java   
@Test(expected = DataSourceConnectorException.class)
public void startCache_WithoutPreloading_Fails_WhenNoConnected() throws DataSourceConnectorException {
    when(client.isConnected()).thenReturn(false);
    testee = new ZkPathChildrenCacheWrapper(client, "/test", false, cache);

    PathChildrenCache.StartMode startMode = PathChildrenCache.StartMode.POST_INITIALIZED_EVENT;
    testee.start(startMode);
}
项目:redirector    文件:ZkPathChildrenCacheWrapperTest.java   
@Test(expected = DataSourceConnectorException.class)
public void startCache_WithPreloading_Fails_WhenNoConnected() throws DataSourceConnectorException {
    when(client.isConnected()).thenReturn(false);
    testee = new ZkPathChildrenCacheWrapper(client, "/test", false, cache);

    PathChildrenCache.StartMode startMode = PathChildrenCache.StartMode.BUILD_INITIAL_CACHE;
    testee.start(startMode);
}
项目:redirector    文件:ZkPathChildrenCacheWrapperTest.java   
@Test
public void testStartWithUseCache() throws Exception {
    PathChildrenCache.StartMode startMode = PathChildrenCache.StartMode.POST_INITIALIZED_EVENT;

    testee = new ZkPathChildrenCacheWrapper(client, "/test", false, cache);
    testee.start(startMode);

    verify(cache, times(1)).start(startMode);
}
项目:redirector    文件:ZkPathChildrenCacheWrapperTest.java   
@Test
public void testStartWithUseCacheExceptionHappens() throws Exception {
    PathChildrenCache.StartMode startMode = PathChildrenCache.StartMode.POST_INITIALIZED_EVENT;
    setupCacheThrowsExceptionOnStart(new Exception());
    testee = new ZkPathChildrenCacheWrapper(client, "/test", false, cache);

    testee.start(startMode);
}
项目:redirector    文件:ZkPathChildrenCacheWrapperTest.java   
@Test
public void testStartWithUseCacheNoConnection() throws Exception {
    PathChildrenCache.StartMode startMode = PathChildrenCache.StartMode.POST_INITIALIZED_EVENT;
    setupCacheThrowsExceptionOnStart(new KeeperException.ConnectionLossException());
    testee = new ZkPathChildrenCacheWrapper(client, "/test", false, cache);

    testee.start(startMode);
}