/** * * @param email * @return */ public String getOrCreateNotification(String email) { String ret = null; String topicName = getTopicName(email); String nextToken = null; do { ListTopicsResult listTopics = asyncSnsClient.listTopics(nextToken); List<Topic> topics = listTopics.getTopics(); for (Topic s : topics) { if (s.getTopicArn().endsWith(topicName)) { ret = s.getTopicArn(); break; } } nextToken = listTopics.getNextToken(); } while (ret == null && nextToken != null); if (ret == null) { // create the topic and the subscription CreateTopicResult topic = asyncSnsClient.createTopic(topicName); SubscribeRequest req = new SubscribeRequest(topic.getTopicArn(), "email", email); asyncSnsClient.subscribeAsync(req); ret = topic.getTopicArn(); } return ret; }
@Test public void shouldCreateSnsTopicResource_withName() throws Exception { // Given final String name = randomString(); final String topicArn = topicArnForName(name); final List<Topic> topics = Arrays.asList(randomTopic(), randomTopic(), topicForArn(topicArn), randomTopic()); final ListTopicsResult mockListTopicsResult = mock(ListTopicsResult.class); when(mockListTopicsResult.getTopics()).thenReturn(topics); when(mockListTopicsResult.getNextToken()).thenReturn(null); final String nextToken = null; when(mockAmazonSnsClient.listTopics(nextToken)).thenReturn(mockListTopicsResult); final SnsTopicResource mockSnsTopicResource = mock(SnsTopicResource.class); whenNew(SnsTopicResource.class).withArguments(name, topicArn, mockAmazonSnsClient) .thenReturn(mockSnsTopicResource); // When final SnsTopicResource result = factory.createSnsTopicResource(name); // Then assertSame(mockSnsTopicResource, result); }
@Test public void resolveDestination_withExistentTopic_returnsTopicArnFoundWhileListingTopic() throws Exception { // Arrange String topicArn = "arn:aws:sns:eu-west:123456789012:test"; AmazonSNS sns = mock(AmazonSNS.class); when(sns.listTopics(new ListTopicsRequest(null))).thenReturn(new ListTopicsResult().withTopics(new Topic().withTopicArn(topicArn))); DynamicTopicDestinationResolver resolver = new DynamicTopicDestinationResolver(sns); // Act String resolvedDestinationName = resolver.resolveDestination("test"); // Assert assertEquals(topicArn, resolvedDestinationName); }
@Test public void resolveDestination_withExistentTopicAndMarker_returnsTopicArnFoundWhileListingTopic() throws Exception { // Arrange AmazonSNS sns = mock(AmazonSNS.class); when(sns.listTopics(new ListTopicsRequest(null))).thenReturn(new ListTopicsResult().withNextToken("mark")); String topicArn = "arn:aws:sns:eu-west:123456789012:test"; when(sns.listTopics(new ListTopicsRequest("mark"))).thenReturn(new ListTopicsResult().withTopics(new Topic().withTopicArn(topicArn))); DynamicTopicDestinationResolver resolver = new DynamicTopicDestinationResolver(sns); // Act String resolvedDestinationName = resolver.resolveDestination("test"); // Assert assertEquals(topicArn, resolvedDestinationName); }
@Test public void resolveDestination_withResourceIdResolver_shouldCallIt() throws Exception { // Arrange String physicalTopicName = "arn:aws:sns:eu-west:123456789012:myTopic"; String logicalTopicName = "myTopic"; ResourceIdResolver resourceIdResolver = mock(ResourceIdResolver.class); when(resourceIdResolver.resolveToPhysicalResourceId(logicalTopicName)).thenReturn(physicalTopicName); AmazonSNS sns = mock(AmazonSNS.class); when(sns.listTopics(new ListTopicsRequest(null))).thenReturn(new ListTopicsResult().withTopics(new Topic().withTopicArn(physicalTopicName))); DynamicTopicDestinationResolver resolver = new DynamicTopicDestinationResolver(sns, resourceIdResolver); // Assert String resolvedDestinationName = resolver.resolveDestination(logicalTopicName); // Assert assertEquals(physicalTopicName, resolvedDestinationName); }
@Test public void send_validTextMessage_usesTopicChannel() throws Exception { // Arrange AmazonSNS amazonSns = mock(AmazonSNS.class); NotificationMessagingTemplate notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns); String physicalTopicName = "arn:aws:sns:eu-west:123456789012:test"; when(amazonSns.listTopics(new ListTopicsRequest(null))).thenReturn(new ListTopicsResult().withTopics(new Topic().withTopicArn(physicalTopicName))); notificationMessagingTemplate.setDefaultDestinationName(physicalTopicName); // Act notificationMessagingTemplate.send(MessageBuilder.withPayload("Message content").build()); // Assert verify(amazonSns).publish(new PublishRequest(physicalTopicName, "Message content", null).withMessageAttributes(isNotNull())); }
public String getTopicARN() { try { if (topicANR.isEmpty()) { ListTopicsResult topics = snsClient.listTopics(); for(Topic topic : topics.getTopics()) { String foundArn = topic.getTopicArn(); if (foundArn.contains(TOPIC_NAME)) { logger.info("Found notification topic for SNS, ARN is: " + foundArn); topicANR = foundArn; break; } } } if (topicANR.isEmpty()) { logger.info("Did not find notification topic for SNS, to receive updates create topic: " + TOPIC_NAME); } return topicANR; } catch (AuthorizationErrorException authException) { logger.error("Did not send SNS notification. You may need to update permissions for user via IAM. Exception was " + authException); return ""; } }
private void createTopicIfNotExists() { for (Topic topic : client.listTopics().getTopics()) { if (topic.getTopicArn().contains(topicName)) { topicArn = topic.getTopicArn(); break; } } if (topicArn == null) { CreateTopicRequest request = new CreateTopicRequest(topicName); CreateTopicResult result = client.createTopic(request); topicArn = result.getTopicArn(); log.debug("Topic created, arn: " + topicArn); } else { log.debug("Topic already created: " + topicArn); } }
/** * Returns ARN of topic where AutoScaling publishes system events * * @return */ private String getEssTopicArn() { Topic ess_topic = amazonSNS.listTopics().getTopics() .stream().filter(topic -> topic.getTopicArn().endsWith(ESS_TOPIC_NAME)).findFirst() .orElseThrow(() -> new ConfigurationException("Topic " + ESS_TOPIC_NAME + " does not exist.")); return ess_topic.getTopicArn(); }
@Override public Topic next() { if (cursor == topics.size() && result.getNextToken() != null) { result = sns.listTopics(result.getNextToken()); topics = result.getTopics(); cursor = 0; } try { return topics.get(cursor++); } catch (IndexOutOfBoundsException e) { throw new NoSuchElementException(); } }
private String topicArnForNameInTopics(final String name, final List<Topic> topics) { final String topicArnSuffix = ":" + name; for (final Topic topic : topics) { final String topicArn = topic.getTopicArn(); final int pos = topicArn.lastIndexOf(topicArnSuffix); if (pos == -1) { continue; } final String actualName = topicArn.substring(pos + 1); if (actualName.equals(name)) { return topicArn; } } return null; }
private String getTopicResourceName(String marker, String topicName) { ListTopicsResult listTopicsResult = this.amazonSns.listTopics(new ListTopicsRequest(marker)); for (Topic topic : listTopicsResult.getTopics()) { AmazonResourceName resourceName = AmazonResourceName.fromString(topic.getTopicArn()); if (resourceName.getResourceType().equals(topicName)) { return topic.getTopicArn(); } } if (StringUtils.hasText(listTopicsResult.getNextToken())) { return getTopicResourceName(listTopicsResult.getNextToken(), topicName); } else { throw new IllegalArgumentException("No topic found for name :'" + topicName + "'"); } }
@Test public void convertAndSend_withDestinationPayloadAndSubject_shouldSetSubject() throws Exception { // Arrange AmazonSNS amazonSns = mock(AmazonSNS.class); NotificationMessagingTemplate notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns); String physicalTopicName = "arn:aws:sns:eu-west:123456789012:test"; when(amazonSns.listTopics(new ListTopicsRequest(null))).thenReturn(new ListTopicsResult().withTopics(new Topic().withTopicArn(physicalTopicName))); // Act notificationMessagingTemplate.sendNotification(physicalTopicName, "My message", "My subject"); // Assert verify(amazonSns).publish(new PublishRequest(physicalTopicName, "My message", "My subject").withMessageAttributes(isNotNull())); }
@Test public void convertAndSend_withPayloadAndSubject_shouldSetSubject() throws Exception { // Arrange AmazonSNS amazonSns = mock(AmazonSNS.class); NotificationMessagingTemplate notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns); String physicalTopicName = "arn:aws:sns:eu-west:123456789012:test"; when(amazonSns.listTopics(new ListTopicsRequest(null))).thenReturn(new ListTopicsResult().withTopics(new Topic().withTopicArn(physicalTopicName))); notificationMessagingTemplate.setDefaultDestinationName(physicalTopicName); // Act notificationMessagingTemplate.sendNotification("My message", "My subject"); // Assert verify(amazonSns).publish(new PublishRequest(physicalTopicName, "My message", "My subject").withMessageAttributes(isNotNull())); }
@Override public Iterator<Topic> iterator() { return new Itr(sns); }
public List<String> arnValues(List<Topic> topics) { return topics.stream().map(Topic::getTopicArn).collect(Collectors.toList()); }
private Topic topicForArn(final String topicArn) { return new Topic().withTopicArn(topicArn); }
private Topic randomTopic() { return topicForArn(topicArnForName(randomString(20))); }
@Override public Topic apply(String s) { return new Topic().withTopicArn(s); }
public static String getTopicArn( final AmazonSNSClient sns, final String queueName, final boolean createOnMissing ) throws Exception { if ( logger.isTraceEnabled() ) { logger.trace( "Looking up Topic ARN: {}", queueName ); } ListTopicsResult listTopicsResult = sns.listTopics(); String topicArn = null; for ( Topic topic : listTopicsResult.getTopics() ) { String arn = topic.getTopicArn(); if ( queueName.equals( arn.substring( arn.lastIndexOf( ':' ) ) ) ) { topicArn = arn; if (logger.isTraceEnabled()) { logger.trace( "Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName ); } } } if ( topicArn == null && createOnMissing ) { if (logger.isTraceEnabled()) { logger.trace("Creating topic for queue=[{}]...", queueName); } CreateTopicResult createTopicResult = sns.createTopic( queueName ); topicArn = createTopicResult.getTopicArn(); if (logger.isTraceEnabled()) { logger.trace("Successfully created topic with name {} and arn {}", queueName, topicArn); } } else { logger.error( "Error looking up topic ARN for queue=[{}] and createOnMissing=[{}]", queueName, createOnMissing ); } if ( logger.isTraceEnabled() ) { logger.trace( "Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName ); } return topicArn; }