protected Promise<?>[] asPromiseArray(Object input) { Promise<?>[] promises; if (input instanceof Object[]) { Object[] inputArray = (Object[])input; promises = new Promise[inputArray.length]; for (int i = 0; i < inputArray.length; i++) { promises[i] = Promise.asPromise(inputArray[i]); } } else { promises = new Promise[1]; if (input instanceof Promise) { promises[0] = (Promise<?>) input; } else { promises[0] = Promise.asPromise(input); } } return promises; }
/** * Announce (or not) a single horse's result. * * @param name * name of the horse. * * @param result * horse's result ("ok" or "injured") * * @param waitFor * anonymous dependencies * * @return promise to respond. * */ @Asynchronous private Promise<Void> announceHorseResult(final String name, final Promise<Status> result, final Promise<?>... waitFor) { final Promise<Void> rval; switch (result.get()) { case OK: if (this.nextPlace <= 3) { rval = announcePlace(name, this.nextPlace); this.nextPlace = this.nextPlace + 1; } else { rval = announceFinished(name); } break; case INJURY: rval = announceInjury(name); break; default: rval = announceMissing(name); break; } return rval; }
/** * Run all the horses in parallel. * * @param laps * number of laps to run. * * @param horses * horses to run. * * @param waitFor * anonymous dependencies. * * @return a list of promises to run, one for each horse. */ @Asynchronous private Promise<List<Promise<Void>>> runAll(final int laps, final Promise<List<String>> horses, final Promise<?>... waitFor) { final List<Promise<Void>> race = new ArrayList<>(horses.get().size()); for (final String name : horses.get()) { Promise<Status> horseRun = Promise.asPromise(Status.OK); for (int lapNum = 1; lapNum <= laps; lapNum = lapNum + 1) { horseRun = runLapIfOk(name, lapNum, horseRun); horseRun = announceLapIfOk(name, lapNum, horseRun); } final Promise<Void> done = announceHorseResult(name, horseRun); race.add(done); } return Promise.asPromise(race); }
/** * Run one lap if not injured. * * @param name * name of horse to run. * * @param lapNum * the lap number to run. * * @param prevStatus * previous lap run status. * * @param waitFor * anonymous dependencies. * * @return the result of running one lap or the previous lap result. */ @Asynchronous private Promise<Status> runLapIfOk(final String name, final int lapNum, final Promise<Status> prevStatus, final Promise<?>... waitFor) { if (prevStatus.get() == Status.OK) { /* * horse is ok, run it. */ return runLap(name, lapNum); } else { /* * something wrong, do not run. */ return prevStatus; } }
@Asynchronous public Promise<List<Void>> processBasketOrder(Promise<List<OrderChoice>> basketChoice) { List<OrderChoice> choices = basketChoice.get(); List<Promise<Void>> results = new ArrayList<Promise<Void>>(); /** * All activities that are started in this loop are executed in parallel * as their invocation from the switch case is non blocking. */ for (OrderChoice choice : choices) { Promise<Void> result = processSingleChoice(choice); results.add(result); } /** * listOfPromisesToPromise is an utility method that accepts a list of * promises and returns a single promise that becomes ready when all * promises of an input list are ready. */ return Promises.listOfPromisesToPromise(results); }
public Promise<Void> processSingleChoice(OrderChoice choice) { Promise<Void> result = null; switch (choice) { case APPLE: result = client.orderApple(); break; case ORANGE: result = client.orderOrange(); break; case LETTUCE: result = client.orderLettuce(); break; case CABBAGE: result = client.orderCabbage(); break; } return result; }
/** * chooses an activity to execute */ @Asynchronous public Promise<Void> processItemOrder(Promise<OrderChoice> itemChoice) { OrderChoice choice = itemChoice.get(); Promise<Void> result = null; switch (choice) { case APPLE: result = client.orderApple(); break; case ORANGE: result = client.orderOrange(); break; case LETTUCE: result = client.orderLettuce(); break; case CABBAGE: result = client.orderCabbage(); break; } return result; }
Promise<List<String>> searchOnCluster1(final String query) { final Settable<List<String>> result = new Settable<List<String>>(); branch1 = new TryCatch() { @Override protected void doTry() throws Throwable { Promise<List<String>> cluster1Result = client.searchCluster1(query); result.chain(cluster1Result); } @Override protected void doCatch(Throwable e) throws Throwable { if (!(e instanceof CancellationException)) { throw e; } } }; return result; }
Promise<List<String>> searchOnCluster2(final String query) { final Settable<List<String>> result = new Settable<List<String>>(); branch2 = new TryCatch() { @Override protected void doTry() throws Throwable { Promise<List<String>> cluster2Result = client.searchCluster2(query); result.chain(cluster2Result); } @Override protected void doCatch(Throwable e) throws Throwable { if (!(e instanceof CancellationException)) { throw e; } } }; return result; }
/** * The code in doCatch() is not cancellable and an attempt to cancel * TryCatch (for example by cancelling the workflow execution) will wait * until doCatch is complete. Since we want activities to be cancellable, * they are called from the "handleException" method which is outside the * TryCatch. * */ @Override public void startWorkflow() throws Throwable { final Settable<Throwable> exception = new Settable<Throwable>(); final Promise<Integer> resourceId = client.allocateResource(); new TryCatch() { @Override protected void doTry() throws Throwable { Promise<Void> waitFor = client.useResource(resourceId); setState(exception, null, waitFor); } @Override protected void doCatch(Throwable e) throws Throwable { setState(exception, e, Promise.Void()); } }; handleException(exception, resourceId); }
@Asynchronous public void handleException(Promise<Throwable> ex, Promise<Integer> resourceId) throws Throwable { Throwable e = ex.get(); if (e != null) { if (e instanceof ActivityTaskFailedException) { //get the original exception thrown from the activity Throwable inner = e.getCause(); //schedule different activities to handle different types of exception if (inner instanceof ResourceNoResponseException) { client.reportBadResource(resourceId.get()); } else if (inner instanceof ResourceNotAvailableException) { client.refreshResourceCatalog(resourceId.get()); } else { throw e; } } else { throw e; } } }
@Override protected Promise<String> deploySelf() { List<String> dataSources = new ArrayList<String>(); for (Database db : databases) { // It is safe to call Promise.get() here as deploySelf is called after // all components WebServer depends on are already deployed dataSources.add(db.getUrl().get()); } List<String> appServerUrls = new ArrayList<String>(); for (AppServer appServer : appServers) { // It is safe to call Promise.get() here as deploySelf is called after // all components WebServer depends on are already deployed appServerUrls.add(appServer.getUrl().get()); } // Use host name as taskList to route request to appropriate host ActivitySchedulingOptions options = new ActivitySchedulingOptions(); options.setTaskList(getHost()); return activities.deployWebServer(appServerUrls, dataSources, options); }
@Asynchronous public void callPeriodicActivity(long startTime, Promise<?>... waitFor) { long currentTime = clock.currentTimeMillis(); if ((currentTime - startTime) < options.getContinueAsNewAfterSeconds() * SECOND) { // Call activity using dynamic client. Return type is specified as Void as it is not used, but activity that // returns some other type can be called this way. Promise<Void> activityCompletion = activities.scheduleActivity(activityType, activityArguments, null, Void.class); if (!options.isWaitForActivityCompletion()) { // Promise.Void() returns already ready promise of type Void activityCompletion = Promise.Void(); } // Create a timer to re-run your periodic activity after activity completion, // but not earlier then after delay of executionPeriodSeconds. // However in real cron workflows, delay should be calculated everytime to run activity at // a predefined time. Promise<Void> timer = clock.createTimer(options.getExecutionPeriodSeconds()); callPeriodicActivity(startTime, timer, activityCompletion); } }
@Asynchronous public void assertTrace(Promise<Void> done) { List<String> obtainedTrace = activitiesImplementation.getTrace(); // Check if the first and last elements of the trace are in order Assert.assertEquals("multiOrdering", obtainedTrace.remove(0)); Assert.assertEquals("done", obtainedTrace.remove(obtainedTrace.size() - 1)); // Create the expected Trace with all the orders List<String> expectedTrace = new ArrayList<String>(); expectedTrace.add("orderApple"); expectedTrace.add("orderOrange"); // Compare the traces out of order allowing repeated orders // if present in expected trace. for (String traceElement : expectedTrace) { Assert.assertTrue(obtainedTrace.contains(traceElement)); obtainedTrace.remove(traceElement); } Assert.assertEquals(0, obtainedTrace.size()); }
@Asynchronous private Promise<String> getGreeting(Promise<String> name) { if(!name.isReady()) { System.out.println("Your weaving isn't working!"); } String returnString = "Hello " + name.get() + "!"; return Promise.asPromise(returnString); }
/** * Processes the file downloaded to the local host. * * @param localInputFileName name of the file to process * @param localOutputFileName name of the file to upload to S3 * @param hostName host processing the file */ @Asynchronous private Promise<Void> processFileOnHost(String localInputFileName, String localOutputFileName, Promise<String> hostName) { state = "Downloaded to " + hostName.get(); // Process the file using the local hosts SWF task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostName.get()); return fileClient.processFile(localInputFileName, localOutputFileName, options); }
/** * Upload the file to S3. * * @param outputBucketName S3 bucket to upload to * @param outputFilename S3 file to upload to * @param localFileName local file to upload * @param hostName host processing the file */ @Asynchronous private void upload(String outputBucketName, String outputFilename, String localFileName, Promise<String> hostName) { state = "Processed at " + hostName.get(); // Upload the file using the local hosts SWF task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostName.get()); storageClient.upload(outputBucketName, localFileName, outputFilename, options); }
public Object scheduleActivity(String eventName, String version, Object input) { ActivityType activity = new ActivityType(); activity.setName(eventName); activity.setVersion(version); Promise<?>[] promises = asPromiseArray(input); Promise<?> promise = dynamicActivitiesClient.scheduleActivity(activity, promises, configuration.getActivitySchedulingOptions(), Object.class, null); return promise; }
public static boolean isPromiseType(TypeMirror typeMirror) { if (typeMirror != null && !isVoidType(typeMirror)) { String fullName = typeMirror.toString(); if (fullName != null && !fullName.isEmpty()) { return fullName.startsWith(Promise.class.getName()); } } return false; }
@Asynchronous private void assertGreeting(Promise<List<String>> done) { List<String> results = done.get(); Assert.assertEquals(2, results.size()); Assert.assertEquals("result1", results.get(0)); Assert.assertEquals("result2", results.get(1)); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announceFinished(final String name, final Promise<?>... waitFor) { return this.announcer.announceFinished(name); }
@Test(expected = IllegalArgumentException.class) public void testHandleErrorWorkflowWithNonHandleableException() { activitiesImplementation = new TestResourceManagementActivities(true, false); workflowTest.addActivitiesImplementation(activitiesImplementation); final HandleErrorWorkflowClient workflowClient = handleErrorWorkflowfactory.getClient(); new TryCatchFinally() { @Override protected void doTry() throws Throwable { workflowClient.startWorkflow(); } @Override protected void doCatch(Throwable e) throws Throwable { if (e instanceof ChildWorkflowException && e.getCause() instanceof ActivityTaskFailedException) { throw e.getCause().getCause(); } else { throw e; } } @Override protected void doFinally() throws Throwable { List<String> expectedTrace = new ArrayList<String>(); expectedTrace.add("allocating"); expectedTrace.add("using"); AsyncAssert.assertEquals(expectedTrace, activitiesImplementation.getTrace(Promise.Void())); } }; }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announceLap(final String name, final int lap, final Promise<?>... waitFor) { return this.announcer.announceLap(name, lap); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announceMissing(final String name, final Promise<?>... waitFor) { return this.announcer.announceMissing(name); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announcePlace(final String name, final Integer place, final Promise<?>... waitFor) { return this.announcer.announcePlace(name, place); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> announceRace(final Promise<List<String>> horses, final Promise<Integer> laps, final Promise<?>... waitFor) { return this.announcer.announceRace(horses, laps); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Void> arriveGate(final String name, final Promise<?>... waitFor) { return this.horses.arriveGate(name); }
/** * Local wrapper for activities client invocation. This lets us specify * retry policy that handles failures from workers and from SWF itself. All * parameters passed directly to client and return whatever the client * returns. */ @ExponentialRetry(initialRetryIntervalSeconds = 2, maximumRetryIntervalSeconds = 30, maximumAttempts = 5) @Asynchronous private Promise<Status> runLap(final String name, final int lapNum, final Promise<?>... waitFor) { return this.horses.runLap(name, lapNum); }
@Override public void loop(int n) { if (n > 0) { Promise<Void> hasRun = Promise.Void(); for (int i = 0; i < n; i++) { hasRun = client.doNothing(hasRun); } } }
@Asynchronous private void processOrder(int originalAmount, Promise<?> waitFor) { int amount = originalAmount; if (signalReceived.isReady()) amount = signalReceived.get(); client.processOrder(amount); }
@Override public void runMultipleActivitiesConcurrently() { //running first activity Promise<Integer> result1 = client.generateRandomNumber(); //running second activity Promise<Integer> result2 = client.generateRandomNumber(); //join the results processResults(result1, result2); }
@Asynchronous public void processResults(Promise<Integer> result1, Promise<Integer> result2) { if (result1.get() + result2.get() > 5) { client.generateRandomNumber(); } }
@Asynchronous private void retryOnFailure(Promise<Throwable> failureP) { Throwable failure = failureP.get(); if (failure != null && shouldRetry(failure)) { callActivityWithRetry(); } }
@Test public void testHandleErrorWorkflowWithException() { activitiesImplementation = new TestResourceManagementActivities(true); workflowTest.addActivitiesImplementation(activitiesImplementation); CleanupResourceWorkflowClient workflowClient = cleanupResourceWorkflowfactory.getClient(); Promise<Void> done = workflowClient.startWorkflow(); expectedTrace.add("allocating"); expectedTrace.add("using"); expectedTrace.add("rollbackChanges"); expectedTrace.add("cleanUp"); AsyncAssert.assertEquals(expectedTrace, activitiesImplementation.getTrace(done)); }
@Asynchronous public void processRecords(int records, Promise<?>... waitFor) { if (records >= 1) { Promise<Void> nextWaitFor = client.processRecord(); processRecords(records - 1, nextWaitFor); } }
@Test public void testReserveNone() { BookingWorkflowClient workflow = workflowFactory.getClient(); Promise<Void> booked = workflow.makeBooking(123, 345, false, false); List<String> expected = new ArrayList<String>(); expected.add("sendConfirmation-345"); AsyncAssert.assertEqualsWaitFor(expected, trace, booked); }
@Test public void testReserveBoth() { BookingWorkflowClient workflow = workflowFactory.getClient(); Promise<Void> booked = workflow.makeBooking(123, 345, true, true); List<String> expected = new ArrayList<String>(); expected.add("reserveCar-123"); expected.add("reserveAirline-123"); expected.add("sendConfirmation-345"); AsyncAssert.assertEqualsWaitFor(expected, trace, booked); }
@Override public Promise<List<String>> search(final String query) { //start parallel branches to run same query on 2 clusters Promise<List<String>> branch1Result = searchOnCluster1(query); Promise<List<String>> branch2Result = searchOnCluster2(query); //branch1OrBranch2 will be ready when either branch completes OrPromise branch1OrBranch2 = new OrPromise(branch1Result, branch2Result); return processResults(branch1OrBranch2); }
@Test public void testHandleErrorWorkflowWithoutException() { activitiesImplementation = new TestResourceManagementActivities(false); workflowTest.addActivitiesImplementation(activitiesImplementation); CleanupResourceWorkflowClient workflowClient = cleanupResourceWorkflowfactory.getClient(); Promise<Void> done = workflowClient.startWorkflow(); expectedTrace.add("allocating"); expectedTrace.add("using"); expectedTrace.add("cleanUp"); AsyncAssert.assertEquals(expectedTrace, activitiesImplementation.getTrace(done)); }
@Override public void startWorkflow() { final Promise<Integer> resourceId = client.allocateResource(); /** * This TryCatchFinally is a sibling of allocateResource() activity. If * allocateResource throws an exception, TryCatchFinally will be * cancelled. The semantics of cancellation are as follows: If the doTry * method has not been executed when the exception was thrown then the * cancellation is immediate and none of doTry, doCatch or doFinally * will be executed. If doTry was already executed then all outstanding * tasks created in doTry will be cancelled, doCatch will be called with * CancellationException and then doFinally will be called. */ new TryCatchFinally() { @Override protected void doTry() throws Throwable { client.useResource(resourceId); } @Override protected void doCatch(Throwable e) throws Throwable { //this cannot be canceled client.rollbackChanges(resourceId); } // doFinally() will be executed whether an exception is thrown or not @Override protected void doFinally() throws Throwable { // make sure that the action of clean up will be executed if (resourceId.isReady()) { client.cleanUpResource(resourceId); } } }; }