/** * Helper method that parses a JSON object from a resource on the classpath * as an instance of the provided type. * * @param resource * the path to the resource (relative to this class) * @param clazz * the type to parse the JSON into */ public static <T> T parse(String resource, Class<T> clazz) throws IOException { InputStream stream = TestUtils.class.getResourceAsStream(resource); try { if (clazz == S3Event.class) { String json = IOUtils.toString(stream); S3EventNotification event = S3EventNotification.parseJson(json); @SuppressWarnings("unchecked") T result = (T) new S3Event(event.getRecords()); return result; } else if (clazz == SNSEvent.class) { return snsEventMapper.readValue(stream, clazz); } else if (clazz == DynamodbEvent.class) { return dynamodbEventMapper.readValue(stream, clazz); } else { return mapper.readValue(stream, clazz); } } finally { stream.close(); } }
@Override public ApiGatewayResponse handleRequest(SNSEvent input, Context context) { if (CollectionUtils.isNotEmpty(input.getRecords())) { SNSRecord record = input.getRecords().get(0); if (StringUtils.containsIgnoreCase(record.getSNS().getTopicArn(), "handle-emoji")) { LOG.info("Got message to handle-emoji topic."); handleSlackEvent(record); } else if (StringUtils.containsIgnoreCase(record.getSNS().getTopicArn(), "s3-file-ready")) { LOG.info("Got message to s3-file-ready topic."); postImageToSlack(record); } else if (StringUtils.containsIgnoreCase(record.getSNS().getTopicArn(), "gif-generator-error")) { LOG.info("Got message to gif-generator-error topic."); postErrorMessageToSlack(record); } } Response responseBody = new Response("pprxmtr-file-fetcher called succesfully.", new HashMap<>()); return ApiGatewayResponse.builder().setStatusCode(200).setObjectBody(responseBody).build(); }
@Test public void shouldWorkCompletely() throws Exception { // Given Context context = mock(Context.class); SNSEvent snsEvent = createSnsEvent("push"); when(config.isWatchedBranch(new Branch("changes"))).thenReturn(true); when(worker.call()).thenReturn(Status.SUCCESS); // When Integer response = uut.handleRequest(snsEvent, context); // Then assertThat(response, is(HttpStatus.SC_OK)); verify(config, times(1)).isWatchedBranch(new Branch("changes")); verify(worker, times(1)).call(); }
@Test public void shouldFailOnWrongWorkerCall() throws Exception { // Given Context context = mock(Context.class); SNSEvent snsEvent = createSnsEvent("push"); when(config.isWatchedBranch(new Branch("changes"))).thenReturn(true); when(worker.call()).thenReturn(Status.FAILED); // When Integer response = uut.handleRequest(snsEvent, context); // Then assertThat(response, is(HttpStatus.SC_BAD_REQUEST)); verify(config, times(1)).isWatchedBranch(new Branch("changes")); verify(worker, times(1)).call(); }
@Test public void shouldFailOnOtherBranch() throws Exception { // Given Context context = mock(Context.class); SNSEvent snsEvent = createSnsEvent("push"); when(config.isWatchedBranch(new Branch("changes"))).thenReturn(false); // When Integer response = uut.handleRequest(snsEvent, context); // Then assertThat(response, is(HttpStatus.SC_BAD_REQUEST)); verify(config, times(1)).isWatchedBranch(any()); verify(worker, times(0)).call(); }
@Test public void shouldFailOnOtherError() throws Exception { // Given Context context = mock(Context.class); SNSEvent snsEvent = createSnsEvent("push"); doThrow(new IllegalArgumentException("Expected test exception")).when(config).isWatchedBranch(any()); // When Integer response = uut.handleRequest(snsEvent, context); // Then assertThat(response, is(HttpStatus.SC_INTERNAL_SERVER_ERROR)); verify(config, times(1)).isWatchedBranch(any()); verify(worker, times(0)).call(); }
private SNSEvent createSnsEvent(final String githubEvent) { SNSEvent.SNS sns = new SNSEvent.SNS(); sns.setMessageAttributes(new HashMap<String, SNSEvent.MessageAttribute>(1, 1) { { SNSEvent.MessageAttribute attr = new SNSEvent.MessageAttribute(); attr.setValue(githubEvent); put("X-Github-Event", attr); } }); try (InputStream is = getClass().getResourceAsStream("/github-push-payload.json")) { sns.setMessage(IOUtils.toString(is)); } catch (IOException e) { throw new IllegalArgumentException(e); } SNSEvent.SNSRecord record = new SNSEvent.SNSRecord(); record.setSns(sns); SNSEvent snsEvent = new SNSEvent(); snsEvent.setRecords(Collections.singletonList(record)); return snsEvent; }
public void testHandleRequest() { System.out.println("handleRequest"); SNS sns = new SNS(); sns.setMessage("test message"); SNSRecord record = new SNSRecord(); record.setSns(sns); SNSEvent request = new SNSEvent(); List<SNSRecord> records = new ArrayList<>(); records.add(record); request.setRecords(records); Context context = new TestContext(); SnsEventHandler instance = new SnsEventHandler(); instance.handleRequest(request, context); }
@Override public Object handleRequest(SNSEvent snsEvent, Context context) { logger = context.getLogger(); awsHelper = new AWSHelper(logger); logger.log("Input: " + snsEvent); List<SNSRecord> records = snsEvent.getRecords(); logger.log("Passed 001"); SNSRecord record = records.get(0); logger.log("Passed 002"); SNS sns = record.getSNS(); logger.log("Passed 003"); String boardToProcess = sns.getMessage(); logger.log("Passed 004"); processBoard(boardToProcess); logger.log("Passed 005"); return null; }
@Override public Object handleRequest(SNSEvent snsEvent, Context context) { awsHelper = new AWSHelper(context.getLogger()); logger = context.getLogger(); logger.log("Input: " + snsEvent); // You gotta love Java's deep objects... List<SNSRecord> records = snsEvent.getRecords(); logger.log("Passed 001"); SNSRecord record = records.get(0); logger.log("Passed 002"); SNS sns = record.getSNS(); logger.log("Passed 003"); String sessionId = sns.getMessage(); logger.log("Passed 004"); context.getLogger().log("Game of Life Session ID: " + sessionId); logger.log("Passed 005"); calculateBoards(sessionId, NUMBER_OF_BOARDS); logger.log("Passed 006"); return sessionId; }
public Integer handleRequest(SNSEvent event, Context context) { try { // SNS Events could be possible more than one even if this looks a bit unusual for the deploy case. for (SNSEvent.SNSRecord record : event.getRecords()) { SNSEvent.SNS sns = record.getSNS(); // Check SNS header for event type. SNSEvent.MessageAttribute attr = sns.getMessageAttributes().get(X_GITHUB_EVENT); // Only watch pushes to master. if (EVENT_PUSH.equalsIgnoreCase(attr.getValue())) { PushPayload value = MAPPER.readValue(sns.getMessage(), PushPayload.class); if (config.isWatchedBranch(new Branch(value.getRef()))) { LOG.info(format("Processing '%s' on '%s': '%s'", attr.getValue(), value.getRef(), value.getHeadCommit().getId())); switch (worker.call()) { case SUCCESS: return HttpStatus.SC_OK; case FAILED: return HttpStatus.SC_BAD_REQUEST; } } // Different branch was found. else { LOG.info(format("Push received for: '%s'", value.getRef())); } } // Different event was found. else { LOG.info(format("Event was: '%s'", attr.getValue())); } } } catch (Exception e) { LOG.error(e.getMessage(), e); return HttpStatus.SC_INTERNAL_SERVER_ERROR; } return HttpStatus.SC_BAD_REQUEST; }
@Test public void shouldFailOnOtherEvent() throws Exception { // Given Context context = mock(Context.class); SNSEvent snsEvent = createSnsEvent("commit"); // When Integer response = uut.handleRequest(snsEvent, context); // Then assertThat(response, is(HttpStatus.SC_BAD_REQUEST)); verify(config, times(0)).isWatchedBranch(any()); verify(worker, times(0)).call(); }
@Override public Handler<SNSEvent> getHandler() { SNSS3Handler handler = new SNSS3Handler(); handler.s3ClientFactory = this.clientFactory; return handler; }
@Override public void handler(SNSEvent event, Context context) throws HandlerException { if (!initialized) { init(context); } this.source = this.sources.get(0); this.inputFiles = new ArrayList<String>(0); for (SNSRecord record : event.getRecords()) { /* * Parse SNS as a S3 notification */ String json = record.getSNS().getMessage(); S3EventNotification s3Event = S3EventNotification.parseJson(json); /* * Validate the S3 file matches the regex */ List<S3EventNotificationRecord> toProcess = new ArrayList<S3EventNotificationRecord>(s3Event.getRecords()); for (S3EventNotificationRecord s3Record : s3Event.getRecords()) { String s3Path = String.format("s3://%s/%s", s3Record.getS3().getBucket().getName(), s3Record.getS3().getObject().getKey()); try { this.source = SourceUtils.getSource(s3Path, this.sources); } catch (SourceNotFoundException e) { logger.warn("skipping processing " + s3Path); toProcess.remove(s3Record); } } if (toProcess.size() == 0) { logger.warn("Nothing to process"); return; } this.inputFiles.addAll(toProcess.stream().map(m -> { return m.getS3().getObject().getKey(); }).collect(Collectors.toList())); this.recordIterator = new S3EventIterator(context, toProcess, s3ClientFactory); super.process(context); } }
@Test public void testExceptionHandlingd() throws Throwable { BaseHandler.CONFIG_FILE = "/com/nextdoor/bender/handler/config_test_sns.json"; TestContext ctx = new TestContext(); ctx.setFunctionName("unittest"); ctx.setInvokedFunctionArn("arn:aws:lambda:us-east-1:123:function:test-function:staging"); /* * Invoke handler */ SNSS3Handler fhandler = (SNSS3Handler) getHandler(); fhandler.init(ctx); IpcSenderService ipcSpy = spy(fhandler.getIpcService()); doThrow(new TransportException("expected")).when(ipcSpy).shutdown(); fhandler.setIpcService(ipcSpy); AmazonSNSClient mockClient = mock(AmazonSNSClient.class); AmazonSNSClientFactory mockClientFactory = mock(AmazonSNSClientFactory.class); doReturn(mockClient).when(mockClientFactory).newInstance(); fhandler.snsClientFactory = mockClientFactory; SNSEvent event = getTestEvent(); try { fhandler.handler(event, ctx); } catch (Exception e) { } verify(mockClient, times(1)).publish("foo", "basic_input.log", "SNSS3Handler Failed"); }
@Override public Void handleRequest(SNSEvent snsEvent, Context context) { for (SNSRecord snsRecord : snsEvent.getRecords()) { delegateRequest(new SnsRecordAndLambdaContext(snsRecord, context)); } return null; }
@Test public void testLambdaContextMemberInjection() { when(context.getAwsRequestId()).thenReturn("0", "1"); for (int i = 0; i <= 1; i++) { SNSEvent snsEvent = createSnsEvent("inject-lambda-context-member" + i); handler.handleRequest(snsEvent, context); verify(testService).injectedStringArg("" + i); } }
@Test public void testSnsRecordInjectionAsMember() { SNSEvent snsEvent = createSnsEvent("inject-sns-record-member0"); handler.handleRequest(snsEvent, context); verify(testService).injectedSns(snsEvent.getRecords().get(0).getSNS()); snsEvent = createSnsEvent("inject-sns-record-member1"); handler.handleRequest(snsEvent, context); verify(testService).injectedSns(snsEvent.getRecords().get(0).getSNS()); }
@Test public void testMultipleRecordsCreateMultipleRequest() { SNSEvent snsEvent = new SNSEvent(); SNSRecord snsRecord0 = createSnsRecord("a:b:mytopic", "inject-sns-record-member0"); SNSRecord snsRecord1 = createSnsRecord("a:b:mytopic", "inject-sns-record-member1"); snsEvent.setRecords(ImmutableList.of(snsRecord0, snsRecord1)); handler.handleRequest(snsEvent, context); InOrder inOrder = Mockito.inOrder(testService); inOrder.verify(testService).injectedSns(snsRecord0.getSNS()); inOrder.verify(testService).injectedSns(snsRecord1.getSNS()); }
@Test public void testPostString() { SNSEvent snsEvent = createSnsEvent("plain-data"); snsEvent.getRecords().get(0).getSNS().setMessage("123"); handler.handleRequest(snsEvent, context); verify(testService).injectedStringArg("123"); }
@Test public void testPostJson() { SNSEvent snsEvent = createSnsEvent("entities"); snsEvent.getRecords().get(0).getSNS().setMessage("{\"value\":\"some data\"}"); handler.handleRequest(snsEvent, context); verify(testService).injectedStringArg("some data"); }
@Override public ApiGatewayResponse handleRequest(SNSEvent input, Context context) { LOG.info("Loading Gif Generator Java Lambda handler."); ObjectMapper mapper = new ObjectMapper(); if (CollectionUtils.isNotEmpty(input.getRecords())) { try { JsonNode json = mapper.readTree(input.getRecords().get(0).getSNS().getMessage()); byte[] gif = ArrayUtils.EMPTY_BYTE_ARRAY; if (json.has("emojiUrl")) { HttpClient client = HttpClientBuilder.create().build(); String emojiUrl = json.get("emojiUrl").asText(); HttpGet getImageRequest = new HttpGet(emojiUrl); HttpResponse getImageResponse = client.execute(getImageRequest); int getImageStatus = getImageResponse.getStatusLine().getStatusCode(); LOG.info("Get image status: {}.", getImageStatus); if (StringUtils.contains(getImageResponse.getFirstHeader(HttpHeaders.CONTENT_TYPE).getValue(), "image")) { byte[] imageFile = IOUtils.toByteArray(getImageResponse.getEntity().getContent()); gif = GifGenerator.generateGif(imageFile); } else { LOG.error("Given image URL did not return an image according to mime type!"); } } if (ArrayUtils.isNotEmpty(gif)) { LOG.info("Gif created successfully, storing in S3."); String emoji = json.get("text").asText(); String emojiName = StringUtils.removeEnd(StringUtils.removeStart(StringUtils.strip(emoji), ":"), ":"); emojiName = emojiName.replaceAll("ä", "a").replaceAll("ö", "o").replaceAll("å", "o"); InputStream is = new ByteArrayInputStream(gif); ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentLength(gif.length); metadata.setContentType("image/gif"); if (UrlValidator.getInstance().isValid(emojiName)) { emojiName = CharMatcher.inRange('a', 'z').or(CharMatcher.inRange('0', '9')) .retainFrom(StringUtils.substringAfterLast(emojiName, "/")); } String filenamePrefix = emojiName + "_approximated_"; if (!S3.fileExistsInBucket(filenamePrefix)) { S3.storeFileInBucket(filenamePrefix + System.currentTimeMillis() + ".gif", is, metadata); } LOG.info("Image stored in S3, publishing to topic s3-file-ready"); SNS.publish("s3-file-ready", mapper.writeValueAsString(json)).get(); } else { LOG.error("Gif generator returned an empty byte array, sending error response"); SNS.publish("gif-generator-error", mapper.writeValueAsString(json)).get(); } } catch (IOException | InterruptedException | ExecutionException e) { LOG.error("Exception occured when creating GIF.", e); } } Response responseBody = new Response("pprxmtr-gif-generator called.", new HashMap<>()); return ApiGatewayResponse.builder().setStatusCode(200).setObjectBody(responseBody).build(); }
@Override public SNSEvent getTestEvent() throws Exception { /* * Upload a test resoruce to the mock S3 */ String payload = IOUtils.toString( new InputStreamReader(this.getClass().getResourceAsStream("basic_input.log"), "UTF-8")); this.client.putObject(S3_BUCKET, "basic_input.log", payload); /* * Create a S3EventNotification event */ S3ObjectEntity objEntity = new S3ObjectEntity("basic_input.log", 1L, null, null); S3BucketEntity bucketEntity = new S3BucketEntity(S3_BUCKET, null, null); S3Entity entity = new S3Entity(null, bucketEntity, objEntity, null); S3EventNotificationRecord rec = new S3EventNotificationRecord(null, null, null, "1970-01-01T00:00:00.000Z", null, null, null, entity, null); List<S3EventNotificationRecord> notifications = new ArrayList<S3EventNotificationRecord>(2); notifications.add(rec); /* * Wrap as an SNS Event */ S3EventNotification event = new S3EventNotification(notifications); SNSEvent.SNS sns = new SNSEvent.SNS(); sns.setMessage(event.toJson()); SNSEvent snsEvent = new SNSEvent(); ArrayList<SNSRecord> snsRecords = new ArrayList<SNSRecord>(1); SNSRecord snsRecord = new SNSRecord(); snsRecord.setEventSource("aws:sns"); snsRecord.setEventVersion("1.0"); snsRecord.setEventSubscriptionArn("arn"); snsRecord.setSns(sns); snsRecords.add(snsRecord); snsEvent.setRecords(snsRecords); return snsEvent; }
@Override public SNSEvent getTestEvent() throws Exception { return getTestEvent(S3_BUCKET, true); }
private SNSEvent getTestEvent(String bucket, boolean doPut) throws Exception { /* * Upload a test resoruce to the mock S3 */ if (doPut) { String payload = IOUtils.toString( new InputStreamReader(this.getClass().getResourceAsStream("basic_input.log"), "UTF-8")); this.client.putObject(bucket, "basic_input.log", payload); } /* * Create a S3EventNotification event */ S3ObjectEntity objEntity = new S3ObjectEntity("basic_input.log", 1L, null, null); S3BucketEntity bucketEntity = new S3BucketEntity(bucket, null, null); S3Entity entity = new S3Entity(null, bucketEntity, objEntity, null); S3EventNotificationRecord rec = new S3EventNotificationRecord(null, null, null, "1970-01-01T00:00:00.000Z", null, null, null, entity, null); List<S3EventNotificationRecord> notifications = new ArrayList<S3EventNotificationRecord>(2); notifications.add(rec); /* * Wrap as an SNS Event */ S3EventNotification event = new S3EventNotification(notifications); SNSEvent.SNS sns = new SNSEvent.SNS(); sns.setMessage(event.toJson()); SNSEvent snsEvent = new SNSEvent(); ArrayList<SNSRecord> snsRecords = new ArrayList<SNSRecord>(1); SNSRecord snsRecord = new SNSRecord(); snsRecord.setEventSource("aws:sns"); snsRecord.setEventVersion("1.0"); snsRecord.setEventSubscriptionArn("arn"); snsRecord.setSns(sns); snsRecords.add(snsRecord); snsEvent.setRecords(snsRecords); return snsEvent; }
@Test public void testLambdaContextInjection() { SNSEvent snsEvent = createSnsEvent("inject-lambda-context"); handler.handleRequest(snsEvent, context); verify(testService).injectedLambdaContext(context); }
@Test public void testSnsRecordInjection() { SNSEvent snsEvent = createSnsEvent("inject-sns-record"); handler.handleRequest(snsEvent, context); verify(testService).injectedSns(same(snsEvent.getRecords().get(0).getSNS())); }
@Test public void testSnsRecordInjectionAsMock() { SNSEvent snsEvent = createSnsEvent("inject-sns-record-mock"); handler.handleRequest(snsEvent, context); assertThrows(IllegalStateException.class, () -> verify(testService).injectedSnsRecord(same(snsEvent.getRecords().get(0)))); }
@Test public void testNoSubjectIsValid() { SNSEvent snsEvent = createSnsEvent(null); handler.handleRequest(snsEvent, context); verify(testService).hitRoot(); }
private SNSEvent createSnsEvent(String subject) { return createSnsEvent("a:b:mytopic", subject); }
private SNSEvent createSnsEvent(String topicArn, String subject) { SNSEvent snsEvent = new SNSEvent(); snsEvent.setRecords(Collections.singletonList(createSnsRecord(topicArn, subject))); return snsEvent; }