@Test public void testInsert() throws SQLException { AtomicInteger counter = new AtomicInteger(0); Pump.pump(client, vertx.eventBus().publisher("binlog")).start(); vertx.eventBus().<JsonObject>consumer("binlog", (msg) -> { JsonObject body = msg.body(); if (!"write".equals(body.getString("type"))) { return; } assertEquals(config().getString("schema"), body.getString("schema")); assertEquals("binlog_client_test", body.getString("table")); JsonObject row = body.getJsonObject("row"); Integer id = row.getInteger("id"); String name = row.getString("name"); Map.Entry<Integer, String> expectedRow = rows().get(counter.getAndIncrement()); assertEquals(expectedRow.getKey(), id); assertEquals(expectedRow.getValue(), name); if (id.equals(lastId())) { testComplete(); } }); insert(); await(); }
@Test public void testDelete() throws SQLException { AtomicInteger counter = new AtomicInteger(0); Pump.pump(client, vertx.eventBus().publisher("binlog")).start(); vertx.eventBus().<JsonObject>consumer("binlog", (msg) -> { JsonObject body = msg.body(); if (!"delete".equals(body.getString("type"))) { return; } assertEquals(config().getString("schema"), body.getString("schema")); assertEquals("binlog_client_test", body.getString("table")); JsonObject row = body.getJsonObject("row"); Integer id = row.getInteger("id"); String name = row.getString("name"); Map.Entry<Integer, String> expectedRow = rows().get(counter.getAndIncrement()); assertEquals(expectedRow.getKey(), id); assertEquals(expectedRow.getValue(), name); if (id.equals(lastId())) { testComplete(); } }); insert(); delete(); await(); }
@Test public void testUpdate() throws SQLException { AtomicInteger counter = new AtomicInteger(0); Pump.pump(client, vertx.eventBus().publisher("binlog")).start(); vertx.eventBus().<JsonObject>consumer("binlog", (msg) -> { JsonObject body = msg.body(); if (!"update".equals(body.getString("type"))) { return; } assertEquals(config().getString("schema"), body.getString("schema")); assertEquals("binlog_client_test", body.getString("table")); JsonObject row = body.getJsonObject("row"); Integer id = row.getInteger("id"); String name = row.getString("name"); Map.Entry<Integer, String> expectedRow = rows().get(counter.getAndIncrement()); assertEquals(expectedRow.getKey(), id); assertEquals(expectedRow.getValue() + "_updated", name); if (id.equals(lastId())) { testComplete(); } }); insert(); update(); await(); }
public MultipartHelper putBinaryBody(String name, ReadStream<Buffer> stream, String contentType, String fileName, Handler<AsyncResult> handler) { request .write("--") .write(boundary) .write(System.lineSeparator()) .write(String.format("Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"", name, fileName)) .write(System.lineSeparator()) .write(String.format("Content-Type: %s", contentType)) .write(System.lineSeparator()) .write("Content-Transfer-Encoding: binary") .write(System.lineSeparator()) .write(System.lineSeparator()); Pump.pump(stream .endHandler(event -> { request.write(System.lineSeparator()); handler.handle(createResult(true, null)); }) .exceptionHandler(e -> handler.handle(createResult(false, e))), request) .start(); return this; }
public MultipartHelper putBinaryBody(String name, String path, String contentType, String fileName, Handler<AsyncResult> handler) { request .write("--") .write(boundary) .write(System.lineSeparator()) .write(String.format("Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"", name, fileName)) .write(System.lineSeparator()) .write(String.format("Content-Type: %s", contentType)) .write(System.lineSeparator()) .write("Content-Transfer-Encoding: binary") .write(System.lineSeparator()) .write(System.lineSeparator()); vertx.fileSystem().open(path, new OpenOptions().setRead(true), ar -> { if (ar.succeeded()) { Pump.pump(ar.result() .endHandler(event -> { request.write(System.lineSeparator()); handler.handle(createResult(true, null)); }) .exceptionHandler(e -> handler.handle(createResult(false, e))), request) .start(); } }); return this; }
/** * Asynchronously store content from source to filePath, * and call onEnd when finished * @param source * @param filePath * @param onEnd */ public static void asyncStore(Vertx vertx, ReadStream<Buffer> source, String filePath, Handler<Void> onEnd) { checkDir(filePath); source.pause(); vertx.fileSystem().open(filePath, new OpenOptions().setWrite(true).setCreate(true), fres -> { AsyncFile afile = fres.result(); Pump pump = Pump.pump(source, afile); source.endHandler(onEnd); pump.start(); source.resume(); }); }
private void attachFile(String boundary, Iterator<Entry<String, Part>> uploadsIterator) { if (!uploadsIterator.hasNext()) { request.write(boundaryEndInfo(boundary)); request.end(); return; } Entry<String, Part> entry = uploadsIterator.next(); // do not use part.getName() to get parameter name // because pojo consumer not easy to set name to part String name = entry.getKey(); Part part = entry.getValue(); String filename = part.getSubmittedFileName(); InputStreamToReadStream fileStream = null; try { fileStream = new InputStreamToReadStream(vertx, part.getInputStream()); } catch (IOException e) { asyncResp.consumerFail(e); return; } InputStreamToReadStream finalFileStream = fileStream; fileStream.exceptionHandler(e -> { LOGGER.debug("Failed to sending file [{}:{}].", name, filename, e); IOUtils.closeQuietly(finalFileStream.getInputStream()); asyncResp.consumerFail(e); }); fileStream.endHandler(V -> { LOGGER.debug("finish sending file [{}:{}].", name, filename); IOUtils.closeQuietly(finalFileStream.getInputStream()); attachFile(boundary, uploadsIterator); }); Buffer fileHeader = fileBoundaryInfo(boundary, name, part); request.write(fileHeader); Pump.pump(fileStream, request).start(); }
@Test public void testPump() throws SQLException { TestWriteStream target = new TestWriteStream(); Pump.pump(client, target).start(); insert(); await(); }
private void initSendingPump() { _wsWriteStream = new WebsocketWriteStream(_socket, _maxFrameSize); _wsSendQueueStream = new WebsocketSendQueueStream(); _wsSendQueueStream.streamEndedHandler(v -> _wsWriteStream.finishCurrentMessage()); _wsSendQueueStream.streamStartedHandler(_wsWriteStream::setDataFormat); _wsSendingPump = Pump.pump(_wsSendQueueStream, _wsWriteStream); _wsSendingPump.start(); }
public MuxRegistrationImpl(MessageStream messageStream, boolean bidirectional) { this.messageStream = messageStream; outPipeline = messageStream.createOutPipelineWrappedWithErrorStrategy(); demuxPump = Pump.pump(this, outPipeline).start(); if (bidirectional) { muxPump = Pump.pump(messageStream.getInPipeline(), this).start(); } }
public FullDuplexMuxChannel(MessageStream messageStream, StreamMux outStreamMux) { this.messageStream = messageStream; this.outStreamMux = outStreamMux; outStreamMux.writeCompleteHandler(messageStream.getWriteCompleteHandler()); final InPipeline inPipeline = messageStream.getInPipeline(); final OutPipeline outPipeline = messageStream.getOutPipeline(); inToOutPump = Pump.pump(inPipeline, outStreamMux); outToInPump = Pump.pump(outStreamMux, outPipeline); inPipeline.endHandler(v -> { if (endHandler != null) endHandler.handle(null); doStop(); }); inPipeline.exceptionHandler(t -> { logger.error("Error processing the InPipeline. Usually an error splitting or decoding.", t); if (t instanceof PacketParsingException) { PacketParsingException exception = (PacketParsingException)t; PipelinePack data = exception.getPipelinePack(); outPipeline.write(data); } doStop(); inPipeline.close(); }); outStreamMux.exceptionHandler(t -> { logger.error("Error in mux. Shutting down channel and closing the Input pipeline.", t); doStop(); inPipeline.close(); }); }
@Test public void testPumpWithBoundedWriteStream() throws Exception { final CountDownLatch latch = new CountDownLatch(1); vertx = Vertx.vertx(); final AsyncInputStream in = new AsyncInputStream( vertx, executor, new ByteArrayInputStream(content), 512); final BoundedWriteStream buffer = new BoundedWriteStream(1024); vertx.runOnContext(event -> { Pump.pump(in, buffer).start(); }); while (AsyncInputStream.STATUS_PAUSED != in.getState()) { sleep(1); } byte[] data = buffer.drain(); assertData(data, 0); while (AsyncInputStream.STATUS_PAUSED != in.getState()) { sleep(1); } data = buffer.drain(); assertData(data, 1024); assertEquals(1024, data.length); latch.countDown(); latch.await(30, TimeUnit.SECONDS); }
@Override public FdfsStorage append(ReadStream<Buffer> stream, long size, FdfsFileId fileId, Handler<AsyncResult<Void>> handler) { stream.pause(); Future<FdfsConnection> futureConn = getConnection(); futureConn.compose(connection -> { Future<FdfsPacket> futureResponse = FdfsProtocol.recvPacket(vertx, options.getNetworkTimeout(), connection, FdfsProtocol.STORAGE_PROTO_CMD_RESP, 0, null); Buffer nameBuffer = Buffer.buffer(fileId.name(), options.getCharset()); long bodyLength = 2 * FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE + nameBuffer.length() + size; Buffer headerBuffer = FdfsProtocol.packHeader(FdfsProtocol.STORAGE_PROTO_CMD_APPEND_FILE, (byte) 0, bodyLength); connection.write(headerBuffer); if (connection.writeQueueFull()) { connection.pause(); connection.drainHandler(v -> { connection.resume(); }); } Buffer bodyBuffer = FdfsUtils.newZero(bodyLength - size); int offset = 0; bodyBuffer.setLong(offset, nameBuffer.length()); offset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE; bodyBuffer.setLong(offset, size); offset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE; bodyBuffer.setBuffer(offset, nameBuffer); connection.write(bodyBuffer); if (connection.writeQueueFull()) { connection.pause(); connection.drainHandler(v -> { connection.resume(); }); } Pump.pump(stream, connection).start(); stream.resume(); return futureResponse; }).setHandler(ar -> { if (futureConn.succeeded()) { futureConn.result().release(); } if (ar.succeeded()) { handler.handle(Future.succeededFuture()); } else { handler.handle(Future.failedFuture(ar.cause())); } }); return this; }
@Override public FdfsStorage modify(ReadStream<Buffer> stream, long size, FdfsFileId fileId, long offset, Handler<AsyncResult<Void>> handler) { stream.pause(); Future<FdfsConnection> futureConn = getConnection(); futureConn.compose(connection -> { Future<FdfsPacket> futureResponse = FdfsProtocol.recvPacket(vertx, options.getNetworkTimeout(), connection, FdfsProtocol.STORAGE_PROTO_CMD_RESP, 0, null); Buffer nameBuffer = Buffer.buffer(fileId.name(), options.getCharset()); long bodyLength = 3 * FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE + nameBuffer.length() + size; Buffer headerBuffer = FdfsProtocol.packHeader(FdfsProtocol.STORAGE_PROTO_CMD_MODIFY_FILE, (byte) 0, bodyLength); connection.write(headerBuffer); if (connection.writeQueueFull()) { connection.pause(); connection.drainHandler(v -> { connection.resume(); }); } Buffer bodyBuffer = FdfsUtils.newZero(bodyLength - size); int bufferOffset = 0; bodyBuffer.setLong(bufferOffset, nameBuffer.length()); bufferOffset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE; bodyBuffer.setLong(bufferOffset, offset); bufferOffset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE; bodyBuffer.setLong(bufferOffset, size); bufferOffset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE; bodyBuffer.setBuffer(bufferOffset, nameBuffer); connection.write(bodyBuffer); if (connection.writeQueueFull()) { connection.pause(); connection.drainHandler(v -> { connection.resume(); }); } Pump.pump(stream, connection).start(); stream.resume(); return futureResponse; }).setHandler(ar -> { if (futureConn.succeeded()) { futureConn.result().release(); } if (ar.succeeded()) { handler.handle(Future.succeededFuture()); } else { handler.handle(Future.failedFuture(ar.cause())); } }); return this; }
@Override public void handle(final RoutingContext context) { // if null or it is marked as protected then go on. final PathContext pathContext = PathContext.get(context); if (pathContext == null) { context.next(); return; } LOG.debug("Handling {} with from={} to={} protected={} ended={}", context, pathContext.getFrom(), pathContext.getTo(), pathContext.isProtected(), context.request().isEnded()); final HttpServerRequest contextRequest = context.request(); contextRequest.setExpectMultipart(context.parsedHeaders().contentType().isPermitted() && "multipart".equals(context.parsedHeaders().contentType().component())); final RequestOptions clientRequestOptions = Conversions.toRequestOptions(pathContext.getTo(), contextRequest.uri().substring(pathContext.getFrom().length())); final HttpClientRequest clientRequest = httpClient .request(contextRequest.method(), clientRequestOptions, clientResponse -> { contextRequest.response().setChunked(clientResponse.getHeader(HttpHeaders.CONTENT_LENGTH) == null) .setStatusCode(clientResponse.statusCode()); clientResponse.headers().forEach(e -> contextRequest.response().putHeader(e.getKey(), e.getValue())); clientResponse.endHandler(v -> contextRequest.response().end()); Pump.pump(clientResponse, contextRequest.response()).start(); }).exceptionHandler(context::fail) .setChunked(true); StreamSupport.stream(contextRequest.headers().spliterator(), false) .filter(Predicates.HEADER_FORWARDABLE) .forEach(e -> clientRequest.putHeader(e.getKey(), e.getValue())); clientRequest.putHeader(REQUEST_ID, (String) context.get(REQUEST_ID)); clientRequest.putHeader(DATE, RFC_1123_DATE_TIME.format(now(UTC))); final Map<String, String> additionalHeaders = context.get("additional_headers"); if (additionalHeaders != null) { additionalHeaders.forEach(clientRequest::putHeader); } contextRequest.endHandler(v -> clientRequest.end()); Pump.pump(contextRequest, clientRequest).start(); contextRequest.resume(); }
public Future<Void> download(String target) { Future<Void> future = Future.future(); HttpClientOptions options = new HttpClientOptions(); options.setKeepAlive(false); options.setLogActivity(true); HttpClient httpClient = vertx.createHttpClient(options); httpClient.get(WORDNET_PORT, WORDNET_HOST, WORDNET_FILE, httpEvent -> { // pause the http response till we complete setting up our async file handler httpEvent.pause(); // setup file open handler OpenOptions openOptions = new OpenOptions(); vertx.fileSystem().open(target, openOptions, fileEvent -> { if (fileEvent.failed()) { fileEvent.cause().printStackTrace(); return; } final AsyncFile asynFile = fileEvent.result(); final Pump downloadPump = Pump.pump(httpEvent, asynFile); downloadPump.start(); // resume the receive operation httpEvent.resume(); httpEvent.endHandler(event -> { // close the file asynFile.flush().close(closeEvent -> { }); logger.info("Downloaded size = {}", downloadPump.numberPumped()); future.complete(); }); }); }).exceptionHandler(e -> { logger.error("Error while downloading the dictionary!", e); future.fail(e); }).end(); return future; }
/** * Upload a file to GeoRocket * @param path path to file to import * @param client the GeoRocket client * @param vertx the Vert.x instance * @return an observable that will emit when the file has been uploaded */ protected Observable<Void> importFile(String path, GeoRocketClient client, Vertx vertx) { // open file FileSystem fs = vertx.fileSystem(); OpenOptions openOptions = new OpenOptions().setCreate(false).setWrite(false); return fs.rxOpen(path, openOptions) // get file size .flatMap(f -> fs.rxProps(path).map(props -> Pair.of(f, props.size()))) // import file .flatMapObservable(f -> { ObservableFuture<Void> o = RxHelper.observableFuture(); Handler<AsyncResult<Void>> handler = o.toHandler(); AsyncFile file = f.getLeft().getDelegate(); WriteStream<Buffer> out = client.getStore() .startImport(layer, tags, properties, Optional.of(f.getRight()), fallbackCRS, handler); AtomicBoolean fileClosed = new AtomicBoolean(); Pump pump = Pump.pump(file, out); file.endHandler(v -> { file.close(); out.end(); fileClosed.set(true); }); Handler<Throwable> exceptionHandler = t -> { if (!fileClosed.get()) { file.endHandler(null); file.close(); } handler.handle(Future.failedFuture(t)); }; file.exceptionHandler(exceptionHandler); out.exceptionHandler(exceptionHandler); pump.start(); return o; }); }
public void toReadStream(io.vertx.rxjava.core.Vertx vertx, HttpServerResponse response) { Observable<Buffer> observable = getObservable(); ReadStream<Buffer> readStream = RxHelper.toReadStream(observable); Pump pump = Pump.pump(readStream, response); pump.start(); }
public void toReadStream(HttpServerResponse response) { Flowable<Buffer> observable = getFlowable(); ReadStream<Buffer> readStream = FlowableHelper.toReadStream(observable); Pump pump = Pump.pump(readStream, response); pump.start(); }
public void start(final Future<Void> startedResult) { final NetServer netServer = vertx.createNetServer(new NetServerOptions().setAcceptBacklog(10000)); logger.info("Echo is Hello world!"); netServer .connectHandler(socket -> { connectionCount.incrementAndGet(); socket.exceptionHandler(event -> { logger.error("Socket error on echo service socket", event); }); socket.closeHandler(v -> { connectionCount.decrementAndGet(); }); Pump.pump(socket, socket).start(); }) .listen(ECHO_SERVICE_PORT, ECHO_SERVICE_HOST, event -> { if (event.failed()) { final Throwable cause = event.cause(); logger.error(cause.getMessage(), cause); startedResult.fail(cause); return; } logger.info(String.format("Started echo server - %s", ECHO_SERVICE_PORT)); startedResult.complete(); }); }
public void example1(HttpServerResponse response, Publisher<Buffer> otherPublisher) { ReactiveReadStream<Buffer> rrs = ReactiveReadStream.readStream(); // Subscribe the read stream to the publisher otherPublisher.subscribe(rrs); // Pump from the read stream to the http response Pump pump = Pump.pump(rrs, response); pump.start(); }
public void example2(Vertx vertx, HttpServerRequest request, Subscriber<Buffer> otherSubscriber) { ReactiveWriteStream<Buffer> rws = ReactiveWriteStream.writeStream(vertx); // Subscribe the other subscriber to the write stream rws.subscribe(otherSubscriber); // Pump the http request to the write stream Pump pump = Pump.pump(request, rws); pump.start(); }