小编典典

Spring Integration中的REST端点使消息传递通道成为多线程

spring-boot

我有一个非常简单的Spring
Boot应用程序,该应用程序提供了几个宁静的终结点,并且应该可以驱动将sftp文件上传到sftp服务器。我的要求是,如果有多个文件,则文件应排队。我希望通过sftp
spring集成工作流的默​​认行为来实现这一目标,因为我读到DirectChannel自动将文件排队。要测试该行为,请执行以下操作:

  1. 发送一个大文件,通过调用端点来阻塞通道一段时间。
  2. 通过调用端点来发送较小的文件。

预期结果:较小的文件将排队到通道上,并在较大文件的上传完成后进行处理。实际结果:打开与sftp服务器的新连接,较小的文件上传到那里而无需排队,而较大的文件继续传输。

我的应用程序中有两个文件:

DemoApplication.java

@SpringBootApplication
@IntegrationComponentScan
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class})
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("tester");
        factory.setPassword("password");
        factory.setAllowUnknownKeys(true);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "toSftpChannel")
    public MessageHandler handler() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
        return handler;
    }

    @MessagingGateway
    public interface MyGateway {

         @Gateway(requestChannel = "toSftpChannel")
         void sendToSftp(File file);
    }
}

DemoController.java

@RestController
public class DemoController {

    @Autowired
    MyGateway gateway;

    @RequestMapping("/sendFile")
    public void sendFile() {
        File file = new File("C:/smallFile.txt");
        gateway.sendToSftp(file);
    }

    @RequestMapping("/sendBigFile")
    public void sendBigFile() {
        File file = new File("D:/bigFile.zip");
        gateway.sendToSftp(file);
    }
}

我是一个完全的新手,我不能完全确定我的sftp通道是否在这里正确创建,我的猜测是每次我执行sendToSftp调用时都会创建一个新的sftp通道。在这种情况下,如何实现队列行为的任何帮助将不胜感激。


阅读 696

收藏
2020-05-30

共1个答案

小编典典

您在这里没有队列,因为每个HTTP请求都是在其自己的线程中执行的。正确,http线程池用尽时,您可能仍然在那儿排队,但是在您只有两个请求的简单用例中,这似乎并不存在。

无论如何,您都可以在那里实现队列行为,但是您应该将自己声明toSftpChannelQueueChannelBean。

这样,下游进程将始终在同一线程上执行,并且下一个消息恰好在第一个消息之后从队列中拉出。

有关更多信息,请参见参考手册

更新

由于您使用的FtpMessageHandler是单向组件,但是您仍然需要对MVC控制器的方法进行一些答复,因此,唯一的方法就是拥有一个不返回的@Gateway方法void,当然,我们需要以某种方式发送答复。

为此,我建议使用PublishSubscribeChannel

@Bean
@BridgeTo
public MessageChannel toSftpChannel() {
    return new PublishSubscribeChannel();
}

@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(0)
public MessageHandler handler() {
    SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
    return handler;
}

这样,我们就有两个订阅者toSftpChannel。使用,@Order(0)我们确保@ServiceActivator是第一个订户,因为我们需要首先执行SFTP传输。用,@BridgeTo我们BridgeHandler在相同的位置增加一秒钟PublishSubscribeChannel。它的目的只是获取replyChannel标头,然后在其中发送请求消息。由于我们不使用任何线程,因此BridgeHandler将在完成向SFTP的传输后立即执行。

当然,BridgeHandler除了您之外,您还可以拥有其他任何东西,@ServiceActivator或者@Transfromer返回其他信息而不是请求File。例如:

@ServiceActivator(inputChannel = "toSftpChannel")
@Order(1)
public String transferComplete(File payload) {
    return "The SFTP transfer complete for file: " + payload;
}
2020-05-30