Java 类java.util.concurrent.RunnableFuture 实例源码

项目:pgAutomator-agent    文件:ThreadFactory.java   
/**
 * Returns a {@code RunnableFuture} for the given runnable and default
 * value.
 *
 * @param runnable the runnable task being wrapped
 * @param value    the default value for the returned future
 * @return a {@code RunnableFuture} which, when run, will run the
 * underlying runnable and which, as a {@code Future}, will yield
 * the given value as its result and provide for cancellation of
 * the underlying task
 * @since 1.6
 */
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
{
    if (runnable instanceof CancellableRunnable)
    {
        return new FutureTask<T>(runnable, value)
        {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning)
            {
                boolean return_value = super.cancel(mayInterruptIfRunning);
                CancellableRunnable.class.cast(runnable).cancelTask();
                return return_value;
            }
        };
    }
    else
    {
        return super.newTaskFor(runnable, value);
    }
}
项目:pgAutomator-agent    文件:ThreadFactory.java   
/**
 * Returns a {@code RunnableFuture} for the given callable task.
 *
 * @param callable the callable task being wrapped
 * @return a {@code RunnableFuture} which, when run, will call the
 * underlying callable and which, as a {@code Future}, will yield
 * the callable's result as its result and provide for
 * cancellation of the underlying task
 * @since 1.6
 */
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
{
    if (callable instanceof CancellableCallable)
    {
        return new FutureTask<T>(callable)
        {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning)
            {
                CancellableCallable.class.cast(callable).cancelTask();
                return super.cancel(mayInterruptIfRunning);
            }
        };
    }
    else
    {
        return super.newTaskFor(callable);
    }
}
项目:incubator-netbeans    文件:JUnitProjectOpenedHook.java   
@Override
public java.util.concurrent.Future<ProjectProblemsProvider.Result> resolve() {
    ProjectProblemsProvider.Result res;
    if (action != null) {
        action.actionPerformed(null);
        String text = (String) action.getValue(ACT_START_MESSAGE);
        if (text != null) {
            res = ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.RESOLVED, text);
        } else {
            res = ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.RESOLVED);
        }
    } else {
        res = ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.UNRESOLVED, "No resolution for the problem");
    }
    RunnableFuture<ProjectProblemsProvider.Result> f = new FutureTask(new Runnable() {
        @Override
        public void run() {
        }
    }, res);
    f.run();
    return f;
}
项目:fort_j    文件:Executor.java   
public synchronized void submit(final RunnableFuture task, long delay)
{
   getTimer().schedule(new TimerTask()
      {
         @Override
         public void run()
         {
            Thread t = new Thread(new Runnable()
               {
                  @Override
                  public void run()
                  {
                     submit(task);
                  }
               });
            t.setDaemon(daemon);
            t.start();
         }
      }, delay);
}
项目:fort_j    文件:Executor.java   
void put(RunnableFuture task)
{
   synchronized (queue)
   {
      while (queue.size() >= queueMax)
      {
         try
         {
            queue.wait();
         }
         catch (Exception ex)
         {

         }
      }
      queue.add(task);
      queue.notifyAll();
   }
}
项目:fort_j    文件:Executor.java   
RunnableFuture take()
{
   RunnableFuture t = null;
   synchronized (queue)
   {
      while (queue.size() == 0)
      {
         try
         {
            queue.wait();
         }
         catch (InterruptedException ex)
         {

         }
      }

      t = queue.removeFirst();
      queue.notifyAll();
   }
   return t;
}
项目:RetrofitAppArchitecture    文件:RequestExecutor.java   
/**
 * 客户端使用本方法提交一个绑定到指定tag的任务,不同情况下可以根据tag取消任务
 *
 * @param requestId
 * @param task
 * @return
 */
public synchronized void submitRequest(String requestId, RequestManager.RequestRunnable task) {
    //执行同步操作
    RunnableFuture<String> future = newTaskFor(task, requestId);
    if (!TextUtils.isEmpty(requestId)) {
        List<Future<String>> futures = mFutures.get(requestId);
        if (futures == null) {
            futures = new ArrayList<>();
            mFutures.put(requestId, futures);
        }
        futures.add(future);
        mRequests.put(future, task.getRequest());
    }

    //执行异步任务
    execute(future);


}
项目:MiniDownloader    文件:MiniDownloader.java   
/**
 * Initial MiniDownloader.
 *
 * @param context
 */
public void init(Context context) {
    this.appContext = context.getApplicationContext();
    /** Create work executor. */
    this.workExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>()) {
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            if (callable instanceof CustomFutureCallable) {
                return ((CustomFutureCallable) callable).newTaskFor();
            }
            return super.newTaskFor(callable);
        }
    };
    /** Create command executor. */
    this.commandExecutor = Executors.newSingleThreadExecutor();
    /** Create and initial task manager. */
    taskManager = new TaskManager();
    taskManager.init(context);
    /** Create and start ProgressUpdater. */
    progressUpdater = new ProgressUpdater();
    progressUpdater.start();
}
项目:PackageTemplates    文件:FileWriter.java   
public static PsiDirectory writeDirectory(PsiDirectory dir, DirectoryWrapper dirWrapper, Project project) {
    if (dir == null) {
        //todo print error
        return null;
    }

    RunnableFuture<PsiDirectory> runnableFuture = new FutureTask<>(() ->
            ApplicationManager.getApplication().runWriteAction(new Computable<PsiDirectory>() {
                @Override
                public PsiDirectory compute() {
                    return writeDirectoryAction(dir, dirWrapper, project);
                }
            }));

    ApplicationManager.getApplication().invokeLater(runnableFuture);

    try {
        return runnableFuture.get();
    } catch (InterruptedException | ExecutionException e) {
        Logger.log("runnableFuture  " + e.getMessage());
        Logger.printStack(e);
    }

    return null;
}
项目:RealArchitecture    文件:RequestExecutor.java   
/**
 * 客户端使用本方法提交一个绑定到指定tag的任务,不同情况下可以根据tag取消任务
 *
 * @param tag
 * @param task
 * @return
 */
public synchronized void submit(String tag, Runnable task) {
    //执行同步操作
    RunnableFuture<String> future = newTaskFor(task, tag);
    if (!TextUtils.isEmpty(tag)) {
        List<Future<String>> list = mFutures.get(tag);
        if (list == null) {
            list = new ArrayList<>();
            mFutures.put(tag, list);
        }
        list.add(future);
    }

    //执行异步任务
    execute(future);


}
项目:flink    文件:RocksDBStateBackendTest.java   
@Test
public void testDismissingSnapshotNotRunnable() throws Exception {
    setupRocksKeyedStateBackend();
    try {
        RunnableFuture<KeyedStateHandle> snapshot =
            keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
        snapshot.cancel(true);
        Thread asyncSnapshotThread = new Thread(snapshot);
        asyncSnapshotThread.start();
        try {
            snapshot.get();
            fail();
        } catch (Exception ignored) {

        }
        asyncSnapshotThread.join();
        verifyRocksObjectsReleased();
    } finally {
        this.keyedStateBackend.dispose();
        this.keyedStateBackend = null;
    }
}
项目:flink    文件:RocksDBStateBackendTest.java   
@Test
public void testCompletingSnapshot() throws Exception {
    setupRocksKeyedStateBackend();
    try {
        RunnableFuture<KeyedStateHandle> snapshot =
            keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
        Thread asyncSnapshotThread = new Thread(snapshot);
        asyncSnapshotThread.start();
        waiter.await(); // wait for snapshot to run
        waiter.reset();
        runStateUpdates();
        blocker.trigger(); // allow checkpointing to start writing
        waiter.await(); // wait for snapshot stream writing to run
        KeyedStateHandle keyedStateHandle = snapshot.get();
        assertNotNull(keyedStateHandle);
        assertTrue(keyedStateHandle.getStateSize() > 0);
        assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
        assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
        asyncSnapshotThread.join();
        verifyRocksObjectsReleased();
    } finally {
        this.keyedStateBackend.dispose();
        this.keyedStateBackend = null;
    }
}
项目:flink    文件:StateUtil.java   
/**
 * Discards the given state future by first trying to cancel it. If this is not possible, then
 * the state object contained in the future is calculated and afterwards discarded.
 *
 * @param stateFuture to be discarded
 * @throws Exception if the discard operation failed
 */
public static void discardStateFuture(RunnableFuture<? extends StateObject> stateFuture) throws Exception {
    if (null != stateFuture) {
        if (!stateFuture.cancel(true)) {

            try {
                // We attempt to get a result, in case the future completed before cancellation.
                StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);

                if (null != stateObject) {
                    stateObject.discardState();
                }
            } catch (CancellationException | ExecutionException ex) {
                LOG.debug("Cancelled execution of snapshot future runnable. Cancellation produced the following " +
                    "exception, which is expected an can be ignored.", ex);
            }
        }
    }
}
项目:flink    文件:OperatorStateBackendTest.java   
@SuppressWarnings("unchecked")
@Test
public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {

    AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);

    final Environment env = createMockEnvironment();
    OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name");

    AtomicInteger copyCounter = new AtomicInteger(0);
    TypeSerializer<Integer> serializer = new VerifyingIntSerializer(env.getUserClassLoader(), copyCounter);

    // write some state
    ListStateDescriptor<Integer> stateDescriptor = new ListStateDescriptor<>("test", serializer);
    ListState<Integer> listState = operatorStateBackend.getListState(stateDescriptor);

    listState.add(42);

    CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
    RunnableFuture<OperatorStateHandle> runnableFuture =
        operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
    FutureUtil.runIfNotDoneAndGet(runnableFuture);

    // make sure that the copy method has been called
    assertTrue(copyCounter.get() > 0);
}
项目:flink    文件:OperatorStateBackendTest.java   
@Test
public void testSnapshotEmpty() throws Exception {
    final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);

    final OperatorStateBackend operatorStateBackend =
            abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator");

    CheckpointStreamFactory streamFactory =
            abstractStateBackend.createStreamFactory(new JobID(), "testOperator");

    RunnableFuture<OperatorStateHandle> snapshot =
            operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint());

    OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
    assertNull(stateHandle);
}
项目:flink    文件:TaskCheckpointingBehaviourTest.java   
@Override
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
    return new DefaultOperatorStateBackend(
        env.getUserClassLoader(),
        env.getExecutionConfig(),
        true) {
        @Override
        public RunnableFuture<OperatorStateHandle> snapshot(
            long checkpointId,
            long timestamp,
            CheckpointStreamFactory streamFactory,
            CheckpointOptions checkpointOptions) throws Exception {

            throw new Exception("Sync part snapshot exception.");
        }
    };
}
项目:flink    文件:TaskCheckpointingBehaviourTest.java   
@Override
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
    return new DefaultOperatorStateBackend(
        env.getUserClassLoader(),
        env.getExecutionConfig(),
        true) {
        @Override
        public RunnableFuture<OperatorStateHandle> snapshot(
            long checkpointId,
            long timestamp,
            CheckpointStreamFactory streamFactory,
            CheckpointOptions checkpointOptions) throws Exception {

            return new FutureTask<>(new Callable<OperatorStateHandle>() {
                @Override
                public OperatorStateHandle call() throws Exception {
                    throw new Exception("Async part snapshot exception.");
                }
            });
        }
    };
}
项目:sstable-adaptor    文件:DebuggableThreadPoolExecutor.java   
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T result)
{
    if (!(runnable instanceof LocalSessionWrapper))
    {
        return new LocalSessionWrapper<T>(Executors.callable(runnable, result));
    }
    return super.newTaskFor(runnable, result);
}
项目:sstable-adaptor    文件:DebuggableThreadPoolExecutor.java   
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
{
    if (!(callable instanceof LocalSessionWrapper))
    {
        return new LocalSessionWrapper<T>(callable);
    }
    return super.newTaskFor(callable);
}
项目:incubator-netbeans    文件:JFXProjectProblems.java   
@NbBundle.Messages({"LBL_ResolveFXJDK=Choose FX-enabled Java Platform - \"{0}\" Project"})
@Override
public Future<Result> resolve() {
    final ChooseOtherPlatformPanel choosePlatform = new ChooseOtherPlatformPanel(type);
    final DialogDescriptor dd = new DialogDescriptor(choosePlatform, Bundle.LBL_ResolveFXJDK(ProjectUtils.getInformation(project).getDisplayName()));
    if (DialogDisplayer.getDefault().notify(dd) == DialogDescriptor.OK_OPTION) {
        final Callable<ProjectProblemsProvider.Result> resultFnc =
                new Callable<Result>() {
            @Override
            public Result call() throws Exception {
                final JavaPlatform jp = choosePlatform.getSelectedPlatform();
                if(jp != null) {
                    try {
                        ProjectManager.mutex().writeAccess(new Mutex.ExceptionAction<Void>() {
                            @Override
                            public Void run() throws IOException {
                                platformSetter.setProjectPlatform(jp);
                                JFXProjectUtils.updateClassPathExtension(project);
                                return null;
                            }
                        });
                    } catch (MutexException e) {
                        throw (IOException) e.getCause();
                    }
                    LOGGER.info("Set " + PLATFORM_ACTIVE + " to platform " + jp);
                    return ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.RESOLVED);
                }
                return ProjectProblemsProvider.Result.create(ProjectProblemsProvider.Status.UNRESOLVED);
            }
        };
        final RunnableFuture<Result> result = new FutureTask<Result>(resultFnc);
        RP.post(result);
        return result;
    }
    return new JFXProjectProblems.Done(
            Result.create(ProjectProblemsProvider.Status.UNRESOLVED));
}
项目:incubator-netbeans    文件:Hinter.java   
Context(Document doc, LayerHandle layer, FileObject file, RunnableFuture<Map<String,Integer>> lines, List<? super ErrorDescription> errors) {
    this.doc = doc;
    this.layer = layer;
    this.file = file;
    this.lines = lines;
    this.errors = errors;
}
项目:elasticsearch_my    文件:PrioritizedEsThreadPoolExecutor.java   
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    if (!(runnable instanceof PrioritizedRunnable)) {
        runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL);
    }
    Priority priority = ((PrioritizedRunnable) runnable).priority();
    return new PrioritizedFutureTask<>(runnable, priority, value, insertionOrder.incrementAndGet());
}
项目:elasticsearch_my    文件:PrioritizedEsThreadPoolExecutor.java   
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    if (!(callable instanceof PrioritizedCallable)) {
        callable = PrioritizedCallable.wrap(callable, Priority.NORMAL);
    }
    return new PrioritizedFutureTask<>((PrioritizedCallable)callable, insertionOrder.incrementAndGet());
}
项目:fort_j    文件:Executor.java   
void processQueue()
{
   try
   {
      while (true && !shutdown && !checkEndThread())
      {
         do
         {
            RunnableFuture task = take();
            task.run();
         }
         while (queue.size() > 0);

         //            if (!shutdown)
         //            {
         //               Thread.sleep(delay);
         //            }
         //
         //            if (queue.size() == 0)
         //               break;
      }
   }
   catch (Exception ex)
   {
      ex.printStackTrace();
   }
}
项目:Elasticsearch    文件:PrioritizedEsThreadPoolExecutor.java   
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    if (!(runnable instanceof PrioritizedRunnable)) {
        runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL);
    }
    return new PrioritizedFutureTask<>((PrioritizedRunnable) runnable, value, insertionOrder.incrementAndGet());
}
项目:Elasticsearch    文件:PrioritizedEsThreadPoolExecutor.java   
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    if (!(callable instanceof PrioritizedCallable)) {
        callable = PrioritizedCallable.wrap(callable, Priority.NORMAL);
    }
    return new PrioritizedFutureTask<>((PrioritizedCallable<T>) callable, insertionOrder.incrementAndGet());
}
项目:agroal    文件:PriorityScheduledExecutor.java   
@Override
protected void beforeExecute(Thread thread, Runnable lowPriorityTask) {
    // Run all high priority tasks in queue first, then low priority
    RunnableFuture<?> priorityTask;
    while ( ( priorityTask = priorityTasks.poll() ) != null ) {
        priorityTask.run();
    }
    super.beforeExecute( thread, lowPriorityTask );
}
项目:skeletoid    文件:ThreadPoolExecutor.java   
@Override
protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
    if (callable instanceof Important)
        return new PriorityTask<>(((Important) callable).getPriority(), callable);
    else
        return new PriorityTask<>(0, callable);
}
项目:skeletoid    文件:ThreadPoolExecutor.java   
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
    if (runnable instanceof Important)
        return new PriorityTask<>(((Important) runnable).getPriority(), runnable, value);
    else
        return new PriorityTask<>(0, runnable, value);
}
项目:skeletoid    文件:ThreadPoolExecutor.java   
@Override
public void execute(Runnable task) {
    if (task == null) {
        LOG.e(LOG_TAG, "Executing null runnable... ignoring");
        return;
    }

    if (task instanceof PriorityTask) {
        super.execute(task);
    } else {
        final RunnableFuture<Object> futureTask = newTaskFor(task, null);
        super.execute(futureTask);
    }
}
项目:skeletoid    文件:ThreadPoolExecutor.java   
@Override
public Future<?> submit(final Runnable task) {
    if (task == null) {
        LOG.e(LOG_TAG, "Submitting null runnable... ignoring");
        return null;
    }
    final RunnableFuture<Object> futureTask = newTaskFor(task, null);
    execute(futureTask);
    return futureTask;
}
项目:com.zsmartsystems.zigbee    文件:ZigBeeNode.java   
/**
 * Request an update of the binding table for this node.
 * <p>
 * This method returns a future to a boolean. Upon success the caller should call {@link #getBindingTable()}
 *
 * @return {@link Future} returning a {@link Boolean}
 */
public Future<Boolean> updateBindingTable() {
    RunnableFuture<Boolean> future = new FutureTask<Boolean>(new Callable<Boolean>() {
        @Override
        public Boolean call() throws Exception {
            int index = 0;
            int tableSize = 0;
            List<BindingTable> bindingTable = new ArrayList<BindingTable>();

            do {
                ManagementBindRequest bindingRequest = new ManagementBindRequest();
                bindingRequest.setDestinationAddress(new ZigBeeEndpointAddress(networkAddress));
                bindingRequest.setStartIndex(index);

                CommandResult result = networkManager.unicast(bindingRequest, new ManagementBindRequest()).get();
                if (result.isError()) {
                    return false;
                }

                ManagementBindResponse response = (ManagementBindResponse) result.getResponse();
                if (response.getStartIndex() == index) {
                    tableSize = response.getBindingTableEntries();
                    index += response.getBindingTableList().size();
                    bindingTable.addAll(response.getBindingTableList());
                }
            } while (index < tableSize);

            setBindingTable(bindingTable);
            return true;
        }
    });

    // start the thread to execute it
    new Thread(future).start();
    return future;
}
项目:hollow    文件:SimultaneousExecutor.java   
@Override
public void execute(Runnable command) {
    if(command instanceof RunnableFuture) {
        super.execute(command);
    } else {
        super.execute(newTaskFor(command, Boolean.TRUE));
    }
}
项目:ThreadDebugger    文件:ThreadExecutor.java   
@TargetApi(Build.VERSION_CODES.GINGERBREAD)
@Override
public <T> Future<T> submit(String name, Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> fTask = null;
    if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.GINGERBREAD) {
        fTask = new FutureTask(task, result);
        execute(name, fTask);
    }
    return fTask;
}
项目:ThreadDebugger    文件:ThreadExecutor.java   
@TargetApi(Build.VERSION_CODES.GINGERBREAD)
@Override
public Future<?> submit(String name, Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> fTask = null;
    if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.GINGERBREAD) {
        fTask = new FutureTask(task, null);
        execute(name, fTask);
    }
    return fTask;
}
项目:ThreadDebugger    文件:ThreadExecutor.java   
@TargetApi(Build.VERSION_CODES.GINGERBREAD)
@Override
public <T> Future<T> submit(String name, Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> fTask = null;
    if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.GINGERBREAD) {
        fTask = new FutureTask(task);
        execute(name, fTask);
    }
    return fTask;
}
项目:flink    文件:RocksDBKeyedStateBackend.java   
/**
 * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
 * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
 * be called by the same thread.
 *
 * @param checkpointId  The Id of the checkpoint.
 * @param timestamp     The timestamp of the checkpoint.
 * @param streamFactory The factory that we can use for writing our state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
 * @throws Exception
 */
@Override
public RunnableFuture<KeyedStateHandle> snapshot(
    final long checkpointId,
    final long timestamp,
    final CheckpointStreamFactory streamFactory,
    CheckpointOptions checkpointOptions) throws Exception {

    if (checkpointOptions.getCheckpointType() != CheckpointOptions.CheckpointType.SAVEPOINT &&
        enableIncrementalCheckpointing) {
        return snapshotIncrementally(checkpointId, timestamp, streamFactory);
    } else {
        return snapshotFully(checkpointId, timestamp, streamFactory);
    }
}
项目:flink    文件:RocksDBStateBackendTest.java   
@Test
public void testDismissingSnapshot() throws Exception {
    setupRocksKeyedStateBackend();
    try {
        RunnableFuture<KeyedStateHandle> snapshot =
            keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
        snapshot.cancel(true);
        verifyRocksObjectsReleased();
    } finally {
        this.keyedStateBackend.dispose();
        this.keyedStateBackend = null;
    }
}
项目:flink    文件:RocksDBStateBackendTest.java   
@Test
public void testCancelRunningSnapshot() throws Exception {
    setupRocksKeyedStateBackend();
    try {
        RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
        Thread asyncSnapshotThread = new Thread(snapshot);
        asyncSnapshotThread.start();
        waiter.await(); // wait for snapshot to run
        waiter.reset();
        runStateUpdates();
        snapshot.cancel(true);
        blocker.trigger(); // allow checkpointing to start writing
        assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
        waiter.await(); // wait for snapshot stream writing to run
        try {
            snapshot.get();
            fail();
        } catch (Exception ignored) {
        }

        asyncSnapshotThread.join();
        verifyRocksObjectsReleased();
    } finally {
        this.keyedStateBackend.dispose();
        this.keyedStateBackend = null;
    }
}
项目:flink    文件:FutureUtil.java   
public static <T> T runIfNotDoneAndGet(RunnableFuture<T> future) throws ExecutionException, InterruptedException {

        if (null == future) {
            return null;
        }

        if (!future.isDone()) {
            future.run();
        }

        return future.get();
    }