我的情况类似于此SO问题中描述的情况。区别在于我不使用aWebFlux.outboundGateway而是Ftp.outboundGateway在其上调用AbstractRemoteFileOutboundGateway.Command.GET命令,常见的问题是我无法使用定义RequestHandlerRetryAdvice。
WebFlux.outboundGateway
Ftp.outboundGateway
AbstractRemoteFileOutboundGateway.Command.GET
RequestHandlerRetryAdvice
配置看起来像这样(向下分解到相关部分):
@RestController @RequestMapping( value = "/somepath" ) public class DownloadController { private DownloadGateway downloadGateway; public DownloadController( DownloadGateway downloadGateway ) { this.downloadGateway = downloadGateway; } @PostMapping( "/downloads" ) public void download( @RequestParam( "filename" ) String filename ) { Map<String, Object> headers = new HashMap<>(); downloadGateway.triggerDownload( filename, headers ); } } @MessagingGateway public interface DownloadGateway { @Gateway( requestChannel = "downloadFiles.input" ) void triggerDownload( Object value, Map<String, Object> headers ); } @Configuration @EnableIntegration public class FtpDefinition { private FtpProperties ftpProperties; public FtpDefinition( FtpProperties ftpProperties ) { this.ftpProperties = ftpProperties; } @Bean public DirectChannel gatewayDownloadsOutputChannel() { return new DirectChannel(); } @Bean public IntegrationFlow downloadFiles( RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile ) { return f -> f.handle( getRemoteFile, getRetryAdvice() ) .channel( "gatewayDownloadsOutputChannel" ); } private Consumer<GenericEndpointSpec<AbstractRemoteFileOutboundGateway<FTPFile>>> getRetryAdvice() { return e -> e.advice( ( (Supplier<RequestHandlerRetryAdvice>) () -> { RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice(); advice.setRetryTemplate( getRetryTemplate() ); return advice; } ).get() ); } private RetryTemplate getRetryTemplate() { RetryTemplate result = new RetryTemplate(); FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod( 5000 ); result.setBackOffPolicy( backOffPolicy ); return result; } @Bean public RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile( SessionFactory sessionFactory ) { return Ftp.outboundGateway( sessionFactory, AbstractRemoteFileOutboundGateway.Command.GET, "payload" ) .fileExistsMode( FileExistsMode.REPLACE ) .localDirectoryExpression( "'" + ftpProperties.getLocalDir() + "'" ) .autoCreateLocalDirectory( true ); } @Bean public SessionFactory<FTPFile> ftpSessionFactory() { DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory(); sessionFactory.setHost( ftpProperties.getServers().get( 0 ).getHost() ); sessionFactory.setPort( ftpProperties.getServers().get( 0 ).getPort() ); sessionFactory.setUsername( ftpProperties.getServers().get( 0 ).getUser() ); sessionFactory.setPassword( ftpProperties.getServers().get( 0 ).getPassword() ); return sessionFactory; } } @SpringBootApplication @EnableIntegration @IntegrationComponentScan public class FtpTestApplication { public static void main(String[] args) { SpringApplication.run( FtpTestApplication.class, args ); } } @Configuration @PropertySource( "classpath:ftp.properties" ) @ConfigurationProperties( prefix = "ftp" ) @Data public class FtpProperties { @NotNull private String localDir; @NotNull private List<Server> servers; @Data public static class Server { @NotNull private String host; @NotNull private int port; @NotNull private String user; @NotNull private String password; } }
Controller主要用于测试目的,在实际的实现中有一个轮询器。我FtpProperties持有服务器列表,因为在实际实现中,我使用a DelegatingSessionFactory根据一些参数选择一个实例。
FtpProperties
DelegatingSessionFactory
根据加里·罗素(Gary Russell)的评论,我希望重试失败的下载。但是,如果我中断了下载服务器端(通过在FileZilla实例中发出“踢用户”),我将立即获得堆栈跟踪,而不会重试:
org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received. Server closed connection. [...]
我还需要上传文件,为此我使用Ftp.outboundAdapter。在这种情况下RetryTemplate,如果使用相同的方法,如果我中断了上传服务器端,SpringIntegration将再执行两次尝试,每次尝试的延迟为5s,然后再进行log java.net.SocketException: Connectionreset,所有操作均符合预期。
Ftp.outboundAdapter
RetryTemplate
java.net.SocketException: Connectionreset
我尝试调试了一下,并注意到在第一次尝试通过进行上传之前Ftp.outboundAdapter,遇到了断点RequestHandlerRetryAdvice.doInvoke()。但是,通过下载时Ftp.outboundGateway,该断点_永远不会_ 命中。
RequestHandlerRetryAdvice.doInvoke()
我的配置有问题吗,有人可以RequestHandlerRetryAdvice使用Ftp.outboundGateway/ AbstractRemoteFileOutboundGateway.Command.GET吗?
抱歉耽搁了; 我们这周在SpringOne平台上。
问题是由于网关规范是一个bean的事实-网关最终在应用建议之前被初始化。
我这样更改了您的代码…
@Bean public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) { return f -> f.handle(getRemoteFile(sessionFactory), getRetryAdvice()) .channel("gatewayDownloadsOutputChannel"); } ... private RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile(SessionFactory<FTPFile> sessionFactory) { return Ftp.outboundGateway(sessionFactory, AbstractRemoteFileOutboundGateway.Command.GET, "payload") .fileExistsMode(FileExistsMode.REPLACE) .localDirectoryExpression("'/tmp'") .autoCreateLocalDirectory(true); }
…而且有效。
通常最好不要直接处理Specs,而只是将它们内联到流定义中…
@Bean public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) { return f -> f.handle(Ftp.outboundGateway(sessionFactory, AbstractRemoteFileOutboundGateway.Command.GET, "payload") .fileExistsMode(FileExistsMode.REPLACE) .localDirectoryExpression("'/tmp'") .autoCreateLocalDirectory(true), getRetryAdvice()) .channel("gatewayDownloadsOutputChannel"); }