/** * Starts the ephemeral node and waits for it to be created * * @param node Node to start * @param maxWaitSec Maximum time in seconds to wait */ public static void startAndWait(PersistentEphemeralNode node, int maxWaitSec) { node.start(); int waitTime = 0; try { while (node.waitForInitialCreate(1, TimeUnit.SECONDS) == false) { waitTime += 1; log.info("Waited " + waitTime + " sec for ephemeral node to be created"); if (waitTime > maxWaitSec) { throw new IllegalStateException("Failed to create ephemeral node"); } } } catch (InterruptedException e) { throw new IllegalStateException(e); } }
@Override public PersistentEphemeralNode createNode(String type, String data) { PersistentEphemeralNode node = new PersistentEphemeralNode(getCurator(), Mode.EPHEMERAL, getNodePath(type), data.getBytes()); node.start(); return node; }
private Observable<NodeCache> buildNodeCache(final ConfigDescriptor desc) { return Observable.fromCallable(() -> { final String configPath = makePath(ROOT_ZK_PATH, desc.getConfigName()); final NodeCache nc = new NodeCache(curator, configPath); nc.getListenable().addListener(() -> onNodeChanged(nc, desc)); try { nc.start(true); // Note that we have to force calling onNodeChanged() here since `nc.start(true)` will not emit an initial event. onNodeChanged(nc, desc); // Create the ephemeral node last, just in case something goes wrong with setting up the node cache // NOTE: This process is what actually creates the configuration node if it was missing. PersistentEphemeralNode en = new PersistentEphemeralNode(curator, EPHEMERAL, makePath(configPath, localNodeName), new byte[0]); en.start(); if (!en.waitForInitialCreate(getDefaultNodeCreationTimeout().toMillis(), TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timeout on creation of ephemeral node for " + makePath(configPath, localNodeName)); } ephemeralNodes.put(desc, en); return nc; } catch (Exception ex) { log.warn("Failed to initialize for configPath {}", configPath, ex); throw ex; } }); }
/** * Creates a transactor node using given transactor id * * @param env Environment * @param tid Transactor ID used to create node */ public TransactorNode(Environment env, TransactorID tid) { this.env = env; this.tid = tid; node = new PersistentEphemeralNode(env.getSharedResources().getCurator(), Mode.EPHEMERAL, getNodePath(), tid.toString().getBytes()); CuratorUtil.startAndWait(node, 10); status = TrStatus.OPEN; }
PartitionManager(Environment env, long minSleepTime, long maxSleepTime) { try { this.curator = env.getSharedResources().getCurator(); this.env = env; this.minSleepTime = minSleepTime; this.maxSleepTime = maxSleepTime; this.retrySleepTime = minSleepTime; groupSize = env.getConfiguration().getInt(FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE, FluoConfigurationImpl.WORKER_PARTITION_GROUP_SIZE_DEFAULT); myESNode = new PersistentEphemeralNode(curator, Mode.EPHEMERAL_SEQUENTIAL, ZookeeperPath.FINDERS + "/" + ZK_FINDER_PREFIX, ("" + groupSize).getBytes(UTF_8)); myESNode.start(); myESNode.waitForInitialCreate(1, TimeUnit.MINUTES); childrenCache = new PathChildrenCache(curator, ZookeeperPath.FINDERS, true); childrenCache.getListenable().addListener(new FindersListener()); childrenCache.start(StartMode.BUILD_INITIAL_CACHE); schedExecutor = Executors.newScheduledThreadPool(1, new FluoThreadFactory("Fluo worker partition manager")); schedExecutor.scheduleWithFixedDelay(new CheckTabletsTask(), 0, maxSleepTime, TimeUnit.MILLISECONDS); scheduleUpdate(); } catch (Exception e) { throw new RuntimeException(e); } }
PersistentEphemeralNode createNode(String type, String data);