public void followFile(Path file, FileInput.InitialReadPosition customInitialReadPosition) throws IOException { synchronized (this) { if (isFollowingFile(file)) { log.debug("Not following file {} because it's already followed.", file); return; } log.debug("Following file {}", file); final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ); final ChunkReader chunkReader = new ChunkReader(input, file, fileChannel, chunkQueue, readerBufferSize, customInitialReadPosition, this); final ScheduledFuture<?> chunkReaderFuture = scheduler.scheduleAtFixedRate(chunkReader, 0, readerInterval, TimeUnit.MILLISECONDS); chunkReaderTasks.putIfAbsent(file, new ChunkReaderTask(chunkReaderFuture, fileChannel)); } }
/** * Primary constructor * @param output output file * @throws IOException Thrown if there is an I/O error when creating file. */ public DocumentBlockFileWriter(File output, DocumentStorageLevel level, DataFilter filter) throws IOException { this.output = output; this.storageLevel = level; this.filter = filter; this.fileChannel = AsynchronousFileChannel.open(Paths.get(output.toURI()), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); try { byte[] magicHeader = MAGIC_V1; byte[] filterHeader = filter != null ? filter.id() : DocumentFileWriter.FILTER_NA; fileChannel.write(ByteBuffer.wrap(magicHeader),0).get(); fileChannel.write(ByteBuffer.wrap(filterHeader),4).get(); allocatedSpace.addAndGet(magicHeader.length+filterHeader.length); } catch (ExecutionException | InterruptedException e) { throw new IOException(e); } }
/** * Primary constructor * @param output output file * @throws IOException Thrown if there is an I/O error when creating file. */ public DocumentFileWriter(File output, DocumentStorageLevel level, DataFilter filter) throws IOException { this.output = output; this.storageLevel = level; this.filter = filter; this.fileChannel = AsynchronousFileChannel.open(Paths.get(output.getAbsoluteFile().toURI()), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); try { fileChannel.write(ByteBuffer.wrap(MAGIC_V1), 0).get(); fileChannel.write(ByteBuffer.wrap(filter == null ? FILTER_NA : filter.id()), 4).get(); allocatedSpace.addAndGet(MAGIC_V1.length + 2); } catch (ExecutionException | InterruptedException e) { throw new IOException(e); } }
private void save(boolean saveLaterIfWriting) { boolean canSaveNow = currentlyWriting.compareAndSet(false, true);// atomically sets to true if false if (canSaveNow) {// no writing is in progress: start one immediately // Writes the config data to a ByteBuffer CharsWrapper.Builder builder = new CharsWrapper.Builder(512); writer.write(config, builder); CharBuffer chars = CharBuffer.wrap(builder.build()); ByteBuffer buffer = charset.encode(chars); // Writes the ByteBuffer to the file, asynchronously synchronized (channelGuard) { try { channel = AsynchronousFileChannel.open(file.toPath(), openOptions); channel.write(buffer, channel.size(), null, writeCompletedHandler); } catch (IOException e) { writeCompletedHandler.failed(e, null); } } } else if (saveLaterIfWriting) {// there is a writing in progress: start one later mustWriteAgain.set(true); } }
public FileLock tryLock(Channel channel, long start, long size, boolean shared) throws IOException{ if(!channel.isOpen()) { throw new ClosedChannelException(); } Iterator<EphemeralFsFileLock> iter = locks.iterator(); while(iter.hasNext()) { EphemeralFsFileLock oldLock = iter.next(); if(!oldLock.isValid()) { iter.remove(); } else if(oldLock.overlaps(start, size)) { throw new OverlappingFileLockException(); } } EphemeralFsFileLock newLock = channel instanceof FileChannel ? new EphemeralFsFileLock((FileChannel) channel, start, size, shared) : new EphemeralFsFileLock((AsynchronousFileChannel) channel, start, size, shared); locks.add(newLock); return newLock; }
@Test public void testFutureAsyncReadWrite() throws Exception { Path test = root.resolve("test"); try (AsynchronousFileChannel file = AsynchronousFileChannel.open(test, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.READ )) { byte[] contents = new byte[512]; random.nextBytes(contents); Future<Integer> wf = file.write(ByteBuffer.wrap(contents), 0); assertEquals((Integer) 512, wf.get()); ByteBuffer read = ByteBuffer.allocate(512); Future<Integer> rf = file.read(read, 0); assertEquals((Integer) 512, rf.get()); assertArrayEquals(contents, read.array()); } }
@Test public void testErrorGoesToHandler() throws Exception { Path test = root.resolve("test"); AsynchronousFileChannel file = AsynchronousFileChannel.open(test, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.READ ); file.close(); byte[] contents = new byte[512]; random.nextBytes(contents); CountdownCompletionHandler<Integer> writeCompletionHandler = new CountdownCompletionHandler<>(); file.write(ByteBuffer.wrap(contents), 0, writeCompletionHandler, writeCompletionHandler); assertTrue(writeCompletionHandler.getExc() instanceof ClosedChannelException); }
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { AsynchronousFileChannel afc = AsynchronousFileChannel.open(Paths.get("/Users/life/lifey40Video.avi"), StandardOpenOption.READ) ; System.out.println(afc.getClass()); ByteBuffer r = ByteBuffer.allocate(500*1024*1024) ; CompletableFuture<Integer> cf = new CompletableFuture<>(); // sleep(20000); System.out.println("Now"); long s = System.currentTimeMillis(); Future<Integer> c = afc.read(r,0); // Future<Integer> a = afc.read(r,10*1024*1024); CompletableFuture<Integer> cf2 = cf.thenApplyAsync((res) -> { sleep(1000); System.out.println("mush"); return res; }, ForkJoinPool.commonPool()); System.out.println("mish"+cf2.get()); // int i = a.get(); sleep(1000000); }
private void handleStoreFile(GetFileReceiver msg) { Path file = msg.getFile(); log.debug("storing data in file: {}", file); ActorRef sender = getSender(); resolveFile(msg, file, false, resolvedFile -> { try { ActorRef receiver = getContext().actorOf( ChannelReceiver.props(AsynchronousFileChannel.open( resolvedFile, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE)), nameGenerator.getName(ChannelReceiver.class)); getSender().tell(new FileReceiver(receiver), getSelf()); } catch(Exception e) { sender.tell(new Failure(e), getSelf()); } }); }
private void handleFetchFile(FetchFile msg) throws Exception { Path file = msg.getFile(); log.debug("fetching file: {}", file); ActorRef sender = getSender(); resolveFile(msg, file, resolvedFile -> { try { ActorRef cursor = getContext().actorOf( ChannelCursor.props(AsynchronousFileChannel.open( resolvedFile, StandardOpenOption.READ)), nameGenerator.getName(ChannelCursor.class)); cursor.tell(new NextItem(), sender); } catch(Exception e) { sender.tell(new Failure(e), getSelf()); } }); }
/** * Write async. * * @param target the target * @param bytes the bytes * @param openOptions the open options * @return the future */ public static Future<Void> writeAsync(final Path target, final byte[] bytes, final OpenOption... openOptions) { if (isTraceEnabled) log.trace("비동기 방식으로 데이터를 파일에 씁니다. target=[{}], openOptions=[{}]", target, listToString(openOptions)); return AsyncTool.startNew(new Callable<Void>() { @Override public Void call() throws Exception { try (AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(target, openOptions)) { Future<Integer> result = fileChannel.write(ByteBuffer.wrap(bytes), 0); while (!result.isDone()) { Thread.sleep(1); } return null; } catch (Exception e) { log.error("비동기 방식으로 파일에 쓰는 동안 예외가 발생했습니다.", e); throw new RuntimeException(e); } } }); }
public ChunkReader(Input fileInput, Path path, AsynchronousFileChannel fileChannel, BlockingQueue<FileChunk> chunks, int initialChunkSize, FileInput.InitialReadPosition initialReadPosition, ChunkReaderScheduler chunkReaderScheduler) { this.fileInput = fileInput; this.path = path; this.fileChannel = fileChannel; this.chunks = chunks; this.initialChunkSize = initialChunkSize; this.chunkReaderScheduler = chunkReaderScheduler; Preconditions.checkArgument(initialChunkSize > 0, "Chunk size must be positive"); if (fileChannel.isOpen()) { try { final BasicFileAttributes attr = Files.readAttributes(path, BasicFileAttributes.class); fileKey = attr.fileKey(); if (initialReadPosition == FileInput.InitialReadPosition.END) { position = attr.size(); } } catch (IOException e) { log.error("Cannot access file metadata", e); } } }
@Test public void readPositionEnd() throws IOException, InterruptedException { final Utils.LogFile logFile = new Utils.LogFile(100 * 1024, 400, 100); logFile.close(); final ArrayBlockingQueue<FileChunk> chunkQueue = Queues.newArrayBlockingQueue(1); final AsynchronousFileChannel channel = AsynchronousFileChannel.open(logFile.getPath(), StandardOpenOption.READ); final CountingAsyncFileChannel spy = new CountingAsyncFileChannel(channel); final ChunkReader chunkReader = new ChunkReader(mock(FileInput.class), logFile.getPath(), spy, chunkQueue, 10 * 1024, FileInput.InitialReadPosition.END, null); final ScheduledExecutorService chunkReaderExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() .setDaemon(false) .setNameFormat("file-chunk-reader-%d") .setUncaughtExceptionHandler(this) .build() ); final Thread consumer = new Thread() { @Override public void run() { try { final FileChunk chunk = chunkQueue.poll(2, TimeUnit.SECONDS); assertNull("Reading from the end of the file must not produce a chunk for a non-changing file.", chunk); } catch (InterruptedException ignore) { } } }; consumer.start(); chunkReaderExecutor.scheduleAtFixedRate(chunkReader, 0, 250, TimeUnit.MILLISECONDS); consumer.join(); // we can process one chunk at a time, so one read is queued, the second is buffered assertEquals("The e should be empty", 1, chunkQueue.remainingCapacity()); }
public Publisher<Integer> save(Publisher<Customer> customers) throws IOException { AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE); AtomicLong offset = new AtomicLong(0); AtomicInteger resultCount = new AtomicInteger(0); SingleItemPublisher<Integer> resultPublisher = new SingleItemPublisher<>(); Semaphore writeSemaphore = new Semaphore(1); writeSemaphore.acquireUninterruptibly(); fileChannel.write(ByteBuffer.wrap("[".getBytes()), 0, resultPublisher, andThen((count, s) -> { writeSemaphore.release(); customers.subscribe(pullEach((Customer customer, Subscription subscription) -> { String json = String.format("%s{\"firstName\": \"%s\", \"lastName\": \"%s\"}", offset.longValue() == 0 ? "" : ",", customer.getFirstName(), customer.getLastName()); offset.addAndGet(count); writeSemaphore.acquireUninterruptibly(); fileChannel.write(ByteBuffer.wrap(json.getBytes()), offset.get(), resultPublisher, andThen((size, c) -> { writeSemaphore.release(); offset.addAndGet(size); resultCount.incrementAndGet(); subscription.request(1); })); }).andThen(() -> { writeSemaphore.acquireUninterruptibly(); fileChannel.write(ByteBuffer.wrap("]".getBytes()), offset.longValue(), resultPublisher, andThen((d, e) -> { writeSemaphore.release(); try { fileChannel.close(); resultPublisher.publish(resultCount.intValue()); } catch (IOException error) { resultPublisher.publish(error); } })); }).exceptionally(error -> resultPublisher.publish(error))); })); return resultPublisher; }
private static AsynchronousFileChannel openInputChannel(File file) { try { final Path path = Paths.get(file.getAbsolutePath()); if (!Files.exists(path)) { Files.createFile(path); } return AsynchronousFileChannel.open(path, StandardOpenOption.READ); } catch (IOException e) { throw new UncheckedIOException(e); } }
@Override public AsynchronousFileChannel newAsynchronousFileChannel( Path path, Set<? extends OpenOption> options, ExecutorService exec, FileAttribute<?>... attrs ) throws IOException { throw new UnsupportedOperationException(); }
public Observable<Void> open(SfsVertx vertx, StandardOpenOption openOption, StandardOpenOption... openOptions) { executorService = vertx.getIoPool(); Context context = vertx.getOrCreateContext(); return aVoid() .doOnNext(aVoid -> checkState(status.compareAndSet(STOPPED, STARTING))) .flatMap(aVoid -> RxHelper.executeBlocking(context, vertx.getBackgroundPool(), () -> { try { createDirectories(file.getParent()); Set<StandardOpenOption> options = new HashSet<>(); options.add(openOption); addAll(options, openOptions); channel = AsynchronousFileChannel.open( file, options, executorService); return (Void) null; } catch (IOException e) { throw new RuntimeException(e); } })) .doOnNext(aVoid -> checkState(status.compareAndSet(STARTING, STARTED))); }
public Observable<Void> open(SfsVertx vertx, StandardOpenOption openOption, StandardOpenOption... openOptions) { this.vertx = vertx; this.executorService = vertx.getIoPool(); Context context = vertx.getOrCreateContext(); return aVoid() .doOnNext(aVoid -> checkState(status.compareAndSet(STOPPED, STARTING))) .flatMap(aVoid -> RxHelper.executeBlocking(context, vertx.getBackgroundPool(), () -> { try { createDirectories(file.getParent()); Set<StandardOpenOption> options = new HashSet<>(); options.add(openOption); addAll(options, openOptions); channel = AsynchronousFileChannel.open( file, options, executorService); return (Void) null; } catch (IOException e) { throw new RuntimeException(e); } })) .doOnNext(aVoid -> { long id = vertx.setPeriodic(100, event -> cleanupOrphanedWriters()); periodics.add(id); }) .doOnNext(aVoid -> checkState(status.compareAndSet(STARTING, STARTED))) .onErrorResumeNext(throwable -> { checkState(status.compareAndSet(STARTING, START_FAILED)); return error(throwable); }); }
public AsyncFileWriterImpl(long startPosition, WriteQueueSupport<AsyncFileWriter> writeQueueSupport, Context context, AsynchronousFileChannel dataFile, Logger log) { this.log = log; this.startPosition = startPosition; this.writePos = startPosition; this.ch = dataFile; this.context = context; this.writeQueueSupport = writeQueueSupport; this.lastWriteTime = System.currentTimeMillis(); }
public AsyncFileReaderImpl(Context context, long startPosition, int bufferSize, long length, AsynchronousFileChannel dataFile, Logger log) { this.log = log; this.bufferSize = bufferSize; this.readPos = startPosition; this.bytesRemaining = length; this.startPosition = startPosition; this.ch = dataFile; this.context = context; }
TestCompletionHandler(AsynchronousFileChannel channel, FluxSink<ByteBuf> sink, ByteBufAllocator allocator, int chunk) { this.channel = channel; this.sink = sink; this.allocator = allocator; this.chunk = chunk; this.position = new AtomicLong(0); }
public AsyncFile(File file, long filesize) throws IOException { super(); this.file = file; this.filesize = filesize; _log.debug("opening file " + file.getAbsolutePath()); this.ch = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); }
public void close(){ for(AsynchronousFileChannel channel: this.map.keySet()){ if(channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); log.info("close file failed"); } } } }
public boolean load(boolean first) throws IOException { Path startingDir = Paths.get(config.basePath + "/" + config.suffix); boolean exist = false; while (!exist) { exist = Files.exists(startingDir, LinkOption.NOFOLLOW_LINKS); if (!exist) { log.info("dir not exist(sleeping):" + startingDir.toAbsolutePath()); try { Thread.sleep(2000L); } catch (InterruptedException e) { log.info("recv interrunpt"); return false; } } } this.channel = AsynchronousFileChannel.open(startingDir, StandardOpenOption.READ); this.node = new FileNode(); node.setFileName(startingDir.toAbsolutePath().toString()); node.setBf(ByteBuffer.allocate(1000000)); node.getBf().clear(); node.setCnt(null); node.setOffset(0); if (first && config.useFileCurrent) { long size = this.channel.size(); if (size > 1000) { size = size - 1000; } node.setOffset(size); } node.setCurTime(System.currentTimeMillis()); log.info("load file successs:" + startingDir.toAbsolutePath() + " with offset:" + node.getOffset()); log.info("init successs!"); return true; }
public AsynchronousFileChannel newAsynchronousByteChannel( EphemeralFsPath efsPath, Set<? extends OpenOption> options, ExecutorService executor, FileAttribute<?>[] attrs) throws IOException { if(executor == null) { executor = asyncThreadPoolHolder.getThreadPool(); } EphemeralFsFileChannel channel = newByteChannel(efsPath, options, attrs); return new EphemeralFsAsynchronousFileChannel(channel, executor); }
@Override public AsynchronousFileChannel newAsynchronousFileChannel(Path path, Set<? extends OpenOption> options, ExecutorService executor, FileAttribute<?>... attrs) throws IOException { return getFs(path).newAsynchronousByteChannel( toEfsPath(path), options, executor, attrs); }
@Test public void testAsyncTryLockAcquiredBy() throws Exception { Path path = root.resolve("locked"); try(AsynchronousFileChannel channel = AsynchronousFileChannel.open( path, WRITE, CREATE_NEW)) { FileLock lock = channel.tryLock(); assertSame(channel, lock.acquiredBy()); assertNull(lock.channel()); } }
@Test public void testHandlerAsyncReadWrite() throws Exception { Path test = root.resolve("test"); try(AsynchronousFileChannel file = AsynchronousFileChannel.open(test, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.READ )) { byte[] contents = new byte[512]; random.nextBytes(contents); CountdownCompletionHandler<Integer> writeCompletionHandler = new CountdownCompletionHandler<>(); file.write(ByteBuffer.wrap(contents), 0, writeCompletionHandler, writeCompletionHandler); assertEquals((Integer) 512, writeCompletionHandler.getResult()); ByteBuffer read = ByteBuffer.allocate(512); CountdownCompletionHandler<Integer> readCompletionHandler = new CountdownCompletionHandler<>(); file.read(read, 0, readCompletionHandler, readCompletionHandler); assertEquals((Integer) 512, readCompletionHandler.getResult()); assertArrayEquals(contents, read.array()); } }
private static void readFile(Path path, ReadHandler handler) throws IOException { AsynchronousFileChannel fileChannel = open(path, readOptions); long size = fileChannel.size(); if(size <= Integer.MAX_VALUE) { readSmallFile(fileChannel, (int)size, handler); } else { throw new IllegalArgumentException("File is too big."); } }
public FileRequestObject(Path path, String mimeType, Request request) throws IOException { this.fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ); this.fileSize = Files.size(path); this.position = 0; this.mimeType = mimeType; this.request = request; this.nextSequence = 0; this.lastModified = new Date(Files.getLastModifiedTime(path, LinkOption.NOFOLLOW_LINKS).toMillis()); this.etag = FileUtils.computeEtag(path.getFileName().toString(), this.lastModified); }
FileTransport(String path) { try { ch = AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE); } catch (IOException e) { throw new RuntimeException(e); } }
@Override public void init(DataStreamChannel channel, OperationCallback or) { this.channel = channel; RecordStruct rec = channel.getBinding(); this.path = new CommonPath(rec.getFieldAsString("FilePath")); this.offset = rec.getFieldAsInteger("Offset", 0); this.file = FileSystemFile.this.driver.resolveToLocalPath(this.path); if (!Files.exists(this.file)) { or.error(1, "FS failed to find file: " + this.file); or.complete(); return; } if (Files.isDirectory(this.file)) { or.error(1, "FS found directory: " + this.file); or.complete(); return; } or.info("Opening " + this.file + " for read"); try { this.sbc = AsynchronousFileChannel.open(this.file); // TODO skip to offset } catch (IOException x) { or.error(1, "FS failed to open file: " + x); } or.complete(); }
@Override public AsynchronousFileChannel newAsynchronousFileChannel( Path path, Set<? extends OpenOption> options, @Nullable ExecutorService executor, FileAttribute<?>... attrs) throws IOException { // call newFileChannel and cast so that FileChannel support is checked there JimfsFileChannel channel = (JimfsFileChannel) newFileChannel(path, options, attrs); if (executor == null) { JimfsFileSystem fileSystem = (JimfsFileSystem) path.getFileSystem(); executor = fileSystem.getDefaultThreadPool(); } return channel.asAsynchronousFileChannel(executor); }
private static void checkAsyncRead(AsynchronousFileChannel channel) throws Throwable { ByteBuffer buf = buffer("1234567890"); assertEquals(10, (int) channel.read(buf, 0).get()); buf.flip(); SettableFuture<Integer> future = SettableFuture.create(); channel.read(buf, 0, null, setFuture(future)); assertThat(future.get(10, SECONDS)).isEqualTo(10); }
private static void checkAsyncWrite(AsynchronousFileChannel asyncChannel) throws Throwable { ByteBuffer buf = buffer("1234567890"); assertEquals(10, (int) asyncChannel.write(buf, 0).get()); buf.flip(); SettableFuture<Integer> future = SettableFuture.create(); asyncChannel.write(buf, 0, null, setFuture(future)); assertThat(future.get(10, SECONDS)).isEqualTo(10); }
private static void checkAsyncLock(AsynchronousFileChannel channel) throws Throwable { assertNotNull(channel.lock().get()); assertNotNull(channel.lock(0, 10, true).get()); SettableFuture<FileLock> future = SettableFuture.create(); channel.lock(0, 10, true, null, setFuture(future)); assertNotNull(future.get(10, SECONDS)); }
public NioBackingFile(File file) throws IOException { this.file = file; Path path = Paths.get(file.getAbsolutePath()); log.warn("We want to disable the read cache (probably?)"); this.fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.READ); }