我创建了简化的示例,它再现了我的真实问题。
我的示例从Google pub / sub接受,将其记录下来,然后将ack发送回 Pub/Sub
Pub/Sub
配置:
@Slf4j @Configuration public class MyConfig implements FlowSupport { private final AppProperties properties; public MyConfig(AppProperties properties) { this.properties = properties; } @Bean public JacksonFactory jacksonFactory() { return JacksonFactory.getDefaultInstance(); } @Bean public MessageChannel bucketNotificationChannel() { return MessageChannels.direct("input-notification-channel").get(); } @Bean public PubSubInboundChannelAdapter messageChannelAdapter(PubSubTemplate template) { var adapter = new PubSubInboundChannelAdapter(template, properties.getBucketTopicSubscription()); adapter.setOutputChannel(bucketNotificationChannel()); adapter.setErrorChannel(errorChannel()); adapter.setAckMode(AckMode.MANUAL); adapter.setPayloadType(Notification.class); return adapter; } @Bean @Override public MessageChannel idempotentDiscardChannel() { return MessageChannels.direct("idempotent-discard-channel").get(); } @Bean public MessageChannel errorChannel() { return MessageChannels.direct("global-error-channel").get(); } @Bean @Override public ConcurrentMetadataStore idempotencyStore() { return new SimpleMetadataStore(); } @Bean public IntegrationFlow bucketNotificationFlow( EmptyNotificationHandler handler, IntegrationFlow acknowledgementFlow ) { return flow -> flow.channel(bucketNotificationChannel()) .handle(handler) .log(INFO, "Handler finished", m -> { return "got" + m; }).gateway(acknowledgementFlow); } @Bean public IntegrationFlow acknowledgementFlow(PubSubAckHandler handler) { return flow -> flow.log(DEBUG, "acknowledgementFlow", m -> "Handling acknowledgement for message: " + m) .handle(handler); } }
NotificationHandler:
@Component @Slf4j public class EmptyNotificationHandler implements GenericHandler<Notification> { private final ResourceLoader loader; public EmptyNotificationHandler(ResourceLoader loader) { this.loader = loader; } @Override public Resource handle(Notification payload, MessageHeaders headers) { try { return new Resource() { @Override public boolean exists() { return false; } ... }; } catch (Exception e) { log.error("Error occurred:", e); return null; } } }
AckHandler :
@Component public class MyPubSubAckHandler implements MessageHandler { private final ConcurrentMetadataStore idempotencyStore; public MyPubSubAckHandler(ConcurrentMetadataStore idempotencyStore, MeterRegistry meterRegistry) { this.idempotencyStore = idempotencyStore; } @Override public void handleMessage(@NonNull Message<?> message) throws MessagingException { Message<?> targetMessage = MessageUtils.unwrap(message); var pubSubMessage = getOriginalMessage(targetMessage); if (pubSubMessage == null) { removeFromIdempotentStore(targetMessage); return; } var generation = targetMessage.getHeaders().get(OBJECT_GENERATION_HEADER); if (message instanceof ErrorMessage || message.getPayload() instanceof Throwable) { pubSubMessage.nack().addCallback( v -> { removeFromIdempotentStore(targetMessage); log.error("Error message was nacked - {}", generation); }, e -> { removeFromIdempotentStore(targetMessage); log.error("Failed to nack message {}", generation, e); } ); } else { pubSubMessage.ack().addCallback( v -> { removeFromIdempotentStore(targetMessage); log.info("Acknowledged message - {}", generation); }, e -> { removeFromIdempotentStore(targetMessage); log.error("Failed to acknowledge message - {}", generation, e); } ); } } @SuppressWarnings({"RedundantSuppression", "unchecked"}) //IDEMPOTENCY_HEADER has Set<String> underneath private void removeFromIdempotentStore(Message<?> targetMessage) { Optional.ofNullable(targetMessage.getHeaders().get(IDEMPOTENCY_HEADER, Set.class)).stream() .flatMap(Collection::stream) .forEach(key -> idempotencyStore.remove(String.valueOf(key))); } }
当我发送第一条消息时,一切正常-我在日志中看到消息,并且ack发送到pubsub。我也看到gcp subcriptions页面上未确认的消息数量为0。
但是在收到几条成功消息后,我的应用程序停止接收消息。我浪费了很多时间进行调试,并且发现了以下内容:
几个线程挂起了行:org.springframework.messaging.core.GenericMessagingTemplate.TemporaryReplyChannel#receive(long):314
this.replyLatch.await();
线程转储:
"gcp-pubsub-subscriber1@7980" prio=5 tid=0x1e nid=NA waiting java.lang.Thread.State: WAITING at jdk.internal.misc.Unsafe.park(Unsafe.java:-1) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232) at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:314) at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:306) at org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:207) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:240) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:46) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:503) at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:474) at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:573) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:508) at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:478) at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:468) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) at com.sun.proxy.$Proxy110.exchange(Unknown Source:-1) at org.springframework.integration.gateway.GatewayMessageHandler.handleRequestMessage(GatewayMessageHandler.java:88) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444) at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:133) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198) at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:148) at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter$$Lambda$859.600858818.accept(Unknown Source:-1) at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:152) at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate$$Lambda$860.1495761010.receiveMessage(Unknown Source:-1) at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:379) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) at java.util.concurrent.FutureTask.run(FutureTask.java:-1) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.lang.Thread.run(Thread.java:834)
我想这是一个原因,因为在com.google.cloud.pubsub.v1.MessageDispatcher#processOutstandingMessage内部:
该行:
executor.execute(deliverMessageTask);
已执行,但threadExecutor未执行liverMessageMessage。
在我看来,它看起来像是库中的错误,但可能是库滥用。无论如何,我正在寻找任何解决方案/解决方法,以避免出现此问题。
我使用: spring-boot 2.2.0.RELEASE springCloudVersion ="Greenwich.SR3"
2.2.0.RELEASE
"Greenwich.SR3"
com.google.cloud:google-cloud-pubsub:1.98.0
聚苯乙烯
我知道我可以增加线程池大小,例如:
spring: cloud: gcp: pubsub: enabled: true subscriber: executor-threads: 100
但是我认为这不是一个好主意。
您的问题在这里:
.gateway(acknowledgementFlow);
这意味着request-reply,我们无法猜测您acknowledgementFlow的one- way身份。我看到,你MyPubSubAckHandler实现它返回void它的handleMessage()实施。这样,网关等待答复,但实际的子流将永远不会返回任何结果。因此,等待答复线程被卡住,最终您的应用程序失败。
request-reply
acknowledgementFlow
one- way
MyPubSubAckHandler
void
handleMessage()
解决方案之一是使GatewayEndpointSpec.replyTimeout()as为0。因此,您的void子流程不会阻止潜在回复的主要流程。
GatewayEndpointSpec.replyTimeout()
0
另一种方法就是不使用gateway(),而是将子流内容直接包含在主流中。看起来确实不像您期望的那样,所以这应该对您有用:
gateway()
return flow -> flow.channel(bucketNotificationChannel()) .handle(handler) .log(INFO, "Handler finished", m -> { return "got" + m; }) .log(DEBUG, "acknowledgementFlow", m -> "Handling acknowledgement for message: " + m) .handle(pubSubAckHandler);