Promise<List<String>> searchOnCluster1(final String query) { final Settable<List<String>> result = new Settable<List<String>>(); branch1 = new TryCatch() { @Override protected void doTry() throws Throwable { Promise<List<String>> cluster1Result = client.searchCluster1(query); result.chain(cluster1Result); } @Override protected void doCatch(Throwable e) throws Throwable { if (!(e instanceof CancellationException)) { throw e; } } }; return result; }
Promise<List<String>> searchOnCluster2(final String query) { final Settable<List<String>> result = new Settable<List<String>>(); branch2 = new TryCatch() { @Override protected void doTry() throws Throwable { Promise<List<String>> cluster2Result = client.searchCluster2(query); result.chain(cluster2Result); } @Override protected void doCatch(Throwable e) throws Throwable { if (!(e instanceof CancellationException)) { throw e; } } }; return result; }
/** * The code in doCatch() is not cancellable and an attempt to cancel * TryCatch (for example by cancelling the workflow execution) will wait * until doCatch is complete. Since we want activities to be cancellable, * they are called from the "handleException" method which is outside the * TryCatch. * */ @Override public void startWorkflow() throws Throwable { final Settable<Throwable> exception = new Settable<Throwable>(); final Promise<Integer> resourceId = client.allocateResource(); new TryCatch() { @Override protected void doTry() throws Throwable { Promise<Void> waitFor = client.useResource(resourceId); setState(exception, null, waitFor); } @Override protected void doCatch(Throwable e) throws Throwable { setState(exception, e, Promise.Void()); } }; handleException(exception, resourceId); }
@Override public void process() { final Settable<Boolean> retryActivity = new Settable<Boolean>(); new TryCatchFinally() { @Override protected void doTry() throws Throwable { client.unreliableActivity(); } @Override protected void doCatch(Throwable e) throws Throwable { if (++retryCount <= maxRetries) { retryActivity.set(true); } else { throw e; } } @Override protected void doFinally() throws Throwable { if (!retryActivity.isReady()) { retryActivity.set(false); } } }; //This will call process() to retry the activity if it fails. //doCatch() cannot be cancelled so we don't call process() directly from it restartRunUnreliableActivityTillSuccess(retryActivity); }
/** * If some exception is thrown in doTry(), the doCatch method will be * executed. Otherwise, the doCatch method will be ignored */ @Asynchronous public void callActivityWithRetry() { final Settable<Throwable> failure = new Settable<Throwable>(); new TryCatchFinally() { protected void doTry() throws Throwable { client.unreliableActivity(); } /** * The code in doCatch is not cancellable. If we call method to * retry from doCatch, then in case of workflow cancellation there * will be no attempt to cancel the retried method. To ensure that * cancellation is always happening, the recursive retry is moved * out outside of TryCatch. */ protected void doCatch(Throwable e) { failure.set(e); } protected void doFinally() throws Throwable { if (!failure.isReady()) { failure.set(null); } } }; retryOnFailure(failure); }
@Override protected Promise<java.util.List<java.lang.String>> searchCluster1Impl(final Promise<java.lang.String> query, final ActivitySchedulingOptions optionsOverride, Promise<?>... waitFor) { DecisionContext context = contextProvider.getDecisionContext(); WorkflowClock clock = context.getWorkflowClock(); //start a 30 second timer Promise<Void> timerFired = clock.createTimer(30); //fail test if the timer fires shouldNotGetCalled(timerFired); // this Promise will never be ready return new Settable<List<String>>(); }
/** * Process the file at inputBucketName.inputFileName. * Place the result at outputBucketName.outputFileName. * * @param inputBucketName input bucket to process from * @param inputFileName input file to process from * @param outputBucketName output bucket to put result to * @param outputFileName output file to put result to * @throws IOException */ @Override public void processFile(String inputBucketName, String inputFileName, String outputBucketName, String outputFileName) throws IOException { // Settable is a Promise implementation that exposes a "chain" method that allows you to // link the state of one Promise to that of another Settable<String> hostNameChain = new Settable<String>(); // Use prepend runId to input and output name way to avoid name collisions String localInputFileName = runId + "_" + inputFileName; String localOutputFileName = runId + "_" + outputFileName; // TryCatchFinally provides an asynchronous version of a typical synchronous try-catch-finally block // For example, if any task that is started in the doTry block throws an exception, // all tasks within the doTry block are cancelled and flow moves to the doCatch block. new TryCatchFinally() { /** * Download the file from S3, process it locally through a chained task, and upload back to S3. * @throws Throwable */ @Override protected void doTry() throws Throwable { // download from S3, returns the host that downloaded the file Promise<String> hostName = storageClient.download(inputBucketName, inputFileName, localInputFileName); // chaining is a way for one promise to get assigned the value of another // when the promise is complete, it's value will be available for subsequent operations hostNameChain.chain(hostName); // zip the file on the local host processFileOnHost(localInputFileName, localOutputFileName, hostName); // upload the zipped file back to S3 upload(outputBucketName, outputFileName, localOutputFileName, hostNameChain); } @Override protected void doCatch(Throwable e) throws Throwable { state = "Failed: " + e.getMessage(); throw e; } @Override protected void doFinally() throws Throwable { if (hostNameChain.isReady()) { // File was downloaded // Set option to schedule activity in worker specific task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostNameChain.get()); // Call deleteLocalFile activity using the host specific task list storageClient.deleteLocalFile(localInputFileName, options); storageClient.deleteLocalFile(localOutputFileName, options); } if (!state.startsWith("Failed:")) { state = "Completed"; } } }; }
@Asynchronous private void restartRunUnreliableActivityTillSuccess(Settable<Boolean> retryActivity) { if (retryActivity.get()) { process(); } }
@Asynchronous public void setState(@NoWait Settable<Throwable> exception, Throwable ex, Promise<Void> WaitFor) { exception.set(ex); }
@Override public void processFile(final String sourceBucketName, final String sourceFilename, final String targetBucketName, final String targetFilename) throws IOException { // Settable to store the worker specific task list returned by the activity final Settable<String> taskList = new Settable<String>(); // Use runId as a way to ensure that downloaded files do not get name collisions String workflowRunId = workflowContext.getWorkflowExecution().getRunId(); File localSource = new File(sourceFilename); final String localSourceFilename = workflowRunId + "_" + localSource.getName(); File localTarget = new File(targetFilename); final String localTargetFilename = workflowRunId + "_" + localTarget.getName(); new TryCatchFinally() { @Override protected void doTry() throws Throwable { Promise<String> activityWorkerTaskList = store.download(sourceBucketName, sourceFilename, localSourceFilename); // chaining is a way for one promise get assigned value of another taskList.chain(activityWorkerTaskList); // Call processFile activity to zip the file Promise<Void> fileProcessed = processFileOnHost(localSourceFilename, localTargetFilename, activityWorkerTaskList); // Call upload activity to upload zipped file upload(targetBucketName, targetFilename, localTargetFilename, taskList, fileProcessed); } @Override protected void doCatch(Throwable e) throws Throwable { state = "Failed: " + e.getMessage(); throw e; } @Override protected void doFinally() throws Throwable { if (taskList.isReady()) { // File was downloaded // Set option to schedule activity in worker specific task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(taskList.get()); // Call deleteLocalFile activity using the host specific task list store.deleteLocalFile(localSourceFilename, options); store.deleteLocalFile(localTargetFilename, options); } if (!state.startsWith("Failed:")) { state = "Completed"; } } }; }