在我的一生中,我一直无法找到与我想做的事情相匹配的问题,因此在这里我将解释我的用例。如果您知道某个主题已涵盖此问题的答案,请随时将我引向该主题。:)
我有一段代码可以定期(每20秒)将文件上传到Amazon S3。该文件是由另一个进程写入的日志文件,因此此功能实际上是尾随日志的一种方式,以便某人可以半实时读取其内容,而不必直接访问日志所在的计算机。 。
直到最近,我只是一直使用S3 PutObject方法(使用File作为输入)来执行此上传。但是在AWS开发工具包1.9中,这不再起作用,因为如果实际上载的内容大小大于上载开始时承诺的内容长度,则S3客户端会拒绝该请求。此方法在开始流传输数据之前先读取文件的大小,因此,鉴于此应用程序的性质,文件很可能在该点与流的末尾之间已增大。这意味着我现在需要确保无论文件有多大,我都只发送N字节的数据。
我不需要以任何方式解释文件中的字节,因此我不关心编码。我可以逐字节传输它。基本上,我想要的是一种简单的方法,在该方法中,我可以读取文件的第N个字节,然后使文件终止读取,即使该点之后文件中还有更多数据。(换句话说,将EOF插入流中的特定位置。)
例如,如果开始上载时我的文件长为10000字节,但是在上载期间增长到12000字节,则无论大小更改如何,我都希望以10000字节停止上载。(在随后的上传中,我然后将上传12000字节或更多。)
我还没有找到执行此操作的预制方法- 到目前为止,我发现最好的方法似乎是IOUtils.copyLarge(InputStream,OutputStream,offset,length),可以告诉它复制最大的“长度”字节提供给提供的OutputStream。但是,copyLarge和BlockObject一样都是阻塞方法(大概在InputStream上调用read()的形式),因此看来我根本无法使它正常工作。
我还没有找到任何可以执行此操作的方法或预构建的流,因此让我觉得我需要编写自己的实现来直接监视已读取的字节数。然后,这可能像BufferedInputStream一样工作,其中每批读取的字节数是缓冲区大小或要读取的剩余字节中的较小者。(例如,在缓冲区大小为3000字节的情况下,我将进行三批处理,每批处理3000字节,然后再批处理1000字节+ EOF。)
有人知道更好的方法吗?谢谢。
编辑 为了澄清一下,我已经知道了几种替代方法,但都不是理想的:
(1)我可以在上传文件时锁定它。这样做会导致数据丢失或写入文件的过程中的操作问题。
(2)我可以在上传文件之前创建文件的本地副本。这可能是非常低效的,并且会占用大量不必要的磁盘空间(此文件可能会扩展到几GB的范围,并且正在运行的计算机可能缺少磁盘空间)。
编辑2: 根据同事的建议,我的最终解决方案如下所示:
private void uploadLogFile(final File logFile) { if (logFile.exists()) { long byteLength = logFile.length(); try ( FileInputStream fileStream = new FileInputStream(logFile); InputStream limitStream = ByteStreams.limit(fileStream, byteLength); ) { ObjectMetadata md = new ObjectMetadata(); md.setContentLength(byteLength); // Set other metadata as appropriate. PutObjectRequest req = new PutObjectRequest(bucket, key, limitStream, md); s3Client.putObject(req); } // plus exception handling } }
LimitInputStream是我的同事建议的,显然不知道它已被弃用。ByteStreams.limit是当前的番石榴替代品,它可以满足我的要求。感谢大家。
完整的答案撕裂并替换:
包装a InputStream这样相对简单,以限制在发送数据结束之前将要传递的字节数。 FilterInputStream是针对这种一般性工作的,但是由于您必须重写此 特定 工作的几乎所有方法,因此它会妨碍您的工作。
InputStream
FilterInputStream
这是解决方案的粗略做法:
import java.io.IOException; import java.io.InputStream; /** * An {@code InputStream} wrapper that provides up to a maximum number of * bytes from the underlying stream. Does not support mark/reset, even * when the wrapped stream does, and does not perform any buffering. */ public class BoundedInputStream extends InputStream { /** This stream's underlying @{code InputStream} */ private final InputStream data; /** The maximum number of bytes still available from this stream */ private long bytesRemaining; /** * Initializes a new {@code BoundedInputStream} with the specified * underlying stream and byte limit * @param data the @{code InputStream} serving as the source of this * one's data * @param maxBytes the maximum number of bytes this stream will deliver * before signaling end-of-data */ public BoundedInputStream(InputStream data, long maxBytes) { this.data = data; bytesRemaining = Math.max(maxBytes, 0); } @Override public int available() throws IOException { return (int) Math.min(data.available(), bytesRemaining); } @Override public void close() throws IOException { data.close(); } @Override public synchronized void mark(int limit) { // does nothing } @Override public boolean markSupported() { return false; } @Override public int read(byte[] buf, int off, int len) throws IOException { if (bytesRemaining > 0) { int nRead = data.read( buf, off, (int) Math.min(len, bytesRemaining)); bytesRemaining -= nRead; return nRead; } else { return -1; } } @Override public int read(byte[] buf) throws IOException { return this.read(buf, 0, buf.length); } @Override public synchronized void reset() throws IOException { throw new IOException("reset() not supported"); } @Override public long skip(long n) throws IOException { long skipped = data.skip(Math.min(n, bytesRemaining)); bytesRemaining -= skipped; return skipped; } @Override public int read() throws IOException { if (bytesRemaining > 0) { int c = data.read(); if (c >= 0) { bytesRemaining -= 1; } return c; } else { return -1; } } }