private void init() throws Exception { CuratorFramework curatorFramework = CuratorFrameworkFactory .builder() .connectString("localhost:2181") .connectionTimeoutMs(3000) .sessionTimeoutMs(5000) .retryPolicy(new RetryNTimes(3, 2000)) .namespace("distBarrier") .build(); curatorFramework.start(); distributedBarrier = new DistributedBarrier(curatorFramework, "/barrier"); // try { // Stat stat = curatorFramework.checkExists().forPath("/double"); // if (stat != null) // curatorFramework.delete().deletingChildrenIfNeeded().forPath("/double"); // else // curatorFramework.create().creatingParentsIfNeeded() // .withMode(CreateMode.PERSISTENT).forPath("/double"); // } catch (Exception e) { // throw new RuntimeException("Cannot create path '/double' !!", e); // } distributedDoubleBarrier = new DistributedDoubleBarrier(curatorFramework, "/double", 3); }
public static void main(String[] args) throws Exception { try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); ExecutorService service = Executors.newFixedThreadPool(QTY); DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH); controlBarrier.setBarrier(); for (int i = 0; i < QTY; ++i) { final DistributedBarrier barrier = new DistributedBarrier(client, PATH); final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { Thread.sleep((long) (3 * Math.random())); System.out.println("Client #" + index + " waits on Barrier"); barrier.waitOnBarrier(); System.out.println("Client #" + index + " begins"); return null; } }; service.submit(task); } Thread.sleep(10000); System.out.println("all Barrier instances should wait the condition"); controlBarrier.removeBarrier(); service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } }
/** * * @throws Exception */ private void init(GlobalConfig conf) throws Exception{ this.conf = conf; zkClient = ZKUtils.newClient(); loopBarrier = new DistributedDoubleBarrier(zkClient, Constants.Algorithm.ZK_ALGO_CHROOT + "/" + conf.getAlgorithmName() + Constants.Algorithm.ZK_WAITING_PATH, conf.getWorkers().size()); startBarrier = new DistributedBarrier(zkClient, Constants.Algorithm.ZK_ALGO_CHROOT + "/" + conf.getAlgorithmName() + "/start"); finishBarrier = new DistributedBarrier(zkClient, Constants.Algorithm.ZK_ALGO_CHROOT + "/" + conf.getAlgorithmName() + "/finish"); local = new MModelLocal(); shardId = conf.getShardId(); loops = conf.getAlgorithmConf().getInteger(Constants.Algorithm.LOOPS); useSyncModel = JSONUtil.getConf(conf.getAlgorithmConf(), Constants.Algorithm.OPEN_MODEL_SERVER, false); AlgoDeployConf deployConf = conf.getDeployConf(); //data server and client can be separated from a worker-node. // if (deployConf.isReduceServer()){ reduceServer = new ReduceServer(conf.getWorkerName(), conf.getWorkers().size(), conf.getAlgorithmName()); } if (deployConf.isStartingGun()){ startingGun = new StartingGun2(conf.getAlgorithmName(), conf.getReduceServers().size(), conf.getWorkers().size()); } modelServer = new MModelServer(conf.getWorkerName(), conf.getAlgorithmName(), local); modelClient = new MModelClient(conf.getWorkers(), shardId, local); reducerClient = new FloatReducerClient(conf.getReduceServers(), shardId); }
public DistributedBarrier create(final String barrierPath) { return new DistributedBarrier(framework, checkNotNull(barrierPath)); }