public void processReply(ReplyHolder holder) { if (holder != null && isRunAllowed()) { try { Exchange exchange = holder.getExchange(); boolean timeout = holder.isTimeout(); if (timeout) { // timeout occurred do a WARN log so its easier to spot in the logs if (log.isWarnEnabled()) { log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}." + " Setting ExchangeTimedOutException on {} and continue routing.", holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange)); } // no response, so lets set a timed out exception String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo; exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg)); } else { messageConverter.populateRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage(), true); // restore correlation id in case the remote server messed with it if (holder.getOriginalCorrelationId() != null) { if (exchange.hasOut()) { exchange.getOut().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId()); } else { exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId()); } } } } finally { // notify callback AsyncCallback callback = holder.getCallback(); callback.done(false); } } }
@Override public void configure() throws Exception { onException(Exception.class).bean(asError).bean(toJson).handled(true); errorHandler(new LoggingErrorHandlerBuilder(log)); // from("timer:hello?period=3s") // .process(exchange -> { // exchange.getIn().setBody(new GreetingReceived("Hendy")); // }) // .to("seda:greetingReceived"); // from("timer:tell me a good story?period=1s&repeatCount=1") // .process(exchange -> { // final AgentResponse agentResponse = aimlService.process(Locale.US, "tell me a good story", logChannel); // droolsService.process(agentResponse); // }); final String agentId = "arkan"; from("rabbitmq://localhost/amq.topic?connectionFactory=#amqpConnFactory&exchangeType=topic&autoDelete=false&queue=" + AvatarChannel.CHAT_INBOX.wildcard() + "&routingKey=" + AvatarChannel.CHAT_INBOX.wildcard()) .process(exchange -> { final long startTime = System.currentTimeMillis(); final CommunicateAction inCommunicate = toJson.getMapper().readValue( exchange.getIn().getBody(byte[].class), CommunicateAction.class); inCommunicate.setAvatarId(AvatarChannel.getAvatarId((String) exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY))); log.info("Chat inbox for {}: {}", inCommunicate.getAvatarId(), inCommunicate); final Optional<Locale> origLocale = Optional.ofNullable(inCommunicate.getInLanguage()); final float[] speechTruthValue = Optional.ofNullable(inCommunicate.getSpeechTruthValue()).orElse(new float[]{0f, 0f, 0f}); final boolean speechInput = speechTruthValue.length >= 2 && speechTruthValue[1] > 0f; // AIML style // final AgentResponse agentResponse = aimlService.process(origLocale, inCommunicate.getObject(), // chatChannel, inCommunicate.getAvatarId(), speechInput); // if (!agentResponse.getCommunicateActions().isEmpty()) { // for (final CommunicateAction communicateAction : agentResponse.getCommunicateActions()) { // chatChannel.express(inCommunicate.getAvatarId(), communicateAction, null); // } // } else if (agentResponse.getUnrecognizedInput() != null) { // chatChannel.express(inCommunicate.getAvatarId(), Proposition.I_DONT_UNDERSTAND, true, null); // } // droolsService.process(agentResponse); final InteractionSession session = sessionManager.getOrCreate(chatChannel, inCommunicate.getAvatarId()); session.receiveUtterance(origLocale, inCommunicate.getObject(), inCommunicate.getAvatarId(), factService, taskRepo, scriptRepo); session.update(chatChannel, inCommunicate.getAvatarId()); // FIXME: re-implement SocialJournal // final SocialJournal socialJournal = new SocialJournal(); // socialJournal.setFromResponse(origLocale, inCommunicate.getAvatarId(), // inCommunicate.getObject(), SocialChannel.DIRECT, // agentResponse, Duration.millis(System.currentTimeMillis() - startTime)); // socialJournalRepo.save(socialJournal); exchange.getIn().setBody(new Status()); }) .bean(toJson); }