我有一个非常简单的Spring Boot应用程序,该应用程序提供了几个宁静的终结点,并且应该可以驱动将sftp文件上传到sftp服务器。我的要求是,如果有多个文件,则文件应排队。我希望通过sftp spring集成工作流的默认行为来实现这一目标,因为我读到DirectChannel自动将文件排队。要测试该行为,请执行以下操作:
预期结果:较小的文件将排队到通道上,并在较大文件的上传完成后进行处理。实际结果:打开与sftp服务器的新连接,较小的文件上传到那里而无需排队,而较大的文件继续传输。
我的应用程序中有两个文件:
DemoApplication.java
@SpringBootApplication @IntegrationComponentScan @EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class}) public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Bean public SessionFactory<LsEntry> sftpSessionFactory() { DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); factory.setHost("localhost"); factory.setPort(22); factory.setUser("tester"); factory.setPassword("password"); factory.setAllowUnknownKeys(true); return factory; } @Bean @ServiceActivator(inputChannel = "toSftpChannel") public MessageHandler handler() { SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory()); handler.setRemoteDirectoryExpression(new LiteralExpression("/")); return handler; } @MessagingGateway public interface MyGateway { @Gateway(requestChannel = "toSftpChannel") void sendToSftp(File file); } }
DemoController.java
@RestController public class DemoController { @Autowired MyGateway gateway; @RequestMapping("/sendFile") public void sendFile() { File file = new File("C:/smallFile.txt"); gateway.sendToSftp(file); } @RequestMapping("/sendBigFile") public void sendBigFile() { File file = new File("D:/bigFile.zip"); gateway.sendToSftp(file); } }
我是一个完全的新手,我不能完全确定我的sftp通道是否在这里正确创建,我的猜测是每次我执行sendToSftp调用时都会创建一个新的sftp通道。在这种情况下,如何实现队列行为的任何帮助将不胜感激。
您在这里没有队列,因为每个HTTP请求都是在其自己的线程中执行的。正确,http线程池用尽时,您可能仍然在那儿排队,但是在您只有两个请求的简单用例中,这似乎并不存在。
无论如何,您都可以在那里实现队列行为,但是您应该将自己声明toSftpChannel为QueueChannelBean。
toSftpChannel
QueueChannel
这样,下游进程将始终在同一线程上执行,并且下一个消息恰好在第一个消息之后从队列中拉出。
有关更多信息,请参见参考手册。
更新
由于您使用的FtpMessageHandler是单向组件,但是您仍然需要对MVC控制器的方法进行一些答复,因此,唯一的方法就是拥有一个不返回的@Gateway方法void,当然,我们需要以某种方式发送答复。
FtpMessageHandler
@Gateway
void
为此,我建议使用PublishSubscribeChannel:
PublishSubscribeChannel
@Bean @BridgeTo public MessageChannel toSftpChannel() { return new PublishSubscribeChannel(); } @Bean @ServiceActivator(inputChannel = "toSftpChannel") @Order(0) public MessageHandler handler() { SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory()); handler.setRemoteDirectoryExpression(new LiteralExpression("/")); return handler; }
这样,我们就有两个订阅者toSftpChannel。使用,@Order(0)我们确保@ServiceActivator是第一个订户,因为我们需要首先执行SFTP传输。用,@BridgeTo我们BridgeHandler在相同的位置增加一秒钟PublishSubscribeChannel。它的目的只是获取replyChannel标头,然后在其中发送请求消息。由于我们不使用任何线程,因此BridgeHandler将在完成向SFTP的传输后立即执行。
@Order(0)
@ServiceActivator
@BridgeTo
BridgeHandler
replyChannel
当然,BridgeHandler除了您之外,您还可以拥有其他任何东西,@ServiceActivator或者@Transfromer返回其他信息而不是请求File。例如:
@Transfromer
File
@ServiceActivator(inputChannel = "toSftpChannel") @Order(1) public String transferComplete(File payload) { return "The SFTP transfer complete for file: " + payload; }