@Override public void start() throws Exception { NetServer netServer = vertx.createNetServer();//创建代理服务器 NetClient netClient = vertx.createNetClient();//创建连接mysql客户端 netServer.connectHandler(socket -> netClient.connect(port, mysqlHost, result -> { //响应来自客户端的连接请求,成功之后,在建立一个与目标mysql服务器的连接 if (result.succeeded()) { //与目标mysql服务器成功连接连接之后,创造一个MysqlProxyConnection对象,并执行代理方法 new MysqlProxyConnection(socket, result.result()).proxy(); } else { logger.error(result.cause().getMessage(), result.cause()); socket.close(); } })).listen(port, listenResult -> {//代理服务器的监听端口 if (listenResult.succeeded()) { //成功启动代理服务器 logger.info("Mysql proxy server start up."); } else { //启动代理服务器失败 logger.error("Mysql proxy exit. because: " + listenResult.cause().getMessage(), listenResult.cause()); System.exit(1); } }); }
@SuppressWarnings({"rawtypes", "unchecked"}) @Test public void testTcpServerNonSSL(@Mocked Vertx vertx, @Mocked AsyncResultCallback<InetSocketAddress> callback, @Mocked NetServer netServer) { new Expectations() { { vertx.createNetServer(); result = netServer; netServer.connectHandler((Handler) any); netServer.listen(anyInt, anyString, (Handler) any); } }; URIEndpointObject endpointObject = new URIEndpointObject("highway://127.0.0.1:6663"); TcpServer server = new TcpServerForTest(endpointObject); // assert done in Expectations server.init(vertx, "", callback); }
@SuppressWarnings({"rawtypes", "unchecked"}) @Test public void testTcpServerSSL(@Mocked Vertx vertx, @Mocked AsyncResultCallback<InetSocketAddress> callback, @Mocked NetServer netServer) { new Expectations() { { vertx.createNetServer((NetServerOptions) any); result = netServer; netServer.connectHandler((Handler) any); netServer.listen(anyInt, anyString, (Handler) any); } }; URIEndpointObject endpointObject = new URIEndpointObject("highway://127.0.0.1:6663?sslEnabled=true"); TcpServer server = new TcpServerForTest(endpointObject); // assert done in Expectations server.init(vertx, "", callback); }
@Override public void start() throws Exception { NetServerOptions options = new NetServerOptions().setPort(8380); NetServer server = vertx.createNetServer(options); server.connectHandler(this::handle); server.close(result -> { if(result.succeeded()){ //TCP server fully closed System.out.println("server close succeeded."); }else { System.out.println("server status: " + result.result().toString()); } }); server.listen(); }
@Test public void testBlockingBroker(TestContext ctx) throws Exception { // Use a port different from default 9092, because Broker IS running int port = 9091; Async serverAsync = ctx.async(); NetServer server = vertx.createNetServer().connectHandler(so -> { }).listen(port, ctx.asyncAssertSuccess(v -> serverAsync.complete())); serverAsync.awaitSuccess(10000); Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:"+port); props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(1)); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); producer = producer(Vertx.vertx(), props); producer.write(new ProducerRecord<>("testBlockkingBroker", 0, "key", "value"), ctx.asyncAssertFailure()); }
@Test(timeout = 20000) public void testConnectionDisconnectedDuringCreation(TestContext context) { server.close(); Async connectFailsAsync = context.async(); NetServer netServer = this.vertx.createNetServer(); netServer.connectHandler(netSocket -> { netSocket.pause(); vertx.setTimer(50, x -> { netSocket.close(); }); }); netServer.listen(listenResult -> { context.assertTrue(listenResult.succeeded()); ProtonClient.create(vertx).connect("localhost", netServer.actualPort(), connResult -> { context.assertFalse(connResult.succeeded()); connectFailsAsync.complete(); }); }); connectFailsAsync.awaitSuccess(); }
private void startTcpServer(ConfigParser c) { int port = c.getPort(); String keyPath = c.getTlsKeyPath(); String certPath = c.getTlsCertPath(); boolean tlsEnabled = c.isTlsEnabled(); int idleTimeout = c.getSocketIdleTimeout(); // MQTT over TCP NetServerOptions opt = new NetServerOptions() .setTcpKeepAlive(true) .setIdleTimeout(idleTimeout) // in seconds; 0 means "don't timeout". .setPort(port); if(tlsEnabled) { opt.setSsl(true).setPemKeyCertOptions(new PemKeyCertOptions() .setKeyPath(keyPath) .setCertPath(certPath) ); } NetServer netServer = vertx.createNetServer(opt); Map<String, MQTTSession> sessions = new MonitoredMap<>(); netServer.connectHandler(netSocket -> { MQTTNetSocket mqttNetSocket = new MQTTNetSocket(vertx, c, netSocket, sessions); mqttNetSocket.start(); }).listen(); }
public void init(Vertx vertx, String sslKey, AsyncResultCallback<InetSocketAddress> callback) { NetServer netServer; if (endpointObject.isSslEnabled()) { SSLOptionFactory factory = SSLOptionFactory.createSSLOptionFactory(sslKey, null); SSLOption sslOption; if (factory == null) { sslOption = SSLOption.buildFromYaml(sslKey); } else { sslOption = factory.createSSLOption(); } SSLCustom sslCustom = SSLCustom.createSSLCustom(sslOption.getSslCustomClass()); NetServerOptions serverOptions = new NetServerOptions(); VertxTLSBuilder.buildNetServerOptions(sslOption, sslCustom, serverOptions); netServer = vertx.createNetServer(serverOptions); } else { netServer = vertx.createNetServer(); } netServer.connectHandler(netSocket -> { TcpServerConnection connection = createTcpServerConnection(); connection.init(netSocket); }); InetSocketAddress socketAddress = endpointObject.getSocketAddress(); netServer.listen(socketAddress.getPort(), socketAddress.getHostString(), ar -> { if (ar.succeeded()) { callback.success(socketAddress); return; } // 监听失败 String msg = String.format("listen failed, address=%s", socketAddress.toString()); callback.fail(new Exception(msg, ar.cause())); }); }
/** * Creates a new instance of {@link StompServerImpl}. * @param vertx the vert.x instance * @param net the net server, may be {@code null} * @param options the options */ public StompServerImpl(Vertx vertx, NetServer net, StompServerOptions options) { Objects.requireNonNull(vertx); Objects.requireNonNull(options); this.options = options; this.vertx = vertx; if (net == null) { server = vertx.createNetServer(options); } else { server = net; } }
private Handler<AsyncResult<NetServer>> convertHandler(final Handler<AsyncResult<ProtonServer>> handler) { return result -> { if (result.succeeded()) { handler.handle(Future.succeededFuture(ProtonServerImpl.this)); } else { handler.handle(Future.failedFuture(result.cause())); } }; }
protected NetServer createNetServer(NetServerOptions options) { NetServer server = vertx.createNetServer(options); toClose.add(() -> { CountDownLatch latch = new CountDownLatch(1); server.close(ar -> { latch.countDown(); }); awaitLatch(latch); return null; }); return server; }
protected void cleanup(NetServer server) throws Exception { CountDownLatch latch = new CountDownLatch(1); if (server != null) { server.close(ar -> { latch.countDown(); }); } awaitLatch(latch); }
@Test public void testNetMetricsOnClose() throws Exception { int requests = 8; CountDownLatch latch = new CountDownLatch(requests); NetClient client = vertx.createNetClient(new NetClientOptions()); NetServer server = vertx.createNetServer(new NetServerOptions().setHost("localhost").setPort(1235).setReceiveBufferSize(50)).connectHandler(socket -> { socket.handler(buff -> latch.countDown()); }).listen(ar -> { assertTrue(ar.succeeded()); client.connect(1235, "localhost", ar2 -> { assertTrue(ar2.succeeded()); for (int i = 0; i < requests; i++) { ar2.result().write(randomBuffer(50)); } }); }); awaitLatch(latch); client.close(); server.close(ar -> { assertTrue(ar.succeeded()); vertx.runOnContext(v -> testComplete()); }); await(); JsonObject metrics = metricsService.getMetricsSnapshot(server); assertNotNull(metrics); assertTrue(metrics.isEmpty()); metrics = metricsService.getMetricsSnapshot(client); assertNotNull(metrics); assertTrue(metrics.isEmpty()); cleanup(client); cleanup(server); }
@Test public void testMetricsCleanupedOnVertxClose() throws Exception { CountDownLatch latch1 = new CountDownLatch(1); HttpServer server = vertx.createHttpServer(new HttpServerOptions().setPort(8080)); server.requestHandler(req -> {}); server.listen(onSuccess(res -> { latch1.countDown(); })); awaitLatch(latch1); HttpClient client = vertx.createHttpClient(new HttpClientOptions()); CountDownLatch latch2 = new CountDownLatch(1); NetServer nServer = vertx.createNetServer(new NetServerOptions().setPort(1234)); nServer.connectHandler(conn -> {}); nServer.listen(res -> { latch2.countDown(); }); awaitLatch(latch2); NetClient nClient = vertx.createNetClient(new NetClientOptions()); DatagramSocket sock = vertx.createDatagramSocket(new DatagramSocketOptions()); EventBus eb = vertx.eventBus(); assertFalse(metricsService.getMetricsSnapshot(vertx).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(server).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(client).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(nServer).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(nClient).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(sock).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(eb).isEmpty()); vertx.close(res -> { assertTrue(metricsService.getMetricsSnapshot(vertx).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(server).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(client).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(nServer).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(nClient).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(sock).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(eb).isEmpty()); testComplete(); }); await(); vertx = null; }
@Override public void start(Future<Void> startFuture) throws Exception { NetServer tcpServer = vertx.createNetServer(); tcpServer.connectHandler(socket -> { socket.handler(recordParser); }); tcpServer.listen(port, result -> { if (result.succeeded()) { startFuture.complete(); } else { startFuture.fail(result.cause()); } }); }
public void start() { //TODO: Fix a better way of configuration other than system properties? Integer port = Integer.getInteger("tcp.port", 5555); ObservableFuture<NetServer> netServerObservable = RxHelper.observableFuture(); NetServer netServer = vertx.createNetServer(new NetServerOptions().setPort(port)); netServerObservable.subscribe(a -> log.info("Starting TCP listener.."), e -> log.error("Could not start TCP listener on port " + port, e), () -> log.info("Started TCP listener on port " + port + ".") ); RxHelper.toObservable(netServer.connectStream()) .flatMap(s -> Riemann.convertBufferStreamToMessages(s, RxHelper.toObservable(s))) .subscribe(s -> { sendResponse(Proto.Msg.newBuilder().setOk(true).build(), s.getLeft()); vertx.eventBus().publish("riemann.stream", s.getRight().toByteArray()); }, e -> { log.error(e); if (e instanceof NetSocketException) { sendResponse(Proto.Msg.newBuilder().setError(e.getMessage()).build(), ((NetSocketException) e).getSocket()); } }); netServer.listen(netServerObservable.asHandler()); }
public void start(final Future<Void> startedResult) { // final Buffer fakeTrackingResponse = Buffer.buffer(); // fakeTrackingResponse.appendByte((byte) 0x11); // fakeTrackingResponse.appendByte((byte) 0x01); final NetServer netServer = vertx.createNetServer(); netServer .connectHandler(socket -> { socket.exceptionHandler(event -> { logger.error("Socket error on fake service socket", event); }); socket.handler(event -> { this.lastBuffer = event; socket.write(response); }); }) .listen(port, FAKE_SERVICE_HOST, event -> { if (event.failed()) { final Throwable cause = event.cause(); logger.error(cause.getMessage()); startedResult.fail(cause); return; } startedResult.complete(); }); }
private NetServer startTcpBroker(JsonObject conf) { ConfigParser c = new ConfigParser(conf); NetServerOptions opt = new NetServerOptions().setTcpKeepAlive(true) .setIdleTimeout(conf.getInteger("socket_idle_timeout")).setPort(conf.getInteger("tcp_port")); NetServer netServer = vertx.createNetServer(opt); netServer.connectHandler(netSocket -> { Map<String, MQTTSession> sessions = new HashMap<>(); MQTTNetSocket mqttNetSocket = new MQTTNetSocket(vertx, c, netSocket, sessions); mqttNetSocket.start(); }).listen(); return netServer; }
@Override public NetServer createNetServer(NetServerOptions options) { return vertx.createNetServer(options); }
@Override public NetServer createNetServer() { return vertx.createNetServer(); }
public void example5(Vertx vertx, NetServer netServer) { StompServer server = StompServer.create(vertx, netServer) .handler(StompServerHandler.create(vertx)) .listen(); }
protected NetServer createNetServer() { return createNetServer(new HttpServerOptions()); }
public void start(final Future<Void> startedResult) { final NetServer netServer = vertx.createNetServer(new NetServerOptions().setAcceptBacklog(10000)); logger.info("Echo is Hello world!"); netServer .connectHandler(socket -> { connectionCount.incrementAndGet(); socket.exceptionHandler(event -> { logger.error("Socket error on echo service socket", event); }); socket.closeHandler(v -> { connectionCount.decrementAndGet(); }); Pump.pump(socket, socket).start(); }) .listen(ECHO_SERVICE_PORT, ECHO_SERVICE_HOST, event -> { if (event.failed()) { final Throwable cause = event.cause(); logger.error(cause.getMessage(), cause); startedResult.fail(cause); return; } logger.info(String.format("Started echo server - %s", ECHO_SERVICE_PORT)); startedResult.complete(); }); }
/** * Creates a {@link StompServer} based on the default Stomp Server implementation. * * @param vertx the vert.x instance to use * @param netServer the Net server used by the STOMP server * @return the created {@link StompServer} */ static StompServer create(Vertx vertx, NetServer netServer) { return new StompServerImpl(vertx, netServer, new StompServerOptions()); }
/** * Creates a {@link StompServer} based on the default Stomp Server implementation. * * @param vertx the vert.x instance to use * @param net the Net server used by the STOMP server * @param options the server options * @return the created {@link StompServer} */ static StompServer create(Vertx vertx, NetServer net, StompServerOptions options) { return new StompServerImpl(vertx, net, options); }
/** * Gets the current {@link NetServer}. * * @return the server instance */ public NetServer getNetServer() { return srvVertx; }