@Override public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) { int nextShardId = 0; if (exclusiveStartShardId != null) { nextShardId = parseInt(exclusiveStartShardId) + 1; } boolean hasMoreShards = nextShardId + 1 < shardedData.size(); List<Shard> shards = new ArrayList<>(); if (nextShardId < shardedData.size()) { shards.add(new Shard().withShardId(Integer.toString(nextShardId))); } HttpResponse response = new HttpResponse(null, null); response.setStatusCode(200); DescribeStreamResult result = new DescribeStreamResult(); result.setSdkHttpMetadata(SdkHttpMetadata.from(response)); result.withStreamDescription( new StreamDescription() .withHasMoreShards(hasMoreShards) .withShards(shards) .withStreamName(streamName)); return result; }
@Test public void testShouldReturnTotalNumberOfFailedEvents() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final String totalNumberOfFailedEvents = RandomStringUtils.randomNumeric(4); final Map<String, String> attributes = new HashMap<>(); attributes.put(QueueAttributeName.ApproximateNumberOfMessages.name(), totalNumberOfFailedEvents); final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult(); getQueueAttributesResult.setSdkHttpMetadata(responseMetadata); getQueueAttributesResult.setAttributes(attributes); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult); assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(totalNumberOfFailedEvents)); }
@Test public void testShouldGetFailedResponseAfterSendingTheEvent() { final GetQueueUrlResult getQueueUrlResult = new GetQueueUrlResult(); getQueueUrlResult.setQueueUrl(randomAlphabetic(10)); final SendMessageResult sendMessageResult = new SendMessageResult(); final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(400); sendMessageResult.setSdkHttpMetadata(responseMetadata); when(amazonSQS.sendMessage(any(SendMessageRequest.class))).thenThrow(new RuntimeException("expected")); assertThatThrownBy(() -> sqsErrorHandler.onError(randomAlphabetic(10), new RuntimeException(), EventTypePartition.of(EventType.of(randomAlphabetic(10)), randomAlphabetic(1)), randomNumeric(10), randomAlphabetic(50))) .isInstanceOf(RuntimeException.class).hasMessageContaining("expected"); }
@Test public void testShouldSendEventToSQS() throws JsonProcessingException { final SendMessageResult sendMessageResult = new SendMessageResult(); final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); sendMessageResult.setSdkHttpMetadata(responseMetadata); when(amazonSQS.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResult); sqsErrorHandler.onError(randomAlphabetic(10), new RuntimeException(), EventTypePartition.of(EventType.of(randomAlphabetic(10)), randomAlphabetic(1)), randomNumeric(10), randomAlphabetic(50)); verify(objectMapper).writeValueAsString(anyString()); verify(amazonSQS).sendMessage(any(SendMessageRequest.class)); }
@SuppressWarnings("unchecked") private <T> T fillInResponseMetadata(AmazonWebServiceResponse<T> awsResponse, HttpResponse httpResponse) { final T result = awsResponse.getResult(); if (result instanceof AmazonWebServiceResult<?>) { ((AmazonWebServiceResult) result) .setSdkResponseMetadata(awsResponse.getResponseMetadata()) .setSdkHttpMetadata(SdkHttpMetadata.from(httpResponse)); } return result; }
@Test public void testShouldReturnDefaultTotalNumberOfFailedEvents() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult(); getQueueAttributesResult.setSdkHttpMetadata(responseMetadata); getQueueAttributesResult.setAttributes(new HashMap<>()); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult); assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(0L)); }
@Test public void testShouldReturnDefaultTotalNumberOfFailedEventsWhenThereIsNoQueueAttributes() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult(); getQueueAttributesResult.setSdkHttpMetadata(responseMetadata); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult); assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(0L)); }
@Test public void testShouldCommitTheMessageSuccessfully() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final DeleteMessageResult deleteMessageResult = new DeleteMessageResult(); deleteMessageResult.setSdkHttpMetadata(responseMetadata); when(amazonSQS.deleteMessage(anyString(), anyString())).thenReturn(deleteMessageResult); sqsFailedEventSource.commit(new SQSFailedEvent(new FailedEvent())); verify(amazonSQS).deleteMessage(anyString(), anyString()); }
/** * @return HTTP related metadata like headers and status code. */ public SdkHttpMetadata getSdkHttpMetadata() { return sdkHttpMetadata; }
public AmazonWebServiceResult<T> setSdkHttpMetadata(SdkHttpMetadata sdkHttpMetadata) { this.sdkHttpMetadata = sdkHttpMetadata; return this; }