Java 类com.amazonaws.services.simpleworkflow.flow.core.TryCatchFinally 实例源码

项目:aws-flow-maven-eclipse-samples    文件:CronWithRetryWorkflowImpl.java   
private void executeActivityUpdatingInvocationHistory(final CronWithRetryWorkflowOptions options) {
    new TryCatchFinally() {

        boolean failed;

        @Override
        protected void doTry() throws Throwable {
            appendToInvocationHistory("starting");
            activities.scheduleActivity(options.getActivity(), options.getActivityArguments(), null, Void.class);
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            failed = true;
            appendToInvocationHistory("failure:" + e.getMessage());
            throw e;
        }

        @Override
        protected void doFinally() throws Throwable {
            if (!failed) {
                appendToInvocationHistory("completed");
            }
        }
    };
}
项目:aws-flow-maven-eclipse-samples    文件:RetryActivityWorkflowImpl.java   
@Override
public void process() {

    final Settable<Boolean> retryActivity = new Settable<Boolean>();

    new TryCatchFinally() {

        @Override
        protected void doTry() throws Throwable {
            client.unreliableActivity();
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            if (++retryCount <= maxRetries) {
                retryActivity.set(true);
            }
            else {
                throw e;
            }
        }

        @Override
        protected void doFinally() throws Throwable {
            if (!retryActivity.isReady()) {
                retryActivity.set(false);
            }

        }
    };

    //This will call process() to retry the activity if it fails. 
    //doCatch() cannot be cancelled so we don't call process() directly from it  
    restartRunUnreliableActivityTillSuccess(retryActivity);
}
项目:aws-flow-maven-eclipse-samples    文件:CustomLogicRetryWorkflowImpl.java   
/**
 * If some exception is thrown in doTry(), the doCatch method will be
 * executed. Otherwise, the doCatch method will be ignored
 */
@Asynchronous
public void callActivityWithRetry() {

    final Settable<Throwable> failure = new Settable<Throwable>();
    new TryCatchFinally() {

        protected void doTry() throws Throwable {
            client.unreliableActivity();
        }

        /**
         * The code in doCatch is not cancellable. If we call method to
         * retry from doCatch, then in case of workflow cancellation there
         * will be no attempt to cancel the retried method. To ensure that
         * cancellation is always happening, the recursive retry is moved
         * out outside of TryCatch.
         */
        protected void doCatch(Throwable e) {
            failure.set(e);
        }

        protected void doFinally() throws Throwable {
            if (!failure.isReady()) {
                failure.set(null);
            }
        }

    };

    retryOnFailure(failure);
}
项目:aws-flow-maven-eclipse-samples    文件:CleanupResourceWorkflowImpl.java   
@Override
public void startWorkflow() {

    final Promise<Integer> resourceId = client.allocateResource();

    /**
     * This TryCatchFinally is a sibling of allocateResource() activity. If
     * allocateResource throws an exception, TryCatchFinally will be
     * cancelled. The semantics of cancellation are as follows: If the doTry
     * method has not been executed when the exception was thrown then the
     * cancellation is immediate and none of doTry, doCatch or doFinally
     * will be executed. If doTry was already executed then all outstanding
     * tasks created in doTry will be cancelled, doCatch will be called with
     * CancellationException and then doFinally will be called.
     */
    new TryCatchFinally() {

        @Override
        protected void doTry() throws Throwable {
            client.useResource(resourceId);
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            //this cannot be canceled
            client.rollbackChanges(resourceId);
        }

        // doFinally() will be executed whether an exception is thrown or not
        @Override
        protected void doFinally() throws Throwable {
            // make sure that the action of clean up will be executed
            if (resourceId.isReady()) {
                client.cleanUpResource(resourceId);
            }
        }
    };

}
项目:aws-flow-maven-eclipse-samples    文件:ExponentialRetryAnnotationWorkflowTest.java   
@Test(expected = IllegalArgumentException.class)
public void testExponentialRetryWorkflowWithNonRetryableException() {

    activitiesImplementation = new TestExponentialRetryActivities(false);
    workflowTest.addActivitiesImplementation(activitiesImplementation);

    final RetryWorkflowClient workflow = retryWorkflowClientFactory.getClient();

    new TryCatchFinally() {

        @Override
        protected void doTry() throws Throwable {
            workflow.process();
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            if (e instanceof ChildWorkflowException && e.getCause() instanceof ActivityTaskFailedException) {
                throw e.getCause().getCause();
            }
            else {
                throw e;
            }
        }

        @Override
        protected void doFinally() throws Throwable {
            AsyncAssert.assertEquals(1, activitiesImplementation.getCounter(Promise.Void()));
        }

    };

}
项目:aws-flow-maven-eclipse-samples    文件:RetryActivityWorkflowTest.java   
@Test(expected = IllegalStateException.class)
public void testRetryActivityWorkflowFailure() {

    activitiesImplementation = new TestRetryActivities();
    activitiesImplementation.maxFailures = 11;
    workflowTest.addActivitiesImplementation(activitiesImplementation);

    final RetryWorkflowClient workflow = retryWorkflowClientFactory.getClient();

    new TryCatchFinally() {

        @Override
        protected void doTry() throws Throwable {
            workflow.process();
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            if (e instanceof ChildWorkflowException && e.getCause() instanceof ActivityTaskFailedException) {
                throw e.getCause().getCause();
            }
            else {
                throw e;
            }
        }

        @Override
        protected void doFinally() throws Throwable {
            AsyncAssert.assertEquals(11, activitiesImplementation.getCounter(Promise.Void()));
        }

    };

}
项目:aws-flow-maven-eclipse-samples    文件:HandleErrorWorkflowTest.java   
@Test(expected = IllegalArgumentException.class)
public void testHandleErrorWorkflowWithNonHandleableException() {
    activitiesImplementation = new TestResourceManagementActivities(true, false);
    workflowTest.addActivitiesImplementation(activitiesImplementation);

    final HandleErrorWorkflowClient workflowClient = handleErrorWorkflowfactory.getClient();

    new TryCatchFinally() {

        @Override
        protected void doTry() throws Throwable {
            workflowClient.startWorkflow();

        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            if (e instanceof ChildWorkflowException && e.getCause() instanceof ActivityTaskFailedException) {
                throw e.getCause().getCause();
            }
            else {
                throw e;
            }
        }

        @Override
        protected void doFinally() throws Throwable {
            List<String> expectedTrace = new ArrayList<String>();
            expectedTrace.add("allocating");
            expectedTrace.add("using");
            AsyncAssert.assertEquals(expectedTrace, activitiesImplementation.getTrace(Promise.Void()));
        }
    };

}
项目:aws-flow-maven-eclipse-samples    文件:DeploymentTest.java   
@Test
public void test() throws IOException {
    new TryCatchFinally() {

        Throwable failure;

        @Override
        protected void doTry() throws Throwable {
            String template = getTemplate("/deployment/AppStack1.xml");
            DeploymentWorkflowClient workflow = workflowClientFactory.getClient("AppStack1");
            workflow.deploy(template);
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            failure = e;
            throw e;
        }

        @Override
        protected void doFinally() throws Throwable {
            // Without this check assertEquals fails overwriting
            // original exception
            if (failure == null) {
                List<String> expected = new ArrayList<String>();
                expected.add("Database-host1");
                expected.add("Database-host2");
                expected.add("appServer-host3");
                expected.add("appServer-host2");
                expected.add("appServer-host3");
                expected.add("WebServer-host2");
                expected.add("WebServer-host1");
                expected.add("LoadBalancer-host2");
                Assert.assertEquals(expected, trace);
            }
        }

    };
}
项目:swf-starter    文件:ZipS3FileProcessingWorkflow.java   
/**
 * Process the file at inputBucketName.inputFileName.
 * Place the result at outputBucketName.outputFileName.
 *
 * @param inputBucketName input bucket to process from
 * @param inputFileName input file to process from
 * @param outputBucketName output bucket to put result to
 * @param outputFileName output file to put result to
 * @throws IOException
 */
@Override
public void processFile(String inputBucketName, String inputFileName, String outputBucketName, String outputFileName)
    throws IOException {
  // Settable is a Promise implementation that exposes a "chain" method that allows you to
  // link the state of one Promise to that of another
  Settable<String> hostNameChain = new Settable<String>();

  // Use prepend runId to input and output name way to avoid name collisions
  String localInputFileName = runId + "_" + inputFileName;
  String localOutputFileName = runId + "_" + outputFileName;

  // TryCatchFinally provides an asynchronous version of a typical synchronous try-catch-finally block
  // For example, if any task that is started in the doTry block throws an exception,
  // all tasks within the doTry block are cancelled and flow moves to the doCatch block.
  new TryCatchFinally() {

    /**
     * Download the file from S3, process it locally through a chained task, and upload back to S3.
     * @throws Throwable
     */
    @Override
    protected void doTry() throws Throwable {
      // download from S3, returns the host that downloaded the file
      Promise<String> hostName = storageClient.download(inputBucketName, inputFileName, localInputFileName);

      // chaining is a way for one promise to get assigned the value of another
      // when the promise is complete, it's value will be available for subsequent operations
      hostNameChain.chain(hostName);

      // zip the file on the local host
      processFileOnHost(localInputFileName, localOutputFileName, hostName);

      // upload the zipped file back to S3
      upload(outputBucketName, outputFileName, localOutputFileName, hostNameChain);
    }

    @Override
    protected void doCatch(Throwable e) throws Throwable {
      state = "Failed: " + e.getMessage();
      throw e;
    }

    @Override
    protected void doFinally() throws Throwable {
      if (hostNameChain.isReady()) { // File was downloaded

        // Set option to schedule activity in worker specific task list
        ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostNameChain.get());

        // Call deleteLocalFile activity using the host specific task list
        storageClient.deleteLocalFile(localInputFileName, options);
        storageClient.deleteLocalFile(localOutputFileName, options);
      }
      if (!state.startsWith("Failed:")) {
        state = "Completed";
      }
    }

  };
}
项目:aws-flow-maven-eclipse-samples    文件:FileProcessingWorkflowZipImpl.java   
@Override
public void processFile(final String sourceBucketName, final String sourceFilename, final String targetBucketName,
        final String targetFilename) throws IOException {
    // Settable to store the worker specific task list returned by the activity
    final Settable<String> taskList = new Settable<String>();

    // Use runId as a way to ensure that downloaded files do not get name collisions
    String workflowRunId = workflowContext.getWorkflowExecution().getRunId();
    File localSource = new File(sourceFilename);
    final String localSourceFilename = workflowRunId + "_" + localSource.getName();
    File localTarget = new File(targetFilename);
    final String localTargetFilename = workflowRunId + "_" + localTarget.getName();
    new TryCatchFinally() {

        @Override
        protected void doTry() throws Throwable {
            Promise<String> activityWorkerTaskList = store.download(sourceBucketName, sourceFilename, localSourceFilename);
            // chaining is a way for one promise get assigned value of another 
            taskList.chain(activityWorkerTaskList);
            // Call processFile activity to zip the file
            Promise<Void> fileProcessed = processFileOnHost(localSourceFilename, localTargetFilename, activityWorkerTaskList);
            // Call upload activity to upload zipped file
            upload(targetBucketName, targetFilename, localTargetFilename, taskList, fileProcessed);
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            state = "Failed: " + e.getMessage();
            throw e;
        }

        @Override
        protected void doFinally() throws Throwable {
            if (taskList.isReady()) { // File was downloaded

                // Set option to schedule activity in worker specific task list
                ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(taskList.get());

                // Call deleteLocalFile activity using the host specific task list
                store.deleteLocalFile(localSourceFilename, options);
                store.deleteLocalFile(localTargetFilename, options);
            }
            if (!state.startsWith("Failed:")) {
                state = "Completed";
            }
        }

    };
}
项目:aws-flow-maven-eclipse-samples    文件:CronWithRetryWorkflowTest.java   
@Test(timeout = 2000)
public void testCronWithFailures() {
    workflowTest.setClockAccelerationCoefficient(SECONDS_HOUR * 24 * 7 * 2);
    final CronWithRetryWorkflowClient workflow = workflowClientFactory.getClient();

    final ActivityType activityType = new ActivityType();
    activityType.setName("CronWithRetryExampleActivities.doSomeWork");
    activityType.setVersion("1.0");
    final Object[] activityArguments = new Object[] { "parameter1" };

    final CronWithRetryWorkflowOptions cronOptions = new CronWithRetryWorkflowOptions();
    cronOptions.setActivity(activityType);
    cronOptions.setActivityArguments(activityArguments);
    cronOptions.setContinueAsNewAfterSeconds(SECONDS_HOUR * 24 + 300);
    cronOptions.setTimeZone("PST");
    cronOptions.setInitialRetryIntervalSeconds(30);
    cronOptions.setMaximumRetryIntervalSeconds(600);
    cronOptions.setRetryExpirationIntervalSeconds(3500);
    final String cronExpression = "0 0 * * * *";
    cronOptions.setCronExpression(cronExpression);

    WorkflowClock clock = workflowTest.getDecisionContext().getWorkflowClock();
    clock.createTimer(SECONDS_HOUR * 24 * 7 + 1000);
    // true constructor argument makes TryCatchFinally a daemon which causes it get cancelled after above timer firing
    new TryCatchFinally(true) {

        Throwable failure;

        @Override
        protected void doTry() throws Throwable {
            workflow.startCron(cronOptions);
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            failure = e;
            throw e;
        }

        @Override
        protected void doFinally() throws Throwable {
            // Skip assertions as their failure masks original exception
            if (failure == null || failure instanceof CancellationException) {
                // Note the 2 activity invocations for each cron invocation as half of the
                // invocations failed and were retried.
                Assert.assertEquals(24 * 7 * 2, cronActivitiesImplementation.getWorkCount());
                Assert.assertEquals(7, cronActivitiesImplementation.getRunCount());
            }
        }

    };
}
项目:aws-flow-maven-eclipse-samples    文件:CronWorkflowTest.java   
@Test(timeout = 2000)
public void testCron() {
    workflowTest.setClockAccelerationCoefficient(SECONDS_HOUR * 24 * 7 * 2);
    final CronWorkflowClient workflow = workflowClientFactory.getClient();

    final ActivityType activityType = new ActivityType();
    activityType.setName("CronExampleActivities.doSomeWork");
    activityType.setVersion("1.0");
    final Object[] activityArguments = new Object[] { "parameter1" };

    final CronWorkflowOptions cronOptions = new CronWorkflowOptions();
    cronOptions.setActivity(activityType);
    cronOptions.setActivityArguments(activityArguments);
    cronOptions.setContinueAsNewAfterSeconds(SECONDS_HOUR * 24 + 300);
    cronOptions.setTimeZone("PST");
    final String cronExpression = "0 0 * * * *";
    cronOptions.setCronExpression(cronExpression);

    WorkflowClock clock = workflowTest.getDecisionContext().getWorkflowClock();
    clock.createTimer(SECONDS_HOUR * 24 * 7 + 1000);
    // true constructor argument makes TryCatchFinally a daemon which causes it get cancelled after above timer firing
    new TryCatchFinally(true) {

        Throwable failure;

        @Override
        protected void doTry() throws Throwable {
            workflow.startCron(cronOptions);
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            failure = e;
            throw e;
        }

        @Override
        protected void doFinally() throws Throwable {
            // Skip assertions as their failure masks original exception
            if (failure == null || failure instanceof CancellationException) {
                Assert.assertEquals(24 * 7, cronActivitiesImplementation.getWorkCount());
                Assert.assertEquals(7, cronActivitiesImplementation.getRunCount());
            }
        }

    };
}
项目:datamung    文件:ExportSnapshotWorkflowImpl.java   
/**
 * @inheritDoc
 */
@Override
public void export( final ExportSnapshotRequest request )
{
    this.request = request;
    this.workflowId =
        contextProvider.getDecisionContext().getWorkflowContext().getWorkflowExecution().getWorkflowId();
    Promise<Void> started = controlActivities.notifyJobStarted();

    final Promise<String> databaseName =
        controlActivities.createDatabaseName( request.getSnapshotName(),
                                              started );
    new TryCatchFinally( databaseName )
    {
        @Override
        protected void doCatch( Throwable cause )
            throws Throwable
        {
            controlActivities.notifyJobFailed( cause );
        }

        @Override
        protected void doFinally()
        {
            rdsActivities.terminateInstance( databaseName,
                                             Promise.asPromise( request.getIdentity() ) );
        }

        @Override
        protected void doTry()
        {
            Promise<DatabaseInstance> instance =
                rdsActivities.restoreSnapshot( Promise.asPromise( request.getSnapshotName() ),
                                               databaseName,
                                               Promise.asPromise( request.getSubnetGroupName() ),
                                               Promise.asPromise( request.getIdentity() ) );

            Promise<Void> sourceAvailable =
                waitUntilDatabaseAvailable( databaseName, instance );
            Promise<DatabaseInstance> source =
                rdsActivities.describeInstance( databaseName,
                                                Promise.asPromise( request.getIdentity() ),
                                                sourceAvailable );
            Promise<Void> done = dumpDatabase( source );
            controlActivities.notifyJobCompleted( done );
        }
    };
}