小编典典

Spring Integration中无法使RequestHandlerRetryAdvice与Ftp.outboundGateway一起使用

spring-boot

我的情况类似于此SO问题中描述的情况。区别在于我不使用aWebFlux.outboundGateway而是Ftp.outboundGateway在其上调用AbstractRemoteFileOutboundGateway.Command.GET命令,常见的问题是我无法使用定义RequestHandlerRetryAdvice

配置看起来像这样(向下分解到相关部分):

@RestController
@RequestMapping( value = "/somepath" )
public class DownloadController
{
   private DownloadGateway downloadGateway;

   public DownloadController( DownloadGateway downloadGateway )
   {
      this.downloadGateway = downloadGateway;
   }

   @PostMapping( "/downloads" )
   public void download( @RequestParam( "filename" ) String filename )
   {
      Map<String, Object> headers = new HashMap<>();

      downloadGateway.triggerDownload( filename, headers );
   }
}



@MessagingGateway
public interface DownloadGateway
{
   @Gateway( requestChannel = "downloadFiles.input" )
   void triggerDownload( Object value, Map<String, Object> headers );
}



@Configuration
@EnableIntegration
public class FtpDefinition
{
   private FtpProperties ftpProperties;

   public FtpDefinition( FtpProperties ftpProperties )
   {
      this.ftpProperties = ftpProperties;
   }

   @Bean
   public DirectChannel gatewayDownloadsOutputChannel()
   {
      return new DirectChannel();
   }

   @Bean
   public IntegrationFlow downloadFiles( RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile )
   {
      return f -> f.handle( getRemoteFile, getRetryAdvice() )
                   .channel( "gatewayDownloadsOutputChannel" );
   }

   private Consumer<GenericEndpointSpec<AbstractRemoteFileOutboundGateway<FTPFile>>> getRetryAdvice()
   {
      return e -> e.advice( ( (Supplier<RequestHandlerRetryAdvice>) () -> {
         RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
         advice.setRetryTemplate( getRetryTemplate() );
         return advice;
      } ).get() );
   }

   private RetryTemplate getRetryTemplate()
   {
      RetryTemplate result = new RetryTemplate();

      FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
      backOffPolicy.setBackOffPeriod( 5000 );

      result.setBackOffPolicy( backOffPolicy );
      return result;
   }

   @Bean
   public RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile( SessionFactory sessionFactory )
   {
      return 
         Ftp.outboundGateway( sessionFactory,
                              AbstractRemoteFileOutboundGateway.Command.GET,
                              "payload" )
            .fileExistsMode( FileExistsMode.REPLACE )
            .localDirectoryExpression( "'" + ftpProperties.getLocalDir() + "'" )
            .autoCreateLocalDirectory( true );
   }

   @Bean
   public SessionFactory<FTPFile> ftpSessionFactory()
   {
      DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
      sessionFactory.setHost( ftpProperties.getServers().get( 0 ).getHost() );
      sessionFactory.setPort( ftpProperties.getServers().get( 0 ).getPort() );
      sessionFactory.setUsername( ftpProperties.getServers().get( 0 ).getUser() );
      sessionFactory.setPassword( ftpProperties.getServers().get( 0 ).getPassword() );
      return sessionFactory;
   }
}



@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class FtpTestApplication {

    public static void main(String[] args) {
        SpringApplication.run( FtpTestApplication.class, args );
    }
}



@Configuration
@PropertySource( "classpath:ftp.properties" )
@ConfigurationProperties( prefix = "ftp" )
@Data
public class FtpProperties
{
   @NotNull
   private String localDir;

   @NotNull
   private List<Server> servers;

   @Data
   public static class Server
   {
      @NotNull
      private String host;

      @NotNull
      private int port;

      @NotNull
      private String user;

      @NotNull
      private String password;
   }
}

Controller主要用于测试目的,在实际的实现中有一个轮询器。我FtpProperties持有服务器列表,因为在实际实现中,我使用a
DelegatingSessionFactory根据一些参数选择一个实例。

根据加里·罗素(Gary Russell)的评论,我希望重试失败的下载。但是,如果我中断了下载服务器端(通过在FileZilla实例中发出“踢用户”),我将立即获得堆栈跟踪,而不会重试:

org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received.  Server closed connection.
[...]

我还需要上传文件,为此我使用Ftp.outboundAdapter。在这种情况下RetryTemplate,如果使用相同的方法,如果我中断了上传服务器端,SpringIntegration将再执行两次尝试,每次尝试的延迟为5s,然后再进行log java.net.SocketException: Connectionreset,所有操作均符合预期。

我尝试调试了一下,并注意到在第一次尝试通过进行上传之前Ftp.outboundAdapter,遇到了断点RequestHandlerRetryAdvice.doInvoke()。但是,通过下载时Ftp.outboundGateway,该断点_永远不会_ 命中。

我的配置有问题吗,有人可以RequestHandlerRetryAdvice使用Ftp.outboundGateway/
AbstractRemoteFileOutboundGateway.Command.GET吗?


阅读 418

收藏
2020-05-30

共1个答案

小编典典

抱歉耽搁了; 我们这周在SpringOne平台上。

问题是由于网关规范是一个bean的事实-网关最终在应用建议之前被初始化。

我这样更改了您的代码…

@Bean
public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) {
    return f -> f.handle(getRemoteFile(sessionFactory), getRetryAdvice())
            .channel("gatewayDownloadsOutputChannel");
}

...

private RemoteFileOutboundGatewaySpec<FTPFile, FtpOutboundGatewaySpec> getRemoteFile(SessionFactory<FTPFile> sessionFactory) {
    return Ftp.outboundGateway(sessionFactory,
            AbstractRemoteFileOutboundGateway.Command.GET,
            "payload")
            .fileExistsMode(FileExistsMode.REPLACE)
            .localDirectoryExpression("'/tmp'")
            .autoCreateLocalDirectory(true);
}

…而且有效。

通常最好不要直接处理Specs,而只是将它们内联到流定义中…

@Bean
public IntegrationFlow downloadFiles(SessionFactory<FTPFile> sessionFactory) {
    return f -> f.handle(Ftp.outboundGateway(sessionFactory,
            AbstractRemoteFileOutboundGateway.Command.GET,
            "payload")
            .fileExistsMode(FileExistsMode.REPLACE)
            .localDirectoryExpression("'/tmp'")
            .autoCreateLocalDirectory(true), getRetryAdvice())
        .channel("gatewayDownloadsOutputChannel");
}
2020-05-30