@Override protected Promise<String> deploySelf() { List<String> dataSources = new ArrayList<String>(); for (Database db : databases) { // It is safe to call Promise.get() here as deploySelf is called after // all components WebServer depends on are already deployed dataSources.add(db.getUrl().get()); } List<String> appServerUrls = new ArrayList<String>(); for (AppServer appServer : appServers) { // It is safe to call Promise.get() here as deploySelf is called after // all components WebServer depends on are already deployed appServerUrls.add(appServer.getUrl().get()); } // Use host name as taskList to route request to appropriate host ActivitySchedulingOptions options = new ActivitySchedulingOptions(); options.setTaskList(getHost()); return activities.deployWebServer(appServerUrls, dataSources, options); }
/** * Processes the file downloaded to the local host. * * @param localInputFileName name of the file to process * @param localOutputFileName name of the file to upload to S3 * @param hostName host processing the file */ @Asynchronous private Promise<Void> processFileOnHost(String localInputFileName, String localOutputFileName, Promise<String> hostName) { state = "Downloaded to " + hostName.get(); // Process the file using the local hosts SWF task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostName.get()); return fileClient.processFile(localInputFileName, localOutputFileName, options); }
/** * Upload the file to S3. * * @param outputBucketName S3 bucket to upload to * @param outputFilename S3 file to upload to * @param localFileName local file to upload * @param hostName host processing the file */ @Asynchronous private void upload(String outputBucketName, String outputFilename, String localFileName, Promise<String> hostName) { state = "Processed at " + hostName.get(); // Upload the file using the local hosts SWF task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostName.get()); storageClient.upload(outputBucketName, localFileName, outputFilename, options); }
@Override protected Promise<String> deploySelf() { List<String> dataSources = new ArrayList<String>(); for (Database db : databases) { // It is safe to call Promise.get() here as deploySelf is called after // all Databases AppServer depends on are already deployed dataSources.add(db.getUrl().get()); } // Use host name as taskList to route request to appropriate host ActivitySchedulingOptions options = new ActivitySchedulingOptions(); options.setTaskList(getHost()); return activities.deployAppServer(dataSources, options); }
@Override protected Promise<String> deploySelf() { List<String> urls = new ArrayList<String>(); for (WebServer webServer : webServers) { // It is safe to call Promise.get() here as deploySelf is called after // all components WebServer depends on are already deployed urls.add(webServer.getUrl().get()); } // Use host name as taskList to route request to appropriate host ActivitySchedulingOptions options = new ActivitySchedulingOptions(); options.setTaskList(getHost()); return activities.deployLoadBalancer(urls, options); }
@Override protected Promise<String> deploySelf() { // Use host name as taskList to route request to appropriate host ActivitySchedulingOptions options = new ActivitySchedulingOptions(); options.setTaskList(getHost()); return activities.deployDatabase(options); }
@Asynchronous private Promise<Void> processFileOnHost(String fileToProcess, String fileToUpload, Promise<String> taskList) { state = "Downloaded to " + taskList.get(); // Call the activity to process the file using worker specific task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(taskList.get()); return processor.processFile(fileToProcess, fileToUpload, options); }
@Asynchronous private void upload(final String targetBucketName, final String targetFilename, final String localTargetFilename, Promise<String> taskList, Promise<Void> fileProcessed) { state = "Processed at " + taskList.get(); ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(taskList.get()); store.upload(targetBucketName, localTargetFilename, targetFilename, options); }
@Override protected Promise<java.util.List<java.lang.String>> searchCluster1Impl(final Promise<java.lang.String> query, final ActivitySchedulingOptions optionsOverride, Promise<?>... waitFor) { DecisionContext context = contextProvider.getDecisionContext(); WorkflowClock clock = context.getWorkflowClock(); //start a 30 second timer Promise<Void> timerFired = clock.createTimer(30); //fail test if the timer fires shouldNotGetCalled(timerFired); // this Promise will never be ready return new Settable<List<String>>(); }
@Override protected Promise<java.util.List<java.lang.String>> searchCluster2Impl(final Promise<java.lang.String> query, final ActivitySchedulingOptions optionsOverride, Promise<?>... waitFor) { List<String> results = new ArrayList<String>(); results.add("result1"); results.add("result2"); return Promise.asPromise(results); }
/** * Process the file at inputBucketName.inputFileName. * Place the result at outputBucketName.outputFileName. * * @param inputBucketName input bucket to process from * @param inputFileName input file to process from * @param outputBucketName output bucket to put result to * @param outputFileName output file to put result to * @throws IOException */ @Override public void processFile(String inputBucketName, String inputFileName, String outputBucketName, String outputFileName) throws IOException { // Settable is a Promise implementation that exposes a "chain" method that allows you to // link the state of one Promise to that of another Settable<String> hostNameChain = new Settable<String>(); // Use prepend runId to input and output name way to avoid name collisions String localInputFileName = runId + "_" + inputFileName; String localOutputFileName = runId + "_" + outputFileName; // TryCatchFinally provides an asynchronous version of a typical synchronous try-catch-finally block // For example, if any task that is started in the doTry block throws an exception, // all tasks within the doTry block are cancelled and flow moves to the doCatch block. new TryCatchFinally() { /** * Download the file from S3, process it locally through a chained task, and upload back to S3. * @throws Throwable */ @Override protected void doTry() throws Throwable { // download from S3, returns the host that downloaded the file Promise<String> hostName = storageClient.download(inputBucketName, inputFileName, localInputFileName); // chaining is a way for one promise to get assigned the value of another // when the promise is complete, it's value will be available for subsequent operations hostNameChain.chain(hostName); // zip the file on the local host processFileOnHost(localInputFileName, localOutputFileName, hostName); // upload the zipped file back to S3 upload(outputBucketName, outputFileName, localOutputFileName, hostNameChain); } @Override protected void doCatch(Throwable e) throws Throwable { state = "Failed: " + e.getMessage(); throw e; } @Override protected void doFinally() throws Throwable { if (hostNameChain.isReady()) { // File was downloaded // Set option to schedule activity in worker specific task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(hostNameChain.get()); // Call deleteLocalFile activity using the host specific task list storageClient.deleteLocalFile(localInputFileName, options); storageClient.deleteLocalFile(localOutputFileName, options); } if (!state.startsWith("Failed:")) { state = "Completed"; } } }; }
public ActivitySchedulingOptions getActivitySchedulingOptions() { return activitySchedulingOptions; }
/** * Activity scheduling options */ public void setActivitySchedulingOptions(ActivitySchedulingOptions activitySchedulingOptions) { this.activitySchedulingOptions = activitySchedulingOptions; }
@Test public void testScheduleActivity() throws Exception { Object result = camelSWFActivityClient.scheduleActivity("eventName", "version", "input"); verify(activitiesClient).scheduleActivity(any(ActivityType.class), any(Promise[].class), isNull(ActivitySchedulingOptions.class), any(Class.class), isNull(Promise.class)); }
@Override public void processFile(final String sourceBucketName, final String sourceFilename, final String targetBucketName, final String targetFilename) throws IOException { // Settable to store the worker specific task list returned by the activity final Settable<String> taskList = new Settable<String>(); // Use runId as a way to ensure that downloaded files do not get name collisions String workflowRunId = workflowContext.getWorkflowExecution().getRunId(); File localSource = new File(sourceFilename); final String localSourceFilename = workflowRunId + "_" + localSource.getName(); File localTarget = new File(targetFilename); final String localTargetFilename = workflowRunId + "_" + localTarget.getName(); new TryCatchFinally() { @Override protected void doTry() throws Throwable { Promise<String> activityWorkerTaskList = store.download(sourceBucketName, sourceFilename, localSourceFilename); // chaining is a way for one promise get assigned value of another taskList.chain(activityWorkerTaskList); // Call processFile activity to zip the file Promise<Void> fileProcessed = processFileOnHost(localSourceFilename, localTargetFilename, activityWorkerTaskList); // Call upload activity to upload zipped file upload(targetBucketName, targetFilename, localTargetFilename, taskList, fileProcessed); } @Override protected void doCatch(Throwable e) throws Throwable { state = "Failed: " + e.getMessage(); throw e; } @Override protected void doFinally() throws Throwable { if (taskList.isReady()) { // File was downloaded // Set option to schedule activity in worker specific task list ActivitySchedulingOptions options = new ActivitySchedulingOptions().withTaskList(taskList.get()); // Call deleteLocalFile activity using the host specific task list store.deleteLocalFile(localSourceFilename, options); store.deleteLocalFile(localTargetFilename, options); } if (!state.startsWith("Failed:")) { state = "Completed"; } } }; }