public void increment() throws Exception { int tempoMaximoDeTentativasMilissegundos = 1000; int intervaloEntreTentativasMilissegundos = 100; try { RetryPolicy rp = new RetryUntilElapsed(tempoMaximoDeTentativasMilissegundos, intervaloEntreTentativasMilissegundos); this.counter = new DistributedAtomicLong(this.client, this.counterPath, rp); logger.debug("## INCREMENT WILL BEGIN"); if (this.counter.get().succeeded()) { logger.debug("## INCREMENT GET COUNTER (BEFORE): " + this.counter.get().postValue()); if(this.counter.increment().succeeded()) { logger.debug("## INCREMENT COUNTER AFTER: " + this.counter.get().postValue()); } } this.counter.increment(); } catch(Exception ex) { logger.error("********* INCREMENT COUNTER ERROR: " + ex.getMessage()); } }
public static void main(String[] args) throws IOException, Exception { try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); List<DistributedAtomicLong> examples = Lists.newArrayList(); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10)); examples.add(count); Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { try { //Thread.sleep(rand.nextInt(1000)); AtomicValue<Long> value = count.increment(); //AtomicValue<Long> value = count.decrement(); //AtomicValue<Long> value = count.add((long)rand.nextInt(20)); System.out.println("succeed: " + value.succeeded()); if (value.succeeded()) System.out.println("Increment: from " + value.preValue() + " to " + value.postValue()); } catch (Exception e) { e.printStackTrace(); } return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } }
private Cluster$(String connectString) { RetryPolicy policy = new ExponentialBackoffRetry(10000, 5) ; try { client = CuratorFrameworkFactory.builder() /*.namespace(namespace)*/ .connectString(connectString) .retryPolicy(policy) .build() ; client.start() ; client.blockUntilConnected() ; } catch (Exception e) { log.error("Failed: "+connectString, e) ; client = null ; } ensure(ClusterCtl.namespace) ; ensure(ClusterCtl.members) ; active.set(true) ; globalCounter = new DistributedAtomicLong(client,"/COUNTER", policy) ; try { log.info("/COUNTER = "+globalCounter.get().postValue()); } catch (Exception ex) {} globalWriteLock = new InterProcessSemaphoreMutex(client, "/WriteLock") ; }
/** * {@inheritDoc} */ @Override public long nextId(final String namespace) { final String[] paths = calcPathIdAndPathLock(namespace); final String pathId = paths[0]; final String pathLock = paths[1]; RetryPolicy retryPolicyMutex = new BoundedExponentialBackoffRetry(10, 1000, 5); PromotedToLock promotedToLock = PromotedToLock.builder().retryPolicy(retryPolicyMutex) .lockPath(pathLock).build(); RetryPolicy retryPolicyOptimistic = new RetryNTimes(3, 100); DistributedAtomicLong dal = new DistributedAtomicLong(curatorFramework, pathId, retryPolicyOptimistic, promotedToLock); semaphore.acquireUninterruptibly(); try { AtomicValue<Long> value = dal.increment(); if (value != null && value.succeeded()) { return value.postValue(); } return -1; } catch (Exception e) { throw e instanceof IdException ? (IdException) e : new IdException(e); } finally { semaphore.release(); } }
/** * {@inheritDoc} */ @Override public long currentId(final String namespace) { final String[] paths = calcPathIdAndPathLock(namespace); final String pathId = paths[0]; final String pathLock = paths[1]; RetryPolicy retryPolicyMutex = new BoundedExponentialBackoffRetry(10, 1000, 5); PromotedToLock promotedToLock = PromotedToLock.builder().retryPolicy(retryPolicyMutex) .lockPath(pathLock).build(); RetryPolicy retryPolicyOptimistic = new RetryNTimes(3, 100); DistributedAtomicLong dal = new DistributedAtomicLong(curatorFramework, pathId, retryPolicyOptimistic, promotedToLock); try { AtomicValue<Long> value = dal.get(); if (value != null && value.succeeded()) { return value.postValue(); } throw new IdException("Operation was not successful!"); } catch (Exception e) { throw e instanceof IdException ? (IdException) e : new IdException(e); } }
/** * Read the count for this dataset and counterName. * * @param datasetUuid the dataset key * @param counterName the counterName representing a processing phase * * @return the count or null if it could not be read */ @Nullable public Long readCounter(UUID datasetUuid, CounterName counterName) { checkNotNull(datasetUuid); checkNotNull(counterName); Long result = null; try { String path = CRAWL_PREFIX + datasetUuid.toString() + counterName.getPath(); LOG.debug("Reading DAL at path [{}]", path); curator.newNamespaceAwareEnsurePath(path).ensure(curator.getZookeeperClient()); DistributedAtomicLong dal = new DistributedAtomicLong(curator, path, new RetryNTimes(1, 1000)); result = dal.get().preValue(); } catch (Exception e) { LOG.warn("Error reading from zookeeper", e); } return result; }
/** * 获得ZK中的当前值,如果不存在,抛出异常 * @param path * @return * @throws Exception */ private long currentValueFromZk (String path) throws Exception { if (isExists(path)) { DistributedAtomicLong count = new DistributedAtomicLong(client, path, new RetryNTimes(10, 1000)); AtomicValue<Long> val = count.get(); return val.postValue(); } else { throw new RuntimeException("Path is not existed! Call nextValue firstly!"); } }
/** * 获得分布式自增变量 * @param path * @return * @throws Exception */ public Long getIncrementValue(String path) throws Exception { DistributedAtomicLong atomicId = new DistributedAtomicLong(client, path, new RetryNTimes(32,1000)); AtomicValue<Long> rc = atomicId.get(); if (rc.succeeded()) { logger.debug("getIncrementValue({}) success! get: {}.", path, rc.postValue()); } else { logger.warn("getIncrementValue({}) failed! get: {}.", path, rc.postValue()); } return rc.postValue(); }
/** * 自增并获得,自增后的变量 * @param path * @return * @throws Exception */ public Long incrementAndGetValue(String path) throws Exception { DistributedAtomicLong atomicId = new DistributedAtomicLong(client, path, new RetryNTimes(32,1000)); AtomicValue<Long> rc = atomicId.increment(); if (rc.succeeded()) { logger.info("incrementAndGetValue({}) success! before: {}, after: {}.", path, rc.preValue(), rc.postValue()); } else { logger.warn("incrementAndGetValue({}) failed! before: {}, after: {}.", path, rc.preValue(), rc.postValue()); } return rc.postValue(); }
private static void addInSync(String path, long delta){ try { ZKPaths.mkdirs(CURATOR_FRAMEWORK.getZookeeperClient().getZooKeeper(), path); COUNTERS.putIfAbsent(path, new DistributedAtomicLong(CURATOR_FRAMEWORK, path, RETRY_N_TIMES)); DistributedAtomicLong counter = COUNTERS.get(path); AtomicValue<Long> returnValue = counter.add(delta); while (!returnValue.succeeded()) { returnValue = counter.add(delta); } }catch (Exception e){ LOGGER.error("addInSync "+delta+" failed for "+path, e); } }
private static void subtract(String path, long delta){ try { ZKPaths.mkdirs(CURATOR_FRAMEWORK.getZookeeperClient().getZooKeeper(), path); COUNTERS.putIfAbsent(path, new DistributedAtomicLong(CURATOR_FRAMEWORK, path, RETRY_N_TIMES)); DistributedAtomicLong counter = COUNTERS.get(path); AtomicValue<Long> returnValue = counter.subtract(delta); while (!returnValue.succeeded()) { returnValue = counter.subtract(delta); } }catch (Exception e){ LOGGER.error("subtract "+delta+" failed for "+path, e); } }
public static long getValue(String path) { try { DistributedAtomicLong dal = new DistributedAtomicLong(CURATOR_FRAMEWORK, path, RETRY_N_TIMES); return dal.get().postValue(); }catch (Exception e){ LOGGER.error("get counter exception: "+path, e); } return -1; }
@Inject public ZKTimestampStorage(CuratorFramework zkClient) throws Exception { LOG.info("ZK Client state {}", zkClient.getState()); timestamp = new DistributedAtomicLong(zkClient, TIMESTAMP_ZNODE, new RetryNTimes(3, 1000)); // TODO Configure // this? if (timestamp.initialize(INITIAL_MAX_TS_VALUE)) { LOG.info("Timestamp value in ZNode initialized to {}", INITIAL_MAX_TS_VALUE); } }
@Override public DistributedAtomicLong createAtomicCounter(String path) { MockAtomicCounter counter = atomicCounters.get(path); if (counter == null) { counter = new MockAtomicCounter(path); atomicCounters.put(path, counter); } return counter; }
@Test public void testCounter() throws Exception { DistributedAtomicLong counter = new MockCurator().createAtomicCounter("/mycounter"); counter.initialize(4l); assertEquals(4l, counter.get().postValue().longValue()); assertEquals(5l, counter.increment().postValue().longValue()); }
public void createCounter(String path) throws Exception { int tempoMaximoDeTentativasMilissegundos = 1000; int intervaloEntreTentativasMilissegundos = 100; this.counterPath = path; RetryPolicy rp = new RetryUntilElapsed(tempoMaximoDeTentativasMilissegundos, intervaloEntreTentativasMilissegundos); this.counter = new DistributedAtomicLong(this.curatorFramework, this.counterPath, rp); this.counter.initialize((long) 0); }
/** * {@inheritDoc} * * @since 0.4.0 */ @Override public boolean setValue(String namespace, long value) { if (value < 0) { throw new IdException("Id value must be greater or equal to 0!"); } final String[] paths = calcPathIdAndPathLock(namespace); final String pathId = paths[0]; final String pathLock = paths[1]; RetryPolicy retryPolicyMutex = new BoundedExponentialBackoffRetry(10, 1000, 5); PromotedToLock promotedToLock = PromotedToLock.builder().retryPolicy(retryPolicyMutex) .lockPath(pathLock).build(); RetryPolicy retryPolicyOptimistic = new RetryNTimes(3, 100); DistributedAtomicLong dal = new DistributedAtomicLong(curatorFramework, pathId, retryPolicyOptimistic, promotedToLock); semaphore.acquireUninterruptibly(); try { dal.forceSet(value); return true; } catch (Exception e) { throw e instanceof IdException ? (IdException) e : new IdException(e); } finally { semaphore.release(); } }
public void updateCounter(UUID datasetKey, String path, long value) { DistributedAtomicLong dal = getCounter(datasetKey, path); try { AtomicValue<Long> atom = dal.trySet(value); // we must check if the operation actually succeeded // see https://github.com/Netflix/curator/wiki/Distributed-Atomic-Long if (!atom.succeeded()) { LOG.error("Failed to update counter {} for dataset {}", path, datasetKey); } } catch (Exception e) { LOG.error("Failed to update counter {} for dataset {}", path, datasetKey, e); } }
private static Long createID(CuratorFramework curator) { try { DistributedAtomicLong counter = new DistributedAtomicLong(curator, ZookeeperPath.TRANSACTOR_COUNT, new ExponentialBackoffRetry(1000, 10)); AtomicValue<Long> nextId = counter.increment(); while (nextId.succeeded() == false) { nextId = counter.increment(); } return nextId.postValue(); } catch (Exception e) { throw new IllegalStateException(e); } }
@Override public void run() { while (true) { List<String> copy; synchronized (paths) { copy = Lists.newArrayList(paths); paths.clear(); } // merge additions Map<String, AtomicLong> mutations = Maps.newHashMap(); for (String path : copy) { if (mutations.containsKey(path)) { mutations.get(path).incrementAndGet(); } else { mutations.put(path, new AtomicLong(1)); } } for (Map.Entry<String, AtomicLong> entry : mutations.entrySet()) { try { DistributedAtomicLong dal = new DistributedAtomicLong(client, entry.getKey(), new RetryUntilElapsed((int) TimeUnit.MINUTES.toMillis(5), (int) TimeUnit.MILLISECONDS.toMillis(25))); AtomicValue<Long> result = dal.add(entry.getValue().get()); if (!result.succeeded()) { LOG.warn("Counter updates are failing and we've exhausted retry - counts will be wrong"); } } catch (Exception e) { LOG.warn("Failed to update DALs during flush - counts will be wrong", e); } } try { Thread.sleep(flushFrequencyMsecs); // not a true time of course } catch (InterruptedException e1) { break; // really? } } }
public ZKCounter(String nodeName, RetryPolicy retryPolicy) throws Exception { this.counterPath = ZK_PATH_COUNTERS + nodeName; this.atomicLong = new DistributedAtomicLong(curator, counterPath, retryPolicy); }
/** Returns an atomic counter in this, or empty if no such counter is created */ public Optional<DistributedAtomicLong> counter(String path) { return Optional.ofNullable(atomicCounters.get(path)); }
/** For internal use; prefer creating a {@link CuratorCounter} */ public DistributedAtomicLong createAtomicCounter(String path) { return new DistributedAtomicLong(curatorFramework, path, new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES)); }
public DistributedAtomicLong getCounter() { return counter; }
public void setCounter(DistributedAtomicLong counter) { this.counter = counter; }
@Override public void init(FloodlightModuleContext context) throws FloodlightModuleException { // Read the Zookeeper connection string from the config Map<String, String> configParams = context.getConfigParams(this); String connectionStringParam = configParams.get("connectionString"); if (connectionStringParam != null) { connectionString = connectionStringParam; } else { connectionString = System.getProperty( "net.onrc.onos.core.registry.ZookeeperRegistry.connectionString", DEFAULT_CONNECTION_STRING); } // Remove spaces from connection string otherwise Zookeeper complains connectionString = connectionString.replaceAll("\\s", ""); log.info("Setting Zookeeper connection string to {}", this.connectionString); namespace = System.getProperty(ZK_NAMESPACE_KEY, DEFAULT_NAMESPACE).trim(); if (namespace.isEmpty()) { namespace = DEFAULT_NAMESPACE; } log.info("Setting Zookeeper namespace to {}", namespace); restApi = context.getServiceImpl(IRestApiService.class); switches = new ConcurrentHashMap<String, SwitchLeadershipData>(); switchPathCaches = new ConcurrentHashMap<String, PathChildrenCache>(); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); curatorFrameworkClient = CuratorFrameworkFactory.newClient(this.connectionString, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy); curatorFrameworkClient.start(); curatorFrameworkClient = curatorFrameworkClient.usingNamespace(namespace); distributedIdCounter = new DistributedAtomicLong( curatorFrameworkClient, ID_COUNTER_PATH, new RetryOneTime(100)); rootSwitchCache = new PathChildrenCache( curatorFrameworkClient, SWITCH_LATCHES_PATH, true); rootSwitchCache.getListenable().addListener(switchPathCacheListener); // Build the service discovery object serviceDiscovery = ServiceDiscoveryBuilder.builder(ControllerService.class) .client(curatorFrameworkClient).basePath(SERVICES_PATH).build(); // We read the list of services very frequently (GUI periodically // queries them) so we'll cache them to cut down on Zookeeper queries. serviceCache = serviceDiscovery.serviceCacheBuilder() .name(CONTROLLER_SERVICE_NAME).build(); try { serviceDiscovery.start(); serviceCache.start(); // Don't prime the cache, we want a notification for each child // node in the path rootSwitchCache.start(StartMode.NORMAL); } catch (Exception e) { throw new FloodlightModuleException( "Error initialising ZookeeperRegistry", e); } ExecutorService eventThreadExecutorService = Executors.newSingleThreadExecutor(); eventThreadExecutorService.execute( new Runnable() { @Override public void run() { dispatchEvents(); } }); }
public DistributedAtomicLong getCounter(UUID datasetKey, String path) { return new DistributedAtomicLong(curator, getCrawlInfoPath(datasetKey, path), new RetryNTimes(5, 1000)); }