@Override public FdfsStorage download(FdfsFileId fileId, String fileFullPathName, long offset, long bytes, Handler<AsyncResult<Void>> handler) { vertx.fileSystem().open(fileFullPathName, new OpenOptions().setCreate(true).setWrite(true), ar -> { if (ar.succeeded()) { AsyncFile file = ar.result(); download(fileId, file, offset, bytes, download -> { file.close(); handler.handle(download); }); } else { handler.handle(Future.failedFuture(ar.cause())); } }); return this; }
public static Future<LocalFile> readFile(FileSystem fs, String filefullPathName) { LocalFile localFile = new LocalFile(); return Future.<FileProps>future(future -> { fs.props(filefullPathName, future); }).compose(props -> { localFile.setSize(props.size()); return Future.<AsyncFile>future(future -> { fs.open(filefullPathName, new OpenOptions().setRead(true).setWrite(false).setCreate(false), future); }); }).compose(fileStream -> { localFile.setFile(fileStream); return Future.succeededFuture(localFile); }); }
@Test public void copyZeroLength(TestContext context) throws IOException { Path tmpFile = Files.createTempFile(tmpDir, "", ""); AsyncFile asyncFile = vertx.fileSystem().openBlocking(tmpFile.toString(), new OpenOptions()); final BufferWriteEndableWriteStream bufferWriteStream = new BufferWriteEndableWriteStream(); Async async = context.async(); AsyncIO.pump(asyncFile, bufferWriteStream) .map(new Func1<Void, Void>() { @Override public Void call(Void aVoid) { VertxAssert.assertEquals(context, 0, bufferWriteStream.toBuffer().length()); return null; } }) .subscribe(new TestSubscriber(context, async)); }
/** * Asynchronously store content from source to filePath, * and call onEnd when finished * @param source * @param filePath * @param onEnd */ public static void asyncStore(Vertx vertx, ReadStream<Buffer> source, String filePath, Handler<Void> onEnd) { checkDir(filePath); source.pause(); vertx.fileSystem().open(filePath, new OpenOptions().setWrite(true).setCreate(true), fres -> { AsyncFile afile = fres.result(); Pump pump = Pump.pump(source, afile); source.endHandler(onEnd); pump.start(); source.resume(); }); }
/** * Uploads the file contents to S3. * * @param aBucket An S3 bucket * @param aKey An S3 key * @param aFile A file to upload * @param aHandler A response handler for the upload */ public void put(final String aBucket, final String aKey, final AsyncFile aFile, final Handler<HttpClientResponse> aHandler) { final S3ClientRequest request = createPutRequest(aBucket, aKey, aHandler); final Buffer buffer = Buffer.buffer(); aFile.endHandler(event -> { request.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length())); request.end(buffer); }); aFile.handler(data -> { buffer.appendBuffer(data); }); }
public static Observable<Void> close(AsyncFile asyncFile) { return Observable.defer(() -> { ObservableFuture<Void> rh = RxHelper.observableFuture(); asyncFile.close(rh.toHandler()); return rh; }); }
@Test public void testMetadataAndReadStream(TestContext context) throws IOException { byte[] data = new byte[256 * 1024 * 1024]; getCurrentInstance().nextBytesBlocking(data); Path dataFile = path.resolve(".data"); write(dataFile, data, CREATE_NEW); long size = size(dataFile); final byte[] expectedDataSha512 = hash(dataFile.toFile(), sha512()).asBytes(); final AsyncFile bigFile = sfsVertx.fileSystem().openBlocking(dataFile.toString(), new OpenOptions()); Path journalPath = path.resolve(".journal"); JournalFile journalFile = new JournalFile(journalPath); Async async = context.async(); aVoid() .flatMap(aVoid -> journalFile.open(sfsVertx)) .flatMap(aVoid -> journalFile.enableWrites(sfsVertx)) .flatMap(aVoid -> journalFile.append(sfsVertx, buffer("metadata0", UTF_8.toString()), size, bigFile)) .doOnNext(aVoid -> bigFile.setReadPos(0)) .flatMap(aVoid -> journalFile.append(sfsVertx, buffer("metadata1", UTF_8.toString()), size, bigFile)) .doOnNext(aVoid -> bigFile.setReadPos(0)) .flatMap(aVoid -> journalFile.append(sfsVertx, buffer("metadata2", UTF_8.toString()), size, bigFile)) .doOnNext(aVoid -> bigFile.setReadPos(0)) .flatMap(aVoid -> journalFile.append(sfsVertx, buffer("metadata3", UTF_8.toString()), size, bigFile)) .doOnNext(aVoid -> bigFile.setReadPos(0)) .flatMap(aVoid -> journalFile.append(sfsVertx, buffer("metadata4", UTF_8.toString()), size, bigFile)) // assert stuff before closing .flatMap(aVoid -> assertScanDataReadStream(context, sfsVertx, journalFile, 5, "metadata", expectedDataSha512)) .flatMap(aVoid -> journalFile.disableWrites(sfsVertx)) .flatMap(aVoid -> journalFile.force(sfsVertx, true)) .flatMap(aVoid -> journalFile.close(sfsVertx)) // assert stuff can be read closing and opening .flatMap(aVoid -> journalFile.open(sfsVertx)) .flatMap(aVoid -> assertScanDataReadStream(context, sfsVertx, journalFile, 5, "metadata", expectedDataSha512)) .subscribe(new TestSubscriber(context, async)); }
@Test public void testImmediatePumpFile(TestContext context) throws IOException { SfsVertx vertx = new SfsVertxImpl(rule.vertx(), backgroundPool, ioPool); byte[] bytes = new byte[256]; getCurrentInstance().nextBytesBlocking(bytes); Path path = createTempFile("", ""); try (OutputStream outputStream = newOutputStream(path)) { for (int i = 0; i < 10000; i++) { outputStream.write(bytes); } } final byte[] sha512 = hash(path.toFile(), sha512()).asBytes(); Async async = context.async(); aVoid() .flatMap(aVoid -> { AsyncFile asyncFile = vertx.fileSystem().openBlocking(path.toString(), new OpenOptions()); PipedReadStream pipedReadStream = new PipedReadStream(); PipedEndableWriteStream pipedEndableWriteStream = new PipedEndableWriteStream(pipedReadStream); Observable<Void> producer = pump(asyncFile, pipedEndableWriteStream); DigestEndableWriteStream digestWriteStream = new DigestEndableWriteStream(new NullEndableWriteStream(), SHA512); Observable<Void> consumer = pump(pipedReadStream, digestWriteStream); return combineSinglesDelayError(producer, consumer, (aVoid1, aVoid2) -> { assertArrayEquals(context, sha512, digestWriteStream.getDigest(SHA512).get()); return (Void) null; }); }) .doOnTerminate(() -> { try { deleteIfExists(path); } catch (IOException e) { e.printStackTrace(); } }) .subscribe(new TestSubscriber(context, async)); }
@Override public void getOne(String path, Handler<AsyncResult<ChunkReadStream>> handler) { String absolutePath = Paths.get(root, path).toString(); // check if chunk exists FileSystem fs = vertx.fileSystem(); ObservableFuture<Boolean> observable = RxHelper.observableFuture(); fs.exists(absolutePath, observable.toHandler()); observable .flatMap(exists -> { if (!exists) { return Observable.error(new FileNotFoundException("Could not find chunk: " + path)); } return Observable.just(exists); }) .flatMap(exists -> { // get chunk's size ObservableFuture<FileProps> propsObservable = RxHelper.observableFuture(); fs.props(absolutePath, propsObservable.toHandler()); return propsObservable; }) .map(props -> props.size()) .flatMap(size -> { // open chunk ObservableFuture<AsyncFile> openObservable = RxHelper.observableFuture(); OpenOptions openOptions = new OpenOptions().setCreate(false).setWrite(false); fs.open(absolutePath, openOptions, openObservable.toHandler()); return openObservable.map(f -> new FileChunkReadStream(size, f)); }) .subscribe(readStream -> { // send chunk to peer handler.handle(Future.succeededFuture(readStream)); }, err -> { handler.handle(Future.failedFuture(err)); }); }
public Stream<RsAsyncFile> open(String path, OpenOptions options) { return Stream.asOne(subscription -> { fileSystem.open(path, options, new Handler<AsyncResult<AsyncFile>>() { @Override public void handle(AsyncResult<AsyncFile> event) { if (event.succeeded()) { subscription.sendNext(new RsAsyncFile(event.result())); subscription.sendComplete(); } else { subscription.sendError(event.cause()); } } }); }); }
public void toObservable(Vertx vertx) { FileSystem fileSystem = vertx.fileSystem(); fileSystem.open("/data.txt", new OpenOptions(), result -> { AsyncFile file = result.result(); Observable<Buffer> observable = RxHelper.toObservable(file); observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8"))); }); }
public void unmarshaller(FileSystem fileSystem) { fileSystem.open("/data.txt", new OpenOptions(), result -> { AsyncFile file = result.result(); Observable<Buffer> observable = RxHelper.toObservable(file); observable.lift(RxHelper.unmarshaller(MyPojo.class)).subscribe( mypojo -> { // Process the object } ); }); }
public void toFlowable(Vertx vertx) { FileSystem fileSystem = vertx.fileSystem(); fileSystem.open("/data.txt", new OpenOptions(), result -> { AsyncFile file = result.result(); Flowable<Buffer> observable = FlowableHelper.toFlowable(file); observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8"))); }); }
public void unmarshaller(FileSystem fileSystem) { fileSystem.open("/data.txt", new OpenOptions(), result -> { AsyncFile file = result.result(); Flowable<Buffer> observable = FlowableHelper.toFlowable(file); observable.compose(FlowableHelper.unmarshaller(MyPojo.class)).subscribe( mypojo -> { // Process the object } ); }); }
private LocalFile setFile(AsyncFile file) { this.file = file; return this; }
public Future<Void> download(String target) { Future<Void> future = Future.future(); HttpClientOptions options = new HttpClientOptions(); options.setKeepAlive(false); options.setLogActivity(true); HttpClient httpClient = vertx.createHttpClient(options); httpClient.get(WORDNET_PORT, WORDNET_HOST, WORDNET_FILE, httpEvent -> { // pause the http response till we complete setting up our async file handler httpEvent.pause(); // setup file open handler OpenOptions openOptions = new OpenOptions(); vertx.fileSystem().open(target, openOptions, fileEvent -> { if (fileEvent.failed()) { fileEvent.cause().printStackTrace(); return; } final AsyncFile asynFile = fileEvent.result(); final Pump downloadPump = Pump.pump(httpEvent, asynFile); downloadPump.start(); // resume the receive operation httpEvent.resume(); httpEvent.endHandler(event -> { // close the file asynFile.flush().close(closeEvent -> { }); logger.info("Downloaded size = {}", downloadPump.numberPumped()); future.complete(); }); }); }).exceptionHandler(e -> { logger.error("Error while downloading the dictionary!", e); future.fail(e); }).end(); return future; }
/** * Asynchronously parse a file and print JSON events to System.out * @param filename the name of the JSON file to parse * @param handler a handler that will be called when the file has been parsed * or when an error has occurred */ private void parseFile(String filename, Handler<AsyncResult<Void>> handler) { OpenOptions options = new OpenOptions() .setRead(true) .setWrite(false); // asynchronously open the file vertx.fileSystem().open(filename, options, ar -> { if (ar.failed()) { handler.handle(Future.failedFuture(ar.cause())); return; } JsonParser parser = new JsonParser(); AsyncFile f = ar.result(); Supplier<Boolean> processEvents = () -> { // process events from the parser until it needs more input int event; do { event = parser.nextEvent(); // print all events to System.out if (event != JsonEvent.NEED_MORE_INPUT) { System.out.println("JSON event: " + event); } // handle values, errors, and end of file if (event == JsonEvent.VALUE_STRING) { System.out.println("VALUE: " + parser.getCurrentString()); } else if (event == JsonEvent.EOF) { handler.handle(Future.succeededFuture()); return false; } else if (event == JsonEvent.ERROR) { handler.handle(Future.failedFuture("Syntax error")); return false; } } while (event != JsonEvent.NEED_MORE_INPUT); return true; }; f.exceptionHandler(t -> { handler.handle(Future.failedFuture(t)); }); f.handler(buf -> { // forward bytes read from the file to the parser byte[] bytes = buf.getBytes(); int i = 0; while (i < bytes.length) { i += parser.getFeeder().feed(bytes, i, bytes.length - i); if (!processEvents.get()) { f.handler(null); f.endHandler(null); break; } } }); f.endHandler(v -> { // process events one last time parser.getFeeder().done(); processEvents.get(); }); }); }
public AsyncFileEndableWriteStream(AsyncFile delegate) { this.delegate = delegate; }
public AsyncFile getDelegate() { return delegate; }
@Override public Observable<HeaderBlob> call(Void aVoid) { return aVoid() .flatMap(aVoid1 -> { ObservableFuture<AsyncFile> handler = RxHelper.observableFuture(); OpenOptions openOptions = new OpenOptions(); openOptions.setCreate(true) .setRead(true) .setWrite(true); vertx.vertx().fileSystem().open(data.toString(), openOptions, handler.toHandler()); return handler; }).flatMap(asyncFile -> { final long size; final byte[] md5; final byte[] sha512; try { size = size(data); md5 = hash(data.toFile(), md5()).asBytes(); sha512 = hash(data.toFile(), sha512()).asBytes(); } catch (IOException e) { throw new RuntimeException(e); } return just(fromNullable(getFirst(vertx.verticle().nodes().volumeManager().volumes(), null))) .map(volumeOptional -> volumeOptional.get()) .flatMap(new Func1<String, Observable<HeaderBlob>>() { @Override public Observable<HeaderBlob> call(String volumeId) { out.println("PPPP1"); return remoteNode.createWriteStream(volumeId, size, SHA512, MD5) .flatMap(nodeWriteStream -> { out.println("PPPP2"); return nodeWriteStream.consume(asyncFile); }) .map(blob -> { out.println("PPPP3"); asyncFile.close(); return blob; }) .map(blob -> { assertArrayEquals(testContext, md5, blob.getDigest(MD5).get()); assertArrayEquals(testContext, sha512, blob.getDigest(SHA512).get()); return blob; }); } }); }); }
@Test public void testLargeUpload(TestContext context) throws IOException { byte[] data = new byte[256]; getCurrentInstance().nextBytesBlocking(data); int dataSize = 256 * 1024; Path tempFile = createTempFile(tmpDir, "", ""); int bytesWritten = 0; try (OutputStream out = newOutputStream(tempFile)) { while (bytesWritten < dataSize) { out.write(data); bytesWritten += data.length; } } long size = size(tempFile); final byte[] md5 = hash(tempFile.toFile(), md5()).asBytes(); final byte[] sha512 = hash(tempFile.toFile(), sha512()).asBytes(); final AsyncFile bigFile = vertx.fileSystem().openBlocking(tempFile.toString(), new OpenOptions()); Async async = context.async(); prepareContainer(context) // put an object then get/head the object .flatMap(new PutObjectStream(httpClient, accountName, containerName, objectName, authNonAdmin, bigFile) .setHeader(CONTENT_LENGTH, valueOf(size)) .setHeader(ETAG, base16().lowerCase().encode(md5))) .map(new HttpClientResponseHeaderLogger()) .map(new AssertHttpClientResponseStatusCode(context, HTTP_CREATED)) .map(new AssertObjectHeaders(context, 0, false, 0, md5, sha512, 0)) .map(new ToVoid<HttpClientResponse>()) .flatMap(new GetObject(httpClient, accountName, containerName, objectName, authNonAdmin)) .map(new HttpClientResponseHeaderLogger()) .map(new AssertHttpClientResponseStatusCode(context, HTTP_OK)) .map(new AssertObjectHeaders(context, 0, false, size, md5, sha512, 1)) .flatMap(new Func1<HttpClientResponse, Observable<HttpClientResponse>>() { @Override public Observable<HttpClientResponse> call(final HttpClientResponse httpClientResponse) { final DigestEndableWriteStream digestWriteStream = new DigestEndableWriteStream(new NullEndableWriteStream(), SHA512, MD5); return pump(httpClientResponse, digestWriteStream) .map(new Func1<Void, HttpClientResponse>() { @Override public HttpClientResponse call(Void aVoid) { assertArrayEquals(context, md5, digestWriteStream.getDigest(MD5).get()); assertArrayEquals(context, sha512, digestWriteStream.getDigest(SHA512).get()); return httpClientResponse; } }); } }) .map(new ToVoid<HttpClientResponse>()) .subscribe(new TestSubscriber(context, async)); }
@Test public void testEncryptedLargeUpload(TestContext context) throws IOException { byte[] data = new byte[256]; getCurrentInstance().nextBytesBlocking(data); int dataSize = 256 * 1024; Path tempFile = createTempFile(tmpDir, "", ""); int bytesWritten = 0; try (OutputStream out = newOutputStream(tempFile)) { while (bytesWritten < dataSize) { out.write(data); bytesWritten += data.length; } } long size = size(tempFile); final byte[] md5 = hash(tempFile.toFile(), md5()).asBytes(); final byte[] sha512 = hash(tempFile.toFile(), sha512()).asBytes(); final AsyncFile bigFile = vertx.fileSystem().openBlocking(tempFile.toString(), new OpenOptions()); Async async = context.async(); prepareContainer(context) // put an object then get/head the object .flatMap(new PutObjectStream(httpClient, accountName, containerName, objectName, authNonAdmin, bigFile) .setHeader(CONTENT_LENGTH, valueOf(size)) .setHeader(ETAG, base16().lowerCase().encode(md5)) .setHeader(X_SERVER_SIDE_ENCRYPTION, "true")) .map(new HttpClientResponseHeaderLogger()) .map(new AssertHttpClientResponseStatusCode(context, HTTP_CREATED)) .map(new AssertObjectHeaders(context, 0, true, 0, md5, sha512, 0)) .map(new ToVoid<HttpClientResponse>()) .flatMap(new GetObject(httpClient, accountName, containerName, objectName, authNonAdmin)) .map(new HttpClientResponseHeaderLogger()) .map(new AssertHttpClientResponseStatusCode(context, HTTP_OK)) .map(new AssertObjectHeaders(context, 0, true, size, md5, sha512, 1)) .flatMap(new Func1<HttpClientResponse, Observable<HttpClientResponse>>() { @Override public Observable<HttpClientResponse> call(final HttpClientResponse httpClientResponse) { final DigestEndableWriteStream digestWriteStream = new DigestEndableWriteStream(new NullEndableWriteStream(), SHA512, MD5); return pump(httpClientResponse, digestWriteStream) .map(new Func1<Void, HttpClientResponse>() { @Override public HttpClientResponse call(Void aVoid) { assertArrayEquals(context, md5, digestWriteStream.getDigest(MD5).get()); assertArrayEquals(context, sha512, digestWriteStream.getDigest(SHA512).get()); return httpClientResponse; } }); } }) .map(new ToVoid<HttpClientResponse>()) .subscribe(new TestSubscriber(context, async)); }
public AFBasicFile(AsyncFile af) { this.af = af; }
/** * Constructs a new read stream * @param size the chunk's size * @param delegate the underlying read stream */ public FileChunkReadStream(long size, AsyncFile delegate) { super(delegate); this.size = size; this.file = delegate; }
/** * Upload a file to GeoRocket * @param path path to file to import * @param client the GeoRocket client * @param vertx the Vert.x instance * @return an observable that will emit when the file has been uploaded */ protected Observable<Void> importFile(String path, GeoRocketClient client, Vertx vertx) { // open file FileSystem fs = vertx.fileSystem(); OpenOptions openOptions = new OpenOptions().setCreate(false).setWrite(false); return fs.rxOpen(path, openOptions) // get file size .flatMap(f -> fs.rxProps(path).map(props -> Pair.of(f, props.size()))) // import file .flatMapObservable(f -> { ObservableFuture<Void> o = RxHelper.observableFuture(); Handler<AsyncResult<Void>> handler = o.toHandler(); AsyncFile file = f.getLeft().getDelegate(); WriteStream<Buffer> out = client.getStore() .startImport(layer, tags, properties, Optional.of(f.getRight()), fallbackCRS, handler); AtomicBoolean fileClosed = new AtomicBoolean(); Pump pump = Pump.pump(file, out); file.endHandler(v -> { file.close(); out.end(); fileClosed.set(true); }); Handler<Throwable> exceptionHandler = t -> { if (!fileClosed.get()) { file.endHandler(null); file.close(); } handler.handle(Future.failedFuture(t)); }; file.exceptionHandler(exceptionHandler); out.exceptionHandler(exceptionHandler); pump.start(); return o; }); }
private PropReadFileStream(FileProps props, AsyncFile file, String path) { this.props = props; this.file = file; this.path = path; }
public static Observable<Buffer> toBufferObs(AsyncFile file) { return toBufferObs(new io.vertx.reactivex.core.file.AsyncFile(file)); }
public static Observable<Buffer> toBufferObs(io.vertx.reactivex.core.file.AsyncFile file) { return file.toObservable().map(io.vertx.reactivex.core.buffer.Buffer::getDelegate).doOnTerminate(() -> file.close()); }
/** * Processes the upload and set the binary information (e.g.: image dimensions) within the provided field. The binary data will be stored in the * {@link BinaryStorage} if desired. * * @param ac * @param ul * Upload to process * @param field * Field which will be updated with the extracted information * @param storeBinary * Whether to store the data in the binary store */ private void processUpload(ActionContext ac, FileUpload ul, BinaryGraphField field, boolean storeBinary) { AsyncFile asyncFile = Mesh.vertx().fileSystem().openBlocking(ul.uploadedFileName(), new OpenOptions()); Binary binary = field.getBinary(); String hash = binary.getSHA512Sum(); String binaryUuid = binary.getUuid(); String contentType = ul.contentType(); boolean isImage = contentType.startsWith("image/"); // Calculate how many streams will connect to the data stream int neededDataStreams = 0; if (isImage) { neededDataStreams++; } if (storeBinary) { neededDataStreams++; } if (neededDataStreams > 0) { Observable<Buffer> stream = RxUtil.toBufferObs(asyncFile).publish().autoConnect(neededDataStreams); // Only gather image info for actual images. Otherwise return an empty image info object. Single<Optional<ImageInfo>> imageInfo = Single.just(Optional.empty()); if (isImage) { imageInfo = processImageInfo(ac, stream); } // Store the data Single<Long> store = Single.just(ul.size()); if (storeBinary) { store = binaryStorage.store(stream, binaryUuid).andThen(Single.just(ul.size())); } // Handle the data in parallel TransformationResult info = Single.zip(imageInfo, store, (imageinfoOpt, size) -> { ImageInfo iinfo = null; if (imageinfoOpt.isPresent()) { iinfo = imageinfoOpt.get(); } return new TransformationResult(hash, 0, iinfo, null); }).blockingGet(); // Only add image information if image properties were found if (info.getImageInfo() != null) { binary.setImageHeight(info.getImageInfo().getHeight()); binary.setImageWidth(info.getImageInfo().getWidth()); field.setImageDominantColor(info.getImageInfo().getDominantColor()); } } field.setFileName(ul.fileName()); field.getBinary().setSize(ul.size()); field.setMimeType(contentType); }
/** * Main work done by the FileDataHandler which reads in line by line and passes on that line * to the correct Importer implementation for line processing * @param fileSize * @param conf * @param replyHandler - the handler returns a job object with success and error counter parameters * to be persisted by the job runner */ private void parseFile(long fileSize, Job conf, Handler<AsyncResult<Job>> replyHandler) { String file = conf.getParameters().get(0).getValue(); vertx.fileSystem().open(file, new OpenOptions(), ar -> { if (ar.succeeded()) { AsyncFile rs = ar.result(); rs.handler(new FileDataHandler(vertx, conf, fileSize, importerCache.get(conf.getName()), reply -> { if(reply.failed()){ if(reply.cause().getMessage().contains(RTFConsts.STATUS_ERROR_THRESHOLD)){ log.error("Stopping import... Error threshold exceeded for file " + file); try{ //can throw an exception if the error threshold is met at //the last bulk where the endHandler is called before the stop on error is called rs.pause().close(); } catch(Exception e){ log.error("Error threshold hit on last block of data ", e); } replyHandler.handle(io.vertx.core.Future.failedFuture(RTFConsts.STATUS_ERROR_THRESHOLD)); } } else{ replyHandler.handle(io.vertx.core.Future.succeededFuture(reply.result())); } })); rs.exceptionHandler(t -> { log.error("Error reading from file " + file, t); replyHandler.handle(io.vertx.core.Future.failedFuture(RTFConsts.STATUS_ERROR)); }); rs.endHandler(v -> { rs.close(ar2 -> { if (ar2.failed()) { log.error("Error closing file " + file, ar2.cause()); } }); }); } else { log.error("Error opening file " + file, ar.cause()); replyHandler.handle(io.vertx.core.Future.failedFuture(RTFConsts.STATUS_ERROR)); } }); }
public RsAsyncFile(AsyncFile asyncFile) { this.asyncFile = asyncFile; }
private void testRequestWithBody(HttpMethod method, boolean chunked) throws Exception { String expected = TestUtils.randomAlphaString(1024 * 1024); File f = File.createTempFile("vertx", ".data"); f.deleteOnExit(); Files.write(f.toPath(), expected.getBytes()); waitFor(2); server.requestHandler(req -> req.bodyHandler(buff -> { assertEquals(method, req.method()); assertEquals(Buffer.buffer(expected), buff); complete(); req.response().end(); })); startServer(); vertx.runOnContext(v -> { AsyncFile asyncFile = vertx.fileSystem().openBlocking(f.getAbsolutePath(), new OpenOptions()); HttpRequest<Buffer> builder = null; switch (method) { case POST: builder = client.post(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath"); break; case PUT: builder = client.put(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath"); break; case PATCH: builder = client.patch(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath"); break; default: fail("Invalid HTTP method"); } if (!chunked) { builder = builder.putHeader("Content-Length", "" + expected.length()); } builder.sendStream(asyncFile, onSuccess(resp -> { assertEquals(200, resp.statusCode()); complete(); })); }); await(); }
public AsyncFile getFile() { return file; }
/** * Gets the opened file, ready to read. * * @return */ public AsyncFile getFile() { return file; }