public void ensureSyncComplete(int blockIndex) { final AtomicLong noSyncLength = noSyncLengthMap.get(blockIndex); if (noSyncLength != null && noSyncLength.get() > 0) { // sync to store if (syncRunning) { // wait for sync parkThreadList.add(Thread.currentThread()); while (true) { LockSupport.parkNanos(WAIT_SYNC_NANO); if (!syncRunning) break; } } // sync once, make sure data has been synced. syncRunning = true; syncRunnable.run(); } }
@Test public void testCircuitBreaker() { CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() .failureRateThreshold(25) .waitDurationInOpenState(Duration.ofMillis(1000)) .ringBufferSizeInHalfOpenState(1) .ringBufferSizeInClosedState(2) .build(); CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("test"); Observable.interval(500,TimeUnit.MILLISECONDS).map(i -> { if(1==1) throw new RuntimeException("BAM"); return "result"+i; }) .lift(CircuitBreakerOperator.of(circuitBreaker)).map(result -> result) .subscribe(System.out::println); LockSupport.park(); }
public void notifyClusterDDL(String schema, String table, String sql, DDLInfo.DDLStatus ddlStatus, boolean needNotifyOther) throws Exception { CuratorFramework zkConn = ZKUtils.getConnection(); DDLInfo ddlInfo = new DDLInfo(schema, sql, ZkConfig.getInstance().getValue(ZkParamCfg.ZK_CFG_MYID), ddlStatus); String nodeName = StringUtil.getFullName(schema, table); String nodePath = ZKPaths.makePath(KVPathUtil.getDDLPath(), nodeName); if (zkConn.checkExists().forPath(nodePath) == null) { zkConn.create().forPath(nodePath, ddlInfo.toString().getBytes(StandardCharsets.UTF_8)); } else { String instancePath = ZKPaths.makePath(nodePath, KVPathUtil.DDL_INSTANCE); String thisNode = ZkConfig.getInstance().getValue(ZkParamCfg.ZK_CFG_MYID); ZKUtils.createTempNode(instancePath, thisNode); if (needNotifyOther) { //this node is ddl sender zkConn.setData().forPath(nodePath, ddlInfo.toString().getBytes(StandardCharsets.UTF_8)); while (true) { List<String> preparedList = zkConn.getChildren().forPath(instancePath); List<String> onlineList = zkConn.getChildren().forPath(KVPathUtil.getOnlinePath()); if (preparedList.size() >= onlineList.size()) { zkConn.delete().deletingChildrenIfNeeded().forPath(nodePath); break; } LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); } } } }
public static void main(String[] args) throws InterruptedException{ new Thread(){ @Override public void run() { long writeLong=lock.writeLock(); LockSupport.parkNanos(600000000000L); lock.unlockWrite(writeLong); } }.start(); Thread.sleep(100); for(int i=0;i<3;++i){ holdCpuThreads[i]=new Thread(new HoldCPUReadThread()); holdCpuThreads[i].start(); } Thread.sleep(10000); //线程中断后,会占用CPU for(int i=0;i<3;++i){ holdCpuThreads[i].interrupt(); } int aa=0; }
/** * Possibly blocks awaiting root lock. */ private final void contendedLock() { boolean waiting = false; for (int s;;) { if (((s = lockState) & ~WAITER) == 0) { if (U.compareAndSetInt(this, LOCKSTATE, s, WRITER)) { if (waiting) waiter = null; return; } } else if ((s & WAITER) == 0) { if (U.compareAndSetInt(this, LOCKSTATE, s, s | WAITER)) { waiting = true; waiter = Thread.currentThread(); } } else if (waiting) LockSupport.park(this); } }
public Reply handle(Command command) { switch (command.code()) { case Command.START: return handleStart((StartCommand) command); case Command.STOP: return handleStop((StopCommand) command); } LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(responseLag)); switch (command.code()) { case Command.ECHO: return handleEcho((EchoCommand) command); case Command.COUNT: return handleCount((CountCommand) command); case Command.REVERSE: return handleReverse((ReverseCommand) command); case Command.LOWER_CAST: return handleLowerCast((LowerCastCommand) command); case Command.UPPER_CAST: return handleUpperCast((UpperCastCommand) command); default: throw new IllegalStateException(); } }
/** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { //检查是否需要终止 barrier.checkAlert(); //如果在200~100,重试 if (counter > 100) { --counter; } //如果在100~0,调用Thread.yield()让出CPU else if (counter > 0) { --counter; Thread.yield(); } //<0的话,利用LockSupport.parkNanos(1L)来sleep最小时间 else { LockSupport.parkNanos(1L); } return counter; }
public void testRemoveWaiter_interruption() throws Exception { final AbstractFuture<String> future = new AbstractFuture<String>() {}; WaiterThread waiter1 = new WaiterThread(future); waiter1.start(); waiter1.awaitWaiting(); WaiterThread waiter2 = new WaiterThread(future); waiter2.start(); waiter2.awaitWaiting(); // The waiter queue should be waiter2->waiter1 // This should wake up waiter1 and cause the waiter1 node to be removed. waiter1.interrupt(); waiter1.join(); waiter2.awaitWaiting(); // should still be blocked LockSupport.unpark(waiter2); // spurious wakeup waiter2.awaitWaiting(); // should eventually re-park future.set(null); waiter2.join(); }
public static void testCpu(){ long startTime = System.currentTimeMillis(); System.out.println(System.currentTimeMillis()); int number = 0; while (true){ LockSupport.unpark(Thread.currentThread()); // LockSupport.park(); ++number; long currTime = System.currentTimeMillis(); if(number % 10000 == 0) { LockSupport.park(); System.out.println("运行次数" + number + "时间" + (currTime - startTime)); } } }
public void testParkAfterUnpark(final ParkMethod parkMethod) { final CountDownLatch pleaseUnpark = new CountDownLatch(1); final AtomicBoolean pleasePark = new AtomicBoolean(false); Thread t = newStartedThread(new CheckedRunnable() { public void realRun() { pleaseUnpark.countDown(); while (!pleasePark.get()) Thread.yield(); parkMethod.park(); }}); await(pleaseUnpark); LockSupport.unpark(t); pleasePark.set(true); awaitTermination(t); }
/** * Enqueue a batch of message headers * * @param p_headers * the message headers * @param p_messages * the number of used entries in array */ void newHeaders(final MessageHeader[] p_headers, final int p_messages) { // #ifdef STATISTICS SOP_PUSH.enter(); // #endif /* STATISTICS */ if (!m_defaultMessageHeaders.pushMessageHeaders(p_headers, p_messages)) { for (int i = 0; i < p_messages; i++) { while (!m_defaultMessageHeaders.pushMessageHeader(p_headers[i])) { // #ifdef STATISTICS SOP_WAIT.enter(); // #endif /* STATISTICS */ LockSupport.parkNanos(100); // #ifdef STATISTICS SOP_WAIT.leave(); // #endif /* STATISTICS */ } } } // #ifdef STATISTICS SOP_PUSH.leave(); // #endif /* STATISTICS */ }
@Override public void receivedBuffer(final short p_sourceNodeId, final long p_bufferHandle, final long p_addr, final int p_length) { // #if LOGGER >= TRACE LOGGER.trace("Received buffer (0x%X, %d) from 0x%X", p_addr, p_length, p_sourceNodeId); // #endif /* LOGGER >= TRACE */ IBConnection connection; try { connection = (IBConnection) getConnection(p_sourceNodeId); } catch (final NetworkException e) { // #if LOGGER >= ERROR LOGGER.error("Getting connection for recv buffer of node 0x%X failed", p_sourceNodeId, e); // #endif /* LOGGER >= ERROR */ return; } // Avoid congestion by not allowing more than m_numberOfBuffers buffers to be cached for reading while (!m_incomingBufferQueue.pushBuffer(connection, null, p_bufferHandle, p_addr, p_length)) { // #if LOGGER == TRACE LOGGER.trace("Message creator: IncomingBuffer queue is full!"); // #endif /* LOGGER == TRACE */ //Thread.yield(); LockSupport.parkNanos(100); } }
private static boolean waitAllSession(ManagerConnection c, long timeout, long beginTime) { logger.info("waiting all sessions of distributed transaction which are not finished."); List<NonBlockingSession> fcList = getNeedWaitSession(); while (!fcList.isEmpty()) { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10)); Iterator<NonBlockingSession> sListIterator = fcList.iterator(); while (sListIterator.hasNext()) { NonBlockingSession session = sListIterator.next(); if (!session.isNeedWaitFinished()) { sListIterator.remove(); } } if (c.isClosed()) { errMsg = "client closed while waiting for unfinished distributed transactions."; logger.info(errMsg); return false; } if (TimeUtil.currentTimeMillis() > beginTime + timeout) { errMsg = "timeout while waiting for unfinished distributed transactions."; logger.info(errMsg); return false; } } logger.info("all sessions of distributed transaction are paused."); return true; }
/** * Possibly blocks awaiting root lock. */ private final void contendedLock() { boolean waiting = false; for (int s;;) { if (((s = lockState) & ~WAITER) == 0) { if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) { if (waiting) waiter = null; return; } } else if ((s & WAITER) == 0) { if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) { waiting = true; waiter = Thread.currentThread(); } } else if (waiting) LockSupport.park(this); } }
/** * Tries to decrement readerOverflow. * * @param s a reader overflow stamp: (s & ABITS) >= RFULL * @return new stamp on success, else zero */ private long tryDecReaderOverflow(long s) { // assert (s & ABITS) >= RFULL; if ((s & ABITS) == RFULL) { if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) { int r; long next; if ((r = readerOverflow) > 0) { readerOverflow = r - 1; next = s; } else next = s - RUNIT; state = next; return next; } } else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0) Thread.yield(); return 0L; }
/** * Blocks until closed, space available or timeout. * For ManagedBlocker. */ public final boolean block() { long nanos = timeout; boolean timed = (nanos < Long.MAX_VALUE); long deadline = timed ? System.nanoTime() + nanos : 0L; while (!isReleasable()) { if (Thread.interrupted()) { timeout = INTERRUPTED; if (timed) break; } else if (timed && (nanos = deadline - System.nanoTime()) <= 0L) break; else if (waiter == null) waiter = Thread.currentThread(); else if (waiting == 0) waiting = 1; else if (timed) LockSupport.parkNanos(this, nanos); else LockSupport.park(this); } waiter = null; waiting = 0; return true; }
public void listen() { while (true) { String id = peekId(); if (id == null) { continue; } String json = jedisCluster.hget(messageStoreKey, id); try { Message message = om.readValue(json, Message.class); if (message == null) { continue; } long delay = message.getCreateTime() + message.getTimeout() - System.currentTimeMillis(); System.out.println(delay); if (delay <= 0) { delayQueueProcessListener.peekCallback(message); } else { LockSupport.parkNanos(this, TimeUnit.NANOSECONDS.convert(delay, TimeUnit.MILLISECONDS)); delayQueueProcessListener.peekCallback(message); } } catch (IOException e) { e.printStackTrace(); } } }
/** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
@Override public void run() { synchronized (u) { System.out.printf("%s in %s\n", new Date(), getName()); LockSupport.park(u); if (isInterrupted()) { System.out.println(getName() + " is interrupted."); } } // 最多阻塞 5 s // 如果中断位为 true,则下面的语句无效,可以对比上面如果是使用 Thread.interrupted() 方法判断有什么不同 LockSupport.parkNanos(this, TimeUnit.SECONDS.toNanos(5)); System.out.printf("%s %s ends\n", new Date(), getName()); }
TransactionId suspend(TimeUnit expiryTimeUnit) { TXStateProxy result = getTXState(); if (result != null) { TransactionId txId = result.getTransactionId(); internalSuspend(); this.suspendedTXs.put(txId, result); // wake up waiting threads Queue<Thread> waitingThreads = this.waitMap.get(txId); if (waitingThreads != null) { Thread waitingThread = null; while (true) { waitingThread = waitingThreads.poll(); if (waitingThread == null || !Thread.currentThread().equals(waitingThread)) { break; } } if (waitingThread != null) { LockSupport.unpark(waitingThread); } } scheduleExpiry(txId, expiryTimeUnit); return txId; } return null; }
void awaitWaiting() { while (LockSupport.getBlocker(this) != future) { if (getState() == State.TERMINATED) { throw new RuntimeException("Thread exited"); } Thread.yield(); } }
public boolean awaitUntil(long until) throws InterruptedException { long now; while (until > (now = System.nanoTime()) && !isSignalled()) { checkInterrupted(); long delta = until - now; LockSupport.parkNanos(delta); } return checkAndClear(); }
private Thread signal() { if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, SIGNALLED)) { Thread thread = this.thread; LockSupport.unpark(thread); this.thread = null; return thread; } return null; }
boolean dispatchDelayedMessage(int what) { if (what == WHAT_CLEAN_PARK) { if (parkThread != null) { LockSupport.unpark(parkThread); parkThread = null; } return true; } return false; }
public boolean block() { if (isReleasable()) return true; else if (!timed) LockSupport.park(this); else if (nanos > 0L) LockSupport.parkNanos(this, nanos); return isReleasable(); }
/** * Tries to match node s to this node, if so, waking up thread. * Fulfillers call tryMatch to identify their waiters. * Waiters block until they have been matched. * * @param s the node to match * @return true if successfully matched to s */ boolean tryMatch(SNode s) { if (match == null && SMATCH.compareAndSet(this, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } return true; } return match == s; }
/** * Returns matching node or null if none. Tries to search * using tree comparisons from root, but continues linear * search when lock not available. */ final Node<K,V> find(int h, Object k) { if (k != null) { for (Node<K,V> e = first; e != null; ) { int s; K ek; if (((s = lockState) & (WAITER|WRITER)) != 0) { if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; e = e.next; } else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) { TreeNode<K,V> r, p; try { p = ((r = root) == null ? null : r.findTreeNode(h, k, null)); } finally { Thread w; if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER) && (w = waiter) != null) LockSupport.unpark(w); } return p; } } } return null; }
public static void main(String[] args) throws InterruptedException{ t1.start(); Thread.sleep(100); t2.start(); LockSupport.unpark(t1); LockSupport.unpark(t2); t1.join(); t2.join(); }
/** * Variant of releaseWaiters that additionally tries to remove any * nodes no longer waiting for advance due to timeout or * interrupt. Currently, nodes are removed only if they are at * head of queue, which suffices to reduce memory footprint in * most usages. * * @return current phase on exit */ private int abortWait(int phase) { AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; for (;;) { Thread t; QNode q = head.get(); int p = (int) (root.state >>> PHASE_SHIFT); if (q == null || ((t = q.thread) != null && q.phase == p)) return p; if (head.compareAndSet(q, q.next) && t != null) { q.thread = null; LockSupport.unpark(t); } } }
@Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long nextValue = this.nextValue; //next方法和之前的hasAvailableCapacity同理,只不过这里是相当于阻塞的 long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; //只要wrapPoint大于最小的gatingSequences,那么不断唤醒消费者去消费,并利用LockSupport让出CPU,直到wrapPoint不大于最小的gatingSequences while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { waitStrategy.signalAllWhenBlocking(); LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } //同理,缓存最小的gatingSequences this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; }
void unpark() { // This is racy with removeWaiter. The consequence of the race is that we may spuriously call // unpark even though the thread has already removed itself from the list. But even if we did // use a CAS, that race would still exist (it would just be ever so slightly smaller). Thread w = thread; if (w != null) { thread = null; LockSupport.unpark(w); } }
/** * {@inheritDoc} * * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the * current thread is interrupted before or during the call, even if the value is already * available. * * @throws InterruptedException if the current thread was interrupted before or during the call * (optional but recommended). * @throws CancellationException {@inheritDoc} */ @CanIgnoreReturnValue @Override public V get() throws InterruptedException, ExecutionException { if (Thread.interrupted()) { throw new InterruptedException(); } Object localValue = value; if (localValue != null & !(localValue instanceof SetFuture)) { return getDoneValue(localValue); } Waiter oldHead = waiters; if (oldHead != Waiter.TOMBSTONE) { Waiter node = new Waiter(); do { node.setNext(oldHead); if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { // we are on the stack, now wait for completion. while (true) { LockSupport.park(this); // Check interruption first, if we woke up due to interruption we need to honor that. if (Thread.interrupted()) { removeWaiter(node); throw new InterruptedException(); } // Otherwise re-read and check doneness. If we loop then it must have been a spurious // wakeup localValue = value; if (localValue != null & !(localValue instanceof SetFuture)) { return getDoneValue(localValue); } } } oldHead = waiters; // re-read and loop. } while (oldHead != Waiter.TOMBSTONE); } // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a // waiter. return getDoneValue(value); }
/** * Tries to increment readerOverflow by first setting state * access bits value to RBITS, indicating hold of spinlock, * then updating, then releasing. * * @param s a reader overflow stamp: (s & ABITS) >= RFULL * @return new stamp on success, else zero */ private long tryIncReaderOverflow(long s) { // assert (s & ABITS) >= RFULL; if ((s & ABITS) == RFULL) { if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) { ++readerOverflow; state = s; return s; } } else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0) Thread.yield(); return 0L; }
/** Tries to CAS-match this node; if successful, wakes waiter. */ final boolean tryMatch(Object cmp, Object val) { if (casItem(cmp, val)) { LockSupport.unpark(waiter); return true; } return false; }
@Override public void run() { IncomingBufferQueue.IncomingBuffer incomingBuffer; int counter = 0; long lastSuccessfulPop = 0; while (!m_shutdown) { // pop an incomingBuffer incomingBuffer = m_bufferQueue.popBuffer(); if (incomingBuffer == null) { // Ring-buffer is empty. if (++counter >= THRESHOLD_TIME_CHECK) { if (System.currentTimeMillis() - lastSuccessfulPop > 1000) { // No message header for over a second -> sleep LockSupport.parkNanos(100); } } if (m_overprovisioning) { Thread.yield(); } continue; } lastSuccessfulPop = System.currentTimeMillis(); counter = 0; try { incomingBuffer.getPipeIn().processBuffer(incomingBuffer); } catch (final NetworkException e) { incomingBuffer.getPipeIn().returnProcessedBuffer(incomingBuffer.getDirectBuffer(), incomingBuffer.getBufferHandle()); // #if LOGGER == ERROR LOGGER.error("Processing incoming buffer failed", e); // #endif /* LOGGER == ERROR */ } } }