@Override public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { log.info("Checking subscriptions to {}", config.aws.topic.subscribeArn); String thisEndpoint = "http://" + appInfo.getPublicHostname() + SnsCamEchoResponseEndpoint.SNS_EP_CAM_ECHO_RESPONSE; List<Subscription> subscriptions = amazonSns.listSubscriptionsByTopic(config.aws.topic.subscribeArn).getSubscriptions(); boolean notSubscribed = true; for (Subscription subscription : subscriptions) { if (thisEndpoint.equals(subscription.getEndpoint())) { log.info("Found subscription {} on topic {} for endpoint {}", subscription.getSubscriptionArn(), subscription.getTopicArn(), thisEndpoint); notSubscribed = false; } } if (notSubscribed) { log.info("Subscribing to topic {} with endpoint {}", config.aws.topic.subscribeArn, thisEndpoint); SubscribeResult subscribeResult = amazonSns.subscribe(config.aws.topic.subscribeArn, "http", thisEndpoint); log.info("Subscription: {}", subscribeResult.getSubscriptionArn()); } }
private static boolean isSQSQueueSubscribingToSNSTopic( AmazonSNS amazonSNS, String queueARN, String topicARN ) { // This statement will throw if the topic does not exist. ListSubscriptionsByTopicResult subscriptions = amazonSNS.listSubscriptionsByTopic( new ListSubscriptionsByTopicRequest() .withTopicArn(topicARN) ); for (Subscription subscription : subscriptions.getSubscriptions()) { if (subscription.getEndpoint().equals(queueARN)) { return true; } } return false; }
@Override public PublishResult publish(PublishRequest publishRequest) throws AmazonClientException { String topicArn = publishRequest.getTopicArn(); if (!_subscriptionsForTopic.containsKey(topicArn)) { throw new NotFoundException("no such topic " + topicArn); } List<Subscription> topicSubscriptions = FluentIterable. from(_subscriptionsForTopic.get(topicArn)). transform(Functions.forMap(_subscriptionsByArn)). toList(); for (Subscription subscription : topicSubscriptions) { String queueName = getLast(subscription.getEndpoint().split(":")); String queueUrl = _sqsClient. getQueueUrl(new GetQueueUrlRequest().withQueueName(queueName)). getQueueUrl(); _sqsClient.sendMessage(new SendMessageRequest(). withQueueUrl(queueUrl). withMessageBody(publishRequest.getMessage())); } return new PublishResult(); }
@Override public SubscribeResult subscribe(SubscribeRequest subscribeRequest) throws AmazonClientException { final String protocol = subscribeRequest.getProtocol().toLowerCase(); if (!protocol.equals("sqs")) { throw new InvalidParameterException("endpoint protocol " + protocol + " not supported"); } final String topicArn = subscribeRequest.getTopicArn(); if (!_subscriptionsForTopic.containsKey(topicArn)) { throw new InvalidParameterException("no such topic " + topicArn); } String subscriptionArn = topicArn + ":" + RandomStringUtils.randomNumeric(7); if (!_subscriptionsByArn.containsKey(subscriptionArn)) { _subscriptionsByArn.put(subscriptionArn, new Subscription(). withTopicArn(topicArn). withProtocol(protocol). withSubscriptionArn(subscriptionArn). withEndpoint(subscribeRequest.getEndpoint())); _subscriptionsForTopic.get(topicArn).add(subscriptionArn); } return new SubscribeResult().withSubscriptionArn(subscriptionArn); }
public void publishAndReceiveSeparateSQSClients() { final String queueName = someQueueName(); final String queueUrl = someNewQueue(queueName); final String topicName = "publishAndReceiveSeparateSQSClients"; final String message = "hi from " + topicName; AmazonSNS amazonSNS = new InMemorySNS(_amazonSQS1, new Subscription(). withTopicArn(makeTopicArn(topicName)). withProtocol("sqs"). withSubscriptionArn(makeSomeSubArn(topicName)). withEndpoint(getQueueArn(queueName))); amazonSNS.publish(new PublishRequest(makeTopicArn(topicName), message)); ReceiveMessageResult result = _amazonSQS2.receiveMessage(new ReceiveMessageRequest(queueUrl). withWaitTimeSeconds(15)); Assert.assertEquals(result.getMessages().size(), 1); Assert.assertEquals(result.getMessages().get(0).getBody(), message); }
public InMemorySNS(AmazonSQS sqsClient, Subscription... subscriptions) { _sqsClient = sqsClient; for (Subscription subscription : subscriptions) { _subscriptionsByArn.put(subscription.getSubscriptionArn(), subscription); if (!_subscriptionsForTopic.containsKey(subscription.getTopicArn())) { _subscriptionsForTopic.put(subscription.getTopicArn(), new ArrayList<String>()); } _subscriptionsForTopic.get(subscription.getTopicArn()).add(subscription.getSubscriptionArn()); } }
@Override public UnsubscribeResult unsubscribe(UnsubscribeRequest unsubscribeRequest) throws AmazonClientException { if (!_subscriptionsByArn.containsKey(unsubscribeRequest.getSubscriptionArn())) throw new NotFoundException("no such subscription"); Subscription removed = _subscriptionsByArn.remove(unsubscribeRequest.getSubscriptionArn()); _subscriptionsForTopic.get(removed.getSubscriptionArn()).remove(removed.getSubscriptionArn()); return new UnsubscribeResult(); }
public void publishAndReceive() { final String queueName = someQueueName(); final String queueUrl = someNewQueue(queueName); final String topicName = "publishAndReceive"; final String message = "hi from " + topicName; AmazonSNS amazonSNS = new InMemorySNS(_amazonSQS1, new Subscription(). withTopicArn(makeTopicArn(topicName)). withProtocol("sqs"). withSubscriptionArn(makeSomeSubArn(topicName)). withEndpoint(getQueueArn(queueName))); Assert.assertEquals(amazonSNS.listTopics().getTopics().size(), 1); Assert.assertEquals(amazonSNS.listSubscriptions().getSubscriptions().size(), 1); Assert.assertEquals(amazonSNS. listSubscriptionsByTopic(new ListSubscriptionsByTopicRequest(makeTopicArn(topicName))) .getSubscriptions() .size(), 1); amazonSNS.publish(new PublishRequest(makeTopicArn(topicName), message)); ReceiveMessageResult result = _amazonSQS1.receiveMessage(new ReceiveMessageRequest(queueUrl)); Assert.assertEquals(result.getMessages().size(), 1); Assert.assertEquals(result.getMessages().get(0).getBody(), message); }
private void processSubscriptions() { if (subscriptionList != null && !subscriptionList.isEmpty()) { for (Subscription subscription : subscriptionList) { if (subscription.getProtocol().startsWith("http")) { processUrlSubscription(subscription); } else { // sqs subscription processSqsSubscription(subscription); } } } }
private void processUrlSubscription(Subscription urlSubscription) { String snsUrlSubscriptionArn = null; for (Subscription subscription : client.listSubscriptions() .getSubscriptions()) { if (subscription.getTopicArn().equals(topicArn) && subscription.getProtocol().equals( urlSubscription.getProtocol()) && subscription.getEndpoint().contains( urlSubscription.getEndpoint())) { if (!subscription.getSubscriptionArn().equals( "PendingConfirmation")) { snsUrlSubscriptionArn = subscription.getSubscriptionArn(); break; } } } if (snsUrlSubscriptionArn == null) { SubscribeRequest request = new SubscribeRequest(topicArn, urlSubscription.getProtocol(), urlSubscription.getEndpoint()); SubscribeResult result = client.subscribe(request); snsUrlSubscriptionArn = result.getSubscriptionArn(); log.info("Subscribed URL to SNS with subscription ARN: " + snsUrlSubscriptionArn); } else { log.info("Already subscribed with ARN: " + snsUrlSubscriptionArn); } }
@Test public void testSnsChannelParser() { setUp("SnsChannelParserTests.xml", getClass(), "snsChannel"); final SnsExecutor snsExecutor = TestUtils.getPropertyValue( this.channel, "snsExecutor", SnsExecutor.class); assertNotNull(snsExecutor); final String topicNameProperty = TestUtils.getPropertyValue( snsExecutor, "topicName", String.class); assertEquals("testTopic", topicNameProperty); assertEquals(true, TestUtils.getPropertyValue(channel, "autoStartup", Boolean.class)); assertTrue(TestUtils.getPropertyValue(channel, "phase", Integer.class) == 0); @SuppressWarnings("unchecked") final List<Subscription> subscriptions = TestUtils.getPropertyValue( snsExecutor, "subscriptionList", List.class); assertThat(subscriptions, is(not(empty()))); Subscription defS = subscriptions.get(0); assertThat(defS.getEndpoint(), containsString("www.example.com")); Object snsExecutorProxy = context.getBean("snsExecutorProxy"); assertNotNull(snsExecutorProxy); assertEquals(SnsExecutorProxy.class, snsExecutorProxy.getClass()); SnsExecutor proxiedExecutor = TestUtils.getPropertyValue( snsExecutorProxy, "snsExecutor", SnsExecutor.class); assertNotNull(proxiedExecutor); SnsExecutor innerBean = context.getBean(SnsExecutor.class); assertSame(innerBean, proxiedExecutor); }
private void processSqsSubscription(Subscription sqsSubscription) { Assert.state(sqsExecutorMap != null, "'sqsExecutorMap' must not be null"); SqsExecutor sqsExecutor = null; String endpointValue = sqsSubscription.getEndpoint(); if (sqsExecutorMap.containsKey(endpointValue)) { sqsExecutor = sqsExecutorMap.get(endpointValue); sqsSubscription.setEndpoint(sqsExecutor.getQueueArn()); } else { // endpointValue is the queue-arn sqsSubscription.setEndpoint(endpointValue); } String snsSqsSubscriptionArn = null; for (Subscription subscription : client.listSubscriptions() .getSubscriptions()) { if (subscription.getTopicArn().equals(topicArn) && subscription.getProtocol().equals( sqsSubscription.getProtocol()) && subscription.getEndpoint().equals( sqsSubscription.getEndpoint())) { snsSqsSubscriptionArn = subscription.getSubscriptionArn(); break; } } if (snsSqsSubscriptionArn == null) { SubscribeRequest request = new SubscribeRequest(topicArn, sqsSubscription.getProtocol(), sqsSubscription.getEndpoint()); SubscribeResult result = client.subscribe(request); snsSqsSubscriptionArn = result.getSubscriptionArn(); log.info("Subscribed SQS to SNS with subscription ARN: " + snsSqsSubscriptionArn); } else { log.info("Already subscribed with ARN: " + snsSqsSubscriptionArn); } if (sqsExecutor != null) { sqsExecutor.addSnsPublishPolicy(topicName, topicArn); } }
public void setSubscriptionList(List<Subscription> subscriptionList) { this.subscriptionList = subscriptionList; }