@Override public PersistentEphemeralNode createNode(String type, String data) { PersistentEphemeralNode node = new PersistentEphemeralNode(getCurator(), Mode.EPHEMERAL, getNodePath(type), data.getBytes()); node.start(); return node; }
/** * Creates a transactor node using given transactor id * * @param env Environment * @param tid Transactor ID used to create node */ public TransactorNode(Environment env, TransactorID tid) { this.env = env; this.tid = tid; node = new PersistentEphemeralNode(env.getSharedResources().getCurator(), Mode.EPHEMERAL, getNodePath(), tid.toString().getBytes()); CuratorUtil.startAndWait(node, 10); status = TrStatus.OPEN; }
PartitionManager(Environment env, long minSleepTime, long maxSleepTime) { try { this.curator = env.getSharedResources().getCurator(); this.env = env; this.minSleepTime = minSleepTime; this.maxSleepTime = maxSleepTime; this.retrySleepTime = minSleepTime; groupSize = env.getConfiguration().getInt(FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE, FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE_DEFAULT); myESNode = new PersistentEphemeralNode(curator, Mode.EPHEMERAL_SEQUENTIAL, ZookeeperPath.FINDERS + "/" + ZK_FINDER_PREFIX, ("" + groupSize).getBytes(UTF_8)); myESNode.start(); myESNode.waitForInitialCreate(1, TimeUnit.MINUTES); childrenCache = new PathChildrenCache(curator, ZookeeperPath.FINDERS, true); childrenCache.getListenable().addListener(new FindersListener()); childrenCache.start(StartMode.BUILD_INITIAL_CACHE); schedExecutor = Executors.newScheduledThreadPool(1, new FluoThreadFactory("Fluo worker partition manager")); schedExecutor.scheduleWithFixedDelay(new CheckTabletsTask(), 0, maxSleepTime, TimeUnit.MILLISECONDS); scheduleUpdate(); } catch (Exception e) { throw new RuntimeException(e); } }