public List<QueueMessage> getMessages() { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(_queueDns); List<Message> messages = _sqs.receiveMessage(receiveMessageRequest).getMessages(); List<QueueMessage> deserializedMessages = new ArrayList<>(); for (Message message : messages) { String body = message.getBody(); QueueMessage qm = _gson.fromJson(body, QueueMessage.class); deserializedMessages.add(qm); System.out.println("query time: " + qm.queryExecutionTime); System.out.println("exec time: " + qm.totalExecutionTime); System.out.println("Has ex: " + qm.hasException); System.out.println("ex message: " + qm.exceptionMessage + "\n"); String receiptHandle = message.getReceiptHandle(); _sqs.deleteMessage(new DeleteMessageRequest(_queueDns, receiptHandle)); } return deserializedMessages; }
public List<S3SNSNotification> parse(Message message) { List<S3SNSNotification> notifications = Lists.newArrayList(); try { SQSMessage envelope = om.readValue(message.getBody(), SQSMessage.class); if (envelope.message == null) { return Collections.emptyList(); } S3EventNotification s3EventNotification = S3EventNotification.parseJson(envelope.message); notifications.addAll(s3EventNotification.getRecords().stream().map(record -> new S3SNSNotification( message.getReceiptHandle(), record.getS3().getBucket().getName(), record.getS3().getObject().getUrlDecodedKey() )).collect(Collectors.toList())); } catch (Exception e) { LOG.error("Could not parse SNS notification: " + message.getBody(), e); throw new RuntimeException("Could not parse SNS notification: " + message.getBody(), e); } return notifications; }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessage() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(1) // .test() // .awaitDone(10, TimeUnit.SECONDS) // .assertComplete() // .assertValue("body1"); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessageAndHonoursBackpressure() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .test(0) // .requestMore(1) // .assertValue("body1")// .assertNotComplete() // .cancel(); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsNoMessagesThenSecondCallReturnsTwoMessages() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())).thenReturn(new ReceiveMessageResult()) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"), new Message().withBody("body2"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(2) // .test() // .awaitDone(10, TimeUnit.SECONDS) // .assertComplete() // .assertValues("body1", "body2"); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(2)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Override public List<Event> parseMessage(final Message message) { List<Event> events = Collections.emptyList(); try { MessageBody body = gson.fromJson(message.getBody(), MessageBody.class); final String json = body.getMessage(); log.debug("Parse message %s", json); if (StringUtils.isEmpty(json)) { log.warning("Message contains no text"); return Collections.emptyList(); } if (!json.startsWith("{") || !json.endsWith("}")) { log.warning("Message text is no JSON"); return Collections.emptyList(); } events = this.parseRecords(json); } catch (final com.google.gson.JsonSyntaxException e) { log.error("JSON syntax exception, cannot parse message: %s", e); } return events; }
private boolean handleMessage(final Message message) { log.debug("Parse and do match against events, message: %s", this.job, message.getBody()); final MessageParser parser = this.messageParserFactory.createParser(message); final List<Event> events = parser.parseMessage(message); boolean matched = this.eventTriggerMatcher.matches(events, this.sqsJob); String messageId = com.ribose.jenkins.plugin.awscodecommittrigger.utils.StringUtils.getMessageId(message); log.info("Any event matched? %s. Message: %s", this.job, matched, messageId); if (matched) { log.debug("Hurray! Execute it", this.job); //TODO use java8 lambda for this loop List<String> userarns = new ArrayList<>(); for (Event event : events) { userarns.add(event.getUser()); } this.execute(message, userarns); return true; } return false; }
private void processTask(Message message) { String path = message.getBody(); PathSplit pathComp = new PathSplit(path); String bucket = pathComp.bucket; String key = pathComp.key; Logger.Info("Processing %s %s", bucket, key); // Rekognition: Detect Labels from S3 object DetectLabelsRequest req = new DetectLabelsRequest() .withImage(new Image().withS3Object(new S3Object().withBucket(bucket).withName(key))) .withMinConfidence(minConfidence); DetectLabelsResult result; result = rek.detectLabels(req); List<Label> labels = result.getLabels(); Logger.Debug("In %s, found: %s", key, labels); // Process downstream actions: for (LabelProcessor processor : processors) { processor.process(labels, path); } }
@Override public List<ScanRangeTask> claimScanRangeTasks(int max, Duration ttl) { if (max == 0) { return ImmutableList.of(); } List<Message> messages = _sqs.receiveMessage(new ReceiveMessageRequest() .withQueueUrl(getQueueUrl(_pendingScanRangeQueue)) .withMaxNumberOfMessages(Math.min(max, 10)) // SQS cannot claim more than 10 messages .withVisibilityTimeout(toSeconds(ttl)) ).getMessages(); return FluentIterable.from(messages) .transform(new Function<Message, ScanRangeTask>() { @Override public ScanRangeTask apply(Message message) { QueueScanRangeTask task = JsonHelper.fromJson(message.getBody(), QueueScanRangeTask.class); task.setMessageId(message.getReceiptHandle()); return task; } }) .toList(); }
@Override public List<ScanRangeComplete> claimCompleteScanRanges(Duration ttl) { List<Message> messages = _sqs.receiveMessage(new ReceiveMessageRequest() .withQueueUrl(getQueueUrl(_completeScanRangeQueue)) .withMaxNumberOfMessages(10) .withVisibilityTimeout(toSeconds(ttl)) ).getMessages(); return FluentIterable.from(messages) .transform(new Function<Message, ScanRangeComplete>() { @Override public ScanRangeComplete apply(Message message) { QueueScanRangeComplete completion = JsonHelper.fromJson(message.getBody(), QueueScanRangeComplete.class); completion.setMessageId(message.getReceiptHandle()); return completion; } }) .toList(); }
private void pollMessages(AmazonSQS sqs) { log.info("Polling messages"); while (true) { List<Message> messages = sqs.receiveMessage(QUEUE_URL).getMessages(); messages.forEach(m -> { log.info("Message Received: " + m.getBody()); System.out.println(m.getBody()); DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(QUEUE_URL, m.getReceiptHandle()); sqs.deleteMessage(deleteMessageRequest); }); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } } }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessage() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(1) // .to(test()) // .awaitTerminalEvent() // .assertCompleted() // .assertValue("body1"); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessageAndHonoursBackpressure() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .to(testWithRequest(0)) // .requestMore(1) // .assertValue("body1")// .assertNotCompleted() // .unsubscribe(); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsNoMessagesThenSecondCallReturnsTwoMessages() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())).thenReturn(new ReceiveMessageResult()) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"), new Message().withBody("body2"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(2) // .to(test()) // .awaitTerminalEvent() // .assertCompleted() // .assertValues("body1", "body2"); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(2)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test public void defaultsToDisabled() throws Exception { this.mock.expectedMessageCount(1); this.mock.whenAnyExchangeReceived(new Processor() { @Override public void process(Exchange exchange) throws Exception { // Simulate message that takes a while to receive. Thread.sleep(TIMEOUT * 1500L); // 150% of TIMEOUT. } }); Message message = new Message(); message.setBody("Message 1"); message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee"); message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458"); message.setReceiptHandle(RECEIPT_HANDLE); this.clientMock.messages.add(message); assertMockEndpointsSatisfied(); // Wait for message to arrive. assertTrue("Expected no changeMessageVisibility requests.", this.clientMock.changeMessageVisibilityRequests.size() == 0); }
@Override protected JndiRegistry createRegistry() throws Exception { JndiRegistry registry = super.createRegistry(); AmazonSQSClientMock clientMock = new AmazonSQSClientMock(); // add 6 messages, one more we will poll for (int counter = 0; counter < 6; counter++) { Message message = new Message(); message.setBody("Message " + counter); message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee"); message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458"); message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5"); clientMock.messages.add(message); } registry.bind("amazonSQSClient", clientMock); return registry; }
@Override public SendMessageResult sendMessage(SendMessageRequest sendMessageRequest) throws AmazonServiceException, AmazonClientException { Message message = new Message(); message.setBody(sendMessageRequest.getMessageBody()); message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee"); message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458"); message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5zC9+4QMqJZ0DJ3gVOmjI2Gh/oFnb0IeJqy5Zc8kH4JX7GVpfjcEDjaAPSeOkXQZRcaBqt" + "4lOtyfj0kcclVV/zS7aenhfhX5Ixfgz/rHhsJwtCPPvTAdgQFGYrqaHly+etJiawiNPVc="); synchronized (messages) { messages.add(message); } SendMessageResult result = new SendMessageResult(); result.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458"); result.setMD5OfMessageBody("6a1559560f67c5e7a7d5d838bf0272ee"); return result; }
@Override public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonServiceException, AmazonClientException { Integer maxNumberOfMessages = receiveMessageRequest.getMaxNumberOfMessages() != null ? receiveMessageRequest.getMaxNumberOfMessages() : Integer.MAX_VALUE; ReceiveMessageResult result = new ReceiveMessageResult(); Collection<Message> resultMessages = new ArrayList<Message>(); synchronized (messages) { int fetchSize = 0; for (Iterator<Message> iterator = messages.iterator(); iterator.hasNext() && fetchSize < maxNumberOfMessages; fetchSize++) { Message rc = iterator.next(); resultMessages.add(rc); iterator.remove(); scheduleCancelInflight(receiveMessageRequest.getQueueUrl(), rc); } } result.setMessages(resultMessages); return result; }
private void scheduleCancelInflight(final String queueUrl, final Message message) { if (scheduler != null) { int visibility = getVisibilityForQueue(queueUrl); if (visibility > 0) { ScheduledFuture task = scheduler.schedule(new Runnable() { @Override public void run() { synchronized (messages) { // put it back! messages.add(message); } } }, visibility, TimeUnit.SECONDS); inFlight.put(message.getReceiptHandle(), task); } } }
public List<JobStatusNotification> pollMessageFromQueueByJobId(String queueUrl, String jobId) { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest() .withQueueUrl(queueUrl) .withMaxNumberOfMessages(MAX_NUMBER_OF_MESSAGES) .withVisibilityTimeout(VISIBILITY_TIMEOUT) .withWaitTimeSeconds(WAIT_TIME_SECONDS); List<JobStatusNotification> jobStatusNotifications = new ArrayList<>(); for (Message message : sqsClient.receiveMessage(receiveMessageRequest).getMessages()) { try { JobStatusNotification jobStatusNotification = parseMessage(message.getBody()); if (jobStatusNotification.getJobId().equalsIgnoreCase(jobId)) { jobStatusNotifications.add(jobStatusNotification); sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(message.getReceiptHandle())); } } catch (IOException e) { logger.error(e.getMessage(), e); } } return jobStatusNotifications; }
@SuppressWarnings("unchecked") public T peekMessage(int waitFor) throws Exception { // Receive messages logger.info("Trying to recieve message from: " + _queueName); ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(_queueURL); receiveMessageRequest.setMaxNumberOfMessages(1); receiveMessageRequest.setWaitTimeSeconds(waitFor); List<Message> messages = _sqs.receiveMessage(receiveMessageRequest).getMessages(); for (Message message : messages) { logger.info(" Got Message"); logger.info(" Body: " + message.getBody()); logger.info(" Handle: " + message.getReceiptHandle()); _lastMessage = message; GenericMessage msg = GenericMessage.fromXML(message.getBody()); if (!msg.type.equals(_msgClass.getName())) throw new Exception("Invalid message type recieved."); return (T) msg.body; } return null; }
/** * Delete a message from the SQS queue * * @param messageHandle * Message handle to delete * @return true if the delete was successful, otherwise false */ public boolean deleteMessage(@Nullable final Message message) { if (message == null) { return false; } try { LOGGER.debug("Deleting message from SQS: {}", message.getMessageId()); deleteRequests.inc(); sqs.deleteMessage(queueUrl, message.getReceiptHandle()); return true; } catch (Exception e) { LOGGER.error("Failed to delete message: " + message.getMessageId(), e); } return false; }
public <T> void registerReceiver(String queueName, SqsReceiver<T> receiver) { Optional<String> queueUrl = getUrlForQueue(queueName); if (queueUrl.isPresent()) { SqsReceiverHandler<T> handler = new SqsReceiverHandler<>( sqs, queueUrl.get(), receiver, new SqsBaseExceptionHandler() { // not replaced with lambda because jacoco fails with lambdas @Override public boolean onException(Message message, Exception exception) { LOGGER.error("Error processing received message - acknowledging it anyway"); return true; } } ); internalRegisterReceiver(queueName, handler); } else { LOGGER.error("Cannot register receiver for queue name : " + queueName); } }
@Test public void messageShouldBeProcessedAfterBeingConsumed() throws Exception { //GIVEN ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); Message message1 = new Message() .withMessageId("aaaa-bbbb-cccc-dddd-eeee") .withBody("Sample test message"); Message message2 = new Message() .withMessageId("ffff-gggg-hhhh-iiii-jjjj") .withBody("Another sample test message"); receiveMessageResult.setMessages(Lists.newArrayList(message1, message2)); when(sqs.receiveMessage((ReceiveMessageRequest) anyObject())).thenReturn(receiveMessageResult, new ReceiveMessageResult()); //WHEN receiverHandler.start(); //THEN Thread.sleep(1000); verify(receiver, times(2)).receive(any()); verify(receiver, times(1)).receive(message1); verify(receiver, times(1)).receive(message2); }
@Test public void sendAndReceiveMessage() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final int maxNumberOfMessages = 10; final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(maxNumberOfMessages); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); assertThat(messages.get(0).getBody(), equalTo(messageBody)); }
@Test public void testParse() throws Exception { final Message message = new Message() .withBody("{\n" + " \"Type\" : \"Notification\",\n" + " \"MessageId\" : \"55508fe9-870b-590c-960f-c34960b669f0\",\n" + " \"TopicArn\" : \"arn:aws:sns:eu-west-1:459220251735:cloudtrail-write\",\n" + " \"Message\" : \"{\\\"s3Bucket\\\":\\\"cloudtrailbucket\\\",\\\"s3ObjectKey\\\":[\\\"example/AWSLogs/459220251735/CloudTrail/eu-west-1/2014/09/27/459220251735_CloudTrail_eu-west-1_20140927T1625Z_UPwzr7ft2mf0Q1SS.json.gz\\\"]}\",\n" + " \"Timestamp\" : \"2014-09-27T16:27:41.258Z\",\n" + " \"SignatureVersion\" : \"1\",\n" + " \"Signature\" : \"O05joR97NvGHqMJQwsSNXzeSHrtbLqbRcqsXB7xmqARyaCGXjaVh2duwTUL93s4YvoNENnOEMzkILKI5PwmQQPha5/cmj6FSjblwRMMga6Xzf6cMnurT9TphQO7z35foHG49IejW05IkzIwD/DW0GvafJLah+fQI3EFySnShzXLFESGQuumdS8bxnM5r96ne8t+MEAHfBCVyQ/QrduO9tTtfXAz6OeWg1IEwV3TeZ5c5SS5vRxxhsD4hOJSmXAUM99CeQfcG9s7saBcvyyGPZrhPEh8S1uhiTmLvr6h1voM9vgiCbCCUujExvg+bnqsXWTZBmnatF1iOyxFfYcZ6kw==\",\n" + " \"SigningCertURL\" : \"https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-d6d679a1d18e95c2f9ffcf11f4f9e198.pem\",\n" + " \"UnsubscribeURL\" : \"https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:459220251735:cloudtrail-write:9a3a4e76-4173-4c8c-b488-0126315ba643\"\n" + "}"); CloudtrailSNSNotificationParser parser = new CloudtrailSNSNotificationParser(objectMapper); List<CloudtrailSNSNotification> notifications = parser.parse(message); assertEquals(1, notifications.size()); CloudtrailSNSNotification notification = notifications.get(0); assertEquals(notification.getS3Bucket(), "cloudtrailbucket"); assertEquals(notification.getS3ObjectKey(), "example/AWSLogs/459220251735/CloudTrail/eu-west-1/2014/09/27/459220251735_CloudTrail_eu-west-1_20140927T1625Z_UPwzr7ft2mf0Q1SS.json.gz"); }
@Test public void issue_44() throws Exception { // https://github.com/Graylog2/graylog-plugin-aws/issues/44 final Message message = new Message() .withBody("{\n" + " \"Type\" : \"Notification\",\n" + " \"MessageId\" : \"5b0a73e6-a4f8-11e7-8dfb-8f76310a10a8\",\n" + " \"TopicArn\" : \"arn:aws:sns:eu-west-1:123456789012:cloudtrail-log-write\",\n" + " \"Subject\" : \"[AWS Config:eu-west-1] AWS::RDS::DBSnapshot rds:instance-2017-09-03-23-11 Dele...\",\n" + " \"Message\" : \"{\\\"configurationItemDiff\\\":{\\\"changedProperties\\\":{\\\"Relationships.0\\\":{\\\"previousValue\\\":{\\\"resourceId\\\":\\\"vpc-12345678\\\",\\\"resourceName\\\":null,\\\"resourceType\\\":\\\"AWS::EC2::VPC\\\",\\\"name\\\":\\\"Is associated with Vpc\\\"},\\\"updatedValue\\\":null,\\\"changeType\\\":\\\"DELETE\\\"},\\\"SupplementaryConfiguration.Tags\\\":{\\\"previousValue\\\":[],\\\"updatedValue\\\":null,\\\"changeType\\\":\\\"DELETE\\\"},\\\"SupplementaryConfiguration.DBSnapshotAttributes\\\":{\\\"previousValue\\\":[{\\\"attributeName\\\":\\\"restore\\\",\\\"attributeValues\\\":[]}],\\\"updatedValue\\\":null,\\\"changeType\\\":\\\"DELETE\\\"},\\\"Configuration\\\":{\\\"previousValue\\\":{\\\"dBSnapshotIdentifier\\\":\\\"rds:instance-2017-09-03-23-11\\\",\\\"dBInstanceIdentifier\\\":\\\"instance\\\",\\\"snapshotCreateTime\\\":\\\"2017-09-03T23:11:38.218Z\\\",\\\"engine\\\":\\\"mysql\\\",\\\"allocatedStorage\\\":200,\\\"status\\\":\\\"available\\\",\\\"port\\\":3306,\\\"availabilityZone\\\":\\\"eu-west-1b\\\",\\\"vpcId\\\":\\\"vpc-12345678\\\",\\\"instanceCreateTime\\\":\\\"2015-04-09T07:08:07.476Z\\\",\\\"masterUsername\\\":\\\"root\\\",\\\"engineVersion\\\":\\\"5.6.34\\\",\\\"licenseModel\\\":\\\"general-public-license\\\",\\\"snapshotType\\\":\\\"automated\\\",\\\"iops\\\":null,\\\"optionGroupName\\\":\\\"default:mysql-5-6\\\",\\\"percentProgress\\\":100,\\\"sourceRegion\\\":null,\\\"sourceDBSnapshotIdentifier\\\":null,\\\"storageType\\\":\\\"standard\\\",\\\"tdeCredentialArn\\\":null,\\\"encrypted\\\":false,\\\"kmsKeyId\\\":null,\\\"dBSnapshotArn\\\":\\\"arn:aws:rds:eu-west-1:123456789012:snapshot:rds:instance-2017-09-03-23-11\\\",\\\"timezone\\\":null,\\\"iAMDatabaseAuthenticationEnabled\\\":false},\\\"updatedValue\\\":null,\\\"changeType\\\":\\\"DELETE\\\"}},\\\"changeType\\\":\\\"DELETE\\\"},\\\"configurationItem\\\":{\\\"relatedEvents\\\":[],\\\"relationships\\\":[],\\\"configuration\\\":null,\\\"supplementaryConfiguration\\\":{},\\\"tags\\\":{},\\\"configurationItemVersion\\\":\\\"1.2\\\",\\\"configurationItemCaptureTime\\\":\\\"2017-09-28T19:54:47.815Z\\\",\\\"configurationStateId\\\":1234567890123,\\\"awsAccountId\\\":\\\"123456789012\\\",\\\"configurationItemStatus\\\":\\\"ResourceDeleted\\\",\\\"resourceType\\\":\\\"AWS::RDS::DBSnapshot\\\",\\\"resourceId\\\":\\\"rds:instance-2017-09-03-23-11\\\",\\\"resourceName\\\":\\\"rds:instance-2017-09-03-23-11\\\",\\\"ARN\\\":\\\"arn:aws:rds:eu-west-1:123456789012:snapshot:rds:instance-2017-09-03-23-11\\\",\\\"awsRegion\\\":\\\"eu-west-1\\\",\\\"availabilityZone\\\":null,\\\"configurationStateMd5Hash\\\":\\\"b026324c6904b2a9cb4b88d6d61c81d1\\\",\\\"resourceCreationTime\\\":null},\\\"notificationCreationTime\\\":\\\"2017-09-28T19:54:48.311Z\\\",\\\"messageType\\\":\\\"ConfigurationItemChangeNotification\\\",\\\"recordVersion\\\":\\\"1.2\\\"}\",\n" + " \"Timestamp\" : \"2017-09-28T19:54:58.543Z\",\n" + " \"SignatureVersion\" : \"1\",\n" + " \"Signature\" : \"...\",\n" + " \"SigningCertURL\" : \"https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-....pem\",\n" + " \"UnsubscribeURL\" : \"https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:123456789012:cloudtrail-log-write:5b0a73e6-a4f8-11e7-8dfb-8f76310a10a8\"\n" + "}"); CloudtrailSNSNotificationParser parser = new CloudtrailSNSNotificationParser(objectMapper); List<CloudtrailSNSNotification> notifications = parser.parse(message); assertTrue(notifications.isEmpty()); }
/** * As long as there is at least one CloudTrail log object: * <p> * <li>Add the CloudTrail log object key to the list.</li> * <li>Add <code>accountId</code> extracted from log object key to <code>sqsMessage</code>.</li> * <li>Add {@link SourceType#CloudTrailLog} to the <code>sqsMessage</code>.</li> * </p> * * If there is no CloudTrail log object and it is a valid CloudTrail message, CPL adds only {@link SourceType#Other} * to the <code>sqsMessage</code>. * */ private void addCloudTrailLogsAndMessageAttributes(Message sqsMessage, List<CloudTrailLog> cloudTrailLogs, JsonNode messageNode) throws IOException { SourceType sourceType = SourceType.Other; String bucketName = messageNode.get(S3_BUCKET_NAME).textValue(); List<String> objectKeys = mapper.readValue(messageNode.get(S3_OBJECT_KEY).traverse(), new TypeReference<List<String>>() {}); for (String objectKey: objectKeys) { SourceType currSourceType = sourceIdentifier.identify(objectKey); if (currSourceType == SourceType.CloudTrailLog) { cloudTrailLogs.add(new CloudTrailLog(bucketName, objectKey)); sourceType = currSourceType; LibraryUtils.setMessageAccountId(sqsMessage, objectKey); } } sqsMessage.addAttributesEntry(SourceAttributeKeys.SOURCE_TYPE.getAttributeKey(), sourceType.name()); }
/** * As long as there is at least one CloudTrail log object: * <p> * <li>Add the CloudTrail log object key to the list.</li> * <li>Add <code>accountId</code> extracted from log object key to <code>sqsMessage</code>.</li> * <li>Add {@link SourceType#CloudTrailLog} to the <code>sqsMessage</code>.</li> * </p> * * If there is no CloudTrail log object and it is a valid S3 message, CPL adds only {@link SourceType#Other} * to the <code>sqsMessage</code>. * */ private void addCloudTrailLogsAndMessageAttributes(Message sqsMessage, JsonNode s3RecordsNode, List<CloudTrailLog> cloudTrailLogs) { SourceType sourceType = SourceType.Other; for (JsonNode s3Record: s3RecordsNode) { String bucketName = s3Record.at(S3_BUCKET_NAME).textValue(); String objectKey = s3Record.at(S3_OBJECT_KEY).textValue(); String eventName = s3Record.get(EVENT_NAME).textValue(); SourceType currSourceType = sourceIdentifier.identifyWithEventName(objectKey, eventName); if (currSourceType == SourceType.CloudTrailLog) { cloudTrailLogs.add(new CloudTrailLog(bucketName, objectKey)); sourceType = currSourceType; LibraryUtils.setMessageAccountId(sqsMessage, objectKey); } } sqsMessage.addAttributesEntry(SourceAttributeKeys.SOURCE_TYPE.getAttributeKey(), sourceType.name()); }
public static String popFrom(String name) { try { String queueUrl = getConnection().createQueue( new CreateQueueRequest(name)).getQueueUrl(); ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( queueUrl); receiveMessageRequest.setMaxNumberOfMessages(1); if (null != receiveMessageRequest) { List<Message> messages = getConnection().receiveMessage( receiveMessageRequest).getMessages(); if (messages.size() > 0) { String messageRecieptHandle = messages.get(0) .getReceiptHandle(); getConnection().deleteMessage( new DeleteMessageRequest(receiveMessageRequest .getQueueUrl(), messageRecieptHandle)); return messages.get(0).getBody(); } } } catch (Exception e) { e.printStackTrace(); } return null; }
private SQSMessageConsumerPrefetch.MessageManager createFifoMessageManager(String queueUrl, String groupId, String messageId, String receiptHandle) throws JMSException { Message message = new Message(); message.setBody("body"); message.setMessageId(messageId); message.setReceiptHandle(receiptHandle); Map<String, String> attributes = new HashMap<String, String>(); attributes.put(SQSMessagingClientConstants.SEQUENCE_NUMBER, "728374687246872364"); attributes.put(SQSMessagingClientConstants.MESSAGE_DEDUPLICATION_ID, messageId); attributes.put(SQSMessagingClientConstants.MESSAGE_GROUP_ID, groupId); attributes.put(SQSMessagingClientConstants.APPROXIMATE_RECEIVE_COUNT, "0"); message.setAttributes(attributes); SQSMessage sqsMessage = new SQSTextMessage(acknowledger, queueUrl, message); PrefetchManager prefetchManager = mock(PrefetchManager.class); when(prefetchManager.getMessageConsumer()) .thenReturn(consumer); SQSMessageConsumerPrefetch.MessageManager msgManager = new SQSMessageConsumerPrefetch.MessageManager(prefetchManager, sqsMessage); return msgManager; }
public static MessageQueueProcessor createQueueProcessor( AmazonSQS amazonSQS, String name, String queueUrl, String deadLetterQueueUrl, MessageHandler<Message> messageHandler ) { return new QueueProcessor( name, queueUrl, deadLetterQueueUrl, amazonSQS, messageHandler, null ); }
public static MessageQueueProcessor createQueueProcessor( AmazonSQS amazonSQS, String name, String queueUrl, String deadLetterQueueUrl, MessageHandler<Message> messageHandler, ExecutorService executorService ) { return new QueueProcessor( name, queueUrl, deadLetterQueueUrl, amazonSQS, messageHandler, executorService ); }
@Test public void shouldPassAllPolledMessagesToSpecifiedHandler() throws Exception { // Arrange Message msg1 = createMessage("msg1"); Message msg2 = createMessage("msg2"); receivedMessages.add(msg1); receivedMessages.add(msg2); // Act queueProcessor.poll(); // Assert verify(mockHandler).handle(msg1); verify(mockHandler).handle(msg2); verifyNoMoreInteractions(mockHandler); }
@Test public void pollAndDeleteMessageShouldWork() throws Exception { ReceiveMessageResult receiveMessageResult = mock(ReceiveMessageResult.class); Message message = mock(Message.class); when(message.getBody()).thenReturn("{}"); when(receiveMessageResult.getMessages()).thenReturn(Arrays.asList(message)); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); List<PolledMessage<TestMessage>> receivedMessages1 = queueServicePoller.poll(); assertThat(receivedMessages1).hasSize(1); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mock(ReceiveMessageResult.class)); queueServicePoller.delete(receivedMessages1.get(0)); List<PolledMessage<TestMessage>> receivedMessages2 = queueServicePoller.poll(); assertThat(receivedMessages2).isEmpty(); }
@Test public void deleteBatchMessagesShouldWork() throws Exception { ReceiveMessageResult receiveMessageResult = mock(ReceiveMessageResult.class); Message message = mock(Message.class); when(message.getBody()).thenReturn("{}"); when(receiveMessageResult.getMessages()).thenReturn(Arrays.asList(message, message)); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); List<PolledMessage<TestMessage>> receivedMessages1 = queueServicePoller.poll(); assertEquals(2, receivedMessages1.size()); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mock(ReceiveMessageResult.class)); queueServicePoller.delete(receivedMessages1.get(0)); queueServicePoller.delete(receivedMessages1.get(1)); List<PolledMessage<TestMessage>> receivedMessages2 = queueServicePoller.poll(); assertEquals(0, receivedMessages2.size()); }
@Override public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(receiveMessageRequest.getQueueUrl(), false); //make sure we have a default for max number of messages. int maxNumberOfMessages = Objects.firstNonNull(receiveMessageRequest.getMaxNumberOfMessages(), 10); //10 is amazon spec default //and a default visibility timeout int visibilityTimeout = Objects.firstNonNull(receiveMessageRequest.getVisibilityTimeout(), _defaultVisibilitySeconds); //also a wait time int waitTime = Objects.firstNonNull(receiveMessageRequest.getWaitTimeSeconds(), 0); if (waitTime < 0 || waitTime > 20) { throw new AmazonServiceException("wait time of " + waitTime + " is not between 0 and 20"); } try { List<Message> messageList = queue.receive(maxNumberOfMessages, visibilityTimeout, waitTime); return new ReceiveMessageResult().withMessages(messageList); } catch (IOException e) { throw new AmazonServiceException("error reading messages from " + queue.getQueuePath().toUri().toString(), e); } }
@Test public void testConvertMessageToEvent() { Message message = new Message(); message.setBody("{\n" + " \"Type\" : \"Notification\",\n" + " \"MessageId\" : \"942c624a-5f4f-5fe4-a2d7-28e60ff48e15\",\n" + " \"TopicArn\" : \"t\",\n" + " \"Subject\" : \"Operation\",\n" + " \"Message\" : \"{\\\"operation\\\":\\\"CREATE\\\",\\\"file\\\":\\\"f4\\\",\\\"originator\\\":\\\"me\\\"}\",\n" + " \"Timestamp\" : \"2013-09-12T23:30:03.434Z\",\n" + " \"SignatureVersion\" : \"1\",\n" + " \"Signature\" : \"n4tdSNZBDlaOgCS2ILxbhCGq3Cl/5LlG2SAh1OKUbat3qWzgtlD5PvgjJUq/heCb5Eo0KnnDYAJbeuiw7X9HmotCtq50OiqCtz6uq6EuApy1LhiBzOhyC5S4yarmQqGV0PBZCGLrvPn/So1HiVsFoBujJZNQuw0ysQQ/ILi6TFA=\",\n" + " \"SigningCertURL\" : \"cert\",\n" + " \"UnsubscribeURL\" : \"foo\"\n" + "}"); message.setReceiptHandle("receiptHandle"); message.setMessageId("messageId"); Event event = SQSUtils.convertMessageToEvent(baseDir, message); assertEquals("operation is incorrect", FileOperation.CREATE, event.getOperation()); assertEquals("originator is correct", "me", event.getOriginator()); assertEquals("file path is correct", "/foo/bar/f4", event.getFile().toPath().toString()); assertEquals("isDirectory is correct", false, event.isDirectory()); }
@Test public void testConvertMessageToEventIsDirPositive() { Message message = new Message(); message.setBody("{\n" + " \"Type\" : \"Notification\",\n" + " \"MessageId\" : \"942c624a-5f4f-5fe4-a2d7-28e60ff48e15\",\n" + " \"TopicArn\" : \"t\",\n" + " \"Subject\" : \"Operation\",\n" + " \"Message\" : \"{\\\"operation\\\":\\\"CREATE\\\",\\\"file\\\":\\\"dir/\\\",\\\"originator\\\":\\\"me\\\"}\",\n" + " \"Timestamp\" : \"2013-09-12T23:30:03.434Z\",\n" + " \"SignatureVersion\" : \"1\",\n" + " \"Signature\" : \"sig/sig=\",\n" + " \"SigningCertURL\" : \"cert\",\n" + " \"UnsubscribeURL\" : \"foo\"\n" + "}"); message.setReceiptHandle("receiptHandle"); message.setMessageId("messageId"); Event event = SQSUtils.convertMessageToEvent(baseDir, message); assertEquals("operation is incorrect", FileOperation.CREATE, event.getOperation()); assertEquals("originator is correct", "me", event.getOriginator()); assertEquals("file path is correct", "/foo/bar/dir", event.getFile().toPath().toString()); assertEquals("isDirectory is correct", true, event.isDirectory()); }
/** * Writes out logs to the given path as a separate JSON message per line. * * @param queue * @param path * @throws IOException */ public void writeLogs(String queue, Path path) throws IOException { FileSystem fs = FileSystem.get(path.toUri(), conf); DataOutputStream fout = fs.create(path); do { List<Message> messages = pull(queue, batchCount); if(messages.isEmpty()) { break; } for(Message m : messages) { fout.write((m.getBody().replaceAll("[\n|\r]", " ")+"\n").getBytes("UTF8")); } delete(queue, messages); } while(true); fout.close(); fs.close(); }