/** * Helper method that parses a JSON object from a resource on the classpath * as an instance of the provided type. * * @param resource * the path to the resource (relative to this class) * @param clazz * the type to parse the JSON into */ public static <T> T parse(String resource, Class<T> clazz) throws IOException { InputStream stream = TestUtils.class.getResourceAsStream(resource); try { if (clazz == S3Event.class) { String json = IOUtils.toString(stream); S3EventNotification event = S3EventNotification.parseJson(json); @SuppressWarnings("unchecked") T result = (T) new S3Event(event.getRecords()); return result; } else if (clazz == SNSEvent.class) { return snsEventMapper.readValue(stream, clazz); } else if (clazz == DynamodbEvent.class) { return dynamodbEventMapper.readValue(stream, clazz); } else { return mapper.readValue(stream, clazz); } } finally { stream.close(); } }
@Override public String handleRequest(S3Event s3, Context context) { if (eventProcessor instanceof LambdaContextAwareEventProcessor) { ((LambdaContextAwareEventProcessor)eventProcessor).initialize(context); } try { Observable.from(s3.getRecords()) .map(this::toJson) .compose(eventProcessor) .toBlocking().subscribe(); } catch (Throwable t) { t.printStackTrace(); throw new RuntimeException(t); } return "done"; }
/** * Helper method that parses a JSON object from a resource on the classpath * as an instance of the provided type. * * @param resource the path to the resource (relative to this class) * @param clazz the type to parse the JSON into */ public static <T> T parse(String resource, Class<T> clazz) throws IOException { InputStream stream = TestUtils.class.getResourceAsStream(resource); try { if (clazz == S3Event.class) { String json = IOUtils.toString(stream); S3EventNotification event = S3EventNotification.parseJson(json); @SuppressWarnings("unchecked") T result = (T) new S3Event(event.getRecords()); return result; } else { return mapper.readValue(stream, clazz); } } finally { stream.close(); } }
@Override public Void handleRequest(S3Event s3Event, Context context) { Collection<Partition> requiredPartitions = new HashSet<>(); TableService tableService = new TableService(); for (S3EventNotification.S3EventNotificationRecord record : s3Event.getRecords()) { String bucket = record.getS3().getBucket().getName(); String key = record.getS3().getObject().getKey(); System.out.printf("S3 event [Event: %s, Bucket: %s, Key: %s]%n", record.getEventName(), bucket, key); S3Object s3Object = new S3Object(bucket, key); if (s3Object.hasDateTimeKey()) { requiredPartitions.add(partitionConfig.createPartitionFor(s3Object)); } } if (!requiredPartitions.isEmpty()) { Collection<Partition> missingPartitions = determineMissingPartitions( partitionConfig.tableName(), requiredPartitions, tableService); tableService.addPartitions(partitionConfig.tableName(), missingPartitions); } return null; }
@Override public Void handleRequest(S3Event s3Event, Context context) { Collection<Partition> partitionsToRemove = new HashSet<>(); TableService tableService = new TableService(); for (S3EventNotification.S3EventNotificationRecord record : s3Event.getRecords()) { String bucket = record.getS3().getBucket().getName(); String key = record.getS3().getObject().getKey(); System.out.printf("S3 event [Event: %s, Bucket: %s, Key: %s]%n", record.getEventName(), bucket, key); S3Object s3Object = new S3Object(bucket, key); if (s3Object.hasDateTimeKey()) { partitionsToRemove.add(partitionConfig.createPartitionFor(s3Object)); } } if (!partitionsToRemove.isEmpty()) { tableService.removePartitions( partitionConfig.tableName(), partitionsToRemove.stream().map(Partition::spec).collect(Collectors.toList())); } return null; }
@Override public Void handleRequest(S3Event s3Event, Context context){ Collection<Partition>requiredPartitions = new HashSet<>(); TableService tableService = new TableService(); DynamoDB dynamoDBClient=new DynamoDB(new AmazonDynamoDBClient(new EnvironmentVariableCredentialsProvider())); for(S3EventNotification.S3EventNotificationRecord record:s3Event.getRecords()){ String bucket=record.getS3().getBucket().getName(); String key=record.getS3().getObject().getKey(); System.out.printf("S3event[Event:%s,Bucket:%s,Key:%s]%n",record.getEventName(),bucket,key); S3Object s3Object=new S3Object(bucket,key); if(s3Object.hasDateTimeKey()){ Partition partition = partitionConfig.createPartitionFor(s3Object); //Check if the partition exists in DynamoDBtable, if not add the partition details to the table, skip otherwise if (tryAddMissingPartition(partitionConfig.dynamoDBTableName(), dynamoDBClient, partition)) { requiredPartitions.add(partition); } } } if(!requiredPartitions.isEmpty()){ tableService.addPartitions(partitionConfig.tableName(),requiredPartitions, true); } return null; }
@Override public Parameters handleRequest(S3Event event, Context context) { context.getLogger() .log("Input Function [" + context.getFunctionName() + "], S3Event [" + event.toJson().toString() + "]"); Parameters parameters = new Parameters( event.getRecords().get(0).getS3().getBucket().getName(), event.getRecords().get(0).getS3().getObject().getKey()); AWSStepFunctions client = AWSStepFunctionsClientBuilder.defaultClient(); ObjectMapper jsonMapper = new ObjectMapper(); StartExecutionRequest request = new StartExecutionRequest(); request.setStateMachineArn(System.getenv("STEP_MACHINE_ARN")); try { request.setInput(jsonMapper.writeValueAsString(parameters)); } catch (JsonProcessingException e) { throw new AmazonServiceException("Error in ["+context.getFunctionName()+"]", e); } context.getLogger() .log("Step Function [" + request.getStateMachineArn() + "] will be called with [" + request.getInput() + "]"); StartExecutionResult result = client.startExecution(request); context.getLogger() .log("Output Function [" + context.getFunctionName() + "], Result [" + result.toString() + "]"); return parameters; }
@Test public void testThatHandleEventCallsProcessEventsOnTheProcessors() throws IOException { String bucketName = "bucketname"; String arn = "foo"; Processor processor = mock(Processor.class); List<Processor> processors = Lists.newLinkedList(); processors.add(processor); handler.overrideProcessors(processors); CloudFrontLogHandlerConfig params = new CloudFrontLogHandlerConfig(); doReturn(params).when(handler).getConfiguration(arn); Context context = mock(Context.class); when(context.getInvokedFunctionArn()).thenReturn(arn); S3Event event = mock(S3Event.class); List<S3EventNotification.S3EventNotificationRecord> records = Lists.newArrayList(); S3EventNotification.S3EventNotificationRecord record = mock(S3EventNotification.S3EventNotificationRecord.class); records.add(record); when(event.getRecords()).thenReturn(records); S3EventNotification.S3Entity s3Entity = mock(S3EventNotification.S3Entity.class); S3EventNotification.S3BucketEntity bucketEntity = mock(S3EventNotification.S3BucketEntity.class); S3EventNotification.S3ObjectEntity objectEntity = mock(S3EventNotification.S3ObjectEntity.class); when(s3Entity.getBucket()).thenReturn(bucketEntity); when(s3Entity.getObject()).thenReturn(objectEntity); when(record.getS3()).thenReturn(s3Entity); when(bucketEntity.getName()).thenReturn(bucketName); when(objectEntity.getKey()).thenReturn("access.log.gz"); when(amazonS3Client.getObject(isA(GetObjectRequest.class))).thenReturn(mock(S3Object.class)); doReturn(null).when(handler).ingestLogStream(null); handler.handleNewS3Event(event, context); verify(processor, times(1)).processLogEvents(null, params, bucketName); }
@Test public void testThatHandleEventCallsDoesNotProcessEventsOnTheProcessorsWhenNotALogFile() throws IOException { String bucketName = "bucketname"; String arn = "foo"; Processor processor = mock(Processor.class); List<Processor> processors = Lists.newLinkedList(); processors.add(processor); handler.overrideProcessors(processors); CloudFrontLogHandlerConfig params = new CloudFrontLogHandlerConfig(); doReturn(params).when(handler).getConfiguration(arn); Context context = mock(Context.class); when(context.getInvokedFunctionArn()).thenReturn(arn); S3Event event = mock(S3Event.class); List<S3EventNotification.S3EventNotificationRecord> records = Lists.newArrayList(); S3EventNotification.S3EventNotificationRecord record = mock(S3EventNotification.S3EventNotificationRecord.class); records.add(record); when(event.getRecords()).thenReturn(records); S3EventNotification.S3Entity s3Entity = mock(S3EventNotification.S3Entity.class); S3EventNotification.S3BucketEntity bucketEntity = mock(S3EventNotification.S3BucketEntity.class); S3EventNotification.S3ObjectEntity objectEntity = mock(S3EventNotification.S3ObjectEntity.class); when(s3Entity.getBucket()).thenReturn(bucketEntity); when(s3Entity.getObject()).thenReturn(objectEntity); when(record.getS3()).thenReturn(s3Entity); when(bucketEntity.getName()).thenReturn(bucketName); when(objectEntity.getKey()).thenReturn("data.json"); when(amazonS3Client.getObject(isA(GetObjectRequest.class))).thenReturn(mock(S3Object.class)); doReturn(null).when(handler).ingestLogStream(null); handler.handleNewS3Event(event, context); verify(processor, times(0)).processLogEvents(null, params, bucketName); }
@Test public void testThatHandleEventDoesNotExplodeWhenTheFirstProcessorErrorsOut() throws IOException { String bucketName = "bucketname"; String arn = "foo"; Processor processor = mock(Processor.class); Processor processor2 = mock(Processor.class); List<Processor> processors = Lists.newLinkedList(); processors.add(processor); doThrow(new RuntimeException("foo")).when(processor).processLogEvents(any(), any(), any()); processors.add(processor2); handler.overrideProcessors(processors); CloudFrontLogHandlerConfig params = new CloudFrontLogHandlerConfig(); doReturn(params).when(handler).getConfiguration(arn); Context context = mock(Context.class); when(context.getInvokedFunctionArn()).thenReturn(arn); S3Event event = mock(S3Event.class); List<S3EventNotification.S3EventNotificationRecord> records = Lists.newArrayList(); S3EventNotification.S3EventNotificationRecord record = mock(S3EventNotification.S3EventNotificationRecord.class); records.add(record); when(event.getRecords()).thenReturn(records); S3EventNotification.S3Entity s3Entity = mock(S3EventNotification.S3Entity.class); S3EventNotification.S3BucketEntity bucketEntity = mock(S3EventNotification.S3BucketEntity.class); S3EventNotification.S3ObjectEntity objectEntity = mock(S3EventNotification.S3ObjectEntity.class); when(s3Entity.getBucket()).thenReturn(bucketEntity); when(s3Entity.getObject()).thenReturn(objectEntity); when(record.getS3()).thenReturn(s3Entity); when(bucketEntity.getName()).thenReturn(bucketName); when(objectEntity.getKey()).thenReturn("access.log.gz"); when(amazonS3Client.getObject(isA(GetObjectRequest.class))).thenReturn(mock(S3Object.class)); doReturn(null).when(handler).ingestLogStream(null); handler.handleNewS3Event(event, context); verify(processor, times(1)).processLogEvents(null, params, bucketName); verify(processor2, times(1)).processLogEvents(null, params, bucketName); }
public void auditValidatedFile(S3Event event,Context ctx) throws Exception{ Connection conn = new com.mysql.jdbc.Driver().connect(props.getProperty("url"), props); List<S3EventNotificationRecord> notificationRecords = event.getRecords(); PreparedStatement ps = conn.prepareStatement(props.getProperty("sql.auditValidatedFile")); for(S3EventNotificationRecord record : notificationRecords){ String fileURL = record.getS3().getBucket().getName()+"/"+record.getS3().getObject().getKey(); ps.setString(1, fileURL); ps.setString(2, "VALIDATED"); ps.setString(3,"VALIDATED"); ps.addBatch(); } ps.executeBatch(); ps.close(); conn.close(); }
/** * The handler that will get triggered by the CloudFront adding a new log chunk into the CloudFront Log S3 Bucket. * Streams the log from S3 and processes each line, which represents a request to Cerberus. * http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#LogFileFormat * * @param context, the context of the lambda fn */ public void handleNewS3Event(S3Event event, Context context) throws IOException { CloudFrontLogHandlerConfig config = getConfiguration(context.getInvokedFunctionArn()); log.info(String.format("Found CloudFormation stack and derived params: %s", objectMapper.writeValueAsString(config))); for (S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord : event.getRecords()){ String bucketName = s3EventNotificationRecord.getS3().getBucket().getName(); String key = s3EventNotificationRecord.getS3().getObject().getKey(); // Only process the log files from CF they end in .gz if (! key.endsWith(".gz")) { return; } log.info(String.format("Triggered from %s/%s", bucketName, key)); S3Object logObject = amazonS3Client.getObject(new GetObjectRequest(bucketName, key)); List<CloudFrontLogEvent> logEvents = ingestLogStream(logObject.getObjectContent()); logEventProcessors.forEach(processor -> { try { processor.processLogEvents(logEvents, config, bucketName); } catch (Throwable t) { log.error(String.format("Failed to run log processor %s", processor.getClass()), t); // Send a message to slack if its configured to do so if (StringUtils.isNotBlank(config.getSlackWebHookUrl())) { String text = String.format("Failed to run log processor %s, env: %s reason: %s", processor.getClass(), config.getEnv(), t.getMessage()); Message.Builder builder = new Message.Builder(text).userName("Cloud-Front-Event-Handler"); if (StringUtils.startsWith(config.getSlackIcon(), "http")) { builder.iconUrl(config.getSlackIcon()); } else { builder.iconEmoji(config.getSlackIcon()); } new SlackClient(config.getSlackWebHookUrl()).sendMessage(builder.build()); } } }); } }
public String handleRequest(S3Event s3event, Context context) { _logger = context.getLogger(); _logger.log("Received S3 Event: " + s3event.toJson()); try { S3EventNotificationRecord record = s3event.getRecords().get(0); String bucket = record.getS3().getBucket().getName(); String extractBucket = "extracts." + bucket; // Object key may have spaces or unicode non-ASCII characters. String key = URLDecoder.decode(record.getS3().getObject().getKey().replace('+', ' '), "UTF-8"); // Short-circuit ignore .extract files because they have already been extracted, this prevents an endless loop if (key.toLowerCase().endsWith(".extract")) { _logger.log("Ignoring extract file " + key); return "Ignored"; } AmazonS3 s3Client = new AmazonS3Client(); S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, key)); try (InputStream objectData = s3Object.getObjectContent()) { String extractJson = doTikaStuff(bucket, key, objectData); byte[] extractBytes = extractJson.getBytes(Charset.forName("UTF-8")); int extractLength = extractBytes.length; ObjectMetadata metaData = new ObjectMetadata(); metaData.setContentLength(extractLength); _logger.log("Saving extract file to S3"); InputStream inputStream = new ByteArrayInputStream(extractBytes); s3Client.putObject(extractBucket, key + ".extract", inputStream, metaData); } } catch (IOException | TransformerConfigurationException | SAXException e) { _logger.log("Exception: " + e.getLocalizedMessage()); throw new RuntimeException(e); } return "Success"; }
@Override public String handleRequest(S3Event s3Event, Context context) { byte[] buffer = new byte[1024]; try { for (S3EventNotificationRecord record: s3Event.getRecords()) { String srcBucket = record.getS3().getBucket().getName(); // Object key may have spaces or unicode non-ASCII characters. String srcKey = record.getS3().getObject().getKey() .replace('+', ' '); srcKey = URLDecoder.decode(srcKey, "UTF-8"); // Detect file type Matcher matcher = Pattern.compile(".*\\.([^\\.]*)").matcher(srcKey); if (!matcher.matches()) { System.out.println("Unable to detect file type for key " + srcKey); return ""; } String extension = matcher.group(1).toLowerCase(); if (!"zip".equals(extension)) { System.out.println("Skipping non-zip file " + srcKey + " with extension " + extension); return ""; } System.out.println("Extracting zip file " + srcBucket + "/" + srcKey); // Download the zip from S3 into a stream AmazonS3 s3Client = new AmazonS3Client(); S3Object s3Object = s3Client.getObject(new GetObjectRequest(srcBucket, srcKey)); ZipInputStream zis = new ZipInputStream(s3Object.getObjectContent()); ZipEntry entry = zis.getNextEntry(); while(entry != null) { String fileName = entry.getName(); String mimeType = FileMimeType.fromExtension(FilenameUtils.getExtension(fileName)).mimeType(); System.out.println("Extracting " + fileName + ", compressed: " + entry.getCompressedSize() + " bytes, extracted: " + entry.getSize() + " bytes, mimetype: " + mimeType); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); int len; while ((len = zis.read(buffer)) > 0) { outputStream.write(buffer, 0, len); } InputStream is = new ByteArrayInputStream(outputStream.toByteArray()); ObjectMetadata meta = new ObjectMetadata(); meta.setContentLength(outputStream.size()); meta.setContentType(mimeType); s3Client.putObject(srcBucket, FilenameUtils.getFullPath(srcKey) + fileName, is, meta); is.close(); outputStream.close(); entry = zis.getNextEntry(); } zis.closeEntry(); zis.close(); //delete zip file when done System.out.println("Deleting zip file " + srcBucket + "/" + srcKey + "..."); s3Client.deleteObject(new DeleteObjectRequest(srcBucket, srcKey)); System.out.println("Done deleting"); } return "Ok"; } catch (IOException e) { throw new RuntimeException(e); } }
/** * Example implementation for processing an S3 event. * * @param s3Event the event object. * @param context the AWS lambda context object. * @return a response code. */ @Override public String handleRequest(S3Event s3Event, Context context) { return "ok"; }