@Override protected HistoryEvent computeNext() { if (!currentPage.hasNext() && !isLastPage) { DecisionTask decisionTask = nextDecisionTask(nextPageToken); nextPageToken = decisionTask.getNextPageToken(); isLastPage = this.nextPageToken == null; currentPage = Optional.ofNullable(decisionTask.getEvents()) .map(List::iterator).orElse(Collections.emptyIterator()); } if (currentPage.hasNext()) { return currentPage.next(); }else { endOfData(); return null; } }
@Test public void markerWorkflowTest() { //submit workflow Workflow<Integer, Integer> workflow = ImmutableSimpleMarkerWorkflow.builder() .taskList(config.taskList()) .executionStartToCloseTimeout(Duration.ofMinutes(5)) .taskStartToCloseTimeout(Duration.ofSeconds(30)) .childPolicy(TERMINATE) .description(Description.of("A Marker Example Workflow")) .dataConverter(config.dataConverter()).build(); workflowExecution = config.submit(workflow, 100); await().ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).atMost(3, TimeUnit.MINUTES).until(() -> getWorkflowExecutionResult(workflowExecution), is("201")); List<HistoryEvent> historyEvents = WorkflowExecutionUtils.getHistory(config.swf(), config.domain().value(), workflowExecution); List<Event> events = Event.fromHistoryEvents(historyEvents); List<Event> recordMarkerEvents = events.stream() .filter(event -> event.task() == TaskType.RECORD_MARKER).collect(toList()); assertThat("incorrect number of marker events", recordMarkerEvents, hasSize(1)); Event recordMarkerEvent = Iterables.getOnlyElement(recordMarkerEvents); assertThat(recordMarkerEvent.actionId(), is(SimpleMarkerWorkflow.MARKER_NAME)); assertThat(recordMarkerEvent.details(), is("101")); }
@Test public void timerWorkflowTest() { //submit workflow Workflow<Integer, Integer> workflow = ImmutableTimerWorkflow.builder() .taskList(config.taskList()) .executionStartToCloseTimeout(Duration.ofMinutes(5)) .taskStartToCloseTimeout(Duration.ofSeconds(30)) .childPolicy(TERMINATE) .description(Description.of("A Timer Example Workflow")) .dataConverter(config.dataConverter()).build(); workflowExecution = config.submit(workflow, 100); await().ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).atMost(3, TimeUnit.MINUTES).until(() -> getWorkflowExecutionResult(workflowExecution), is("201")); List<HistoryEvent> historyEvents = WorkflowExecutionUtils.getHistory(config.swf(), config.domain().value(), workflowExecution); List<Event> events = Event.fromHistoryEvents(historyEvents); List<Event> timerEvents = events.stream() .filter(event -> event.task() == TaskType.TIMER).collect(toList()); //TimerStarted & TimerFired assertThat("incorrect number of marker events", timerEvents, hasSize(2)); Optional<Event> timerEvent = timerEvents.stream().filter(e -> e.type() == EventType.TimerStarted).findFirst(); assertThat(timerEvent.get().actionId(), is(TimerWorkflow.TIMER_NAME)); assertThat(timerEvent.get().control(), is("101")); }
@Test public void multiplePages(@Mocked AmazonSimpleWorkflow swf) { PollForDecisionTaskRequest initialRequest = new PollForDecisionTaskRequest(); List<HistoryEvent> events = loadHistoryEvents("fixtures/simple_workflow.json"); int numberOfPages = 4; int sizePerPage = (int) Math.ceil( (double) events.size() / numberOfPages); List<List<HistoryEvent>> pages = Lists.partition(events, sizePerPage); assertThat("partitioned incorrectly", pages, hasSize(numberOfPages)); DecisionTask initialDecisionTask = new DecisionTask().withTaskToken("xyz") .withNextPageToken("1").withEvents(pages.get(0)); DecisionTaskIterator decisionTaskIterator = new DecisionTaskIterator(swf, initialRequest, initialDecisionTask); new Expectations() {{ swf.pollForDecisionTask(initialRequest);times=(numberOfPages-1); returns(new DecisionTask().withTaskToken("abc").withNextPageToken("2").withEvents(pages.get(1)), new DecisionTask().withTaskToken("def").withNextPageToken("3").withEvents(pages.get(2)), new DecisionTask().withTaskToken("ghi").withEvents(pages.get(3))); }}; assertThat(Lists.newArrayList(decisionTaskIterator), is(events)); }
@Test public void basicTest() { DecisionTask decisionTask = unmarshalDecisionTask(readFile("fixtures/simple_workflow.json")); DecisionTaskIterator decisionTaskIterator = new DecisionTaskIterator(null, null, decisionTask); List<HistoryEvent> events = Lists.newArrayList(decisionTaskIterator); assertThat(events, is(decisionTask.getEvents())); }
private static WorkflowActivity updateActivityEvent( long originalActivityId, String workflowId, WorkflowActivity.Status newStatus, HistoryEvent event, Map<String, WorkflowActivity> activityMap ) { WorkflowActivity activity = activityMap.get( workflowId + "/" + originalActivityId ); if ( activity == null ) { return null; } activity.setActivityStatus( newStatus ); if ( newStatus != WorkflowActivity.Status.RUNNING ) { activity.setCompleteDate( new DateTime( event.getEventTimestamp() ) ); } if ( newStatus == WorkflowActivity.Status.FAILED ) { activity.setErrorDetail( event.getActivityTaskFailedEventAttributes().getDetails() ); activity.setErrorReason( event.getActivityTaskFailedEventAttributes().getReason() ); } else if ( newStatus == WorkflowActivity.Status.COMPLETED ) { activity.setResult( event.getActivityTaskCompletedEventAttributes().getResult() ); } return activity; }
public static List<Event> fromHistoryEvents(List<HistoryEvent> historyEvents) { return historyEvents.stream() .map(h -> ImmutableEvent.builder().historyEvent(h).historyEvents(historyEvents).build()) .sorted().collect(toList()); }
public static List<HistoryEvent> parseHistoryEvents(String json) { List<HistoryEvent> historyEvents = Lists.newArrayList(); historyEvents.addAll(unmarshalDecisionTask(json).getEvents()); return historyEvents; }
public static List<HistoryEvent> loadHistoryEvents(String fileName) { return parseHistoryEvents(readFile(fileName)); }
public List<HistoryEvent> getWorkflowExecutionHistory(String workflowId, String runId) { return WorkflowExecutionUtils.getHistory(endpoint.getSWClient(), configuration.getDomainName(), new WorkflowExecution().withWorkflowId(workflowId).withRunId(runId)); }
/** * All history events are required since we need to find the previous events * they might refer to. */ @Value.Auxiliary public abstract List<HistoryEvent> historyEvents();
/** * Parse workflow history from a json-formatted string into a list of {@link Event} sorted in descending event id order. * <p/> * Note: json format is same as native format used by Amazon SWF responses. * * @param json json to parse */ public static List<Event> parseActionEvents(String json) { List<HistoryEvent> historyEvents = parseHistoryEvents(json); return historyEvents.stream().map(he -> ImmutableEvent.builder().historyEvent(he).historyEvents(historyEvents).build()).sorted().collect(Collectors.toList()); }
public abstract HistoryEvent historyEvent();