有一middleware两个其他软件组件之间英寸 在middleware我Apache ActiveMQ通过路由消息Apache Camel。
middleware
Apache ActiveMQ
Apache Camel
这是这样的:
1stComponent
3rdComponent
1st
1stComponent <<=>> Middleware <<=>> 3rdComponent
问题:
我ConcurrentConsumers在中间件中使用。
ConcurrentConsumers
在顺序发送大量消息middleware的过程中,突然停止了所有过程!没有例外或消息!例如,处理了500条消息中的前100条,其余的作为待处理消息保留在队列中。
在过程的中间有时会记录此警告:
[WARN ] TemporaryQueueReplyManager(Camel (camel-1) thread #11 - TemporaryQueueReplyManager[Q.MyQ]):91 - Reply received for unknown correlationID [c551c7aa061f501c]. The message will be ignored: ActiveMQMapMessage {commandId = 2161, responseRequired = true, messageId = ID:xxxxxxx, originalDestination = null, originalTransactionId = null, producerId = ID:xxxxxxx, destination = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, transactionId = null, expiration = 0, timestamp = 1389890272360, arrival = 0, brokerInTime = 1389890272360, brokerOutTime = 1389890272360, correlationId = c551c7aa061f501c, replyTo = temp-queue://ID:localhost.localdomain-40961-1389890357282-3:1:1, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@19e19da, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID:xxxxxxxxxxxxxx, Title=300, CamelJmsDeliveryMode=1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {} }
这是middlewares代码:
middlewares
private static class MyRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { from("activemq:queue:Q.Middleware?concurrentConsumers=1&maxConcurrentConsumers=10") .threads(1, 100) .process(new Processor() { public void process(Exchange exchange) { //some code } }) .inOut("activemq2:queue:Q.3RD") ; } }
和3rdComponent:
private static class MyRouteBuilder extends RouteBuilder { @Override public void configure() { from("activemq:queue:Q.3RD") .threads(1, 100) .process(new Processor() { public void process(Exchange exchange) { //some code } }) ; } }
更新:
错误是 产生 非唯一的 CorrelationIDs!(随机字符串生成器中的错误)很简单!:|
CorrelationIDs