/** * Returns existing row lock if found, otherwise obtains a new row lock and returns it. * @param lockid requested by the user, or null if the user didn't already hold lock * @param row the row to lock * @param waitForLock if true, will block until the lock is available, otherwise will simply * return null if it could not acquire the lock. * @return lockid or null if waitForLock is false and the lock was unavailable. */ protected Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) throws IOException { Integer lid; if (lockid == null) { lid = internalObtainRowLock(row, waitForLock); } else { HashedBytes rowFromLock = lockIds.get(lockid); if (!row.equals(rowFromLock)) { throw new IOException("Invalid row lock: LockId: " + lockid + " holds the lock for row: " + rowFromLock + " but wanted lock for row: " + row); } lid = lockid; } return lid; }
/** * Release the row lock! * @param lockId The lock ID to release. */ public void releaseRowLock(final Integer lockId) { if (lockId == null) return; // null lock id, do nothing HashedBytes rowKey = lockIds.remove(lockId); if (rowKey == null) { LOG.warn("Release unknown lockId: " + lockId); return; } CountDownLatch rowLatch = lockedRows.remove(rowKey); if (rowLatch == null) { LOG.error("Releases row not locked, lockId: " + lockId + " row: " + rowKey); return; } rowLatch.countDown(); }
@Test public void testGettingTheLockMatchesMyRow() throws Exception { MockHRegion region = getMockHRegion(); HashedBytes rowKey = new HashedBytes(Bytes.toBytes(1)); assertEquals(Integer.valueOf(2), region.getLock(null, rowKey, false)); assertEquals(Integer.valueOf(2), region.getLock(2, rowKey, false)); }
@Override public Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); } return super.getLock(lockid, row, waitForLock); }
/** * A version of getRowLock(byte[], boolean) to use when a region operation has already been * started (the calling thread has already acquired the region-close-guard lock). */ protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException { checkRow(row, "row lock"); HashedBytes rowKey = new HashedBytes(row); RowLockContext rowLockContext = new RowLockContext(rowKey); // loop until we acquire the row lock (unless !waitForLock) while (true) { RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); if (existingContext == null) { // Row is not already locked by any thread, use newly created context. break; } else if (existingContext.ownedByCurrentThread()) { // Row is already locked by current thread, reuse existing context instead. rowLockContext = existingContext; break; } else { if (!waitForLock) { return null; } try { // Row is already locked by some other thread, give up or wait for it if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out waiting for lock for row: " + rowKey); } } catch (InterruptedException ie) { LOG.warn("Thread interrupted waiting for lock on row: " + rowKey); InterruptedIOException iie = new InterruptedIOException(); iie.initCause(ie); throw iie; } } } // allocate new lock for this thread return rowLockContext.newLock(); }
/** * Returns existing row lock if found, otherwise * obtains a new row lock and returns it. * @param lockid requested by the user, or null if the user didn't already hold lock * @param row the row to lock * @param waitForLock if true, will block until the lock is available, otherwise will * simply return null if it could not acquire the lock. * @return lockid or null if waitForLock is false and the lock was unavailable. */ protected Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) throws IOException { Integer lid; if (lockid == null) { lid = internalObtainRowLock(row, waitForLock); } else { HashedBytes rowFromLock = lockIds.get(lockid); if (!row.equals(rowFromLock)) { throw new IOException("Invalid row lock: LockId: " + lockid + " holds the lock for row: " + rowFromLock + " but wanted lock for row: " + row); } lid = lockid; } return lid; }
/** * Release the row lock! * @param lockId The lock ID to release. */ public void releaseRowLock(final Integer lockId) { HashedBytes rowKey = lockIds.remove(lockId); if (rowKey == null) { LOG.warn("Release unknown lockId: " + lockId); return; } CountDownLatch rowLatch = lockedRows.remove(rowKey); if (rowLatch == null) { LOG.error("Releases row not locked, lockId: " + lockId + " row: " + rowKey); return; } rowLatch.countDown(); }
/** * Get a row lock for the specified row. All locks are reentrant. Before calling this function * make sure that a region operation has already been started (the calling thread has already * acquired the region-close-guard lock). * * @param row The row actions will be performed against * @param readLock is the lock reader or writer. True indicates that a non-exlcusive lock is * requested */ public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { // Make sure the row is inside of this region before getting the lock for // it. checkRow(row, "row lock"); // create an object to use a a key in the row lock map HashedBytes rowKey = new HashedBytes(row); RowLockContext rowLockContext = null; RowLockImpl result = null; TraceScope traceScope = null; // If we're tracing start a span to show how long this took. if (Trace.isTracing()) { traceScope = Trace.startSpan("HRegion.getRowLock"); traceScope.getSpan() .addTimelineAnnotation("Getting a " + (readLock ? "readLock" : "writeLock")); } try { // Keep trying until we have a lock or error out. // TODO: do we need to add a time component here? while (result == null) { // Try adding a RowLockContext to the lockedRows. // If we can add it then there's no other transactions currently // running. rowLockContext = new RowLockContext(rowKey); RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); // if there was a running transaction then there's already a context. if (existingContext != null) { rowLockContext = existingContext; } // Now try an get the lock. // // This can fail as if (readLock) { result = rowLockContext.newReadLock(); } else { result = rowLockContext.newWriteLock(); } } if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("Failed to get row lock"); } result = null; // Clean up the counts just in case this was the thing keeping the // context alive. rowLockContext.cleanUp(); throw new IOException("Timed out waiting for lock for row: " + rowKey); } return result; } catch (InterruptedException ie) { LOG.warn("Thread interrupted waiting for lock on row: " + rowKey); InterruptedIOException iie = new InterruptedIOException(); iie.initCause(ie); if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock"); } Thread.currentThread().interrupt(); throw iie; } finally { if (traceScope != null) { traceScope.close(); } } }
RowLockContext(HashedBytes row) { this.row = row; }
/** * Obtains or tries to obtain the given row lock. * @param waitForLock if true, will block until the lock is available. Otherwise, just tries to * obtain the lock and returns null if unavailable. */ private Integer internalObtainRowLock(final HashedBytes rowKey, boolean waitForLock) throws IOException { checkRow(rowKey.getBytes(), "row lock"); startRegionOperation(); try { CountDownLatch rowLatch = new CountDownLatch(1); // loop until we acquire the row lock (unless !waitForLock) while (true) { CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); if (existingLatch == null) { break; } else { // row already locked if (!waitForLock) { return null; } try { if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out on getting lock for row=" + rowKey); } } catch (InterruptedException ie) { // Empty } } } // loop until we generate an unused lock id while (true) { Integer lockId = lockIdGenerator.incrementAndGet(); HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); if (existingRowKey == null) { return lockId; } else { // lockId already in use, jump generator to a new spot lockIdGenerator.set(rand.nextInt()); } } } finally { closeRegionOperation(); } }
@Override public Integer getLock(Integer lockid, HashedBytes row, boolean waitForLock) throws IOException { acqioredLockCount++; return super.getLock(lockid, row, waitForLock); }
RowLockContext(HashedBytes row) { this.row = row; this.thread = Thread.currentThread(); }
/** * Tries to acquire a lock on the given row. * @param waitForLock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns * false if unavailable. * @return the row lock if acquired, * null if waitForLock was false and the lock was not acquired * @throws IOException if waitForLock was true and the lock could not be acquired after waiting */ public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException { checkRow(row, "row lock"); startRegionOperation(); try { HashedBytes rowKey = new HashedBytes(row); RowLockContext rowLockContext = new RowLockContext(rowKey); // loop until we acquire the row lock (unless !waitForLock) while (true) { RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); if (existingContext == null) { // Row is not already locked by any thread, use newly created context. break; } else if (existingContext.ownedByCurrentThread()) { // Row is already locked by current thread, reuse existing context instead. rowLockContext = existingContext; break; } else { // Row is already locked by some other thread, give up or wait for it if (!waitForLock) { return null; } try { if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out waiting for lock for row: " + rowKey); } } catch (InterruptedException ie) { LOG.warn("Thread interrupted waiting for lock on row: " + rowKey); InterruptedIOException iie = new InterruptedIOException(); iie.initCause(ie); throw iie; } } } // allocate new lock for this thread return rowLockContext.newLock(); } finally { closeRegionOperation(); } }
/** * Obtains or tries to obtain the given row lock. * @param waitForLock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns * null if unavailable. */ private Integer internalObtainRowLock(final HashedBytes rowKey, boolean waitForLock) throws IOException { checkRow(rowKey.getBytes(), "row lock"); startRegionOperation(); try { CountDownLatch rowLatch = new CountDownLatch(1); // loop until we acquire the row lock (unless !waitForLock) while (true) { CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); if (existingLatch == null) { break; } else { // row already locked if (!waitForLock) { return null; } try { if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out on getting lock for row=" + rowKey); } } catch (InterruptedException ie) { // Empty } } } // loop until we generate an unused lock id while (true) { Integer lockId = lockIdGenerator.incrementAndGet(); HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); if (existingRowKey == null) { return lockId; } else { // lockId already in use, jump generator to a new spot lockIdGenerator.set(rand.nextInt()); } } } finally { closeRegionOperation(); } }
public ConcurrentHashMap<HashedBytes, RowLockContext> getLockedRows() { return lockedRows; }
/** * Obtains or tries to obtain the given row lock. * @param waitForLock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns * null if unavailable. */ private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) throws IOException { checkRow(row, "row lock"); startRegionOperation(); try { HashedBytes rowKey = new HashedBytes(row); CountDownLatch rowLatch = new CountDownLatch(1); // loop until we acquire the row lock (unless !waitForLock) while (true) { CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); if (existingLatch == null) { break; } else { // row already locked if (!waitForLock) { return null; } try { if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { return null; } } catch (InterruptedException ie) { // Empty } } } // loop until we generate an unused lock id while (true) { Integer lockId = lockIdGenerator.incrementAndGet(); HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); if (existingRowKey == null) { return lockId; } else { // lockId already in use, jump generator to a new spot lockIdGenerator.set(rand.nextInt()); } } } finally { closeRegionOperation(); } }
/** * Obtains or tries to obtain the given row lock. * @param waitForLock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns * null if unavailable. */ private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) throws IOException { checkRow(row, "row lock"); startRegionOperation(); try { HashedBytes rowKey = new HashedBytes(row); CountDownLatch rowLatch = new CountDownLatch(1); // loop until we acquire the row lock (unless !waitForLock) while (true) { CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch); if (existingLatch == null) { break; } else { // row already locked if (!waitForLock) { return null; } try { if (!existingLatch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { throw new IOException("Timed out on getting lock for row=" + Bytes.toStringBinary(row)); } } catch (InterruptedException ie) { // Empty } } } // loop until we generate an unused lock id while (true) { Integer lockId = lockIdGenerator.incrementAndGet(); HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey); if (existingRowKey == null) { return lockId; } else { // lockId already in use, jump generator to a new spot lockIdGenerator.set(rand.nextInt()); } } } finally { closeRegionOperation(); } }
/** * Obtain a lock on the given row. Blocks until success. I know it's strange to have two mappings: * * <pre> * ROWS ==> LOCKS * </pre> * * as well as * * <pre> * LOCKS ==> ROWS * </pre> * * But it acts as a guard on the client; a miswritten client just can't submit the name of a row * and start writing to it; it must know the correct lockid, which matches the lock list in * memory. * <p> * It would be more memory-efficient to assume a correctly-written client, which maybe we'll do in * the future. * @param row Name of row to lock. * @throws IOException * @return The id of the held lock. */ public Integer obtainRowLock(final byte[] row) throws IOException { startRegionOperation(); this.writeRequestsCount.increment(); this.opMetrics.setWriteRequestCountMetrics(this.writeRequestsCount.get()); try { return internalObtainRowLock(new HashedBytes(row), true); } finally { closeRegionOperation(); } }
/** * Obtain a lock on the given row. Blocks until success. * * I know it's strange to have two mappings: * <pre> * ROWS ==> LOCKS * </pre> * as well as * <pre> * LOCKS ==> ROWS * </pre> * * But it acts as a guard on the client; a miswritten client just can't * submit the name of a row and start writing to it; it must know the correct * lockid, which matches the lock list in memory. * * <p>It would be more memory-efficient to assume a correctly-written client, * which maybe we'll do in the future. * * @param row Name of row to lock. * @throws IOException * @return The id of the held lock. */ public Integer obtainRowLock(final byte [] row) throws IOException { startRegionOperation(); this.writeRequestsCount.increment(); this.opMetrics.setWriteRequestCountMetrics( this.writeRequestsCount.get()); try { return internalObtainRowLock(new HashedBytes(row), true); } finally { closeRegionOperation(); } }
/** * Used by unit tests. * @param lockid * @return Row that goes with <code>lockid</code> */ byte[] getRowFromLock(final Integer lockid) { HashedBytes rowKey = lockIds.get(lockid); return rowKey == null ? null : rowKey.getBytes(); }
/** * Returns existing row lock if found, otherwise obtains a new row lock and returns it. * @param lockid requested by the user, or null if the user didn't already hold lock * @param row the row to lock * @param waitForLock if true, will block until the lock is available, otherwise will simply * return null if it could not acquire the lock. * @return lockid or null if waitForLock is false and the lock was unavailable. */ public Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException { return getLock(lockid, new HashedBytes(row), waitForLock); }