@Override public <K, V> void getAsyncMultiMap(final String name, final Handler<AsyncResult<AsyncMultiMap<K, V>>> resultHandler) { ConcurrentMap map = asyncMultiMaps.get(name); if (map == null) { map = new ConcurrentHashMap<>(); final ConcurrentMap prevMap = asyncMultiMaps.putIfAbsent(name, map); if (prevMap != null) { map = prevMap; } } @SuppressWarnings("unchecked") final ConcurrentMap<K, ChoosableSet<V>> theMap = map; this.vertx.runOnContext(v -> resultHandler.handle(Future.succeededFuture(new FakeAsyncMultiMap<>(theMap)))); }
@Override public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> resultHandler) { vertx.executeBlocking(future -> { Cache<MultiMapKey, Object> cache = cacheManager.getCache(name); InfinispanAsyncMultiMap<K, V> asyncMultiMap = new InfinispanAsyncMultiMap<>(vertx, cache); synchronized (this) { multimaps.add(asyncMultiMap); } future.complete(asyncMultiMap); }, false, resultHandler); }
@Override public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) { atomix.<K, V>consistentMultimapBuilder(name) .withSerializer(createSerializer()) .withConsistency(Consistency.LINEARIZABLE) .withPersistence(Persistence.PERSISTENT) .withReplication(Replication.SYNCHRONOUS) .withRecovery(Recovery.RECOVER) .withMaxRetries(5) .buildAsync() .whenComplete(VertxFutures.convertHandler( handler, map -> new AtomixAsyncMultiMap<>(vertx, map.async()), vertx.getOrCreateContext())); }
@Override public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) { logTrace(() -> String.format("Create new AsyncMultiMap [%s] on address [%s]", name, address)); vertx.executeBlocking((future) -> { checkCluster(); AsyncMultiMap<K, V> map = cacheManager.<K, V>createAsyncMultiMap(name); future.complete(map); }, handler); }
/** * Every eventbus handler has an ID. SubsMap (subscriber map) is a MultiMap which * maps handler-IDs with server-IDs and thus allows the eventbus to determine where * to send messages. * * @param name A unique name by which the the MultiMap can be identified within the cluster. * See the cluster config file (e.g. cluster.xml in case of HazelcastClusterManager) for * additional MultiMap config parameters. * @param resultHandler handler receiving the multimap */ @Override public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> resultHandler) { vertx.executeBlocking(fut -> { com.hazelcast.core.MultiMap<K, V> multiMap = hazelcast.getMultiMap(name); HazelcastAsyncMultiMap<K, V> asyncMultiMap = new HazelcastAsyncMultiMap<>(vertx, multiMap); synchronized (this) { multimaps.add(asyncMultiMap); } fut.complete(asyncMultiMap); }, resultHandler); }
@Override public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) { vertx.executeBlocking( fut -> fut.complete(new AsyncMultiMapImpl<>(this.<K, Set<V>>getCache(name), vertx)), handler ); }
public <K, V> AsyncMultiMap<K, V> createAsyncMultiMap(String name) { logDebug(() -> String.format("method createAsyncMultiMap address[%s] name[%s]", channel.getAddressAsString(), name)); MultiMap<K, V> map = multiMapService.<K, V>multiMapCreate(name); return new AsyncMultiMapWrapper<>(name, map, executorService); }
/** * Every eventbus handler has an ID. SubsMap (subscriber map) is a MultiMap which * maps handler-IDs with server-IDs and thus allows the eventbus to determine where * to send messages. * * @param name A unique name by which the the MultiMap can be identified within the cluster. * See the cluster config file (e.g. io.vertx.spi.cluster.impl.zookeeper.zookeeper.properties in case of ZookeeperClusterManager) for * additional MultiMap config parameters. * @return subscription map */ @Override public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) { vertx.runOnContext(event -> handler.handle(Future.succeededFuture(new ZKAsyncMultiMap<>(vertx, curator, name)))); }