@Override public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) { ContextImpl context = (ContextImpl) vertx.getOrCreateContext(); // Ordered on the internal blocking executor context.executeBlocking(() -> { java.util.concurrent.locks.Lock lock = lockService.getLock(name); try { if (lock.tryLock(timeout, TimeUnit.MILLISECONDS)) { return new JGroupsLock(vertx, lock); } else { throw new VertxException("Timed out waiting to get lock " + name); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new VertxException(e); } }, resultHandler); }
@Override public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> resultHandler) { ContextImpl context = vertx.getOrCreateContext(); @SuppressWarnings("unchecked") Queue<GetRequest<K, V>> getRequests = (Queue<GetRequest<K, V>>) context.contextData().computeIfAbsent(this, ctx -> new ArrayDeque<>()); synchronized (getRequests) { ChoosableSet<V> entries = nearCache.get(k); if (entries != null && entries.isInitialised() && getRequests.isEmpty()) { context.runOnContext(v -> { resultHandler.handle(Future.succeededFuture(entries)); }); } else { getRequests.add(new GetRequest<>(k, resultHandler)); if (getRequests.size() == 1) { dequeueGet(context, getRequests); } } } }
@Override public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> resultHandler) { ContextImpl context = vertx.getOrCreateContext(); @SuppressWarnings("unchecked") Queue<GetRequest<K, V>> getRequests = (Queue<GetRequest<K, V>>) context.contextData().computeIfAbsent(this, ctx -> new ArrayDeque<>()); synchronized (getRequests) { ChoosableSet<V> entries = cache.get(k); if (entries != null && entries.isInitialised() && getRequests.isEmpty()) { context.runOnContext(v -> { resultHandler.handle(Future.succeededFuture(entries)); }); } else { getRequests.add(new GetRequest<>(k, resultHandler)); if (getRequests.size() == 1) { dequeueGet(context, getRequests); } } } }
@Override public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) { ContextImpl context = (ContextImpl) vertx.getOrCreateContext(); // Ordered on the internal blocking executor context.executeBlocking(() -> { ISemaphore iSemaphore = hazelcast.getSemaphore(LOCK_SEMAPHORE_PREFIX + name); boolean locked = false; long remaining = timeout; do { long start = System.nanoTime(); try { locked = iSemaphore.tryAcquire(remaining, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // OK continue } remaining = remaining - MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS); } while (!locked && remaining > 0); if (locked) { return new HazelcastLock(iSemaphore); } else { throw new VertxException("Timed out waiting to get lock " + name); } }, resultHandler); }
@Override public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> handler) { ContextImpl context = (ContextImpl) vertx.getOrCreateContext(); // Ordered on the internal blocking executor context.executeBlocking(() -> { boolean locked; try { IgniteQueue<String> queue = getQueue(name, true); pendingLocks.offer(name); locked = queue.offer(getNodeID(), timeout, TimeUnit.MILLISECONDS); if (!locked) { // EVT_NODE_LEFT/EVT_NODE_FAILED event might be already handled, so trying get lock again if // node left topology. // Use IgniteSempahore when it will be fixed. String ownerId = queue.peek(); ClusterNode ownerNode = ignite.cluster().forNodeId(UUID.fromString(ownerId)).node(); if (ownerNode == null) { queue.remove(ownerId); locked = queue.offer(getNodeID(), timeout, TimeUnit.MILLISECONDS); } } } catch (Exception e) { throw new VertxException("Error during getting lock " + name, e); } finally { pendingLocks.remove(name); } if (locked) { return new LockImpl(name); } else { throw new VertxException("Timed out waiting to get lock " + name); } }, handler); }