Java 类com.amazonaws.services.simpleworkflow.flow.annotations.Asynchronous 实例源码

项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Announce (or not) a single horse's result.
 *
 * @param name
 *            name of the horse.
 *
 * @param result
 *            horse's result ("ok" or "injured")
 *
 * @param waitFor
 *            anonymous dependencies
 *
 * @return promise to respond.
 *
 */
@Asynchronous
private Promise<Void> announceHorseResult(final String name,
        final Promise<Status> result, final Promise<?>... waitFor) {

    final Promise<Void> rval;
    switch (result.get()) {
    case OK:
        if (this.nextPlace <= 3) {
            rval = announcePlace(name, this.nextPlace);
            this.nextPlace = this.nextPlace + 1;
        } else {
            rval = announceFinished(name);
        }
        break;
    case INJURY:
        rval = announceInjury(name);
        break;
    default:
        rval = announceMissing(name);
        break;
    }
    return rval;
}
项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Run all the horses in parallel.
 *
 * @param laps
 *            number of laps to run.
 *
 * @param horses
 *            horses to run.
 *
 * @param waitFor
 *            anonymous dependencies.
 *
 * @return a list of promises to run, one for each horse.
 */
@Asynchronous
private Promise<List<Promise<Void>>> runAll(final int laps,
        final Promise<List<String>> horses, final Promise<?>... waitFor) {

    final List<Promise<Void>> race = new ArrayList<>(horses.get().size());
    for (final String name : horses.get()) {

        Promise<Status> horseRun = Promise.asPromise(Status.OK);
        for (int lapNum = 1; lapNum <= laps; lapNum = lapNum + 1) {
            horseRun = runLapIfOk(name, lapNum, horseRun);
            horseRun = announceLapIfOk(name, lapNum, horseRun);
        }
        final Promise<Void> done = announceHorseResult(name, horseRun);

        race.add(done);

    }

    return Promise.asPromise(race);

}
项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Run one lap if not injured.
 *
 * @param name
 *            name of horse to run.
 *
 * @param lapNum
 *            the lap number to run.
 *
 * @param prevStatus
 *            previous lap run status.
 *
 * @param waitFor
 *            anonymous dependencies.
 *
 * @return the result of running one lap or the previous lap result.
 */
@Asynchronous
private Promise<Status> runLapIfOk(final String name, final int lapNum,
        final Promise<Status> prevStatus, final Promise<?>... waitFor) {

    if (prevStatus.get() == Status.OK) {
        /*
         * horse is ok, run it.
         */
        return runLap(name, lapNum);
    } else {
        /*
         * something wrong, do not run.
         */
        return prevStatus;
    }

}
项目:aws-flow-maven-eclipse-samples    文件:MultiChoiceWorkflowImpl.java   
@Asynchronous
public Promise<List<Void>> processBasketOrder(Promise<List<OrderChoice>> basketChoice) {

    List<OrderChoice> choices = basketChoice.get();
    List<Promise<Void>> results = new ArrayList<Promise<Void>>();

    /**
     * All activities that are started in this loop are executed in parallel
     * as their invocation from the switch case is non blocking.
     */
    for (OrderChoice choice : choices) {
        Promise<Void> result = processSingleChoice(choice);
        results.add(result);
    }

    /**
     * listOfPromisesToPromise is an utility method that accepts a list of
     * promises and returns a single promise that becomes ready when all
     * promises of an input list are ready.
     */
    return Promises.listOfPromisesToPromise(results);
}
项目:aws-flow-maven-eclipse-samples    文件:ExclusiveChoiceWorkflowImpl.java   
/**
 * chooses an activity to execute
 */
@Asynchronous
public Promise<Void> processItemOrder(Promise<OrderChoice> itemChoice) {
    OrderChoice choice = itemChoice.get();
    Promise<Void> result = null;
    switch (choice) {
    case APPLE:
        result = client.orderApple();
        break;
    case ORANGE:
        result = client.orderOrange();
        break;
    case LETTUCE:
        result = client.orderLettuce();
        break;
    case CABBAGE:
        result = client.orderCabbage();
        break;
    }
    return result;
}
项目:aws-flow-maven-eclipse-samples    文件:PickFirstBranchWorkflowImpl.java   
@SuppressWarnings("unchecked")
@Asynchronous
Promise<List<String>> processResults(OrPromise result) {
    Promise<List<String>> output = null;
    Promise<List<String>> branch1Result = (Promise<List<String>>) result.getValues()[0];
    Promise<List<String>> branch2Result = (Promise<List<String>>) result.getValues()[1];
    //branch1 has result
    if (branch1Result.isReady()) {
        output = branch1Result;
        //cancel branch2 if it is not complete yet
        if (!branch2Result.isReady()) {
            branch2.cancel(null);
        }
    }
    //branch2 has result, cancel branch 1
    else {
        output = branch2Result;
        branch1.cancel(null);
    }
    return output;
}
项目:aws-flow-maven-eclipse-samples    文件:HandleErrorWorkflowImpl.java   
@Asynchronous
public void handleException(Promise<Throwable> ex, Promise<Integer> resourceId) throws Throwable {
    Throwable e = ex.get();
    if (e != null) {
        if (e instanceof ActivityTaskFailedException) {
            //get the original exception thrown from the activity
            Throwable inner = e.getCause();
            //schedule different activities to handle different types of exception
            if (inner instanceof ResourceNoResponseException) {
                client.reportBadResource(resourceId.get());
            }
            else if (inner instanceof ResourceNotAvailableException) {
                client.refreshResourceCatalog(resourceId.get());
            }
            else {
                throw e;
            }
        }
        else {
            throw e;
        }
    }
}
项目:aws-flow-maven-eclipse-samples    文件:PeriodicWorkflowImpl.java   
@Asynchronous
public void callPeriodicActivity(long startTime, Promise<?>... waitFor) {
    long currentTime = clock.currentTimeMillis();
    if ((currentTime - startTime) < options.getContinueAsNewAfterSeconds() * SECOND) {

        // Call activity using dynamic client. Return type is specified as Void as it is not used, but activity that 
        // returns some other type can be called this way.
        Promise<Void> activityCompletion = activities.scheduleActivity(activityType, activityArguments, null, Void.class);

        if (!options.isWaitForActivityCompletion()) {
            // Promise.Void() returns already ready promise of type Void
            activityCompletion = Promise.Void();
        }
        // Create a timer to re-run your periodic activity after activity completion, 
        // but not earlier then after delay of executionPeriodSeconds.
        // However in real cron workflows, delay should be calculated everytime to run activity at 
        // a predefined time.
        Promise<Void> timer = clock.createTimer(options.getExecutionPeriodSeconds());

        callPeriodicActivity(startTime, timer, activityCompletion);
    }
}
项目:aws-flow-maven-eclipse-samples    文件:MultiChoiceWorkflowTest.java   
@Asynchronous
public void assertTrace(Promise<Void> done) {
    List<String> obtainedTrace = activitiesImplementation.getTrace();

    // Check if the first and last elements of the trace are in order
    Assert.assertEquals("multiOrdering", obtainedTrace.remove(0));
    Assert.assertEquals("done", obtainedTrace.remove(obtainedTrace.size() - 1));

    // Create the expected Trace with all the orders
    List<String> expectedTrace = new ArrayList<String>();
    expectedTrace.add("orderApple");
    expectedTrace.add("orderOrange");

    // Compare the traces out of order allowing repeated orders
    // if present in expected trace.
    for (String traceElement : expectedTrace) {
        Assert.assertTrue(obtainedTrace.contains(traceElement));
        obtainedTrace.remove(traceElement);
    }
    Assert.assertEquals(0, obtainedTrace.size());
}
项目:datamung    文件:JobWorkflowImpl.java   
@Asynchronous
private Promise<Void> reportResult( Promise<String> actionId,
                                    Promise<JobResult> result )
{
    if ( actionId.get() == null )
    {
        return Promise.Void();
    }
    JobResult r = result.get();
    String summary =
        String.format( "stdout=%s, stderr=%s, on=%s",
                       r.getStandardOutput(), r.getErrorOutput(),
                       r.getRunsOn() );
    if ( r.getExitCode() == 0 )
    {
        return controlActivities.notifyActionCompleted( actionId.get(),
                                                        summary,
                                                        r.getElapsedMillis() );
    }
    return controlActivities.notifyActionFailed( actionId.get(),
                                                 summary + ", code="
                                                     + r.getExitCode(),
                                                 r.getStackTrace(),
                                                 r.getElapsedMillis() );
}
项目:swf-flow-gradle    文件:GreeterWorkflowImpl.java   
@Asynchronous
private Promise<String> getGreeting(Promise<String> name) {
   if(!name.isReady()) {
      System.out.println("Your weaving isn't working!");
   }
   String returnString = "Hello " + name.get() + "!";
   return Promise.asPromise(returnString);
}
项目:swf-starter    文件:ZipS3FileProcessingWorkflow.java   
/**
 * Processes the file downloaded to the local host.
 *
 * @param localInputFileName name of the file to process
 * @param localOutputFileName name of the file to upload to S3
 * @param hostName host processing the file
 */
@Asynchronous
private Promise<Void> processFileOnHost(String localInputFileName, String localOutputFileName, Promise<String> hostName) {
  state = "Downloaded to " + hostName.get();

  // Process the file using the local hosts SWF task list
  ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostName.get());
  return fileClient.processFile(localInputFileName, localOutputFileName, options);
}
项目:swf-starter    文件:ZipS3FileProcessingWorkflow.java   
/**
 * Upload the file to S3.
 *
 * @param outputBucketName S3 bucket to upload to
 * @param outputFilename S3 file to upload to
 * @param localFileName local file to upload
 * @param hostName host processing the file
 */
@Asynchronous
private void upload(String outputBucketName, String outputFilename, String localFileName, Promise<String> hostName) {
  state = "Processed at " + hostName.get();

  // Upload the file using the local hosts SWF task list
  ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostName.get());
  storageClient.upload(outputBucketName, localFileName, outputFilename, options);
}
项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Local wrapper for activities client invocation. This lets us specify
 * retry policy that handles failures from workers and from SWF itself. All
 * parameters passed directly to client and return whatever the client
 * returns.
 */
@ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5)
@Asynchronous
private Promise<Void> announceFinished(final String name,
        final Promise<?>... waitFor) {
    return this.announcer.announceFinished(name);
}
项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Local wrapper for activities client invocation. This lets us specify
 * retry policy that handles failures from workers and from SWF itself. All
 * parameters passed directly to client and return whatever the client
 * returns.
 */
@ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5)
@Asynchronous
private Promise<Void> announceInjury(final String name,
        final Promise<?>... waitFor) {
    return this.announcer.announceInjury(name);
}
项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Local wrapper for activities client invocation. This lets us specify
 * retry policy that handles failures from workers and from SWF itself. All
 * parameters passed directly to client and return whatever the client
 * returns.
 */
@ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5)
@Asynchronous
private Promise<Void> announceLap(final String name, final int lap,
        final Promise<?>... waitFor) {
    return this.announcer.announceLap(name, lap);
}
项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Local wrapper for activities client invocation. This lets us specify
 * retry policy that handles failures from workers and from SWF itself. All
 * parameters passed directly to client and return whatever the client
 * returns.
 */
@ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5)
@Asynchronous
private Promise<Void> announceMissing(final String name,
        final Promise<?>... waitFor) {
    return this.announcer.announceMissing(name);
}
项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Local wrapper for activities client invocation. This lets us specify
 * retry policy that handles failures from workers and from SWF itself. All
 * parameters passed directly to client and return whatever the client
 * returns.
 */
@ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5)
@Asynchronous
private Promise<Void> announcePlace(final String name, final Integer place,
        final Promise<?>... waitFor) {
    return this.announcer.announcePlace(name, place);
}
项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Local wrapper for activities client invocation. This lets us specify
 * retry policy that handles failures from workers and from SWF itself. All
 * parameters passed directly to client and return whatever the client
 * returns.
 */
@ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5)
@Asynchronous
private Promise<Void> announceRace(final Promise<List<String>> horses,
        final Promise<Integer> laps, final Promise<?>... waitFor) {

    return this.announcer.announceRace(horses, laps);

}
项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Local wrapper for activities client invocation. This lets us specify
 * retry policy that handles failures from workers and from SWF itself. All
 * parameters passed directly to client and return whatever the client
 * returns.
 */
@ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5)
@Asynchronous
private Promise<Void> arriveGate(final String name,
        final Promise<?>... waitFor) {
    return this.horses.arriveGate(name);
}
项目:swf-horserace    文件:RaceFlowImpl.java   
/**
 * Local wrapper for activities client invocation. This lets us specify
 * retry policy that handles failures from workers and from SWF itself. All
 * parameters passed directly to client and return whatever the client
 * returns.
 */
@ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5)
@Asynchronous
private Promise<Status> runLap(final String name, final int lapNum,
        final Promise<?>... waitFor) {
    return this.horses.runLap(name, lapNum);
}
项目:aws-flow-maven-eclipse-samples    文件:WaitForSignalWorkflowImpl.java   
@Asynchronous
private void processOrder(int originalAmount, Promise<?> waitFor) {
    int amount = originalAmount;
    if (signalReceived.isReady())
        amount = signalReceived.get();
    client.processOrder(amount);
}
项目:aws-flow-maven-eclipse-samples    文件:RunMultipleActivitiesConcurrentlyWorkflowImpl.java   
@Asynchronous
public void processResults(Promise<Integer> result1, Promise<Integer> result2) {
    if (result1.get() + result2.get() > 5) {
        client.generateRandomNumber();
    }

}
项目:aws-flow-maven-eclipse-samples    文件:CustomLogicRetryWorkflowImpl.java   
/**
 * If some exception is thrown in doTry(), the doCatch method will be
 * executed. Otherwise, the doCatch method will be ignored
 */
@Asynchronous
public void callActivityWithRetry() {

    final Settable<Throwable> failure = new Settable<Throwable>();
    new TryCatchFinally() {

        protected void doTry() throws Throwable {
            client.unreliableActivity();
        }

        /**
         * The code in doCatch is not cancellable. If we call method to
         * retry from doCatch, then in case of workflow cancellation there
         * will be no attempt to cancel the retried method. To ensure that
         * cancellation is always happening, the recursive retry is moved
         * out outside of TryCatch.
         */
        protected void doCatch(Throwable e) {
            failure.set(e);
        }

        protected void doFinally() throws Throwable {
            if (!failure.isReady()) {
                failure.set(null);
            }
        }

    };

    retryOnFailure(failure);
}
项目:aws-flow-maven-eclipse-samples    文件:CustomLogicRetryWorkflowImpl.java   
@Asynchronous
private void retryOnFailure(Promise<Throwable> failureP) {
    Throwable failure = failureP.get();
    if (failure != null && shouldRetry(failure)) {
        callActivityWithRetry();
    }
}
项目:aws-flow-maven-eclipse-samples    文件:ConditionalLoopWorkflowImpl.java   
@Asynchronous
public void processRecords(int records, Promise<?>... waitFor) {
    if (records >= 1) {
        Promise<Void> nextWaitFor = client.processRecord();
        processRecords(records - 1, nextWaitFor);
    }
}
项目:aws-flow-maven-eclipse-samples    文件:JoinBranchesWorkflowImpl.java   
/**
 * This method will not be executed until all promises in the 'results'
 * argument are in the ready state. An alternative to using the {@link Wait}
 * annotation, is to convert the list of {@link Promise}s to an
 * {@link AndPromise} using {@link Promises.listOfPromisesToPromise}.
 */
@Asynchronous
public Promise<Integer> joinBranches(@Wait List<Promise<Integer>> results) {
    int sum = 0;
    for (Promise<Integer> result : results) {
        sum += result.get();
    }
    return Promise.asPromise(sum);
}
项目:aws-flow-maven-eclipse-samples    文件:PartitionedAverageCalculatorImpl.java   
@Asynchronous
private Promise<Double> computeAverageDistributed(String inputFile, Promise<Integer> dataSize) {
    int chunkSize = dataSize.get() / numberOfWorkers;

    // Create an array list to hold the result returned by each worker
    List<Promise<Integer>> results = new ArrayList<Promise<Integer>>();
    for (int chunkNumber = 0; chunkNumber < numberOfWorkers; chunkNumber++) {
        // Splitting computation for each chunk as separate activity
        results.add(client.computeSumForChunk(bucketName, inputFile, chunkNumber, chunkSize));
    }
    // Merge phase
    return mergeSumAndComputeAverage(results, dataSize.get());
}
项目:aws-flow-maven-eclipse-samples    文件:PartitionedAverageCalculatorImpl.java   
@Asynchronous
private Promise<Double> mergeSumAndComputeAverage(@Wait List<Promise<Integer>> results, int dataSize){
    int totalSum = 0;
    for(Promise<Integer> workerSum: results){
        totalSum += workerSum.get();
    }
    return Promise.asPromise((double) totalSum / (double) dataSize);
}
项目:aws-flow-maven-eclipse-samples    文件:FileProcessingWorkflowZipImpl.java   
@Asynchronous
private Promise<Void> processFileOnHost(String fileToProcess, String fileToUpload, Promise<String> taskList) {
    state = "Downloaded to " + taskList.get();
    // Call the activity to process the file using worker specific task list
    ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(taskList.get());
    return processor.processFile(fileToProcess, fileToUpload, options);
}
项目:aws-flow-maven-eclipse-samples    文件:FileProcessingWorkflowZipImpl.java   
@Asynchronous
private void upload(final String targetBucketName, final String targetFilename, final String localTargetFilename,
        Promise<String> taskList, Promise<Void> fileProcessed) {
    state = "Processed at " + taskList.get();
    ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(taskList.get());
    store.upload(targetBucketName, localTargetFilename, targetFilename, options);
}
项目:aws-flow-maven-eclipse-samples    文件:HumanTaskTest.java   
@Asynchronous
private void assertResult(Promise<Void> done) {
    ArrayList<String> result = activitiesImpl.getResult();
    Assert.assertEquals("automatedActivity", result.get(0));
    Assert.assertEquals("humanActivity", result.get(1));
    Assert.assertEquals("sendNotification:test", result.get(2));
}
项目:aws-flow-maven-eclipse-samples    文件:PickFirstBranchTest.java   
@Asynchronous
private void assertGreeting(Promise<List<String>> done) {
    List<String> results = done.get();
    Assert.assertEquals(2, results.size());
    Assert.assertEquals("result1", results.get(0));
    Assert.assertEquals("result2", results.get(1));
}
项目:datamung    文件:ExportSnapshotWorkflowImpl.java   
@Asynchronous
private Promise<Void> dumpDatabase( Promise<DatabaseInstance> database )
{
    MySQLDumpJob job = new MySQLDumpJob();
    job.setDataArchive( request.getDestinationArchive() );
    job.setDatabaseInstance( database.get() );
    job.setIdentity( request.getIdentity() );
    job.setMasterPassword( request.getDatabaseMasterPassword() );

    RunJobRequest runJob = new RunJobRequest();
    runJob.setJob( job );
    runJob.setIdentity( request.getIdentity() );
    runJob.setWorkerOptions( request.getWorkerOptions() );
    return jobFlowFactory.getClient( workflowId + "-job" ).executeCommand( runJob );
}
项目:datamung    文件:ExportSnapshotWorkflowImpl.java   
@Asynchronous
private Promise<Void> waitUntilDatabaseAvailable( Promise<String> databaseId,
                                                  Promise<?>... waitFor )
{
    CheckAndWait check = new CheckAndWait();
    check.setCheckType( CheckAndWait.Type.DATABASE_CREATION );
    // Hardcoded 1 hour wait for now
    check.setExpireOn( contextProvider.getDecisionContext().getWorkflowClock().currentTimeMillis()
        + request.getSnapshotRestoreTimeoutSeconds() * 1000L );
    check.setIdentity( request.getIdentity() );
    check.setObjectName( databaseId.get() );
    return waitFlowFactory.getClient( workflowId + "-restore-db" ).checkAndWait( check );
}
项目:datamung    文件:CheckWaitWorkflowImpl.java   
@Asynchronous
private void continueOrExit( Promise<Boolean> done, CheckAndWait request )
    throws TimeoutException
{
    checks++;
    if ( done.get() )
    {
        return;
    }

    long now =
        contextProvider.getDecisionContext().getWorkflowClock().currentTimeMillis();
    if ( now > request.getExpireOn() )
    {
        throw new TimeoutException( "Task " + request.getCheckType()
            + " didn't complete before "
            + new DateTime( request.getExpireOn(), DateTimeZone.UTC ) );
    }

    Promise<Void> delay =
        contextProvider.getDecisionContext().getWorkflowClock().createTimer( request.getWaitIntervalSeconds() );

    if ( checks >= request.getMaxChecksPerExecution() )
    {
        new CheckWaitWorkflowSelfClientImpl().checkAndWait( request, delay );
        return;
    }
    doCheckAndWait( request, delay );
}
项目:datamung    文件:CheckWaitWorkflowImpl.java   
@Asynchronous
private void doCheckAndWait( CheckAndWait request, Promise<?>... waitFor )
    throws TimeoutException
{
    Promise<Boolean> successful;
    switch ( request.getCheckType() )
    {
        case SNAPSHOT_CREATION:
            successful =
                isSnapshotAvailable( request.getObjectName(),
                                     request.getIdentity() );
            break;
        case DATABASE_CREATION:
            successful =
                isDatabaseInstanceAvailable( request.getObjectName(),
                                             request.getIdentity() );
            break;
        case WORKER_LAUNCH:
            successful =
                isEc2InstanceRunning( request.getObjectName(),
                                      request.getIdentity() );
            break;
        default:
            throw new IllegalStateException( "Unexpected check type "
                + request.getCheckType() );
    }
    continueOrExit( successful, request );
}
项目:datamung    文件:CheckWaitWorkflowImpl.java   
@Asynchronous
private Promise<Boolean> isDatabaseInstanceAvailable( String instanceName,
                                                      Identity identity )
{
    Promise<DatabaseInstance> instance =
        rdsActivities.describeInstance( instanceName, identity );
    return isDatabaseInstanceAvailable( instance );
}
项目:datamung    文件:CheckWaitWorkflowImpl.java   
@Asynchronous
private Promise<Boolean> isEc2InstanceRunning( String instanceId,
                                               Identity identity )
{
    Promise<WorkerInstance> instance =
        ec2Activities.describeInstance( instanceId, identity );
    return isEc2InstanceRunning( instance );
}
项目:datamung    文件:CheckWaitWorkflowImpl.java   
@Asynchronous
private Promise<Boolean> isSnapshotAvailable( String snapshotName,
                                              Identity identity )
{
    Promise<String> status =
        rdsActivities.getSnapshotStatus( snapshotName, identity );
    return equals( "available", status );
}