/** * Helper method that parses a JSON object from a resource on the classpath * as an instance of the provided type. * * @param resource * the path to the resource (relative to this class) * @param clazz * the type to parse the JSON into */ public static <T> T parse(String resource, Class<T> clazz) throws IOException { InputStream stream = TestUtils.class.getResourceAsStream(resource); try { if (clazz == S3Event.class) { String json = IOUtils.toString(stream); S3EventNotification event = S3EventNotification.parseJson(json); @SuppressWarnings("unchecked") T result = (T) new S3Event(event.getRecords()); return result; } else if (clazz == SNSEvent.class) { return snsEventMapper.readValue(stream, clazz); } else if (clazz == DynamodbEvent.class) { return dynamodbEventMapper.readValue(stream, clazz); } else { return mapper.readValue(stream, clazz); } } finally { stream.close(); } }
public List<S3SNSNotification> parse(Message message) { List<S3SNSNotification> notifications = Lists.newArrayList(); try { SQSMessage envelope = om.readValue(message.getBody(), SQSMessage.class); if (envelope.message == null) { return Collections.emptyList(); } S3EventNotification s3EventNotification = S3EventNotification.parseJson(envelope.message); notifications.addAll(s3EventNotification.getRecords().stream().map(record -> new S3SNSNotification( message.getReceiptHandle(), record.getS3().getBucket().getName(), record.getS3().getObject().getUrlDecodedKey() )).collect(Collectors.toList())); } catch (Exception e) { LOG.error("Could not parse SNS notification: " + message.getBody(), e); throw new RuntimeException("Could not parse SNS notification: " + message.getBody(), e); } return notifications; }
@SuppressWarnings("unchecked") @Test public void testS3EventViaSNSEventHandlerSuccessfully() throws Exception { InputStream input = this.getClass().getClassLoader().getResourceAsStream(S3_VIA_SNS_REQUEST_JSON_TEMPLATE); new Expectations(S3EventAction.class) { { new S3EventAction().handle((EventActionRequest<S3EventNotification>)any, (Context)any); times = 1; } }; Optional<S3EventResult> result = executor.apply(readJson(input), mockContext); assertEquals(1, result.get().getSuccessItems().size()); assertEquals(0, result.get().getFailureItems().size()); assertEquals(0, result.get().getSkippedItems().size()); }
@SuppressWarnings("unchecked") @Test public void testS3EventViaSNSEventHandlerSuccessfully() throws Exception { InputStream input = this.getClass().getClassLoader().getResourceAsStream(S3_REQUEST_JSON_TEMPLATE); new Expectations(S3EventAction.class) { { new S3EventAction().handle((EventActionRequest<S3EventNotification>)any, (Context)any); times = 1; } }; S3EventResult result = handler.handleRequest(supplyEvent(input), mockContext); assertEquals(1, result.getSuccessItems().size()); assertEquals(0, result.getFailureItems().size()); assertEquals(0, result.getSkippedItems().size()); }
@Override public S3EventNotification getTestEvent() throws Exception { /* * Upload a test resoruce to the mock S3 */ String payload = IOUtils.toString( new InputStreamReader(this.getClass().getResourceAsStream("basic_input.log"), "UTF-8")); this.client.putObject(S3_BUCKET, "basic_input.log", payload); /* * Create a S3EventNotification event */ S3ObjectEntity objEntity = new S3ObjectEntity("basic_input.log", 1L, null, null); S3BucketEntity bucketEntity = new S3BucketEntity(S3_BUCKET, null, null); S3Entity entity = new S3Entity(null, bucketEntity, objEntity, null); S3EventNotificationRecord rec = new S3EventNotificationRecord(null, null, null, "1970-01-01T00:00:00.000Z", null, null, null, entity, null); List<S3EventNotificationRecord> notifications = new ArrayList<S3EventNotificationRecord>(2); notifications.add(rec); return new S3EventNotification(notifications); }
private S3EventNotification getTestEvent(String bucket, boolean doPut) throws Exception { /* * Upload a test resoruce to the mock S3 */ if (doPut) { String payload = IOUtils.toString( new InputStreamReader(this.getClass().getResourceAsStream("basic_input.log"), "UTF-8")); this.client.putObject(bucket, "basic_input.log", payload); } /* * Create a S3EventNotification event */ S3ObjectEntity objEntity = new S3ObjectEntity("basic_input.log", 1L, null, null); S3BucketEntity bucketEntity = new S3BucketEntity(bucket, null, null); S3Entity entity = new S3Entity(null, bucketEntity, objEntity, null); S3EventNotificationRecord rec = new S3EventNotificationRecord(null, null, null, "1970-01-01T00:00:00.000Z", null, null, null, entity, null); List<S3EventNotificationRecord> notifications = new ArrayList<S3EventNotificationRecord>(2); notifications.add(rec); return new S3EventNotification(notifications); }
@Test public void testS3MessageS3FileNoExists() throws Exception { setLogLevel(UploadDownloadHelperServiceImpl.class, LogLevel.OFF); uploadDownloadServiceTestHelper.createDatabaseEntitiesForUploadDownloadTesting(); UploadSingleInitiationResponse resultUploadSingleInitiationResponse = uploadDownloadService.initiateUploadSingle(uploadDownloadServiceTestHelper .createUploadSingleInitiationRequest(NAMESPACE, BDEF_NAME, FORMAT_USAGE_CODE, FORMAT_FILE_TYPE_CODE, FORMAT_VERSION, NAMESPACE, BDEF_NAME_2, FORMAT_USAGE_CODE_2, FORMAT_FILE_TYPE_CODE_2, FORMAT_VERSION_2, TARGET_S3_KEY)); String filePath = resultUploadSingleInitiationResponse.getSourceBusinessObjectData().getStorageUnits().get(0).getStorageFiles().get(0).getFilePath(); S3Entity s3Entity = new S3Entity(null, null, new S3ObjectEntity(filePath, 0L, null, null), null); List<S3EventNotificationRecord> records = new ArrayList<>(); records.add(new S3EventNotificationRecord(null, null, null, null, null, null, null, s3Entity, null)); S3EventNotification s3EventNotification = new S3EventNotification(records); setLogLevel(UploadDownloadServiceImpl.class, LogLevel.OFF); setLogLevel(HerdJmsMessageListener.class, LogLevel.OFF); // Try to process an S3 JMS message, when source S3 file does not exist. herdJmsMessageListener.processMessage(jsonHelper.objectToJson(s3EventNotification), null); }
/** * Helper method that parses a JSON object from a resource on the classpath * as an instance of the provided type. * * @param resource the path to the resource (relative to this class) * @param clazz the type to parse the JSON into */ public static <T> T parse(String resource, Class<T> clazz) throws IOException { InputStream stream = TestUtils.class.getResourceAsStream(resource); try { if (clazz == S3Event.class) { String json = IOUtils.toString(stream); S3EventNotification event = S3EventNotification.parseJson(json); @SuppressWarnings("unchecked") T result = (T) new S3Event(event.getRecords()); return result; } else { return mapper.readValue(stream, clazz); } } finally { stream.close(); } }
@Override public Void handleRequest(S3Event s3Event, Context context) { Collection<Partition> requiredPartitions = new HashSet<>(); TableService tableService = new TableService(); for (S3EventNotification.S3EventNotificationRecord record : s3Event.getRecords()) { String bucket = record.getS3().getBucket().getName(); String key = record.getS3().getObject().getKey(); System.out.printf("S3 event [Event: %s, Bucket: %s, Key: %s]%n", record.getEventName(), bucket, key); S3Object s3Object = new S3Object(bucket, key); if (s3Object.hasDateTimeKey()) { requiredPartitions.add(partitionConfig.createPartitionFor(s3Object)); } } if (!requiredPartitions.isEmpty()) { Collection<Partition> missingPartitions = determineMissingPartitions( partitionConfig.tableName(), requiredPartitions, tableService); tableService.addPartitions(partitionConfig.tableName(), missingPartitions); } return null; }
@Override public Void handleRequest(S3Event s3Event, Context context) { Collection<Partition> partitionsToRemove = new HashSet<>(); TableService tableService = new TableService(); for (S3EventNotification.S3EventNotificationRecord record : s3Event.getRecords()) { String bucket = record.getS3().getBucket().getName(); String key = record.getS3().getObject().getKey(); System.out.printf("S3 event [Event: %s, Bucket: %s, Key: %s]%n", record.getEventName(), bucket, key); S3Object s3Object = new S3Object(bucket, key); if (s3Object.hasDateTimeKey()) { partitionsToRemove.add(partitionConfig.createPartitionFor(s3Object)); } } if (!partitionsToRemove.isEmpty()) { tableService.removePartitions( partitionConfig.tableName(), partitionsToRemove.stream().map(Partition::spec).collect(Collectors.toList())); } return null; }
@Override public Void handleRequest(S3Event s3Event, Context context){ Collection<Partition>requiredPartitions = new HashSet<>(); TableService tableService = new TableService(); DynamoDB dynamoDBClient=new DynamoDB(new AmazonDynamoDBClient(new EnvironmentVariableCredentialsProvider())); for(S3EventNotification.S3EventNotificationRecord record:s3Event.getRecords()){ String bucket=record.getS3().getBucket().getName(); String key=record.getS3().getObject().getKey(); System.out.printf("S3event[Event:%s,Bucket:%s,Key:%s]%n",record.getEventName(),bucket,key); S3Object s3Object=new S3Object(bucket,key); if(s3Object.hasDateTimeKey()){ Partition partition = partitionConfig.createPartitionFor(s3Object); //Check if the partition exists in DynamoDBtable, if not add the partition details to the table, skip otherwise if (tryAddMissingPartition(partitionConfig.dynamoDBTableName(), dynamoDBClient, partition)) { requiredPartitions.add(partition); } } } if(!requiredPartitions.isEmpty()){ tableService.addPartitions(partitionConfig.tableName(),requiredPartitions, true); } return null; }
@Override protected S3EventResult handleEvent(S3EventNotification event, Context context) { AwsEventRequest request = readEvent(event); S3EventResult result = new S3EventResult(); AwsEventResponse res = actionRouterHandle(request, context); if (res.isSuccessful()) { result.addSuccessItem(request); } else { logger.error("Failed processing S3Event", res.getCause()); result.addFailureItem(request); } return result; }
@Test public void testResolveS3EventViaSNSEventSuccessfully() throws Exception { InputStream input = this.getClass().getClassLoader().getResourceAsStream(S3_VIA_SNS_REQUEST_JSON_TEMPLATE); Optional<S3EventNotification> event = executor.resolve(readJson(input)); assertEquals(1, event.get().getRecords().size()); assertEquals("aws:s3", event.get().getRecords().get(0).getEventSource()); assertEquals("us-east-1", event.get().getRecords().get(0).getAwsRegion()); assertEquals("dev-nsmg-logs-temp", event.get().getRecords().get(0).getS3().getBucket().getName()); assertEquals("transformed_sample_logs.json", event.get().getRecords().get(0).getS3().getObject().getKey()); }
private S3EventNotification supplyEvent(InputStream input) { try { return om.readValue(input, S3EventNotification.class); } catch (IOException e) { throw new UncheckedIOException(e); } }
@Override public Handler<S3EventNotification> getHandler() { S3Handler handler = new S3Handler(); handler.s3ClientFactory = this.clientFactory; return handler; }
@Override public void handler(SNSEvent event, Context context) throws HandlerException { if (!initialized) { init(context); } this.source = this.sources.get(0); this.inputFiles = new ArrayList<String>(0); for (SNSRecord record : event.getRecords()) { /* * Parse SNS as a S3 notification */ String json = record.getSNS().getMessage(); S3EventNotification s3Event = S3EventNotification.parseJson(json); /* * Validate the S3 file matches the regex */ List<S3EventNotificationRecord> toProcess = new ArrayList<S3EventNotificationRecord>(s3Event.getRecords()); for (S3EventNotificationRecord s3Record : s3Event.getRecords()) { String s3Path = String.format("s3://%s/%s", s3Record.getS3().getBucket().getName(), s3Record.getS3().getObject().getKey()); try { this.source = SourceUtils.getSource(s3Path, this.sources); } catch (SourceNotFoundException e) { logger.warn("skipping processing " + s3Path); toProcess.remove(s3Record); } } if (toProcess.size() == 0) { logger.warn("Nothing to process"); return; } this.inputFiles.addAll(toProcess.stream().map(m -> { return m.getS3().getObject().getKey(); }).collect(Collectors.toList())); this.recordIterator = new S3EventIterator(context, toProcess, s3ClientFactory); super.process(context); } }
public void handler(S3EventNotification event, Context context) throws HandlerException { if (!initialized) { init(context); } /* * Validate the S3 file matches the regex */ List<S3EventNotificationRecord> toProcess = new ArrayList<S3EventNotificationRecord>(event.getRecords()); for (S3EventNotificationRecord record : event.getRecords()) { String s3Path = String.format("s3://%s/%s", record.getS3().getBucket().getName(), record.getS3().getObject().getKey()); try { this.source = SourceUtils.getSource(s3Path, this.sources); } catch (SourceNotFoundException e) { logger.warn("Skipping processing " + s3Path); toProcess.remove(record); } } if (toProcess.size() == 0) { logger.warn("Nothing to process"); return; } this.recordIterator = new S3EventIterator(context, toProcess, s3ClientFactory); super.process(context); }
@Test public void testSourceRegexFail() throws Throwable { BaseHandler.CONFIG_FILE = "/com/nextdoor/bender/handler/config_s3_source.json"; TestContext ctx = new TestContext(); ctx.setFunctionName("unittest"); ctx.setInvokedFunctionArn("arn:aws:lambda:us-east-1:123:function:test-function:staging"); BaseHandler<S3EventNotification> handler = (BaseHandler) getHandler(); handler.init(ctx); handler.handler(getTestEvent("foo", false), ctx); assertEquals(0, DummyTransportHelper.BufferedTransporter.output.size()); }
@Test public void testSourceRegex() throws Throwable { BaseHandler.CONFIG_FILE = "/com/nextdoor/bender/handler/config_s3_source.json"; TestContext ctx = new TestContext(); ctx.setFunctionName("unittest"); ctx.setInvokedFunctionArn("arn:aws:lambda:us-east-1:123:function:test-function:staging"); BaseHandler<S3EventNotification> handler = (BaseHandler) getHandler(); handler.init(ctx); handler.handler(getTestEvent(), ctx); assertEquals(1, DummyTransportHelper.BufferedTransporter.output.size()); }
public static S3EventNotification getS3Notification(String key, String bucket, long size) { S3ObjectEntity objEntity = new S3ObjectEntity(key, size, null, null); S3BucketEntity bucketEntity = new S3BucketEntity(bucket, null, null); S3Entity entity = new S3Entity(null, bucketEntity, objEntity, null); String timestamp = formatter.print(System.currentTimeMillis()); S3EventNotificationRecord rec = new S3EventNotificationRecord(null, null, null, timestamp, null, null, null, entity, null); List<S3EventNotificationRecord> notifications = new ArrayList<S3EventNotificationRecord>(1); notifications.add(rec); return new S3EventNotification(notifications); }
@Before public void before() { initMocks(this); handler = new CloudFrontLogEventHandler(); String arn = System.getProperty("arn"); String bucketName = System.getProperty("bucketName"); String logKey = System.getProperty("logKey"); Preconditions.checkNotNull(arn, "You must pass the arn for this lambda to run in a mocked manner, the arn is used to get the env name ex: arn:aws:lambda:us-west-2:1111111:function:dev-gateway-db2599d1-6d86-LambdaWAFBlacklistingFun-1LSORI5GUP95H"); Preconditions.checkNotNull(bucketName, "You must supply a bucket for the lambda to read a log from"); Preconditions.checkNotNull(logKey, "You must supply a key to a log for the lambda to read"); List<S3EventNotification.S3EventNotificationRecord> records = Lists.newArrayList(); S3EventNotification.S3EventNotificationRecord record = mock(S3EventNotification.S3EventNotificationRecord.class); records.add(record); when(event.getRecords()).thenReturn(records); S3EventNotification.S3Entity s3Entity = mock(S3EventNotification.S3Entity.class); S3EventNotification.S3BucketEntity bucketEntity = mock(S3EventNotification.S3BucketEntity.class); S3EventNotification.S3ObjectEntity objectEntity = mock(S3EventNotification.S3ObjectEntity.class); when(s3Entity.getBucket()).thenReturn(bucketEntity); when(s3Entity.getObject()).thenReturn(objectEntity); when(record.getS3()).thenReturn(s3Entity); when(context.getInvokedFunctionArn()).thenReturn(arn); when(bucketEntity.getName()).thenReturn(bucketName); when(objectEntity.getKey()).thenReturn(logKey); }
@Test public void testThatHandleEventCallsProcessEventsOnTheProcessors() throws IOException { String bucketName = "bucketname"; String arn = "foo"; Processor processor = mock(Processor.class); List<Processor> processors = Lists.newLinkedList(); processors.add(processor); handler.overrideProcessors(processors); CloudFrontLogHandlerConfig params = new CloudFrontLogHandlerConfig(); doReturn(params).when(handler).getConfiguration(arn); Context context = mock(Context.class); when(context.getInvokedFunctionArn()).thenReturn(arn); S3Event event = mock(S3Event.class); List<S3EventNotification.S3EventNotificationRecord> records = Lists.newArrayList(); S3EventNotification.S3EventNotificationRecord record = mock(S3EventNotification.S3EventNotificationRecord.class); records.add(record); when(event.getRecords()).thenReturn(records); S3EventNotification.S3Entity s3Entity = mock(S3EventNotification.S3Entity.class); S3EventNotification.S3BucketEntity bucketEntity = mock(S3EventNotification.S3BucketEntity.class); S3EventNotification.S3ObjectEntity objectEntity = mock(S3EventNotification.S3ObjectEntity.class); when(s3Entity.getBucket()).thenReturn(bucketEntity); when(s3Entity.getObject()).thenReturn(objectEntity); when(record.getS3()).thenReturn(s3Entity); when(bucketEntity.getName()).thenReturn(bucketName); when(objectEntity.getKey()).thenReturn("access.log.gz"); when(amazonS3Client.getObject(isA(GetObjectRequest.class))).thenReturn(mock(S3Object.class)); doReturn(null).when(handler).ingestLogStream(null); handler.handleNewS3Event(event, context); verify(processor, times(1)).processLogEvents(null, params, bucketName); }
@Test public void testThatHandleEventCallsDoesNotProcessEventsOnTheProcessorsWhenNotALogFile() throws IOException { String bucketName = "bucketname"; String arn = "foo"; Processor processor = mock(Processor.class); List<Processor> processors = Lists.newLinkedList(); processors.add(processor); handler.overrideProcessors(processors); CloudFrontLogHandlerConfig params = new CloudFrontLogHandlerConfig(); doReturn(params).when(handler).getConfiguration(arn); Context context = mock(Context.class); when(context.getInvokedFunctionArn()).thenReturn(arn); S3Event event = mock(S3Event.class); List<S3EventNotification.S3EventNotificationRecord> records = Lists.newArrayList(); S3EventNotification.S3EventNotificationRecord record = mock(S3EventNotification.S3EventNotificationRecord.class); records.add(record); when(event.getRecords()).thenReturn(records); S3EventNotification.S3Entity s3Entity = mock(S3EventNotification.S3Entity.class); S3EventNotification.S3BucketEntity bucketEntity = mock(S3EventNotification.S3BucketEntity.class); S3EventNotification.S3ObjectEntity objectEntity = mock(S3EventNotification.S3ObjectEntity.class); when(s3Entity.getBucket()).thenReturn(bucketEntity); when(s3Entity.getObject()).thenReturn(objectEntity); when(record.getS3()).thenReturn(s3Entity); when(bucketEntity.getName()).thenReturn(bucketName); when(objectEntity.getKey()).thenReturn("data.json"); when(amazonS3Client.getObject(isA(GetObjectRequest.class))).thenReturn(mock(S3Object.class)); doReturn(null).when(handler).ingestLogStream(null); handler.handleNewS3Event(event, context); verify(processor, times(0)).processLogEvents(null, params, bucketName); }
@Test public void testThatHandleEventDoesNotExplodeWhenTheFirstProcessorErrorsOut() throws IOException { String bucketName = "bucketname"; String arn = "foo"; Processor processor = mock(Processor.class); Processor processor2 = mock(Processor.class); List<Processor> processors = Lists.newLinkedList(); processors.add(processor); doThrow(new RuntimeException("foo")).when(processor).processLogEvents(any(), any(), any()); processors.add(processor2); handler.overrideProcessors(processors); CloudFrontLogHandlerConfig params = new CloudFrontLogHandlerConfig(); doReturn(params).when(handler).getConfiguration(arn); Context context = mock(Context.class); when(context.getInvokedFunctionArn()).thenReturn(arn); S3Event event = mock(S3Event.class); List<S3EventNotification.S3EventNotificationRecord> records = Lists.newArrayList(); S3EventNotification.S3EventNotificationRecord record = mock(S3EventNotification.S3EventNotificationRecord.class); records.add(record); when(event.getRecords()).thenReturn(records); S3EventNotification.S3Entity s3Entity = mock(S3EventNotification.S3Entity.class); S3EventNotification.S3BucketEntity bucketEntity = mock(S3EventNotification.S3BucketEntity.class); S3EventNotification.S3ObjectEntity objectEntity = mock(S3EventNotification.S3ObjectEntity.class); when(s3Entity.getBucket()).thenReturn(bucketEntity); when(s3Entity.getObject()).thenReturn(objectEntity); when(record.getS3()).thenReturn(s3Entity); when(bucketEntity.getName()).thenReturn(bucketName); when(objectEntity.getKey()).thenReturn("access.log.gz"); when(amazonS3Client.getObject(isA(GetObjectRequest.class))).thenReturn(mock(S3Object.class)); doReturn(null).when(handler).ingestLogStream(null); handler.handleNewS3Event(event, context); verify(processor, times(1)).processLogEvents(null, params, bucketName); verify(processor2, times(1)).processLogEvents(null, params, bucketName); }
protected JsonEvent toJson(S3EventNotification.S3EventNotificationRecord record) { return new JsonEvent() .put("bucket_name", record.getS3().getBucket().getName()) .put("bucket_arn", record.getS3().getBucket().getArn()) .put("key", record.getS3().getObject().getKey()) .put("etag", record.getS3().getObject().geteTag()) .put("size", record.getS3().getObject().getSizeAsLong()); }
/** * Process the message as S3 notification. * * @param payload the JMS message payload. * * @return boolean whether message was processed. */ private boolean processS3Notification(String payload) { boolean messageProcessed = false; try { // Process messages coming from S3 bucket. S3EventNotification s3EventNotification = S3EventNotification.parseJson(payload); String objectKey = URLDecoder.decode(s3EventNotification.getRecords().get(0).getS3().getObject().getKey(), CharEncoding.UTF_8); // Perform the complete upload single file. CompleteUploadSingleMessageResult completeUploadSingleMessageResult = uploadDownloadService.performCompleteUploadSingleMessage(objectKey); if (LOGGER.isDebugEnabled()) { LOGGER.debug("completeUploadSingleMessageResult={}", jsonHelper.objectToJson(completeUploadSingleMessageResult)); } messageProcessed = true; } catch (RuntimeException | UnsupportedEncodingException e) { // The logging is set to DEBUG level, since the method is expected to fail when message is not of the expected type. LOGGER.debug("Failed to process message from the JMS queue for an S3 notification. jmsQueueName=\"{}\" jmsMessagePayload={}", HerdJmsDestinationResolver.SQS_DESTINATION_HERD_INCOMING, payload, e); } return messageProcessed; }
@Test public void testS3MessageWithWrongFormat() throws Exception { // Create and persist database entities required for testing. businessObjectDefinitionServiceTestHelper.createDatabaseEntitiesForBusinessObjectDefinitionTesting(); storageDaoTestHelper.createStorageEntity(StorageEntity.SAMPLE_DATA_FILE_STORAGE, Arrays.asList(new Attribute(configurationHelper.getProperty(ConfigurationValue.S3_ATTRIBUTE_NAME_BUCKET_NAME), S3_BUCKET_NAME))); // Create a business object definition. BusinessObjectDefinitionCreateRequest request = new BusinessObjectDefinitionCreateRequest(NAMESPACE, BDEF_NAME, DATA_PROVIDER_NAME, BDEF_DESCRIPTION, BDEF_DISPLAY_NAME, businessObjectDefinitionServiceTestHelper.getNewAttributes()); businessObjectDefinitionService.createBusinessObjectDefinition(request); String fileName = "test1.csv"; String filePath = NAMESPACE + "/" + BDEF_NAME + fileName; long fileSize = 1024L; S3Entity s3Entity = new S3Entity(null, null, new S3ObjectEntity(filePath, fileSize, null, null), null); List<S3EventNotificationRecord> records = new ArrayList<>(); records.add(new S3EventNotificationRecord(null, null, null, null, null, null, null, s3Entity, null)); S3EventNotification s3EventNotification = new S3EventNotification(records); try { sampleDataJmsMessageListener.processMessage(jsonHelper.objectToJson(s3EventNotification), null); } catch (IllegalArgumentException ex) { //this exception should be caught inside the processMessage method fail(); } }
@Test public void testS3MessageNoKey() throws Exception { S3Entity s3Entity = new S3Entity(null, null, new S3ObjectEntity("key_does_not_exist", 0L, null, null), null); List<S3EventNotificationRecord> records = new ArrayList<>(); records.add(new S3EventNotificationRecord(null, null, null, null, null, null, null, s3Entity, null)); S3EventNotification s3EventNotification = new S3EventNotification(records); setLogLevel(UploadDownloadServiceImpl.class, LogLevel.OFF); setLogLevel(HerdJmsMessageListener.class, LogLevel.OFF); herdJmsMessageListener.processMessage(jsonHelper.objectToJson(s3EventNotification), null); }
private File retrieveS3File(String sqsdMessageBody) throws UnsupportedEncodingException { File localFile = null; if(!sqsdMessageBody.isEmpty()){ AmazonS3 s3 = new AmazonS3Client(); List<S3EventNotificationRecord> records = S3EventNotification.parseJson(sqsdMessageBody).getRecords(); S3EventNotificationRecord firstRecord = records.get(0); String bucketName = firstRecord.getS3().getBucket().getName(); String objectRegion = firstRecord.getAwsRegion(); Region s3Region = Region.getRegion(Regions.fromName(objectRegion)); s3.setRegion(s3Region); // Object key may have spaces or unicode non-ASCII characters. String keyName = firstRecord.getS3().getObject().getKey().replace('+', ' '); keyName = URLDecoder.decode(keyName, "UTF-8"); localFile = new File(keyName); System.out.println("Downloading file: " + objectRegion + "/" + bucketName + "/" + keyName); s3.getObject(new GetObjectRequest(bucketName, keyName), localFile); if(!localFile.canRead()){ localFile = null; } } return localFile; }
@Override public AwsEventRequest readEvent(S3EventNotification event) { return new AwsEventRequest() .action(s3Action.getName()) .body(event.toJson()); }
@Override public Supplier<RequestHandler<S3EventNotification, S3EventResult>> getEventHandler() { return () -> new S3EventHandler(getApplicationContext(), s3Action); }
@Override public Class<S3EventNotification> actionBodyType() { return S3EventNotification.class; }
@Override protected void handleEvent(EventActionRequest<S3EventNotification> request, Context context) throws Exception { logger.info("{}", () -> request.getBody().toJson()); }
@Override public SNSEvent getTestEvent() throws Exception { /* * Upload a test resoruce to the mock S3 */ String payload = IOUtils.toString( new InputStreamReader(this.getClass().getResourceAsStream("basic_input.log"), "UTF-8")); this.client.putObject(S3_BUCKET, "basic_input.log", payload); /* * Create a S3EventNotification event */ S3ObjectEntity objEntity = new S3ObjectEntity("basic_input.log", 1L, null, null); S3BucketEntity bucketEntity = new S3BucketEntity(S3_BUCKET, null, null); S3Entity entity = new S3Entity(null, bucketEntity, objEntity, null); S3EventNotificationRecord rec = new S3EventNotificationRecord(null, null, null, "1970-01-01T00:00:00.000Z", null, null, null, entity, null); List<S3EventNotificationRecord> notifications = new ArrayList<S3EventNotificationRecord>(2); notifications.add(rec); /* * Wrap as an SNS Event */ S3EventNotification event = new S3EventNotification(notifications); SNSEvent.SNS sns = new SNSEvent.SNS(); sns.setMessage(event.toJson()); SNSEvent snsEvent = new SNSEvent(); ArrayList<SNSRecord> snsRecords = new ArrayList<SNSRecord>(1); SNSRecord snsRecord = new SNSRecord(); snsRecord.setEventSource("aws:sns"); snsRecord.setEventVersion("1.0"); snsRecord.setEventSubscriptionArn("arn"); snsRecord.setSns(sns); snsRecords.add(snsRecord); snsEvent.setRecords(snsRecords); return snsEvent; }
private SNSEvent getTestEvent(String bucket, boolean doPut) throws Exception { /* * Upload a test resoruce to the mock S3 */ if (doPut) { String payload = IOUtils.toString( new InputStreamReader(this.getClass().getResourceAsStream("basic_input.log"), "UTF-8")); this.client.putObject(bucket, "basic_input.log", payload); } /* * Create a S3EventNotification event */ S3ObjectEntity objEntity = new S3ObjectEntity("basic_input.log", 1L, null, null); S3BucketEntity bucketEntity = new S3BucketEntity(bucket, null, null); S3Entity entity = new S3Entity(null, bucketEntity, objEntity, null); S3EventNotificationRecord rec = new S3EventNotificationRecord(null, null, null, "1970-01-01T00:00:00.000Z", null, null, null, entity, null); List<S3EventNotificationRecord> notifications = new ArrayList<S3EventNotificationRecord>(2); notifications.add(rec); /* * Wrap as an SNS Event */ S3EventNotification event = new S3EventNotification(notifications); SNSEvent.SNS sns = new SNSEvent.SNS(); sns.setMessage(event.toJson()); SNSEvent snsEvent = new SNSEvent(); ArrayList<SNSRecord> snsRecords = new ArrayList<SNSRecord>(1); SNSRecord snsRecord = new SNSRecord(); snsRecord.setEventSource("aws:sns"); snsRecord.setEventVersion("1.0"); snsRecord.setEventSubscriptionArn("arn"); snsRecord.setSns(sns); snsRecords.add(snsRecord); snsEvent.setRecords(snsRecords); return snsEvent; }
@Override public S3EventNotification getTestEvent() throws Exception { return getTestEvent(S3_BUCKET, true); }
/** * The handler that will get triggered by the CloudFront adding a new log chunk into the CloudFront Log S3 Bucket. * Streams the log from S3 and processes each line, which represents a request to Cerberus. * http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#LogFileFormat * * @param context, the context of the lambda fn */ public void handleNewS3Event(S3Event event, Context context) throws IOException { CloudFrontLogHandlerConfig config = getConfiguration(context.getInvokedFunctionArn()); log.info(String.format("Found CloudFormation stack and derived params: %s", objectMapper.writeValueAsString(config))); for (S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord : event.getRecords()){ String bucketName = s3EventNotificationRecord.getS3().getBucket().getName(); String key = s3EventNotificationRecord.getS3().getObject().getKey(); // Only process the log files from CF they end in .gz if (! key.endsWith(".gz")) { return; } log.info(String.format("Triggered from %s/%s", bucketName, key)); S3Object logObject = amazonS3Client.getObject(new GetObjectRequest(bucketName, key)); List<CloudFrontLogEvent> logEvents = ingestLogStream(logObject.getObjectContent()); logEventProcessors.forEach(processor -> { try { processor.processLogEvents(logEvents, config, bucketName); } catch (Throwable t) { log.error(String.format("Failed to run log processor %s", processor.getClass()), t); // Send a message to slack if its configured to do so if (StringUtils.isNotBlank(config.getSlackWebHookUrl())) { String text = String.format("Failed to run log processor %s, env: %s reason: %s", processor.getClass(), config.getEnv(), t.getMessage()); Message.Builder builder = new Message.Builder(text).userName("Cloud-Front-Event-Handler"); if (StringUtils.startsWith(config.getSlackIcon(), "http")) { builder.iconUrl(config.getSlackIcon()); } else { builder.iconEmoji(config.getSlackIcon()); } new SlackClient(config.getSlackWebHookUrl()).sendMessage(builder.build()); } } }); } }
/** * Processes a JMS message. * * @param payload the message payload * @param allHeaders the JMS headers */ @JmsListener(id = HerdJmsDestinationResolver.SQS_DESTINATION_SAMPLE_DATA_QUEUE, containerFactory = "jmsListenerContainerFactory", destination = HerdJmsDestinationResolver.SQS_DESTINATION_SAMPLE_DATA_QUEUE) public void processMessage(String payload, @Headers Map<Object, Object> allHeaders) { LOGGER.info("Message received from the JMS queue. jmsQueueName=\"{}\" jmsMessageHeaders=\"{}\" jmsMessagePayload={}", HerdJmsDestinationResolver.SQS_DESTINATION_SAMPLE_DATA_QUEUE, allHeaders, payload); try { // Process messages coming from S3 bucket. S3EventNotification s3EventNotification = S3EventNotification.parseJson(payload); String objectKey = URLDecoder.decode(s3EventNotification.getRecords().get(0).getS3().getObject().getKey(), CharEncoding.UTF_8); long fileSize = s3EventNotification.getRecords().get(0).getS3().getObject().getSizeAsLong(); // parse the objectKey, it should be in the format of namespace/businessObjectDefinitionName/fileName String[] objectKeyArrays = objectKey.split("/"); Assert.isTrue(objectKeyArrays.length == 3, String.format("S3 notification message %s is not in expected format", objectKey)); String namespace = objectKeyArrays[0]; String businessObjectDefinitionName = objectKeyArrays[1]; String fileName = objectKeyArrays[2]; String path = namespace + "/" + businessObjectDefinitionName + "/"; BusinessObjectDefinitionSampleFileUpdateDto businessObjectDefinitionSampleFileUpdateDto = new BusinessObjectDefinitionSampleFileUpdateDto(path, fileName, fileSize); String convertedNamespaece = convertS3KeyFormat(namespace); String convertedBusinessObjectDefinitionName = convertS3KeyFormat(businessObjectDefinitionName); BusinessObjectDefinitionKey businessObjectDefinitionKey = new BusinessObjectDefinitionKey(convertedNamespaece, convertedBusinessObjectDefinitionName); try { businessObjectDefinitionService.updateBusinessObjectDefinitionEntitySampleFile(businessObjectDefinitionKey, businessObjectDefinitionSampleFileUpdateDto); } catch (ObjectNotFoundException ex) { LOGGER.info("Failed to find the business object definition, next try the original namespace and business oject defination name " + ex); // if Business object definition is not found, use the original name space and bdef name businessObjectDefinitionKey = new BusinessObjectDefinitionKey(namespace, businessObjectDefinitionName); businessObjectDefinitionService.updateBusinessObjectDefinitionEntitySampleFile(businessObjectDefinitionKey, businessObjectDefinitionSampleFileUpdateDto); } } catch (RuntimeException | IOException e) { LOGGER.error("Failed to process message from the JMS queue. jmsQueueName=\"{}\" jmsMessagePayload={}", HerdJmsDestinationResolver.SQS_DESTINATION_SAMPLE_DATA_QUEUE, payload, e); } }
@Test public void testS3Message() throws Exception { // Create and persist database entities required for testing. businessObjectDefinitionServiceTestHelper.createDatabaseEntitiesForBusinessObjectDefinitionTesting(); storageDaoTestHelper.createStorageEntity(StorageEntity.SAMPLE_DATA_FILE_STORAGE, Arrays.asList(new Attribute(configurationHelper.getProperty(ConfigurationValue.S3_ATTRIBUTE_NAME_BUCKET_NAME), S3_BUCKET_NAME))); // Create a business object definition. BusinessObjectDefinitionCreateRequest request = new BusinessObjectDefinitionCreateRequest(NAMESPACE, BDEF_NAME, DATA_PROVIDER_NAME, BDEF_DESCRIPTION, BDEF_DISPLAY_NAME, businessObjectDefinitionServiceTestHelper.getNewAttributes()); businessObjectDefinitionService.createBusinessObjectDefinition(request); // Get the business object definition entity. BusinessObjectDefinitionEntity businessObjectDefinitionEntity = businessObjectDefinitionDao.getBusinessObjectDefinitionByKey(new BusinessObjectDefinitionKey(NAMESPACE, BDEF_NAME)); assertNotNull(businessObjectDefinitionEntity); String fileName = "test1.csv"; String filePath = NAMESPACE + "/" + BDEF_NAME + "/" + fileName; long fileSize = 1024L; S3Entity s3Entity = new S3Entity(null, null, new S3ObjectEntity(filePath, fileSize, null, null), null); List<S3EventNotificationRecord> records = new ArrayList<>(); records.add(new S3EventNotificationRecord(null, null, null, null, null, null, null, s3Entity, null)); S3EventNotification s3EventNotification = new S3EventNotification(records); sampleDataJmsMessageListener.processMessage(jsonHelper.objectToJson(s3EventNotification), null); BusinessObjectDefinitionKey businessObjectDefinitionKey = new BusinessObjectDefinitionKey(NAMESPACE, BDEF_NAME); BusinessObjectDefinition updatedBusinessObjectDefinition = businessObjectDefinitionService.getBusinessObjectDefinition(businessObjectDefinitionKey, false); List<SampleDataFile> sampleDataFiles = Arrays.asList(new SampleDataFile(NAMESPACE + "/" + BDEF_NAME + "/", fileName)); // Validate the returned object. assertEquals(new BusinessObjectDefinition(updatedBusinessObjectDefinition.getId(), NAMESPACE, BDEF_NAME, DATA_PROVIDER_NAME, BDEF_DESCRIPTION, NO_BDEF_SHORT_DESCRIPTION, BDEF_DISPLAY_NAME, businessObjectDefinitionServiceTestHelper.getNewAttributes(), NO_DESCRIPTIVE_BUSINESS_OBJECT_FORMAT, sampleDataFiles, businessObjectDefinitionEntity.getCreatedBy(), businessObjectDefinitionEntity.getUpdatedBy(), HerdDateUtils.getXMLGregorianCalendarValue(businessObjectDefinitionEntity.getUpdatedOn()), NO_BUSINESS_OBJECT_DEFINITION_CHANGE_EVENTS), updatedBusinessObjectDefinition); }