@Override public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> asyncResultHandler) { if (!keyIsNull(k, asyncResultHandler)) { vertx.runOnContext(event -> { Map<String, ChildData> maps = curatorCache.getCurrentChildren(keyPath(k)); ChoosableSet<V> choosableSet = new ChoosableSet<>(0); if (maps != null) { for (ChildData childData : maps.values()) { try { if (childData != null && childData.getData() != null && childData.getData().length > 0) choosableSet.add(asObject(childData.getData())); } catch (Exception ex) { asyncResultHandler.handle(Future.failedFuture(ex)); } } } asyncResultHandler.handle(Future.succeededFuture(choosableSet)); }); } }
@Override public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> resultHandler) { ContextImpl context = vertx.getOrCreateContext(); @SuppressWarnings("unchecked") Queue<GetRequest<K, V>> getRequests = (Queue<GetRequest<K, V>>) context.contextData().computeIfAbsent(this, ctx -> new ArrayDeque<>()); synchronized (getRequests) { ChoosableSet<V> entries = nearCache.get(k); if (entries != null && entries.isInitialised() && getRequests.isEmpty()) { context.runOnContext(v -> { resultHandler.handle(Future.succeededFuture(entries)); }); } else { getRequests.add(new GetRequest<>(k, resultHandler)); if (getRequests.size() == 1) { dequeueGet(context, getRequests); } } } }
@Override public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> resultHandler) { ContextImpl context = vertx.getOrCreateContext(); @SuppressWarnings("unchecked") Queue<GetRequest<K, V>> getRequests = (Queue<GetRequest<K, V>>) context.contextData().computeIfAbsent(this, ctx -> new ArrayDeque<>()); synchronized (getRequests) { ChoosableSet<V> entries = cache.get(k); if (entries != null && entries.isInitialised() && getRequests.isEmpty()) { context.runOnContext(v -> { resultHandler.handle(Future.succeededFuture(entries)); }); } else { getRequests.add(new GetRequest<>(k, resultHandler)); if (getRequests.size() == 1) { dequeueGet(context, getRequests); } } } }
@Override public void get(final K k, final Handler<AsyncResult<ChoosableIterable<V>>> asyncResultHandler) { final ContextInternal ctx = FakeClusterManager.this.vertx.getOrCreateContext(); ctx.executeBlocking(fut -> { ChoosableIterable<V> it = this.map.get(k); if (it == null) { it = new ChoosableSet<>(0); } fut.complete(it); }, this.taskQueue, asyncResultHandler); }
@Override public void get(K key, Handler<AsyncResult<ChoosableIterable<V>>> handler) { execute( cache -> cache.getAsync(marshal(key)), (Set<V> items) -> { Set<V> unmarshalledItems = null; if (items != null) { unmarshalledItems = items.stream().map(ClusterSerializationUtils::unmarshal).collect(toSet()); } Set<V> items0 = unmarshalledItems; ChoosableIterableImpl<V> it = subs.compute(key, (k, oldValue) -> { if (items0 == null || items0.isEmpty()) { return null; } if (oldValue == null) { return new ChoosableIterableImpl<>(new ArrayList<>(items0)); } else { oldValue.update(new ArrayList<>(items0)); return oldValue; } }); return it == null ? ChoosableIterableImpl.empty() : it; }, handler ); }
@Override @SuppressWarnings("unchecked") public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> handler) { map.get(k).whenComplete(VertxFutures.convertHandler( handler, collection -> collection != null ? new AtomixChoosableIterable(collection.value()) : new AtomixChoosableIterable<V>(Collections.emptyList()), vertx.getOrCreateContext())); }
@Test public void shouldNotAddToMapCacheIfKeyDoesntAlreadyExist() throws Exception { String nonexistentKey = "non-existent-key." + UUID.randomUUID(); map.get(nonexistentKey, ar -> { if (ar.succeeded()) { try { ChoosableIterable<ServerID> s = ar.result(); Map<String, ChoosableIterable<ServerID>> cache = getCacheFromMap(); // System.err.println("CACHE CONTENTS: " + cache); // check result assertNotNull(s); assertTrue(s.isEmpty()); // check cache assertNotNull(cache); assertFalse( "Map cache should not contain key " + nonexistentKey, cache.containsKey(nonexistentKey)); } catch (Exception e) { fail(e.toString()); } finally { testComplete(); } } else { fail(ar.cause().toString()); } }); await(); }
public static <T> ChoosableIterable<T> empty() { return (ChoosableIterable<T>)EMPTY; }
GetRequest(K key, Handler<AsyncResult<ChoosableIterable<V>>> handler) { this.key = key; this.handler = handler; }
public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> handler) { logTrace(() -> "get k = [" + k + "], handler = [" + handler + "]"); executorService.runAsync(() -> map.get(k), handler); }
@SuppressWarnings("unchecked") private Map<String, ChoosableIterable<ServerID>> getCacheFromMap() throws Exception { Field field = map.getClass().getDeclaredField("cache"); field.setAccessible(true); return (Map<String, ChoosableIterable<ServerID>>) field.get(map); }
ChoosableIterable<V> get(K k);