@Override public Object execute() throws Exception { SharedData sharedData = getVertxService().sharedData(); LocalMap<Object, Object> map = sharedData.getLocalMap(this.map); ShellTable table = new ShellTable(); table.column("Map[" + this.map + "]\nKey"); table.column("\nValue"); if (keys != null) { keys.forEach(key -> { renderRow(map, table, key); }); } else { map.keySet().forEach(key -> { renderRow(map, table, (String) key); }); } table.print(System.out); return null; }
/** * Create a AsyncMap * * @param <K> Key type * @param <V> Value type * @param vertx * @param mapName name of the map. If null, will always create a local map * @param fut */ public static <K, V> void create(Vertx vertx, String mapName, Handler<ExtendedAsyncResult<AsyncMap<K, V>>> fut) { if (vertx.isClustered() && mapName != null) { SharedData shared = vertx.sharedData(); shared.<K, V>getClusterWideMap(mapName, res -> { if (res.succeeded()) { fut.handle(new Success<>(res.result())); } else { fut.handle(new Failure<>(INTERNAL, res.cause())); } }); } else { // Dirty trickery to make sure we can run two verticles in our tests, // without them sharing the 'shared' memory. Only when running in non- // clustered mode, of course. // Also used in deploy-only nodes, where we want local-only tenant and // module lists with only the hard-coded supertenant and internalModule. Random r = new Random(); String newid = String.format("%09d", r.nextInt(1000000000)); if (mapName != null) { newid = mapName + newid; } AsyncLocalmap<K, V> l = new AsyncLocalmap<>(vertx, newid); fut.handle(new Success<>(l)); } }
@Test public void initializationTest(TestContext ctx) { ModuleOne m1 = new ModuleOne(); m1.setConfig(new JsonObject()); initializer.initialize(m1).setHandler(ar -> { ctx.assertTrue(ar.succeeded()); SharedData sd = Environment.my(SharedData.class); sd.getCounter("c1", c -> { c.result().get(ar1 -> { ctx.assertEquals(2l, ar1.result()); }); }); }); }
@Override public void findRouteSocketInRegistryAndRemove(ServerWebSocket serverSocket) { final SharedData sharedData = this.vertx.sharedData(); final String binaryHandlerID = serverSocket.binaryHandlerID(); final String textHandlerID = serverSocket.textHandlerID(); final LocalMap<String, byte[]> wsRegistry = sharedData.getLocalMap(WS_REGISTRY); final WSEndpointHolder holder = getWSEndpointHolderFromSharedData(wsRegistry); if (holder != null) { final List<WSEndpoint> all = holder.getAll(); final Optional<WSEndpoint> first = all.parallelStream().filter(e -> e.getBinaryHandlerId().equals(binaryHandlerID) && e.getTextHandlerId().equals(textHandlerID)).findFirst(); first.ifPresent(endpoint -> { holder.remove(endpoint); wsRegistry.replace(WS_ENDPOINT_HOLDER, serialize(holder)); log("OK REMOVE: " + serverSocket.binaryHandlerID()); }); } }
@Override public void replyToAllWS(Message<byte[]> message) { try { log("Reply to all: " + this); final WSMessageWrapper wrapper = (WSMessageWrapper) Serializer.deserialize(message.body()); final String stringResult = TypeTool.trySerializeToString(wrapper.getBody()); final byte[] payload = stringResult != null ? stringResult.getBytes() : Serializer.serialize(wrapper.getBody()); final SharedData sharedData = this.vertx.sharedData(); final LocalMap<String, byte[]> wsRegistry = sharedData.getLocalMap(WS_REGISTRY); final byte[] holderPayload = wsRegistry.get(WS_ENDPOINT_HOLDER); if (holderPayload != null) { final WSEndpointHolder holder = (WSEndpointHolder) deserialize(holderPayload); final List<WSEndpoint> all = holder.getAll(); all.parallelStream(). filter(endP -> endP.getUrl().equals(wrapper.getEndpoint().getUrl())). forEach( endpoint -> replyToEndpoint(stringResult, payload, endpoint) ); } } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } }
@Override public void start() { SharedData engineData = vertx.sharedData(); TradeRecommendationsEngine engine = new TradeRecommendationsEngine(engineData); Router router = Router.router(vertx); router.get("/").handler(engine::nextRecommendation); vertx.createHttpServer() .requestHandler(router::accept) .listen(8080); }
@Override public Object execute() throws Exception { SharedData sharedData = getVertxService().sharedData(); LocalMap<Object, Object> localMap = sharedData.getLocalMap(map); localMap.put(key, value); return "key and value added to map"; }
@Override public Object execute() throws Exception { SharedData sharedData = getVertxService().sharedData(); LocalMap<Object, Object> map = sharedData.getLocalMap(this.map); if (keys != null) { keys.forEach(map::remove); } return "removed keys from map"; }
@Override public void start() throws Exception { mongo = MongoClient.createShared(vertx, config().getJsonObject("mongodb", new JsonObject())); intervalMS = config().getJsonObject("simulation", new JsonObject()).getInteger("interval_ms", 1000); msgInterval = config().getJsonObject("simulation", new JsonObject()).getInteger("msgInterval", 1); SharedData sd = vertx.sharedData(); simulationStatus = sd.getLocalMap("simStatusMap"); vertx.eventBus().consumer(Bus.START_SIMULATION.address(), this::startSimulation); vertx.eventBus().consumer(Bus.STOP_SIMULATION.address(), this::stopSimulation); vertx.eventBus().consumer(Bus.SIMULATION_STATUS.address(), this::getSimulationStatus); vertx.eventBus().consumer(Bus.SIMULATION_ENDED.address(), this::handleSimulationEnded); }
public void bindInterfaces(Vertx vertx){ Environment.registry(Bind.bind(Vertx.class), vertx); Environment.registry(Bind.bind(Context.class), vertx.getOrCreateContext()); Environment.registry(Bind.bind(EventBus.class), vertx.eventBus()); Environment.registry(Bind.bind(FileSystem.class), vertx.fileSystem()); Environment.registry(Bind.bind(SharedData.class), vertx.sharedData()); }
@Override protected void load() { log.debug(getClass().getSimpleName()); SharedData sd = Environment.my(SharedData.class); sd.getCounter("c1", c -> { c.result().addAndGet(1, ar -> { }); }); }
@Override public Object resolve(RoutingContext context, LocalMapValue annotation, String paramName, Class<?> resultClass) { SharedData sd = context.vertx().sharedData(); String mapName = annotation.mapName(); String key = annotation.key(); if ("".equals(key)) { key = paramName; } io.vertx.core.shareddata.LocalMap<Object, Object> map = sd.getLocalMap(mapName); return map.get(key); }
@Override public Object resolve(RoutingContext context, VertxLocalMap annotation, String paramName, Class<?> resultClass) { SharedData sd = context.vertx().sharedData(); String mapName = annotation.value(); if ("".equals(mapName)) { mapName = paramName; } return sd.getLocalMap(mapName); }
@Override public void findRouteToWSServiceAndRegister(ServerWebSocket serverSocket) { final SharedData sharedData = this.vertx.sharedData(); sharedData.<String, ServiceInfoHolder>getClusterWideMap(REGISTRY, onSuccess(resultMap -> resultMap.get(GlobalKeyHolder.SERVICE_HOLDER, onSuccess(resultHolder -> findServiceEntryAndRegisterWS(serverSocket, resultHolder, sharedData))) )); }
private void findServiceEntryAndRegisterWS(final ServerWebSocket serverSocket, final ServiceInfoHolder resultHolder, final SharedData sharedData) { if (resultHolder != null) { final String path = serverSocket.path(); log("find entry : " + path); final Optional<Operation> operationResult = findServiceInfoEntry(resultHolder, path); operationResult.ifPresent(op -> createEndpointDefinitionAndRegister(serverSocket, sharedData) ); } }
private void getSharedRegistryAndPing() { final SharedData sharedData = this.vertx.sharedData(); sharedData.<String, ServiceInfoHolder>getClusterWideMap(GlobalKeyHolder.REGISTRY_MAP_KEY, onSuccess(resultMap -> { logDebug("resultMap " + resultMap); getServiceHolderAndPingServices(resultMap); } )); }
public TradeRecommendationsEngine(SharedData engineData) { this.engineData = engineData; }
public AsyncLocalmap(Vertx vertx, String mapName) { SharedData sd = vertx.sharedData(); this.map = sd.getLocalMap(mapName); }
@Override public SharedData sharedData() { return vertx.sharedData(); }
@Bean public SharedData sharedData(Vertx vertx){ return vertx.sharedData(); }
public AuctionRepository(SharedData sharedData) { this.sharedData = sharedData; }
@Provides SharedData provideSharedData(Vertx vertx) { return vertx.sharedData(); }
@Inject public AuctionRepository(SharedData sharedData) { this.sharedData = sharedData; }
private void createEndpointDefinitionAndRegister(ServerWebSocket serverSocket, final SharedData sharedData) { sharedData.<String, WSEndpointHolder>getClusterWideMap(WS_REGISTRY, onSuccess(registryMap -> getEndpointHolderAndAdd(serverSocket, registryMap) )); }
private void createEndpointDefinitionAndRegister(ServerWebSocket serverSocket) { final SharedData sharedData = this.vertx.sharedData(); final LocalMap<String, byte[]> wsRegistry = sharedData.getLocalMap(WS_REGISTRY); final WSEndpointHolder holder = getWSEndpointHolderFromSharedData(wsRegistry); final String path = serverSocket.path(); final WSEndpoint endpoint = new WSEndpoint(serverSocket.binaryHandlerID(), serverSocket.textHandlerID(), path); replaceOrAddEndpoint(wsRegistry, holder, endpoint); sendToWSService(serverSocket, path, endpoint); }