private void init() throws Exception { CuratorFramework curatorFramework = CuratorFrameworkFactory .builder() .connectString("localhost:2181") .connectionTimeoutMs(3000) .sessionTimeoutMs(5000) .retryPolicy(new RetryNTimes(3, 2000)) .namespace("distBarrier") .build(); curatorFramework.start(); distributedBarrier = new DistributedBarrier(curatorFramework, "/barrier"); // try { // Stat stat = curatorFramework.checkExists().forPath("/double"); // if (stat != null) // curatorFramework.delete().deletingChildrenIfNeeded().forPath("/double"); // else // curatorFramework.create().creatingParentsIfNeeded() // .withMode(CreateMode.PERSISTENT).forPath("/double"); // } catch (Exception e) { // throw new RuntimeException("Cannot create path '/double' !!", e); // } distributedDoubleBarrier = new DistributedDoubleBarrier(curatorFramework, "/double", 3); }
/** * Send TerminateFlakes requests to all containers and waits for their * response. * @param updatedMapping the resource mapping to use. * @param barrier The barrier used for synchronization. * @throws Exception if an error occurs. */ private void terminateFlakes(final ResourceMapping updatedMapping, final DistributedDoubleBarrier barrier) throws Exception { String appName = updatedMapping.getAppName(); SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS", ContainerSignal.ContainerSignalType.TERMINATE_FLAKES, Utils.serialize("dummy")); ZKUtils.setAppStatus(appName, AppStatus.UPDATING_FLAKES); barrier.enter(); //wait for all containers to receive the new // resource mapping and start processing. LOGGER.info("Waiting for containers to finish flake deployment."); barrier.leave(); //wait for all containers to deploy (or update) // flakes and complete their execution. ZKUtils.setAppStatus(appName, AppStatus.UPDATING_FLAKES_COMPLETED); }
/** * Send start pellets requests to all containers and waits for * their response. * @param updatedMapping the resource mapping to use. * @param barrier The barrier used for synchronization. * @throws Exception if an error occurs. */ private void startPellets(final ResourceMapping updatedMapping, final DistributedDoubleBarrier barrier) throws Exception { String appName = updatedMapping.getAppName(); SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS", ContainerSignal.ContainerSignalType.START_PELLETS, Utils.serialize("dummy")); ZKUtils.setAppStatus(appName, AppStatus.STARTING_PELLETS); barrier.enter(); LOGGER.info("Waiting for containers to Start all pellets."); barrier.leave(); ZKUtils.setAppStatus(appName, AppStatus.RUNNING); }
/** * Send Connect or disconnect requests to all containers and waits for * their response. * @param updatedMapping the resource mapping to use. * @param barrier The barrier used for synchronization. * @throws Exception if an error occurs. */ private void connectFlakes(final ResourceMapping updatedMapping, final DistributedDoubleBarrier barrier) throws Exception { String appName = updatedMapping.getAppName(); SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS", ContainerSignal.ContainerSignalType.CONNECT_OR_DISCONNECT_FLAKES, Utils.serialize("dummy")); barrier.enter(); LOGGER.info("Waiting for containers to finish flake channel creation."); barrier.leave(); ZKUtils.setAppStatus(appName, AppStatus.UPDATING_FLAKES_COMPLETED); }
/** * Send Increase or decrease pellets requests to all containers and waits * for their response. * @param updatedMapping the resource mapping to use. * @param barrier The barrier used for synchronization. * @throws Exception if an error occurs. */ private void increaseOrDecreasePellets( final ResourceMapping updatedMapping, final DistributedDoubleBarrier barrier) throws Exception { String appName = updatedMapping.getAppName(); SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS", ContainerSignal.ContainerSignalType.INCREASE_OR_DECREASE_PELLETS, Utils.serialize("dummy")); ZKUtils.setAppStatus(appName, AppStatus.UPDATING_PELLETS); barrier.enter(); LOGGER.info("Waiting for containers to launch pellets in the flakes"); barrier.leave(); ZKUtils.setAppStatus(appName, AppStatus.UPDATING_PELLETS_COMPLETED); }
/** * Send INITIALIZE requests to all containers and waits for their response. * @param updatedMapping the resource mapping to use. * @param barrier The barrier used for synchronization. * @throws Exception if an error occurs. */ private void initializeFlakes(final ResourceMapping updatedMapping, final DistributedDoubleBarrier barrier) throws Exception { String appName = updatedMapping.getAppName(); SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS", ContainerSignal.ContainerSignalType.INITIALIZE_FLAKES, Utils.serialize("dummy")); ZKUtils.setAppStatus(appName, AppStatus.UPDATING_FLAKES); barrier.enter(); //wait for all containers to receive the new // resource mapping and start processing. LOGGER.info("Waiting for containers to finish flake deployment."); barrier.leave(); //wait for all containers to deploy (or update) // flakes and complete their execution. ZKUtils.setAppStatus(appName, AppStatus.UPDATING_FLAKES_COMPLETED); }
/** * Send CreateFlake requests to all containers and waits for their response. * @param updatedMapping the resource mapping to use. * @param barrier The barrier used for synchronization. * @throws Exception if an error occurs. */ private void createFlakes(final ResourceMapping updatedMapping, final DistributedDoubleBarrier barrier) throws Exception { String appName = updatedMapping.getAppName(); SignalHandler.getInstance().signal(appName, "ALL-CONTAINERS", ContainerSignal.ContainerSignalType.CREATE_FLAKES, Utils.serialize("dummy")); ZKUtils.setAppStatus(appName, AppStatus.UPDATING_FLAKES); barrier.enter(); //wait for all containers to receive the new // resource mapping and start processing. LOGGER.info("Waiting for containers to finish flake deployment."); barrier.leave(); //wait for all containers to deploy (or update) // flakes and complete their execution. ZKUtils.setAppStatus(appName, AppStatus.UPDATING_FLAKES_COMPLETED); }
@Test public void testManageAckBarrierShouldBeOk() throws Exception { Integer ackTimeout = 500; String barrierPath = "/stratio/decision/barrier"; Integer nodesExpected = 2; ClusterSyncManager clusterSyncManager = mock(ClusterSyncManager.class); when(clusterSyncManager.getClient()).thenReturn(curatorFramework); final ClusterBarrierManager clusterBarrierManager = PowerMockito.spy(new ClusterBarrierManager (clusterSyncManager, ackTimeout)); barrier = mock(DistributedDoubleBarrier.class); when(barrier.enter(anyLong(), any(TimeUnit.class))).thenReturn(true); PowerMockito.doReturn(barrier).when(clusterBarrierManager).getDistributedDoubleBarrier(anyString(), anyInt()); assertEquals(true, clusterBarrierManager.manageAckBarrier(barrierPath, nodesExpected)); }
@Test public void testManageAckBarrierShouldBeKO() throws Exception { Integer ackTimeout = 500; String barrierPath = "/stratio/decision/barrier"; Integer nodesExpected = 2; ClusterSyncManager clusterSyncManager = mock(ClusterSyncManager.class); when(clusterSyncManager.getClient()).thenReturn(curatorFramework); final ClusterBarrierManager clusterBarrierManager = PowerMockito.spy(new ClusterBarrierManager (clusterSyncManager, ackTimeout)); barrier = mock(DistributedDoubleBarrier.class); when(barrier.enter(anyLong(), any(TimeUnit.class))).thenReturn(false); PowerMockito.doReturn(barrier).when(clusterBarrierManager).getDistributedDoubleBarrier(anyString(), anyInt()); assertEquals(false, clusterBarrierManager.manageAckBarrier(barrierPath, nodesExpected)); }
public static void leave(final DistributedDoubleBarrier barrier) throws Exception { boolean interrupted = false; try { while (true) { try { barrier.leave(); return; } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
public static void enter(final DistributedDoubleBarrier barrier) throws Exception { boolean interrupted = false; try { while (true) { try { barrier.enter(); return; } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
/** * creates or terminates pellet instances within a flake. * @param signal The signal (with associated data) received. * @throws Exception If an error occurs while performing the action. */ private void increaseOrDecreasePellets(final ContainerSignal signal) throws Exception { String appName = signal.getDestApp(); ResourceMapping resourceMapping = ZKUtils.getResourceMapping(appName); String appUpdateBarrierPath = ZKUtils .getApplicationBarrierPath(appName); int numContainersToUpdate = resourceMapping.getContainersToUpdate(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier( ZKClient.getInstance().getCuratorClient(), appUpdateBarrierPath, numContainersToUpdate + 1 ); if (ContainerActions.isContainerUpdated(resourceMapping)) { barrier.enter(); try { ContainerActions.increaseOrDecreasePellets(resourceMapping); } catch (Exception e) { LOGGER.error("Could not launch pellets. Exception {}", e); } finally { barrier.leave(); } } }
/** * creates flakes on the container. * @param signal The signal (with associated data) received. * @throws Exception If an error occurs while performing the action. */ private void connectFlakes(final ContainerSignal signal) throws Exception { String appName = signal.getDestApp(); ResourceMapping resourceMapping = ZKUtils.getResourceMapping(appName); String appUpdateBarrierPath = ZKUtils .getApplicationBarrierPath(appName); int numContainersToUpdate = resourceMapping.getContainersToUpdate(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier( ZKClient.getInstance().getCuratorClient(), appUpdateBarrierPath, numContainersToUpdate + 1 ); if (ContainerActions.isContainerUpdated(resourceMapping)) { barrier.enter(); try { ContainerActions.updateFlakeConnections(resourceMapping); } catch (Exception e) { LOGGER.error("Could not create flakes. Exception {}", e); } finally { barrier.leave(); } } }
/** * terminates required flakes from the container. (assuming all the pellets * are already terminated). * @param signal The signal (with associated data) received. * @throws Exception If an error occurs while performing the action. */ private void terminateFlakes(final ContainerSignal signal) throws Exception { String appName = signal.getDestApp(); ResourceMapping resourceMapping = ZKUtils.getResourceMapping(appName); String appUpdateBarrierPath = ZKUtils .getApplicationBarrierPath(appName); int numContainersToUpdate = resourceMapping.getContainersToUpdate(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier( ZKClient.getInstance().getCuratorClient(), appUpdateBarrierPath, numContainersToUpdate + 1 ); if (ContainerActions.isContainerUpdated(resourceMapping)) { barrier.enter(); try { ContainerActions.terminateFlakes(resourceMapping); } catch (Exception e) { LOGGER.error("Could not terminate flakes. Exception {}", e); } finally { barrier.leave(); } } }
/** * initializes newly created flakes on the container. * @param signal The signal (with associated data) received. * @throws Exception If an error occurs while performing the action. */ private void initializeFlakes(final ContainerSignal signal) throws Exception { String appName = signal.getDestApp(); ResourceMapping resourceMapping = ZKUtils.getResourceMapping(appName); String appUpdateBarrierPath = ZKUtils .getApplicationBarrierPath(appName); int numContainersToUpdate = resourceMapping.getContainersToUpdate(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier( ZKClient.getInstance().getCuratorClient(), appUpdateBarrierPath, numContainersToUpdate + 1 ); if (ContainerActions.isContainerUpdated(resourceMapping)) { barrier.enter(); try { ContainerActions.initializeFlakes(resourceMapping); } catch (Exception e) { LOGGER.error("Could not create flakes. Exception {}", e); } finally { barrier.leave(); } } }
/** * creates new flakes on the container. * @param signal The signal (with associated data) received. * @throws Exception If an error occurs while performing the action. */ private void createFlakes(final ContainerSignal signal) throws Exception { String appName = signal.getDestApp(); ResourceMapping resourceMapping = ZKUtils.getResourceMapping(appName); String appUpdateBarrierPath = ZKUtils .getApplicationBarrierPath(appName); int numContainersToUpdate = resourceMapping.getContainersToUpdate(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier( ZKClient.getInstance().getCuratorClient(), appUpdateBarrierPath, numContainersToUpdate + 1 ); if (ContainerActions.isContainerUpdated(resourceMapping)) { barrier.enter(); try { ContainerActions.createFlakes(resourceMapping); } catch (Exception e) { LOGGER.error("Could not create flakes. Exception {}", e); } finally { barrier.leave(); } } }
public static void main(String[] args) throws Exception { try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY); final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { Thread.sleep((long) (3 * Math.random())); System.out.println("Client #" + index + " enters"); barrier.enter(); System.out.println("Client #" + index + " begins"); Thread.sleep((long) (3000 * Math.random())); barrier.leave(); System.out.println("Client #" + index + " left"); return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } }
/** * * @throws Exception */ private void init(GlobalConfig conf) throws Exception{ this.conf = conf; zkClient = ZKUtils.newClient(); loopBarrier = new DistributedDoubleBarrier(zkClient, Constants.Algorithm.ZK_ALGO_CHROOT + "/" + conf.getAlgorithmName() + Constants.Algorithm.ZK_WAITING_PATH, conf.getWorkers().size()); startBarrier = new DistributedBarrier(zkClient, Constants.Algorithm.ZK_ALGO_CHROOT + "/" + conf.getAlgorithmName() + "/start"); finishBarrier = new DistributedBarrier(zkClient, Constants.Algorithm.ZK_ALGO_CHROOT + "/" + conf.getAlgorithmName() + "/finish"); local = new MModelLocal(); shardId = conf.getShardId(); loops = conf.getAlgorithmConf().getInteger(Constants.Algorithm.LOOPS); useSyncModel = JSONUtil.getConf(conf.getAlgorithmConf(), Constants.Algorithm.OPEN_MODEL_SERVER, false); AlgoDeployConf deployConf = conf.getDeployConf(); //data server and client can be separated from a worker-node. // if (deployConf.isReduceServer()){ reduceServer = new ReduceServer(conf.getWorkerName(), conf.getWorkers().size(), conf.getAlgorithmName()); } if (deployConf.isStartingGun()){ startingGun = new StartingGun2(conf.getAlgorithmName(), conf.getReduceServers().size(), conf.getWorkers().size()); } modelServer = new MModelServer(conf.getWorkerName(), conf.getAlgorithmName(), local); modelClient = new MModelClient(conf.getWorkers(), shardId, local); reducerClient = new FloatReducerClient(conf.getReduceServers(), shardId); }
/** * Start the execution for the given transition. Returns immediately * after starting? * * @param args transaction specific arguments * @throws Exception if there is an unrecoverable error while * processing the transition. */ @Override protected final void execute(final Map<String, Object> args) throws Exception { LOGGER.info("Executing base app transition."); String appName = (String) args.get("appName"); TFloeApp app = (TFloeApp) args.get("app"); ResourceMapping currentMapping = ZKUtils.getResourceMapping(appName); if (!preTransition(appName, currentMapping)) { LOGGER.error("Pre-Transition failed."); throw new Exception("Transition cannot be executed. " + "Pre-transition failed."); } //STEP 1: Schedule app, Get resource mapping and update it in ZK ResourceMapping updatedMapping = schedule(appName, app, currentMapping, args); String appUpdateBarrierPath = ZKUtils .getApplicationBarrierPath(appName); int numContainersToUpdate = updatedMapping.getContainersToUpdate(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier( ZKClient.getInstance().getCuratorClient(), appUpdateBarrierPath, numContainersToUpdate + 1 ); //Step 2. Send the command to *ALL* containers to check the // resource mapping and CREATE the required flakes. createFlakes(updatedMapping, barrier); LOGGER.info("All containers finished launching flakes."); initializeFlakes(updatedMapping, barrier); LOGGER.info("All containers finished initializing flakes."); // if (1 == 1) { // return; // } //Step 3. Send connect signals to flakes so that they establish all // the appropriate channels. connectFlakes(updatedMapping, barrier); LOGGER.info("All containers finished CONNECTING flakes."); //Step 4. wait for all containers to finish launching or removing // pellets. increaseOrDecreasePellets(updatedMapping, barrier); LOGGER.info("All pellets successfully created."); //Step 5. wait for container to terminate flakes, if required. terminateFlakes(updatedMapping, barrier); LOGGER.info("All required flakes terminated."); //Step 6. Send signal Start pellets. startPellets(updatedMapping, barrier); LOGGER.info("All pellets Started. The application is now " + "running"); if (!postTransition(currentMapping)) { LOGGER.error("Post-Transition failed."); throw new Exception("Transition may have been executed partially. " + "Post-transition failed."); } }
/** * Sends signal to flakes to start all pellets. * mapping. * @param signal The signal (with associated data) received. * @exception Exception if an error occurs while launching flakes. */ private void startPellets(final ContainerSignal signal) throws Exception { String appName = signal.getDestApp(); String containerName = signal.getDestContainer(); //ignore. byte[] data = signal.getSignalData(); LOGGER.info("Container Id: " + containerName); String resourceMappingPath = ZKUtils .getApplicationResourceMapPath(appName); byte[] serializedRM = null; try { serializedRM = ZKClient.getInstance().getCuratorClient().getData() .forPath(resourceMappingPath); } catch (Exception e) { LOGGER.error("Could not receive resource mapping. Aborting."); return; } ResourceMapping resourceMapping = (ResourceMapping) Utils.deserialize(serializedRM); String containerId = ContainerInfo.getInstance().getContainerId(); ResourceMapping.ContainerInstance container = resourceMapping.getContainer(containerId); if (container == null) { LOGGER.info("No resource mapping for this container."); return; } String appUpdateBarrierPath = ZKUtils .getApplicationBarrierPath(appName); int numContainersToUpdate = resourceMapping.getContainersToUpdate(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier( ZKClient.getInstance().getCuratorClient(), appUpdateBarrierPath, numContainersToUpdate + 1 ); LOGGER.info("Entering barrier: " + appUpdateBarrierPath); barrier.enter(); //start processing. LOGGER.info("Starting pellets."); Map<String, ResourceMapping.FlakeInstance> flakes = container.getFlakes(); Map<String, FlakeInfo> pidToFidMap = FlakeMonitor.getInstance().getFlakes(); for (Map.Entry<String, ResourceMapping.FlakeInstance> flakeEntry : flakes.entrySet()) { String pid = flakeEntry.getKey(); ContainerUtils.sendStartPelletsCommand( pidToFidMap.get(pid).getFlakeId()); } barrier.leave(); //finish launching pellets. }
public DistributedDoubleBarrier create(final String barrierPath, @Nonnegative final int memberQty) { checkArgument(memberQty > 0, "Member quantity must be greater than zero."); return new DistributedDoubleBarrier(framework, barrierPath, memberQty); }
public DistributedDoubleBarrier create(final String barrierPath, @Nonnegative final int memberQty) { checkArgument(memberQty > 0, "Member quantity must be greater than zero."); return new DistributedDoubleBarrier(framework, checkNotNull(barrierPath, "Path cannot be null."), memberQty); }
public Boolean manageAckBarrier(String barrierPath, Integer nodesExpected) throws Exception { DistributedDoubleBarrier barrier = getDistributedDoubleBarrier(barrierPath, nodesExpected); return barrier.enter(ackTimeout, TimeUnit.MILLISECONDS); }
public DistributedDoubleBarrier getDistributedDoubleBarrier(String barrierPath, Integer nodesExpected) { return new DistributedDoubleBarrier(clusterSyncManagerInstance.getClient(), barrierPath, nodesExpected); }