Java 类org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier 实例源码

项目:yuzhouwan    文件:CuratorDistributedBarrier.java   
private void init() throws Exception {
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .builder()
                .connectString("localhost:2181")
                .connectionTimeoutMs(3000)
                .sessionTimeoutMs(5000)
                .retryPolicy(new RetryNTimes(3, 2000))
                .namespace("distBarrier")
                .build();
        curatorFramework.start();
        distributedBarrier = new DistributedBarrier(curatorFramework, "/barrier");

//        try {
//            Stat stat = curatorFramework.checkExists().forPath("/double");
//            if (stat != null)
//                curatorFramework.delete().deletingChildrenIfNeeded().forPath("/double");
//            else
//                curatorFramework.create().creatingParentsIfNeeded()
//                      .withMode(CreateMode.PERSISTENT).forPath("/double");
//        } catch (Exception e) {
//            throw new RuntimeException("Cannot create path '/double' !!", e);
//        }
        distributedDoubleBarrier = new DistributedDoubleBarrier(curatorFramework, "/double", 3);
    }
项目:floe2    文件:BaseAppTransition.java   
/**
 * Send TerminateFlakes requests to all containers and waits for their
 * response.
 * @param updatedMapping the resource mapping to use.
 * @param barrier The barrier used for synchronization.
 * @throws Exception if an error occurs.
 */
private void terminateFlakes(final ResourceMapping updatedMapping,
                          final DistributedDoubleBarrier barrier)
        throws Exception {

    String appName = updatedMapping.getAppName();

    SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS",
            ContainerSignal.ContainerSignalType.TERMINATE_FLAKES,
            Utils.serialize("dummy"));

    ZKUtils.setAppStatus(appName,
            AppStatus.UPDATING_FLAKES);

    barrier.enter(); //wait for all containers to receive the new
    // resource mapping and start processing.

    LOGGER.info("Waiting for containers to finish flake deployment.");

    barrier.leave(); //wait for all containers to deploy (or update)
    // flakes and complete their execution.

    ZKUtils.setAppStatus(appName, AppStatus.UPDATING_FLAKES_COMPLETED);
}
项目:floe2    文件:BaseAppTransition.java   
/**
 * Send start pellets requests to all containers and waits for
 * their response.
 * @param updatedMapping the resource mapping to use.
 * @param barrier The barrier used for synchronization.
 * @throws Exception if an error occurs.
 */
private void startPellets(final ResourceMapping updatedMapping,
                          final DistributedDoubleBarrier barrier)
        throws Exception {

    String appName = updatedMapping.getAppName();
    SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS",
            ContainerSignal.ContainerSignalType.START_PELLETS,
            Utils.serialize("dummy"));

    ZKUtils.setAppStatus(appName,
            AppStatus.STARTING_PELLETS);

    barrier.enter();
    LOGGER.info("Waiting for containers to Start all pellets.");
    barrier.leave();
    ZKUtils.setAppStatus(appName,
            AppStatus.RUNNING);
}
项目:floe2    文件:BaseAppTransition.java   
/**
 * Send Connect or disconnect requests to all containers and waits for
 * their response.
 * @param updatedMapping the resource mapping to use.
 * @param barrier The barrier used for synchronization.
 * @throws Exception if an error occurs.
 */
private void connectFlakes(final ResourceMapping updatedMapping,
                           final DistributedDoubleBarrier barrier)
        throws Exception {

    String appName = updatedMapping.getAppName();

    SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS",
        ContainerSignal.ContainerSignalType.CONNECT_OR_DISCONNECT_FLAKES,
        Utils.serialize("dummy"));

    barrier.enter();

    LOGGER.info("Waiting for containers to finish flake channel creation.");

    barrier.leave();

    ZKUtils.setAppStatus(appName,
            AppStatus.UPDATING_FLAKES_COMPLETED);
}
项目:floe2    文件:BaseAppTransition.java   
/**
 * Send Increase or decrease pellets requests to all containers and waits
 * for their response.
 * @param updatedMapping the resource mapping to use.
 * @param barrier The barrier used for synchronization.
 * @throws Exception if an error occurs.
 */
private void increaseOrDecreasePellets(
        final ResourceMapping updatedMapping,
        final DistributedDoubleBarrier barrier)
        throws Exception {

    String appName = updatedMapping.getAppName();
    SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS",
        ContainerSignal.ContainerSignalType.INCREASE_OR_DECREASE_PELLETS,
        Utils.serialize("dummy"));

    ZKUtils.setAppStatus(appName,
            AppStatus.UPDATING_PELLETS);

    barrier.enter();
    LOGGER.info("Waiting for containers to launch pellets in the flakes");
    barrier.leave();
    ZKUtils.setAppStatus(appName,
            AppStatus.UPDATING_PELLETS_COMPLETED);
}
项目:floe2    文件:BaseAppTransition.java   
/**
 * Send INITIALIZE requests to all containers and waits for their response.
 * @param updatedMapping the resource mapping to use.
 * @param barrier The barrier used for synchronization.
 * @throws Exception if an error occurs.
 */
private void initializeFlakes(final ResourceMapping updatedMapping,
                          final DistributedDoubleBarrier barrier)
        throws Exception {

    String appName = updatedMapping.getAppName();

    SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS",
            ContainerSignal.ContainerSignalType.INITIALIZE_FLAKES,
            Utils.serialize("dummy"));

    ZKUtils.setAppStatus(appName,
            AppStatus.UPDATING_FLAKES);

    barrier.enter(); //wait for all containers to receive the new
    // resource mapping and start processing.

    LOGGER.info("Waiting for containers to finish flake deployment.");

    barrier.leave(); //wait for all containers to deploy (or update)
    // flakes and complete their execution.

    ZKUtils.setAppStatus(appName, AppStatus.UPDATING_FLAKES_COMPLETED);
}
项目:floe2    文件:BaseAppTransition.java   
/**
 * Send CreateFlake requests to all containers and waits for their response.
 * @param updatedMapping the resource mapping to use.
 * @param barrier The barrier used for synchronization.
 * @throws Exception if an error occurs.
 */
private void createFlakes(final ResourceMapping updatedMapping,
                          final DistributedDoubleBarrier barrier)
        throws Exception {

    String appName = updatedMapping.getAppName();

    SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS",
            ContainerSignal.ContainerSignalType.CREATE_FLAKES,
            Utils.serialize("dummy"));

    ZKUtils.setAppStatus(appName,
            AppStatus.UPDATING_FLAKES);

    barrier.enter(); //wait for all containers to receive the new
    // resource mapping and start processing.

    LOGGER.info("Waiting for containers to finish flake deployment.");

    barrier.leave(); //wait for all containers to deploy (or update)
    // flakes and complete their execution.

    ZKUtils.setAppStatus(appName, AppStatus.UPDATING_FLAKES_COMPLETED);
}
项目:Decision    文件:ClusterBarrierManagerTest.java   
@Test
public void testManageAckBarrierShouldBeOk() throws Exception {

    Integer ackTimeout = 500;
    String barrierPath = "/stratio/decision/barrier";
    Integer nodesExpected = 2;

    ClusterSyncManager clusterSyncManager = mock(ClusterSyncManager.class);
    when(clusterSyncManager.getClient()).thenReturn(curatorFramework);

    final ClusterBarrierManager clusterBarrierManager = PowerMockito.spy(new ClusterBarrierManager
            (clusterSyncManager, ackTimeout));

    barrier = mock(DistributedDoubleBarrier.class);
    when(barrier.enter(anyLong(), any(TimeUnit.class))).thenReturn(true);

    PowerMockito.doReturn(barrier).when(clusterBarrierManager).getDistributedDoubleBarrier(anyString(), anyInt());
    assertEquals(true, clusterBarrierManager.manageAckBarrier(barrierPath, nodesExpected));

}
项目:Decision    文件:ClusterBarrierManagerTest.java   
@Test
public void testManageAckBarrierShouldBeKO() throws Exception {

    Integer ackTimeout = 500;
    String barrierPath = "/stratio/decision/barrier";
    Integer nodesExpected = 2;

    ClusterSyncManager clusterSyncManager = mock(ClusterSyncManager.class);
    when(clusterSyncManager.getClient()).thenReturn(curatorFramework);

    final ClusterBarrierManager clusterBarrierManager = PowerMockito.spy(new ClusterBarrierManager
            (clusterSyncManager, ackTimeout));

    barrier = mock(DistributedDoubleBarrier.class);
    when(barrier.enter(anyLong(), any(TimeUnit.class))).thenReturn(false);

    PowerMockito.doReturn(barrier).when(clusterBarrierManager).getDistributedDoubleBarrier(anyString(), anyInt());
    assertEquals(false, clusterBarrierManager.manageAckBarrier(barrierPath, nodesExpected));

}
项目:cultivar    文件:CuratorUninterruptibles.java   
public static void leave(final DistributedDoubleBarrier barrier) throws Exception {
    boolean interrupted = false;
    try {
        while (true) {
            try {
                barrier.leave();
                return;
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }
    } finally {
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }
}
项目:cultivar    文件:CuratorUninterruptibles.java   
public static void enter(final DistributedDoubleBarrier barrier) throws Exception {
    boolean interrupted = false;
    try {
        while (true) {
            try {
                barrier.enter();
                return;
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }
    } finally {
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }
}
项目:floe2    文件:Container.java   
/**
 * creates or terminates pellet instances within a flake.
 * @param signal The signal (with associated data) received.
 * @throws Exception If an error occurs while performing the action.
 */
private void increaseOrDecreasePellets(final ContainerSignal signal)
        throws Exception {

    String appName = signal.getDestApp();

    ResourceMapping resourceMapping
            = ZKUtils.getResourceMapping(appName);

    String appUpdateBarrierPath = ZKUtils
            .getApplicationBarrierPath(appName);

    int numContainersToUpdate = resourceMapping.getContainersToUpdate();

    DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(
            ZKClient.getInstance().getCuratorClient(),
            appUpdateBarrierPath,
            numContainersToUpdate + 1
    );

    if (ContainerActions.isContainerUpdated(resourceMapping)) {
        barrier.enter();
        try {
            ContainerActions.increaseOrDecreasePellets(resourceMapping);
        } catch (Exception e) {
            LOGGER.error("Could not launch pellets. Exception {}", e);
        } finally {
            barrier.leave();
        }
    }
}
项目:floe2    文件:Container.java   
/**
 * creates flakes on the container.
 * @param signal The signal (with associated data) received.
 * @throws Exception If an error occurs while performing the action.
 */
private void connectFlakes(final ContainerSignal signal) throws Exception {

    String appName = signal.getDestApp();

    ResourceMapping resourceMapping
            = ZKUtils.getResourceMapping(appName);

    String appUpdateBarrierPath = ZKUtils
            .getApplicationBarrierPath(appName);

    int numContainersToUpdate = resourceMapping.getContainersToUpdate();

    DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(
            ZKClient.getInstance().getCuratorClient(),
            appUpdateBarrierPath,
            numContainersToUpdate + 1
    );

    if (ContainerActions.isContainerUpdated(resourceMapping)) {
        barrier.enter();
        try {
            ContainerActions.updateFlakeConnections(resourceMapping);
        } catch (Exception e) {
            LOGGER.error("Could not create flakes. Exception {}", e);
        } finally {
            barrier.leave();
        }
    }
}
项目:floe2    文件:Container.java   
/**
 * terminates required flakes from the container. (assuming all the pellets
 * are already terminated).
 * @param signal The signal (with associated data) received.
 * @throws Exception If an error occurs while performing the action.
 */
private void terminateFlakes(final ContainerSignal signal)
        throws Exception {

    String appName = signal.getDestApp();

    ResourceMapping resourceMapping
            = ZKUtils.getResourceMapping(appName);

    String appUpdateBarrierPath = ZKUtils
            .getApplicationBarrierPath(appName);

    int numContainersToUpdate = resourceMapping.getContainersToUpdate();

    DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(
            ZKClient.getInstance().getCuratorClient(),
            appUpdateBarrierPath,
            numContainersToUpdate + 1
    );

    if (ContainerActions.isContainerUpdated(resourceMapping)) {
        barrier.enter();
        try {
            ContainerActions.terminateFlakes(resourceMapping);
        } catch (Exception e) {
            LOGGER.error("Could not terminate flakes. Exception {}", e);
        } finally {
            barrier.leave();
        }
    }
}
项目:floe2    文件:Container.java   
/**
 * initializes newly created flakes on the container.
 * @param signal The signal (with associated data) received.
 * @throws Exception If an error occurs while performing the action.
 */
private void initializeFlakes(final ContainerSignal signal)
        throws Exception {

    String appName = signal.getDestApp();

    ResourceMapping resourceMapping
            = ZKUtils.getResourceMapping(appName);

    String appUpdateBarrierPath = ZKUtils
            .getApplicationBarrierPath(appName);

    int numContainersToUpdate = resourceMapping.getContainersToUpdate();

    DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(
            ZKClient.getInstance().getCuratorClient(),
            appUpdateBarrierPath,
            numContainersToUpdate + 1
    );

    if (ContainerActions.isContainerUpdated(resourceMapping)) {
        barrier.enter();
        try {
            ContainerActions.initializeFlakes(resourceMapping);
        } catch (Exception e) {
            LOGGER.error("Could not create flakes. Exception {}", e);
        } finally {
            barrier.leave();
        }
    }
}
项目:floe2    文件:Container.java   
/**
 * creates new flakes on the container.
 * @param signal The signal (with associated data) received.
 * @throws Exception If an error occurs while performing the action.
 */
private void createFlakes(final ContainerSignal signal) throws Exception {

    String appName = signal.getDestApp();

    ResourceMapping resourceMapping
            = ZKUtils.getResourceMapping(appName);

    String appUpdateBarrierPath = ZKUtils
            .getApplicationBarrierPath(appName);

    int numContainersToUpdate = resourceMapping.getContainersToUpdate();

    DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(
            ZKClient.getInstance().getCuratorClient(),
            appUpdateBarrierPath,
            numContainersToUpdate + 1
    );

    if (ContainerActions.isContainerUpdated(resourceMapping)) {
        barrier.enter();
        try {
            ContainerActions.createFlakes(resourceMapping);
        } catch (Exception e) {
            LOGGER.error("Could not create flakes. Exception {}", e);
        } finally {
            barrier.leave();
        }
    }
}
项目:ZKRecipesByExample    文件:DistributedBarrierExample.java   
public static void main(String[] args) throws Exception {
    try (TestingServer server = new TestingServer()) {
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();

        ExecutorService service = Executors.newFixedThreadPool(QTY);
        for (int i = 0; i < QTY; ++i) {
            final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
            final int index = i;
            Callable<Void> task = new Callable<Void>() {
                @Override
                public Void call() throws Exception {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                }
            };
            service.submit(task);
        }


        service.shutdown();
        service.awaitTermination(10, TimeUnit.MINUTES);

    }

}
项目:feluca    文件:LoopingBase.java   
/**
 * 
 * @throws Exception
 */
private void init(GlobalConfig conf) throws Exception{
    this.conf = conf;
    zkClient = ZKUtils.newClient();
    loopBarrier = new DistributedDoubleBarrier(zkClient, 
            Constants.Algorithm.ZK_ALGO_CHROOT + "/" + conf.getAlgorithmName() + Constants.Algorithm.ZK_WAITING_PATH, 
            conf.getWorkers().size());
    startBarrier = new DistributedBarrier(zkClient, Constants.Algorithm.ZK_ALGO_CHROOT + "/" + conf.getAlgorithmName() + "/start");
    finishBarrier = new DistributedBarrier(zkClient, Constants.Algorithm.ZK_ALGO_CHROOT + "/" + conf.getAlgorithmName() + "/finish");
    local = new MModelLocal();
    shardId = conf.getShardId();
    loops = conf.getAlgorithmConf().getInteger(Constants.Algorithm.LOOPS);
    useSyncModel = JSONUtil.getConf(conf.getAlgorithmConf(), Constants.Algorithm.OPEN_MODEL_SERVER, false);
    AlgoDeployConf deployConf = conf.getDeployConf();
    //data server and client can be separated from a worker-node.
    //
    if (deployConf.isReduceServer()){
        reduceServer = new ReduceServer(conf.getWorkerName(), conf.getWorkers().size(), conf.getAlgorithmName());
    }
    if (deployConf.isStartingGun()){
        startingGun = new StartingGun2(conf.getAlgorithmName(), conf.getReduceServers().size(), conf.getWorkers().size());
    }

    modelServer = new MModelServer(conf.getWorkerName(), conf.getAlgorithmName(), local);               
    modelClient = new MModelClient(conf.getWorkers(), shardId, local);

    reducerClient = new FloatReducerClient(conf.getReduceServers(), shardId); 
}
项目:floe2    文件:BaseAppTransition.java   
/**
     * Start the execution for the given transition. Returns immediately
     * after starting?
     *
     * @param args transaction specific arguments
     * @throws Exception if there is an unrecoverable error while
     *                   processing the transition.
     */
    @Override
    protected final void execute(final Map<String, Object> args)
            throws Exception {
        LOGGER.info("Executing base app transition.");

        String appName = (String) args.get("appName");

        TFloeApp app = (TFloeApp) args.get("app");

        ResourceMapping currentMapping
                = ZKUtils.getResourceMapping(appName);

        if (!preTransition(appName, currentMapping)) {
            LOGGER.error("Pre-Transition failed.");
            throw new Exception("Transition cannot be executed. "
                    + "Pre-transition failed.");
        }

        //STEP 1: Schedule app, Get resource mapping and update it in ZK
        ResourceMapping updatedMapping = schedule(appName, app,
                currentMapping, args);

        String appUpdateBarrierPath = ZKUtils
                .getApplicationBarrierPath(appName);

        int numContainersToUpdate = updatedMapping.getContainersToUpdate();

        DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(
                ZKClient.getInstance().getCuratorClient(),
                appUpdateBarrierPath,
                numContainersToUpdate + 1
        );

        //Step 2. Send the command to *ALL* containers to check the
        // resource mapping and CREATE the required flakes.
        createFlakes(updatedMapping, barrier);
        LOGGER.info("All containers finished launching flakes.");

        initializeFlakes(updatedMapping, barrier);
        LOGGER.info("All containers finished initializing flakes.");

//        if (1 == 1) {
//            return;
//        }

        //Step 3. Send connect signals to flakes so that they establish all
        // the appropriate channels.
        connectFlakes(updatedMapping, barrier);
        LOGGER.info("All containers finished CONNECTING flakes.");

        //Step 4. wait for all containers to finish launching or removing
        // pellets.
        increaseOrDecreasePellets(updatedMapping, barrier);
        LOGGER.info("All pellets successfully created.");

        //Step 5. wait for container to terminate flakes, if required.
        terminateFlakes(updatedMapping, barrier);
        LOGGER.info("All required flakes terminated.");

        //Step 6. Send signal Start pellets.
        startPellets(updatedMapping, barrier);
        LOGGER.info("All pellets Started. The application is now "
                + "running");

        if (!postTransition(currentMapping)) {
            LOGGER.error("Post-Transition failed.");
            throw new Exception("Transition may have been executed partially. "
                    + "Post-transition failed.");
        }
    }
项目:floe2    文件:Container.java   
/**
 * Sends signal to flakes to start all pellets.
 * mapping.
 * @param signal The signal (with associated data) received.
 * @exception Exception if an error occurs while launching flakes.
 */
private void startPellets(final ContainerSignal signal) throws Exception {
    String appName = signal.getDestApp();
    String containerName = signal.getDestContainer(); //ignore.
    byte[] data = signal.getSignalData();

    LOGGER.info("Container Id: " + containerName);

    String resourceMappingPath = ZKUtils
            .getApplicationResourceMapPath(appName);

    byte[] serializedRM = null;

    try {
        serializedRM = ZKClient.getInstance().getCuratorClient().getData()
                .forPath(resourceMappingPath);
    } catch (Exception e) {
        LOGGER.error("Could not receive resource mapping. Aborting.");
        return;
    }

    ResourceMapping resourceMapping =
            (ResourceMapping) Utils.deserialize(serializedRM);

    String containerId = ContainerInfo.getInstance().getContainerId();

    ResourceMapping.ContainerInstance container
            = resourceMapping.getContainer(containerId);

    if (container == null) {
        LOGGER.info("No resource mapping for this container.");
        return;
    }

    String appUpdateBarrierPath = ZKUtils
            .getApplicationBarrierPath(appName);


    int numContainersToUpdate = resourceMapping.getContainersToUpdate();

    DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(
            ZKClient.getInstance().getCuratorClient(),
            appUpdateBarrierPath,
            numContainersToUpdate + 1
    );

    LOGGER.info("Entering barrier: " + appUpdateBarrierPath);
    barrier.enter(); //start processing.

    LOGGER.info("Starting pellets.");
    Map<String, ResourceMapping.FlakeInstance> flakes
            = container.getFlakes();

    Map<String, FlakeInfo> pidToFidMap
            = FlakeMonitor.getInstance().getFlakes();

    for (Map.Entry<String, ResourceMapping.FlakeInstance> flakeEntry
            : flakes.entrySet()) {
        String pid = flakeEntry.getKey();

        ContainerUtils.sendStartPelletsCommand(
                pidToFidMap.get(pid).getFlakeId());
    }
    barrier.leave(); //finish launching pellets.
}
项目:cultivar_old    文件:DistributedDoubleBarrierFactory.java   
public DistributedDoubleBarrier create(final String barrierPath, @Nonnegative final int memberQty) {
    checkArgument(memberQty > 0, "Member quantity must be greater than zero.");

    return new DistributedDoubleBarrier(framework, barrierPath, memberQty);
}
项目:cultivar    文件:DistributedDoubleBarrierFactory.java   
public DistributedDoubleBarrier create(final String barrierPath, @Nonnegative final int memberQty) {
    checkArgument(memberQty > 0, "Member quantity must be greater than zero.");

    return new DistributedDoubleBarrier(framework, checkNotNull(barrierPath, "Path cannot be null."), memberQty);
}
项目:Decision    文件:ClusterBarrierManager.java   
public Boolean manageAckBarrier(String barrierPath, Integer nodesExpected) throws Exception {

        DistributedDoubleBarrier barrier =  getDistributedDoubleBarrier(barrierPath, nodesExpected);
        return barrier.enter(ackTimeout, TimeUnit.MILLISECONDS);

    }
项目:Decision    文件:ClusterBarrierManager.java   
public DistributedDoubleBarrier getDistributedDoubleBarrier(String barrierPath, Integer nodesExpected) {

        return new DistributedDoubleBarrier(clusterSyncManagerInstance.getClient(), barrierPath,
                nodesExpected);

    }