/** * Announce (or not) a single horse's result. * * @param name * name of the horse. * * @param result * horse's result ("ok" or "injured") * * @param waitFor * anonymous dependencies * * @return promise to respond. * */ @Asynchronous private Promise<Void> announceHorseResult(final String name, final Promise<Status> result, final Promise<?>... waitFor) { final Promise<Void> rval; switch (result.get()) { case OK: if (this.nextPlace <= 3) { rval = announcePlace(name, this.nextPlace); this.nextPlace = this.nextPlace + 1; } else { rval = announceFinished(name); } break; case INJURY: rval = announceInjury(name); break; default: rval = announceMissing(name); break; } return rval; }
/** * Run all the horses in parallel. * * @param laps * number of laps to run. * * @param horses * horses to run. * * @param waitFor * anonymous dependencies. * * @return a list of promises to run, one for each horse. */ @Asynchronous private Promise<List<Promise<Void>>> runAll(final int laps, final Promise<List<String>> horses, final Promise<?>... waitFor) { final List<Promise<Void>> race = new ArrayList<>(horses.get().size()); for (final String name : horses.get()) { Promise<Status> horseRun = Promise.asPromise(Status.OK); for (int lapNum = 1; lapNum <= laps; lapNum = lapNum + 1) { horseRun = runLapIfOk(name, lapNum, horseRun); horseRun = announceLapIfOk(name, lapNum, horseRun); } final Promise<Void> done = announceHorseResult(name, horseRun); race.add(done); } return Promise.asPromise(race); }
/** * Run one lap if not injured. * * @param name * name of horse to run. * * @param lapNum * the lap number to run. * * @param prevStatus * previous lap run status. * * @param waitFor * anonymous dependencies. * * @return the result of running one lap or the previous lap result. */ @Asynchronous private Promise<Status> runLapIfOk(final String name, final int lapNum, final Promise<Status> prevStatus, final Promise<?>... waitFor) { if (prevStatus.get() == Status.OK) { /* * horse is ok, run it. */ return runLap(name, lapNum); } else { /* * something wrong, do not run. */ return prevStatus; } }
@Asynchronous public Promise<List<Void>> processBasketOrder(Promise<List<OrderChoice>> basketChoice) { List<OrderChoice> choices = basketChoice.get(); List<Promise<Void>> results = new ArrayList<Promise<Void>>(); /** * All activities that are started in this loop are executed in parallel * as their invocation from the switch case is non blocking. */ for (OrderChoice choice : choices) { Promise<Void> result = processSingleChoice(choice); results.add(result); } /** * listOfPromisesToPromise is an utility method that accepts a list of * promises and returns a single promise that becomes ready when all * promises of an input list are ready. */ return Promises.listOfPromisesToPromise(results); }
/** * chooses an activity to execute */ @Asynchronous public Promise<Void> processItemOrder(Promise<OrderChoice> itemChoice) { OrderChoice choice = itemChoice.get(); Promise<Void> result = null; switch (choice) { case APPLE: result = client.orderApple(); break; case ORANGE: result = client.orderOrange(); break; case LETTUCE: result = client.orderLettuce(); break; case CABBAGE: result = client.orderCabbage(); break; } return result; }
@SuppressWarnings("unchecked") @Asynchronous Promise<List<String>> processResults(OrPromise result) { Promise<List<String>> output = null; Promise<List<String>> branch1Result = (Promise<List<String>>) result.getValues()[0]; Promise<List<String>> branch2Result = (Promise<List<String>>) result.getValues()[1]; //branch1 has result if (branch1Result.isReady()) { output = branch1Result; //cancel branch2 if it is not complete yet if (!branch2Result.isReady()) { branch2.cancel(null); } } //branch2 has result, cancel branch 1 else { output = branch2Result; branch1.cancel(null); } return output; }
@Asynchronous public void handleException(Promise<Throwable> ex, Promise<Integer> resourceId) throws Throwable { Throwable e = ex.get(); if (e != null) { if (e instanceof ActivityTaskFailedException) { //get the original exception thrown from the activity Throwable inner = e.getCause(); //schedule different activities to handle different types of exception if (inner instanceof ResourceNoResponseException) { client.reportBadResource(resourceId.get()); } else if (inner instanceof ResourceNotAvailableException) { client.refreshResourceCatalog(resourceId.get()); } else { throw e; } } else { throw e; } } }
@Asynchronous public void callPeriodicActivity(long startTime, Promise<?>... waitFor) { long currentTime = clock.currentTimeMillis(); if ((currentTime - startTime) < options.getContinueAsNewAfterSeconds() * SECOND) { // Call activity using dynamic client. Return type is specified as Void as it is not used, but activity that // returns some other type can be called this way. Promise<Void> activityCompletion = activities.scheduleActivity(activityType, activityArguments, null, Void.class); if (!options.isWaitForActivityCompletion()) { // Promise.Void() returns already ready promise of type Void activityCompletion = Promise.Void(); } // Create a timer to re-run your periodic activity after activity completion, // but not earlier then after delay of executionPeriodSeconds. // However in real cron workflows, delay should be calculated everytime to run activity at // a predefined time. Promise<Void> timer = clock.createTimer(options.getExecutionPeriodSeconds()); callPeriodicActivity(startTime, timer, activityCompletion); } }
@Asynchronous public void assertTrace(Promise<Void> done) { List<String> obtainedTrace = activitiesImplementation.getTrace(); // Check if the first and last elements of the trace are in order Assert.assertEquals("multiOrdering", obtainedTrace.remove(0)); Assert.assertEquals("done", obtainedTrace.remove(obtainedTrace.size() - 1)); // Create the expected Trace with all the orders List<String> expectedTrace = new ArrayList<String>(); expectedTrace.add("orderApple"); expectedTrace.add("orderOrange"); // Compare the traces out of order allowing repeated orders // if present in expected trace. for (String traceElement : expectedTrace) { Assert.assertTrue(obtainedTrace.contains(traceElement)); obtainedTrace.remove(traceElement); } Assert.assertEquals(0, obtainedTrace.size()); }
@Asynchronous private Promise<Void> reportResult( Promise<String> actionId, Promise<JobResult> result ) { if ( actionId.get() == null ) { return Promise.Void(); } JobResult r = result.get(); String summary = String.format( "stdout=%s, stderr=%s, on=%s", r.getStandardOutput(), r.getErrorOutput(), r.getRunsOn() ); if ( r.getExitCode() == 0 ) { return controlActivities.notifyActionCompleted( actionId.get(), summary, r.getElapsedMillis() ); } return controlActivities.notifyActionFailed( actionId.get(), summary + ", code=" + r.getExitCode(), r.getStackTrace(), r.getElapsedMillis() ); }
@Asynchronous private Promise<String> getGreeting(Promise<String> name) { if(!name.isReady()) { System.out.println("Your weaving isn't working!"); } String returnString = "Hello " + name.get() + "!"; return Promise.asPromise(returnString); }
/** * Processes the file downloaded to the local host. * * @param localInputFileName name of the file to process * @param localOutputFileName name of the file to upload to S3 * @param hostName host processing the file */ @Asynchronous private Promise<Void> processFileOnHost(String localInputFileName, String localOutputFileName, Promise<String> hostName) { state = "Downloaded to " + hostName.get(); // Process the file using the local hosts SWF task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostName.get()); return fileClient.processFile(localInputFileName, localOutputFileName, options); }
/** * Upload the file to S3. * * @param outputBucketName S3 bucket to upload to * @param outputFilename S3 file to upload to * @param localFileName local file to upload * @param hostName host processing the file */ @Asynchronous private void upload(String outputBucketName, String outputFilename, String localFileName, Promise<String> hostName) { state = "Processed at " + hostName.get(); // Upload the file using the local hosts SWF task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostName.get()); storageClient.upload(outputBucketName, localFileName, outputFilename, options); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announceFinished(final String name, final Promise<?>... waitFor) { return this.announcer.announceFinished(name); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announceInjury(final String name, final Promise<?>... waitFor) { return this.announcer.announceInjury(name); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announceLap(final String name, final int lap, final Promise<?>... waitFor) { return this.announcer.announceLap(name, lap); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announceMissing(final String name, final Promise<?>... waitFor) { return this.announcer.announceMissing(name); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announcePlace(final String name, final Integer place, final Promise<?>... waitFor) { return this.announcer.announcePlace(name, place); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announceRace(final Promise<List<String>> horses, final Promise<Integer> laps, final Promise<?>... waitFor) { return this.announcer.announceRace(horses, laps); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> arriveGate(final String name, final Promise<?>... waitFor) { return this.horses.arriveGate(name); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Status> runLap(final String name, final int lapNum, final Promise<?>... waitFor) { return this.horses.runLap(name, lapNum); }
@Asynchronous private void processOrder(int originalAmount, Promise<?> waitFor) { int amount = originalAmount; if (signalReceived.isReady()) amount = signalReceived.get(); client.processOrder(amount); }
@Asynchronous public void processResults(Promise<Integer> result1, Promise<Integer> result2) { if (result1.get() + result2.get() > 5) { client.generateRandomNumber(); } }
/** * If some exception is thrown in doTry(), the doCatch method will be * executed. Otherwise, the doCatch method will be ignored */ @Asynchronous public void callActivityWithRetry() { final Settable<Throwable> failure = new Settable<Throwable>(); new TryCatchFinally() { protected void doTry() throws Throwable { client.unreliableActivity(); } /** * The code in doCatch is not cancellable. If we call method to * retry from doCatch, then in case of workflow cancellation there * will be no attempt to cancel the retried method. To ensure that * cancellation is always happening, the recursive retry is moved * out outside of TryCatch. */ protected void doCatch(Throwable e) { failure.set(e); } protected void doFinally() throws Throwable { if (!failure.isReady()) { failure.set(null); } } }; retryOnFailure(failure); }
@Asynchronous private void retryOnFailure(Promise<Throwable> failureP) { Throwable failure = failureP.get(); if (failure != null && shouldRetry(failure)) { callActivityWithRetry(); } }
@Asynchronous public void processRecords(int records, Promise<?>... waitFor) { if (records >= 1) { Promise<Void> nextWaitFor = client.processRecord(); processRecords(records - 1, nextWaitFor); } }
/** * This method will not be executed until all promises in the 'results' * argument are in the ready state. An alternative to using the {@link Wait} * annotation, is to convert the list of {@link Promise}s to an * {@link AndPromise} using {@link Promises.listOfPromisesToPromise}. */ @Asynchronous public Promise<Integer> joinBranches(@Wait List<Promise<Integer>> results) { int sum = 0; for (Promise<Integer> result : results) { sum += result.get(); } return Promise.asPromise(sum); }
@Asynchronous private Promise<Double> computeAverageDistributed(String inputFile, Promise<Integer> dataSize) { int chunkSize = dataSize.get() / numberOfWorkers; // Create an array list to hold the result returned by each worker List<Promise<Integer>> results = new ArrayList<Promise<Integer>>(); for (int chunkNumber = 0; chunkNumber < numberOfWorkers; chunkNumber++) { // Splitting computation for each chunk as separate activity results.add(client.computeSumForChunk(bucketName, inputFile, chunkNumber, chunkSize)); } // Merge phase return mergeSumAndComputeAverage(results, dataSize.get()); }
@Asynchronous private Promise<Double> mergeSumAndComputeAverage(@Wait List<Promise<Integer>> results, int dataSize){ int totalSum = 0; for(Promise<Integer> workerSum: results){ totalSum += workerSum.get(); } return Promise.asPromise((double) totalSum / (double) dataSize); }
@Asynchronous private Promise<Void> processFileOnHost(String fileToProcess, String fileToUpload, Promise<String> taskList) { state = "Downloaded to " + taskList.get(); // Call the activity to process the file using worker specific task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(taskList.get()); return processor.processFile(fileToProcess, fileToUpload, options); }
@Asynchronous private void upload(final String targetBucketName, final String targetFilename, final String localTargetFilename, Promise<String> taskList, Promise<Void> fileProcessed) { state = "Processed at " + taskList.get(); ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(taskList.get()); store.upload(targetBucketName, localTargetFilename, targetFilename, options); }
@Asynchronous private void assertResult(Promise<Void> done) { ArrayList<String> result = activitiesImpl.getResult(); Assert.assertEquals("automatedActivity", result.get(0)); Assert.assertEquals("humanActivity", result.get(1)); Assert.assertEquals("sendNotification:test", result.get(2)); }
@Asynchronous private void assertGreeting(Promise<List<String>> done) { List<String> results = done.get(); Assert.assertEquals(2, results.size()); Assert.assertEquals("result1", results.get(0)); Assert.assertEquals("result2", results.get(1)); }
@Asynchronous private Promise<Void> dumpDatabase( Promise<DatabaseInstance> database ) { MySQLDumpJob job = new MySQLDumpJob(); job.setDataArchive( request.getDestinationArchive() ); job.setDatabaseInstance( database.get() ); job.setIdentity( request.getIdentity() ); job.setMasterPassword( request.getDatabaseMasterPassword() ); RunJobRequest runJob = new RunJobRequest(); runJob.setJob( job ); runJob.setIdentity( request.getIdentity() ); runJob.setWorkerOptions( request.getWorkerOptions() ); return jobFlowFactory.getClient( workflowId + "-job" ).executeCommand( runJob ); }
@Asynchronous private Promise<Void> waitUntilDatabaseAvailable( Promise<String> databaseId, Promise<?>... waitFor ) { CheckAndWait check = new CheckAndWait(); check.setCheckType( CheckAndWait.Type.DATABASE_CREATION ); // Hardcoded 1 hour wait for now check.setExpireOn( contextProvider.getDecisionContext().getWorkflowClock().currentTimeMillis() + request.getSnapshotRestoreTimeoutSeconds() * 1000L ); check.setIdentity( request.getIdentity() ); check.setObjectName( databaseId.get() ); return waitFlowFactory.getClient( workflowId + "-restore-db" ).checkAndWait( check ); }
@Asynchronous private void continueOrExit( Promise<Boolean> done, CheckAndWait request ) throws TimeoutException { checks++; if ( done.get() ) { return; } long now = contextProvider.getDecisionContext().getWorkflowClock().currentTimeMillis(); if ( now > request.getExpireOn() ) { throw new TimeoutException( "Task " + request.getCheckType() + " didn't complete before " + new DateTime( request.getExpireOn(), DateTimeZone.UTC ) ); } Promise<Void> delay = contextProvider.getDecisionContext().getWorkflowClock().createTimer( request.getWaitIntervalSeconds() ); if ( checks >= request.getMaxChecksPerExecution() ) { new CheckWaitWorkflowSelfClientImpl().checkAndWait( request, delay ); return; } doCheckAndWait( request, delay ); }
@Asynchronous private void doCheckAndWait( CheckAndWait request, Promise<?>... waitFor ) throws TimeoutException { Promise<Boolean> successful; switch ( request.getCheckType() ) { case SNAPSHOT_CREATION: successful = isSnapshotAvailable( request.getObjectName(), request.getIdentity() ); break; case DATABASE_CREATION: successful = isDatabaseInstanceAvailable( request.getObjectName(), request.getIdentity() ); break; case WORKER_LAUNCH: successful = isEc2InstanceRunning( request.getObjectName(), request.getIdentity() ); break; default: throw new IllegalStateException( "Unexpected check type " + request.getCheckType() ); } continueOrExit( successful, request ); }
@Asynchronous private Promise<Boolean> isDatabaseInstanceAvailable( String instanceName, Identity identity ) { Promise<DatabaseInstance> instance = rdsActivities.describeInstance( instanceName, identity ); return isDatabaseInstanceAvailable( instance ); }
@Asynchronous private Promise<Boolean> isEc2InstanceRunning( String instanceId, Identity identity ) { Promise<WorkerInstance> instance = ec2Activities.describeInstance( instanceId, identity ); return isEc2InstanceRunning( instance ); }
@Asynchronous private Promise<Boolean> isSnapshotAvailable( String snapshotName, Identity identity ) { Promise<String> status = rdsActivities.getSnapshotStatus( snapshotName, identity ); return equals( "available", status ); }