@Override public void start() { final DatagramSocket socket = vertx.createDatagramSocket(dsOptions); socket.listen(udpPort, host, datagramSocketAsyncResult -> { if (datagramSocketAsyncResult.succeeded()){ log.info("Listening on UDP port " + udpPort); async.countDown(); socket.handler(packet -> { String decoded = packet.data().getString(0, packet.data().length()); log.debug("=============== Received content on UDP " + udpPort + " ================= \n" + decoded); async.countDown(); }); } else { log.warn("Listen failed on port " + udpPort, datagramSocketAsyncResult.cause()); } }); }
@Test public void testMetricsCleanupedOnVertxClose() throws Exception { CountDownLatch latch1 = new CountDownLatch(1); HttpServer server = vertx.createHttpServer(new HttpServerOptions().setPort(8080)); server.requestHandler(req -> {}); server.listen(onSuccess(res -> { latch1.countDown(); })); awaitLatch(latch1); HttpClient client = vertx.createHttpClient(new HttpClientOptions()); CountDownLatch latch2 = new CountDownLatch(1); NetServer nServer = vertx.createNetServer(new NetServerOptions().setPort(1234)); nServer.connectHandler(conn -> {}); nServer.listen(res -> { latch2.countDown(); }); awaitLatch(latch2); NetClient nClient = vertx.createNetClient(new NetClientOptions()); DatagramSocket sock = vertx.createDatagramSocket(new DatagramSocketOptions()); EventBus eb = vertx.eventBus(); assertFalse(metricsService.getMetricsSnapshot(vertx).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(server).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(client).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(nServer).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(nClient).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(sock).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(eb).isEmpty()); vertx.close(res -> { assertTrue(metricsService.getMetricsSnapshot(vertx).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(server).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(client).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(nServer).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(nClient).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(sock).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(eb).isEmpty()); testComplete(); }); await(); vertx = null; }
@Override public void start(Future<Void> startFuture) throws Exception { DatagramSocket socket = vertx.createDatagramSocket(); socket.handler(this::handlePacket); socket.listen(port, "0.0.0.0", result -> { if (result.succeeded()) { startFuture.complete(); } else { startFuture.fail(result.cause()); } }); }
@Override default DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { return DummyVertxMetrics.DummyDatagramMetrics.INSTANCE; }
@Override public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { return createSubMetrics(v -> v.createMetrics(socket, options), d -> new DispatchingDatagramSocketMetrics(d)); }
@Override public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { return new DatagramSocketMetricsImpl(counterService, gaugeService, properties.getDatagramSocket()); }
@Override public @NotNull DatagramSocketMetrics createMetrics(@NotNull DatagramSocket socket, @NotNull DatagramSocketOptions datagramSocketOptions) { return options.isEnabled(DatagramSocket) ? new DatagramSocketPrometheusMetrics(options.getRegistry()) : super.createMetrics(socket, datagramSocketOptions); }
@Override public DatagramSocket createDatagramSocket(DatagramSocketOptions options) { return vertx.createDatagramSocket(options); }
@Override public DatagramSocket createDatagramSocket() { return vertx.createDatagramSocket(); }
@Override public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { return new DatagramSocketMetricsImpl(defaultLabels, !this.options.isMetricsTypeDisabled(MetricsType.DATAGRAM_SOCKET)); }
@Override public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { DatagramSocketMetricsSupplier supplier = (DatagramSocketMetricsSupplier) metricSuppliers.get(DATAGRAM_SOCKET); return supplier != null ? new DatagramSocketMetricsImpl(supplier) : super.createMetrics(socket, options); }
@Override public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { return new DatagramSocketMetricsImpl(this, nameOf("datagram")); }
@Test public void testDatagramMetrics() throws Exception { Buffer clientMax = randomBuffer(1823); Buffer clientMin = randomBuffer(123); AtomicBoolean complete = new AtomicBoolean(false); DatagramSocket datagramSocket = vertx.createDatagramSocket(new DatagramSocketOptions()).listen(1236, "localhost", ar -> { assertTrue(ar.succeeded()); DatagramSocket socket = ar.result(); socket.handler(packet -> { if (complete.getAndSet(true)) { testComplete(); } }); socket.send(clientMin, 1236, "localhost", ds -> { assertTrue(ar.succeeded()); }); socket.send(clientMax, 1236, "localhost", ds -> { assertTrue(ar.succeeded()); }); }); await(); // Test sender/client (bytes-written) JsonObject metrics = metricsService.getMetricsSnapshot(datagramSocket); assertCount(metrics.getJsonObject("bytes-written"), 2L); assertMinMax(metrics.getJsonObject("bytes-written"), (long) clientMin.length(), (long) clientMax.length()); // Test server (bytes-read) assertCount(metrics.getJsonObject("localhost:1236.bytes-read"), 2L); assertMinMax(metrics.getJsonObject("localhost:1236.bytes-read"), (long) clientMin.length(), (long) clientMax.length()); CountDownLatch latch = new CountDownLatch(1); datagramSocket.close(ar -> { assertTrue(ar.succeeded()); latch.countDown(); }); awaitLatch(latch); assertWaitUntil(() -> metricsService.getMetricsSnapshot(datagramSocket).isEmpty()); }
@Test public void testNothingToSend(TestContext testContext) { this.setup(false, Optional.empty(), Optional.empty()); Async async = testContext.async(); Vertx vertx = mock(Vertx.class); when(vertx.setTimer(anyLong(), Matchers.any())).thenReturn(1L); Context context = mock(Context.class); Mockito.doNothing().when(context).runOnContext(Matchers.any()); DatagramSocket datagramSocket = mock(DatagramSocket.class); when(vertx.createDatagramSocket()).thenReturn(datagramSocket); StatfulMetricsOptions options = mock(StatfulMetricsOptions.class); when(options.getFlushInterval()).thenReturn(10L); when(options.getFlushSize()).thenReturn(10); when(options.getMaxBufferSize()).thenReturn(5000); UDPSender sender = new UDPSender(vertx, context, options); sender.send(Collections.emptyList()); verify(datagramSocket, times(0)).send(anyString(), anyInt(), anyString(), Matchers.any()); this.teardown(async); }