/** * Creates and adds a WriteCtx into the pendingWrites map. This is a * synchronized method to handle concurrent writes. * * @return A non-null {@link WriteCtx} instance if the incoming write * request's offset >= nextOffset. Otherwise null. */ private synchronized WriteCtx addWritesToCache(WRITE3Request request, Channel channel, int xid) { long offset = request.getOffset(); int count = request.getCount(); long cachedOffset = nextOffset.get(); int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT; if (LOG.isDebugEnabled()) { LOG.debug( "requesed offset=" + offset + " and current offset=" + cachedOffset); } // Handle a special case first if ((offset < cachedOffset) && (offset + count > cachedOffset)) { // One Linux client behavior: after a file is closed and reopened to // write, the client sometimes combines previous written data(could still // be in kernel buffer) with newly appended data in one write. This is // usually the first write after file reopened. In this // case, we log the event and drop the overlapped section. LOG.warn(String.format( "Got overwrite with appended data (%d-%d)," + " current offset %d," + " drop the overlapped section (%d-%d)" + " and append new data (%d-%d).", offset, (offset + count - 1), cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset + count - 1))); if (!pendingWrites.isEmpty()) { LOG.warn("There are other pending writes, fail this jumbo write"); return null; } LOG.warn("Modify this write to write only the appended data"); alterWriteRequest(request, cachedOffset); // Update local variable originalCount = count; offset = request.getOffset(); count = request.getCount(); } // Fail non-append call if (offset < cachedOffset) { LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + "," + nextOffset + ")"); return null; } else { DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP : WriteCtx.DataState.ALLOW_DUMP; WriteCtx writeCtx = new WriteCtx(request.getHandle(), request.getOffset(), request.getCount(), originalCount, request.getStableHow(), request.getData(), channel, xid, false, dataState); if (LOG.isDebugEnabled()) { LOG.debug("Add new write to the list with nextOffset " + cachedOffset + " and requesed offset=" + offset); } if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { // update the memory size updateNonSequentialWriteInMemory(count); } // check if there is a WriteCtx with the same range in pendingWrites WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid); if (oldWriteCtx == null) { addWrite(writeCtx); } else { LOG.warn("Got a repeated request, same range, with xid:" + writeCtx.getXid()); } return writeCtx; } }
/** * Creates and adds a WriteCtx into the pendingWrites map. This is a * synchronized method to handle concurrent writes. * * @return A non-null {@link WriteCtx} instance if the incoming write * request's offset >= nextOffset. Otherwise null. */ private synchronized WriteCtx addWritesToCache(WRITE3Request request, Channel channel, int xid) { long offset = request.getOffset(); int count = request.getCount(); long cachedOffset = nextOffset.get(); int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT; if (LOG.isDebugEnabled()) { LOG.debug("requesed offset=" + offset + " and current offset=" + cachedOffset); } // Handle a special case first if ((offset < cachedOffset) && (offset + count > cachedOffset)) { // One Linux client behavior: after a file is closed and reopened to // write, the client sometimes combines previous written data(could still // be in kernel buffer) with newly appended data in one write. This is // usually the first write after file reopened. In this // case, we log the event and drop the overlapped section. LOG.warn(String.format("Got overwrite with appended data (%d-%d)," + " current offset %d," + " drop the overlapped section (%d-%d)" + " and append new data (%d-%d).", offset, (offset + count - 1), cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset + count - 1))); if (!pendingWrites.isEmpty()) { LOG.warn("There are other pending writes, fail this jumbo write"); return null; } LOG.warn("Modify this write to write only the appended data"); alterWriteRequest(request, cachedOffset); // Update local variable originalCount = count; offset = request.getOffset(); count = request.getCount(); } // Fail non-append call if (offset < cachedOffset) { LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + "," + nextOffset + ")"); return null; } else { DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP : WriteCtx.DataState.ALLOW_DUMP; WriteCtx writeCtx = new WriteCtx(request.getHandle(), request.getOffset(), request.getCount(), originalCount, request.getStableHow(), request.getData(), channel, xid, false, dataState); if (LOG.isDebugEnabled()) { LOG.debug("Add new write to the list with nextOffset " + cachedOffset + " and requesed offset=" + offset); } if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { // update the memory size updateNonSequentialWriteInMemory(count); } // check if there is a WriteCtx with the same range in pendingWrites WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid); if (oldWriteCtx == null) { addWrite(writeCtx); } else { LOG.warn("Got a repeated request, same range, with xid:" + writeCtx.getXid()); } return writeCtx; } }