@Override public void updateMaxTimestamp(long previousMaxTimestamp, long newMaxTimestamp) throws IOException { if (newMaxTimestamp < 0) { LOG.error("Negative value received for maxTimestamp: {}", newMaxTimestamp); throw new IllegalArgumentException(); } if (newMaxTimestamp <= previousMaxTimestamp) { LOG.error("maxTimestamp {} <= previousMaxTimesamp: {}", newMaxTimestamp, previousMaxTimestamp); throw new IllegalArgumentException(); } AtomicValue<Long> compareAndSet; try { compareAndSet = timestamp.compareAndSet(previousMaxTimestamp, newMaxTimestamp); } catch (Exception e) { throw new IOException("Problem setting timestamp in ZK", e); } if (!compareAndSet.succeeded()) { // We have to explicitly check for success (See Curator doc) throw new IOException("GetAndSet operation for storing timestamp in ZK did not succeed " + compareAndSet.preValue() + " " + compareAndSet.postValue()); } }
/** * Return the shared counter. * @return * @throws Exception */ private long getRequests() throws Exception { long contador = 0; AtomicValue<Long> value = this.zkw.getCounter().get(); if (value.succeeded()) { contador = value.postValue(); } else { contador = value.preValue(); } return contador; }
public static void main(String[] args) throws IOException, Exception { try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); List<DistributedAtomicLong> examples = Lists.newArrayList(); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10)); examples.add(count); Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { try { //Thread.sleep(rand.nextInt(1000)); AtomicValue<Long> value = count.increment(); //AtomicValue<Long> value = count.decrement(); //AtomicValue<Long> value = count.add((long)rand.nextInt(20)); System.out.println("succeed: " + value.succeeded()); if (value.succeeded()) System.out.println("Increment: from " + value.preValue() + " to " + value.postValue()); } catch (Exception e) { e.printStackTrace(); } return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } }
/** * {@inheritDoc} */ @Override public long nextId(final String namespace) { final String[] paths = calcPathIdAndPathLock(namespace); final String pathId = paths[0]; final String pathLock = paths[1]; RetryPolicy retryPolicyMutex = new BoundedExponentialBackoffRetry(10, 1000, 5); PromotedToLock promotedToLock = PromotedToLock.builder().retryPolicy(retryPolicyMutex) .lockPath(pathLock).build(); RetryPolicy retryPolicyOptimistic = new RetryNTimes(3, 100); DistributedAtomicLong dal = new DistributedAtomicLong(curatorFramework, pathId, retryPolicyOptimistic, promotedToLock); semaphore.acquireUninterruptibly(); try { AtomicValue<Long> value = dal.increment(); if (value != null && value.succeeded()) { return value.postValue(); } return -1; } catch (Exception e) { throw e instanceof IdException ? (IdException) e : new IdException(e); } finally { semaphore.release(); } }
/** * {@inheritDoc} */ @Override public long currentId(final String namespace) { final String[] paths = calcPathIdAndPathLock(namespace); final String pathId = paths[0]; final String pathLock = paths[1]; RetryPolicy retryPolicyMutex = new BoundedExponentialBackoffRetry(10, 1000, 5); PromotedToLock promotedToLock = PromotedToLock.builder().retryPolicy(retryPolicyMutex) .lockPath(pathLock).build(); RetryPolicy retryPolicyOptimistic = new RetryNTimes(3, 100); DistributedAtomicLong dal = new DistributedAtomicLong(curatorFramework, pathId, retryPolicyOptimistic, promotedToLock); try { AtomicValue<Long> value = dal.get(); if (value != null && value.succeeded()) { return value.postValue(); } throw new IdException("Operation was not successful!"); } catch (Exception e) { throw e instanceof IdException ? (IdException) e : new IdException(e); } }
/** * 获得ZK中的当前值,如果不存在,抛出异常 * @param path * @return * @throws Exception */ private long currentValueFromZk (String path) throws Exception { if (isExists(path)) { DistributedAtomicLong count = new DistributedAtomicLong(client, path, new RetryNTimes(10, 1000)); AtomicValue<Long> val = count.get(); return val.postValue(); } else { throw new RuntimeException("Path is not existed! Call nextValue firstly!"); } }
/** * 获得分布式自增变量 * @param path * @return * @throws Exception */ public Long getIncrementValue(String path) throws Exception { DistributedAtomicLong atomicId = new DistributedAtomicLong(client, path, new RetryNTimes(32,1000)); AtomicValue<Long> rc = atomicId.get(); if (rc.succeeded()) { logger.debug("getIncrementValue({}) success! get: {}.", path, rc.postValue()); } else { logger.warn("getIncrementValue({}) failed! get: {}.", path, rc.postValue()); } return rc.postValue(); }
/** * 自增并获得,自增后的变量 * @param path * @return * @throws Exception */ public Long incrementAndGetValue(String path) throws Exception { DistributedAtomicLong atomicId = new DistributedAtomicLong(client, path, new RetryNTimes(32,1000)); AtomicValue<Long> rc = atomicId.increment(); if (rc.succeeded()) { logger.info("incrementAndGetValue({}) success! before: {}, after: {}.", path, rc.preValue(), rc.postValue()); } else { logger.warn("incrementAndGetValue({}) failed! before: {}, after: {}.", path, rc.preValue(), rc.postValue()); } return rc.postValue(); }
private static void addInSync(String path, long delta){ try { ZKPaths.mkdirs(CURATOR_FRAMEWORK.getZookeeperClient().getZooKeeper(), path); COUNTERS.putIfAbsent(path, new DistributedAtomicLong(CURATOR_FRAMEWORK, path, RETRY_N_TIMES)); DistributedAtomicLong counter = COUNTERS.get(path); AtomicValue<Long> returnValue = counter.add(delta); while (!returnValue.succeeded()) { returnValue = counter.add(delta); } }catch (Exception e){ LOGGER.error("addInSync "+delta+" failed for "+path, e); } }
private static void subtract(String path, long delta){ try { ZKPaths.mkdirs(CURATOR_FRAMEWORK.getZookeeperClient().getZooKeeper(), path); COUNTERS.putIfAbsent(path, new DistributedAtomicLong(CURATOR_FRAMEWORK, path, RETRY_N_TIMES)); DistributedAtomicLong counter = COUNTERS.get(path); AtomicValue<Long> returnValue = counter.subtract(delta); while (!returnValue.succeeded()) { returnValue = counter.subtract(delta); } }catch (Exception e){ LOGGER.error("subtract "+delta+" failed for "+path, e); } }
public String getNextExecutorId() { checkState(distributedGenerator != null, "never started!"); try { AtomicValue<Integer> atomic = distributedGenerator.increment(); Preconditions.checkState(atomic.succeeded(), "Atomic increment did not succeed"); return convertUsingAlphabet(atomic.postValue()); } catch (Throwable t) { throw Throwables.propagate(t); } }
@Override public long getMaxTimestamp() throws IOException { AtomicValue<Long> atomicValue; try { atomicValue = timestamp.get(); } catch (Exception e) { throw new IOException("Problem getting data from ZK", e); } if (!atomicValue.succeeded()) { // We have to explicitly check for success (See Curator doc) throw new IOException("Get operation to obtain timestamp from ZK did not succeed"); } return atomicValue.postValue(); }
/** * Atomically increment and return resulting value. * * @return the resulting value * @throws IllegalStateException if increment fails */ public synchronized long next() { try { AtomicValue<Long> value = counter.increment(); if (!value.succeeded()) { throw new IllegalStateException("Increment did not succeed"); } return value.postValue(); } catch (Exception e) { throw new IllegalStateException("Unable to get next value", e); } }
/** * Atomically decrement and return the resulting value. * * @return the resulting value * @throws IllegalStateException if decrement fails */ public synchronized long previous() { try { AtomicValue<Long> value = counter.subtract(1L); if (!value.succeeded()) { throw new IllegalStateException("Decrement did not succeed"); } return value.postValue(); } catch (Exception e) { throw new IllegalStateException("Unable to get previous value", e); } }
public long get() { try { AtomicValue<Long> value = counter.get(); if (!value.succeeded()) { throw new RuntimeException("Get did not succeed"); } return value.postValue(); } catch (Exception e) { throw new RuntimeException("Unable to get value", e); } }
/** * 测试入口 */ public static void main(String[] args) throws Exception { client.start(); DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger( client, path, new RetryNTimes(3, 1000)); int jia = new Random().nextInt(100); AtomicValue<Integer> rc = atomicInteger.add(jia); System.out.println("pre: " + rc.preValue()); System.out.println("add: " + jia); System.out.println("post: " + rc.postValue()); System.out.println("Result: " + rc.succeeded()); }
public long uniqueNumber() { // XXX Blocks of numbers. try { AtomicValue<Long> along = globalCounter.increment() ; if ( ! along.succeeded() ) { log.error("Failed: uniqueNumber") ; throw new LizardException("Failed to allocate a unique number") ; } //FmtLog.info(log, "Unique: %d -> %d", along.preValue(), along.postValue()) ; return along.postValue() ; } catch (Exception e) { throw new LizardException("Exception allocating a unique number", e) ; } }
@Override public IdBlock allocateUniqueIdBlock(long range) { try { AtomicValue<Long> result = null; do { result = distributedIdCounter.add(range); } while (result == null || !result.succeeded()); return new IdBlock(result.preValue(), range); } catch (Exception e) { log.error("Error allocating ID block"); } return null; }
public void updateCounter(UUID datasetKey, String path, long value) { DistributedAtomicLong dal = getCounter(datasetKey, path); try { AtomicValue<Long> atom = dal.trySet(value); // we must check if the operation actually succeeded // see https://github.com/Netflix/curator/wiki/Distributed-Atomic-Long if (!atom.succeeded()) { LOG.error("Failed to update counter {} for dataset {}", path, datasetKey); } } catch (Exception e) { LOG.error("Failed to update counter {} for dataset {}", path, datasetKey, e); } }
private static Long createID(CuratorFramework curator) { try { DistributedAtomicLong counter = new DistributedAtomicLong(curator, ZookeeperPath.TRANSACTOR_COUNT, new ExponentialBackoffRetry(1000, 10)); AtomicValue<Long> nextId = counter.increment(); while (nextId.succeeded() == false) { nextId = counter.increment(); } return nextId.postValue(); } catch (Exception e) { throw new IllegalStateException(e); } }
@Override public void run() { while (true) { List<String> copy; synchronized (paths) { copy = Lists.newArrayList(paths); paths.clear(); } // merge additions Map<String, AtomicLong> mutations = Maps.newHashMap(); for (String path : copy) { if (mutations.containsKey(path)) { mutations.get(path).incrementAndGet(); } else { mutations.put(path, new AtomicLong(1)); } } for (Map.Entry<String, AtomicLong> entry : mutations.entrySet()) { try { DistributedAtomicLong dal = new DistributedAtomicLong(client, entry.getKey(), new RetryUntilElapsed((int) TimeUnit.MINUTES.toMillis(5), (int) TimeUnit.MILLISECONDS.toMillis(25))); AtomicValue<Long> result = dal.add(entry.getValue().get()); if (!result.succeeded()) { LOG.warn("Counter updates are failing and we've exhausted retry - counts will be wrong"); } } catch (Exception e) { LOG.warn("Failed to update DALs during flush - counts will be wrong", e); } } try { Thread.sleep(flushFrequencyMsecs); // not a true time of course } catch (InterruptedException e1) { break; // really? } } }
@Override public AtomicValue<Long> get() { if (value == null) return new MockLongValue(0); return value; }
public AtomicValue<Long> add(Long delta) throws Exception { return trySet(value.postValue() + delta); }
public AtomicValue<Long> subtract(Long delta) throws Exception { return trySet(value.postValue() - delta); }
@Override public AtomicValue<Long> increment() { return trySet(value.postValue() + 1); }
public AtomicValue<Long> decrement() throws Exception { return trySet(value.postValue() - 1); }
@Override public AtomicValue<Long> trySet(Long longval) { value = new MockLongValue(longval); return value; }
public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) throws Exception { throw new UnsupportedOperationException("Not implemented in MockCurator"); }