private void executeActivityUpdatingInvocationHistory(final CronWithRetryWorkflowOptions options) { new TryCatchFinally() { boolean failed; @Override protected void doTry() throws Throwable { appendToInvocationHistory("starting"); activities.scheduleActivity(options.getActivity(), options.getActivityArguments(), null, Void.class); } @Override protected void doCatch(Throwable e) throws Throwable { failed = true; appendToInvocationHistory("failure:" + e.getMessage()); throw e; } @Override protected void doFinally() throws Throwable { if (!failed) { appendToInvocationHistory("completed"); } } }; }
@Override public void process() { final Settable<Boolean> retryActivity = new Settable<Boolean>(); new TryCatchFinally() { @Override protected void doTry() throws Throwable { client.unreliableActivity(); } @Override protected void doCatch(Throwable e) throws Throwable { if (++retryCount <= maxRetries) { retryActivity.set(true); } else { throw e; } } @Override protected void doFinally() throws Throwable { if (!retryActivity.isReady()) { retryActivity.set(false); } } }; //This will call process() to retry the activity if it fails. //doCatch() cannot be cancelled so we don't call process() directly from it restartRunUnreliableActivityTillSuccess(retryActivity); }
/** * If some exception is thrown in doTry(), the doCatch method will be * executed. Otherwise, the doCatch method will be ignored */ @Asynchronous public void callActivityWithRetry() { final Settable<Throwable> failure = new Settable<Throwable>(); new TryCatchFinally() { protected void doTry() throws Throwable { client.unreliableActivity(); } /** * The code in doCatch is not cancellable. If we call method to * retry from doCatch, then in case of workflow cancellation there * will be no attempt to cancel the retried method. To ensure that * cancellation is always happening, the recursive retry is moved * out outside of TryCatch. */ protected void doCatch(Throwable e) { failure.set(e); } protected void doFinally() throws Throwable { if (!failure.isReady()) { failure.set(null); } } }; retryOnFailure(failure); }
@Override public void startWorkflow() { final Promise<Integer> resourceId = client.allocateResource(); /** * This TryCatchFinally is a sibling of allocateResource() activity. If * allocateResource throws an exception, TryCatchFinally will be * cancelled. The semantics of cancellation are as follows: If the doTry * method has not been executed when the exception was thrown then the * cancellation is immediate and none of doTry, doCatch or doFinally * will be executed. If doTry was already executed then all outstanding * tasks created in doTry will be cancelled, doCatch will be called with * CancellationException and then doFinally will be called. */ new TryCatchFinally() { @Override protected void doTry() throws Throwable { client.useResource(resourceId); } @Override protected void doCatch(Throwable e) throws Throwable { //this cannot be canceled client.rollbackChanges(resourceId); } // doFinally() will be executed whether an exception is thrown or not @Override protected void doFinally() throws Throwable { // make sure that the action of clean up will be executed if (resourceId.isReady()) { client.cleanUpResource(resourceId); } } }; }
@Test(expected = IllegalArgumentException.class) public void testExponentialRetryWorkflowWithNonRetryableException() { activitiesImplementation = new TestExponentialRetryActivities(false); workflowTest.addActivitiesImplementation(activitiesImplementation); final RetryWorkflowClient workflow = retryWorkflowClientFactory.getClient(); new TryCatchFinally() { @Override protected void doTry() throws Throwable { workflow.process(); } @Override protected void doCatch(Throwable e) throws Throwable { if (e instanceof ChildWorkflowException && e.getCause() instanceof ActivityTaskFailedException) { throw e.getCause().getCause(); } else { throw e; } } @Override protected void doFinally() throws Throwable { AsyncAssert.assertEquals(1, activitiesImplementation.getCounter(Promise.Void())); } }; }
@Test(expected = IllegalStateException.class) public void testRetryActivityWorkflowFailure() { activitiesImplementation = new TestRetryActivities(); activitiesImplementation.maxFailures = 11; workflowTest.addActivitiesImplementation(activitiesImplementation); final RetryWorkflowClient workflow = retryWorkflowClientFactory.getClient(); new TryCatchFinally() { @Override protected void doTry() throws Throwable { workflow.process(); } @Override protected void doCatch(Throwable e) throws Throwable { if (e instanceof ChildWorkflowException && e.getCause() instanceof ActivityTaskFailedException) { throw e.getCause().getCause(); } else { throw e; } } @Override protected void doFinally() throws Throwable { AsyncAssert.assertEquals(11, activitiesImplementation.getCounter(Promise.Void())); } }; }
@Test(expected = IllegalArgumentException.class) public void testHandleErrorWorkflowWithNonHandleableException() { activitiesImplementation = new TestResourceManagementActivities(true, false); workflowTest.addActivitiesImplementation(activitiesImplementation); final HandleErrorWorkflowClient workflowClient = handleErrorWorkflowfactory.getClient(); new TryCatchFinally() { @Override protected void doTry() throws Throwable { workflowClient.startWorkflow(); } @Override protected void doCatch(Throwable e) throws Throwable { if (e instanceof ChildWorkflowException && e.getCause() instanceof ActivityTaskFailedException) { throw e.getCause().getCause(); } else { throw e; } } @Override protected void doFinally() throws Throwable { List<String> expectedTrace = new ArrayList<String>(); expectedTrace.add("allocating"); expectedTrace.add("using"); AsyncAssert.assertEquals(expectedTrace, activitiesImplementation.getTrace(Promise.Void())); } }; }
@Test public void test() throws IOException { new TryCatchFinally() { Throwable failure; @Override protected void doTry() throws Throwable { String template = getTemplate("/deployment/AppStack1.xml"); DeploymentWorkflowClient workflow = workflowClientFactory.getClient("AppStack1"); workflow.deploy(template); } @Override protected void doCatch(Throwable e) throws Throwable { failure = e; throw e; } @Override protected void doFinally() throws Throwable { // Without this check assertEquals fails overwriting // original exception if (failure == null) { List<String> expected = new ArrayList<String>(); expected.add("Database-host1"); expected.add("Database-host2"); expected.add("appServer-host3"); expected.add("appServer-host2"); expected.add("appServer-host3"); expected.add("WebServer-host2"); expected.add("WebServer-host1"); expected.add("LoadBalancer-host2"); Assert.assertEquals(expected, trace); } } }; }
/** * Process the file at inputBucketName.inputFileName. * Place the result at outputBucketName.outputFileName. * * @param inputBucketName input bucket to process from * @param inputFileName input file to process from * @param outputBucketName output bucket to put result to * @param outputFileName output file to put result to * @throws IOException */ @Override public void processFile(String inputBucketName, String inputFileName, String outputBucketName, String outputFileName) throws IOException { // Settable is a Promise implementation that exposes a "chain" method that allows you to // link the state of one Promise to that of another Settable<String> hostNameChain = new Settable<String>(); // Use prepend runId to input and output name way to avoid name collisions String localInputFileName = runId + "_" + inputFileName; String localOutputFileName = runId + "_" + outputFileName; // TryCatchFinally provides an asynchronous version of a typical synchronous try-catch-finally block // For example, if any task that is started in the doTry block throws an exception, // all tasks within the doTry block are cancelled and flow moves to the doCatch block. new TryCatchFinally() { /** * Download the file from S3, process it locally through a chained task, and upload back to S3. * @throws Throwable */ @Override protected void doTry() throws Throwable { // download from S3, returns the host that downloaded the file Promise<String> hostName = storageClient.download(inputBucketName, inputFileName, localInputFileName); // chaining is a way for one promise to get assigned the value of another // when the promise is complete, it's value will be available for subsequent operations hostNameChain.chain(hostName); // zip the file on the local host processFileOnHost(localInputFileName, localOutputFileName, hostName); // upload the zipped file back to S3 upload(outputBucketName, outputFileName, localOutputFileName, hostNameChain); } @Override protected void doCatch(Throwable e) throws Throwable { state = "Failed: " + e.getMessage(); throw e; } @Override protected void doFinally() throws Throwable { if (hostNameChain.isReady()) { // File was downloaded // Set option to schedule activity in worker specific task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostNameChain.get()); // Call deleteLocalFile activity using the host specific task list storageClient.deleteLocalFile(localInputFileName, options); storageClient.deleteLocalFile(localOutputFileName, options); } if (!state.startsWith("Failed:")) { state = "Completed"; } } }; }
@Override public void processFile(final String sourceBucketName, final String sourceFilename, final String targetBucketName, final String targetFilename) throws IOException { // Settable to store the worker specific task list returned by the activity final Settable<String> taskList = new Settable<String>(); // Use runId as a way to ensure that downloaded files do not get name collisions String workflowRunId = workflowContext.getWorkflowExecution().getRunId(); File localSource = new File(sourceFilename); final String localSourceFilename = workflowRunId + "_" + localSource.getName(); File localTarget = new File(targetFilename); final String localTargetFilename = workflowRunId + "_" + localTarget.getName(); new TryCatchFinally() { @Override protected void doTry() throws Throwable { Promise<String> activityWorkerTaskList = store.download(sourceBucketName, sourceFilename, localSourceFilename); // chaining is a way for one promise get assigned value of another taskList.chain(activityWorkerTaskList); // Call processFile activity to zip the file Promise<Void> fileProcessed = processFileOnHost(localSourceFilename, localTargetFilename, activityWorkerTaskList); // Call upload activity to upload zipped file upload(targetBucketName, targetFilename, localTargetFilename, taskList, fileProcessed); } @Override protected void doCatch(Throwable e) throws Throwable { state = "Failed: " + e.getMessage(); throw e; } @Override protected void doFinally() throws Throwable { if (taskList.isReady()) { // File was downloaded // Set option to schedule activity in worker specific task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(taskList.get()); // Call deleteLocalFile activity using the host specific task list store.deleteLocalFile(localSourceFilename, options); store.deleteLocalFile(localTargetFilename, options); } if (!state.startsWith("Failed:")) { state = "Completed"; } } }; }
@Test(timeout = 2000) public void testCronWithFailures() { workflowTest.setClockAccelerationCoefficient(SECONDS_HOUR * 24 * 7 * 2); final CronWithRetryWorkflowClient workflow = workflowClientFactory.getClient(); final ActivityType activityType = new ActivityType(); activityType.setName("CronWithRetryExampleActivities.doSomeWork"); activityType.setVersion("1.0"); final Object[] activityArguments = new Object[] { "parameter1" }; final CronWithRetryWorkflowOptions cronOptions = new CronWithRetryWorkflowOptions(); cronOptions.setActivity(activityType); cronOptions.setActivityArguments(activityArguments); cronOptions.setContinueAsNewAfterSeconds(SECONDS_HOUR * 24 + 300); cronOptions.setTimeZone("PST"); cronOptions.setInitialRetryIntervalSeconds(30); cronOptions.setMaximumRetryIntervalSeconds(600); cronOptions.setRetryExpirationIntervalSeconds(3500); final String cronExpression = "0 0 * * * *"; cronOptions.setCronExpression(cronExpression); WorkflowClock clock = workflowTest.getDecisionContext().getWorkflowClock(); clock.createTimer(SECONDS_HOUR * 24 * 7 + 1000); // true constructor argument makes TryCatchFinally a daemon which causes it get cancelled after above timer firing new TryCatchFinally(true) { Throwable failure; @Override protected void doTry() throws Throwable { workflow.startCron(cronOptions); } @Override protected void doCatch(Throwable e) throws Throwable { failure = e; throw e; } @Override protected void doFinally() throws Throwable { // Skip assertions as their failure masks original exception if (failure == null || failure instanceof CancellationException) { // Note the 2 activity invocations for each cron invocation as half of the // invocations failed and were retried. Assert.assertEquals(24 * 7 * 2, cronActivitiesImplementation.getWorkCount()); Assert.assertEquals(7, cronActivitiesImplementation.getRunCount()); } } }; }
@Test(timeout = 2000) public void testCron() { workflowTest.setClockAccelerationCoefficient(SECONDS_HOUR * 24 * 7 * 2); final CronWorkflowClient workflow = workflowClientFactory.getClient(); final ActivityType activityType = new ActivityType(); activityType.setName("CronExampleActivities.doSomeWork"); activityType.setVersion("1.0"); final Object[] activityArguments = new Object[] { "parameter1" }; final CronWorkflowOptions cronOptions = new CronWorkflowOptions(); cronOptions.setActivity(activityType); cronOptions.setActivityArguments(activityArguments); cronOptions.setContinueAsNewAfterSeconds(SECONDS_HOUR * 24 + 300); cronOptions.setTimeZone("PST"); final String cronExpression = "0 0 * * * *"; cronOptions.setCronExpression(cronExpression); WorkflowClock clock = workflowTest.getDecisionContext().getWorkflowClock(); clock.createTimer(SECONDS_HOUR * 24 * 7 + 1000); // true constructor argument makes TryCatchFinally a daemon which causes it get cancelled after above timer firing new TryCatchFinally(true) { Throwable failure; @Override protected void doTry() throws Throwable { workflow.startCron(cronOptions); } @Override protected void doCatch(Throwable e) throws Throwable { failure = e; throw e; } @Override protected void doFinally() throws Throwable { // Skip assertions as their failure masks original exception if (failure == null || failure instanceof CancellationException) { Assert.assertEquals(24 * 7, cronActivitiesImplementation.getWorkCount()); Assert.assertEquals(7, cronActivitiesImplementation.getRunCount()); } } }; }
/** * @inheritDoc */ @Override public void export( final ExportSnapshotRequest request ) { this.request = request; this.workflowId = contextProvider.getDecisionContext().getWorkflowContext().getWorkflowExecution().getWorkflowId(); Promise<Void> started = controlActivities.notifyJobStarted(); final Promise<String> databaseName = controlActivities.createDatabaseName( request.getSnapshotName(), started ); new TryCatchFinally( databaseName ) { @Override protected void doCatch( Throwable cause ) throws Throwable { controlActivities.notifyJobFailed( cause ); } @Override protected void doFinally() { rdsActivities.terminateInstance( databaseName, Promise.asPromise( request.getIdentity() ) ); } @Override protected void doTry() { Promise<DatabaseInstance> instance = rdsActivities.restoreSnapshot( Promise.asPromise( request.getSnapshotName() ), databaseName, Promise.asPromise( request.getSubnetGroupName() ), Promise.asPromise( request.getIdentity() ) ); Promise<Void> sourceAvailable = waitUntilDatabaseAvailable( databaseName, instance ); Promise<DatabaseInstance> source = rdsActivities.describeInstance( databaseName, Promise.asPromise( request.getIdentity() ), sourceAvailable ); Promise<Void> done = dumpDatabase( source ); controlActivities.notifyJobCompleted( done ); } }; }