/** * @return decision of type {@link DecisionType#ScheduleActivityTask} */ @Builder.Factory public static Decision scheduleActivityTaskDecision( @Nonnull ActionId actionId, @Nonnull Name name, @Nonnull Version version, Optional<String> input, Optional<Control> control, Optional<TaskListName> taskListName, Optional<String> heartbeatTimeout, Optional<String> scheduleToCloseTimeout, Optional<String> scheduleToStartTimeout, Optional<String> startToCloseTimeout, Optional<Integer> taskPriority) { TaskList taskList = taskListName.map(TaskListName::value) .map(t -> new TaskList().withName(t)).orElse(null); String taskPriorityString = taskPriority.map(String::valueOf).orElse(null); return new Decision() .withDecisionType(DecisionType.ScheduleActivityTask) .withScheduleActivityTaskDecisionAttributes(new ScheduleActivityTaskDecisionAttributes() .withActivityType(new ActivityType() .withName(name.value()) .withVersion(version.value())) .withActivityId(actionId.value()) .withTaskList(taskList) .withInput(input.orElse(null)) .withControl(control.map(Control::value).orElse(null)) .withHeartbeatTimeout(heartbeatTimeout.orElse(null)) .withScheduleToCloseTimeout(scheduleToCloseTimeout.orElse(null)) .withScheduleToStartTimeout(scheduleToStartTimeout.orElse(null)) .withStartToCloseTimeout(startToCloseTimeout.orElse(null)) .withTaskPriority(taskPriorityString)); }
@Override public Iterable<ActivityType> getActivityTypesToRegister() { ArrayList<ActivityType> activityTypes = new ArrayList<ActivityType>(1); ActivityType activityType = new ActivityType(); activityType.setName(configuration.getEventName()); activityType.setVersion(configuration.getVersion()); activityTypes.add(activityType); return activityTypes; }
@Override public ActivityImplementation getActivityImplementation(ActivityType activityType) { ActivityTypeExecutionOptions activityTypeExecutionOptions = configuration.getActivityTypeExecutionOptions() != null ? configuration.getActivityTypeExecutionOptions() : new ActivityTypeExecutionOptions(); ActivityTypeRegistrationOptions activityTypeRegistrationOptions = configuration.getActivityTypeRegistrationOptions() != null ? configuration.getActivityTypeRegistrationOptions() : new ActivityTypeRegistrationOptions(); DataConverter dataConverter = configuration.getDataConverter() != null ? configuration.getDataConverter() : new JsonDataConverter(); return new CamelActivityImplementation(swfWorkflowConsumer, activityTypeRegistrationOptions, activityTypeExecutionOptions, dataConverter); }
public Object scheduleActivity(String eventName, String version, Object input) { ActivityType activity = new ActivityType(); activity.setName(eventName); activity.setVersion(version); Promise<?>[] promises = asPromiseArray(input); Promise<?> promise = dynamicActivitiesClient.scheduleActivity(activity, promises, configuration.getActivitySchedulingOptions(), Object.class, null); return promise; }
@Test(timeout = 2000) public void testPeriodic() { workflowTest.setClockAccelerationCoefficient(100); workflowTest.setDisableOutstandingTasksCheck(true); final PeriodicWorkflowClient workflow = workflowClientFactory.getClient(); final PeriodicWorkflowOptions options = new PeriodicWorkflowOptions(); options.setExecutionPeriodSeconds(10); options.setContinueAsNewAfterSeconds(30); options.setCompleteAfterSeconds(120); options.setWaitForActivityCompletion(true); final ActivityType activityType = new ActivityType(); activityType.setName("PeriodicWorkflowActivities.doSomeWork"); activityType.setVersion("1.0"); final Object[] parameters = new Object[] { "parameter1" }; new TryFinally() { @Override protected void doTry() throws Throwable { workflow.startPeriodicWorkflow(activityType, parameters, options); } @Override protected void doFinally() throws Throwable { Assert.assertEquals(120 / 10, periodicActivitiesImplementation.getWorkCount()); Assert.assertEquals(30 / 10, periodicActivitiesImplementation.getCurrentRunWorkCount()); Assert.assertNull(errorReportingActivitiesImplementation.getFailure()); } }; }
public static Key of(ActivityType activityType) { return Key.of(Name.of(activityType.getName()), Version.of(activityType.getVersion())); }
@Test public void testScheduleActivity() throws Exception { Object result = camelSWFActivityClient.scheduleActivity("eventName", "version", "input"); verify(activitiesClient).scheduleActivity(any(ActivityType.class), any(Promise[].class), isNull(ActivitySchedulingOptions.class), any(Class.class), isNull(Promise.class)); }
public ActivityType getActivity() { return activity; }
public void setActivity(ActivityType activity) { this.activity = activity; }
public static void main(String[] args) throws Exception { // Load configuration ConfigHelper configHelper = ConfigHelper.createConfig(); // Create the client for Simple Workflow Service swfService = configHelper.createSWFClient(); domain = configHelper.getDomain(); // Start Workflow execution PeriodicWorkflowClientExternalFactory clientFactory = new PeriodicWorkflowClientExternalFactoryImpl(swfService, domain); // Passing instance id to ensure that only one periodic workflow can be active at a time. // Use different id for each schedule. PeriodicWorkflowClientExternal workflow = clientFactory.getClient("periodic1"); // Execute activity every two 10 seconds, wait for it to complete before starting the new one, // create new run every 30 seconds and stop the workflow after two minutes. // Obviously these periods are so low to make example run fast enough to not be boring. // In production case there is no need to create new runs so frequently. PeriodicWorkflowOptions options = new PeriodicWorkflowOptions(); options.setExecutionPeriodSeconds(10); options.setContinueAsNewAfterSeconds(30); options.setCompleteAfterSeconds(120); options.setWaitForActivityCompletion(true); ActivityType activityType = new ActivityType(); activityType.setName("PeriodicWorkflowActivities.doSomeWork"); activityType.setVersion("1.0"); Object[] parameters = new Object[] { "parameter1" }; try { workflow.startPeriodicWorkflow(activityType, parameters, options); // WorkflowExecution is available after workflow creation WorkflowExecution workflowExecution = workflow.getWorkflowExecution(); System.out.println("Started periodic workflow with workflowId=\"" + workflowExecution.getWorkflowId() + "\" and runId=\"" + workflowExecution.getRunId() + "\""); } catch (WorkflowExecutionAlreadyStartedException e) { // It is expected to get this exception if start is called before workflow run is completed. System.out.println("Periodic workflow with workflowId=\"" + workflow.getWorkflowExecution().getWorkflowId() + " is already running"); } System.exit(0); }
@Test(timeout = 2000) public void testCronWithFailures() { workflowTest.setClockAccelerationCoefficient(SECONDS_HOUR * 24 * 7 * 2); final CronWithRetryWorkflowClient workflow = workflowClientFactory.getClient(); final ActivityType activityType = new ActivityType(); activityType.setName("CronWithRetryExampleActivities.doSomeWork"); activityType.setVersion("1.0"); final Object[] activityArguments = new Object[] { "parameter1" }; final CronWithRetryWorkflowOptions cronOptions = new CronWithRetryWorkflowOptions(); cronOptions.setActivity(activityType); cronOptions.setActivityArguments(activityArguments); cronOptions.setContinueAsNewAfterSeconds(SECONDS_HOUR * 24 + 300); cronOptions.setTimeZone("PST"); cronOptions.setInitialRetryIntervalSeconds(30); cronOptions.setMaximumRetryIntervalSeconds(600); cronOptions.setRetryExpirationIntervalSeconds(3500); final String cronExpression = "0 0 * * * *"; cronOptions.setCronExpression(cronExpression); WorkflowClock clock = workflowTest.getDecisionContext().getWorkflowClock(); clock.createTimer(SECONDS_HOUR * 24 * 7 + 1000); // true constructor argument makes TryCatchFinally a daemon which causes it get cancelled after above timer firing new TryCatchFinally(true) { Throwable failure; @Override protected void doTry() throws Throwable { workflow.startCron(cronOptions); } @Override protected void doCatch(Throwable e) throws Throwable { failure = e; throw e; } @Override protected void doFinally() throws Throwable { // Skip assertions as their failure masks original exception if (failure == null || failure instanceof CancellationException) { // Note the 2 activity invocations for each cron invocation as half of the // invocations failed and were retried. Assert.assertEquals(24 * 7 * 2, cronActivitiesImplementation.getWorkCount()); Assert.assertEquals(7, cronActivitiesImplementation.getRunCount()); } } }; }
@Test(timeout = 2000) public void testCron() { workflowTest.setClockAccelerationCoefficient(SECONDS_HOUR * 24 * 7 * 2); final CronWorkflowClient workflow = workflowClientFactory.getClient(); final ActivityType activityType = new ActivityType(); activityType.setName("CronExampleActivities.doSomeWork"); activityType.setVersion("1.0"); final Object[] activityArguments = new Object[] { "parameter1" }; final CronWorkflowOptions cronOptions = new CronWorkflowOptions(); cronOptions.setActivity(activityType); cronOptions.setActivityArguments(activityArguments); cronOptions.setContinueAsNewAfterSeconds(SECONDS_HOUR * 24 + 300); cronOptions.setTimeZone("PST"); final String cronExpression = "0 0 * * * *"; cronOptions.setCronExpression(cronExpression); WorkflowClock clock = workflowTest.getDecisionContext().getWorkflowClock(); clock.createTimer(SECONDS_HOUR * 24 * 7 + 1000); // true constructor argument makes TryCatchFinally a daemon which causes it get cancelled after above timer firing new TryCatchFinally(true) { Throwable failure; @Override protected void doTry() throws Throwable { workflow.startCron(cronOptions); } @Override protected void doCatch(Throwable e) throws Throwable { failure = e; throw e; } @Override protected void doFinally() throws Throwable { // Skip assertions as their failure masks original exception if (failure == null || failure instanceof CancellationException) { Assert.assertEquals(24 * 7, cronActivitiesImplementation.getWorkCount()); Assert.assertEquals(7, cronActivitiesImplementation.getRunCount()); } } }; }
/** * Start workflow that executes activity according to options. * * @param activity * activity type to execute * @param options * define the schedule of the execution. */ @Execute(name = "PeriodicWorkflow", version = "1.0") void startPeriodicWorkflow(ActivityType activity, Object[] activityArguments, PeriodicWorkflowOptions options);