小编典典

自己的执行器替换默认SimpleAsyncTaskExecutor的缺点和风险是什么

spring-boot

个人知识:我从javacodegeeks中读到:“ …
SimpleAsyncTaskExecutor对于玩具项目是可以的,但对于任何大于此的项目,它都有一定的风险,因为它不限制并发线程且不重用线程。因此,为了安全起见,我们还将添加任务执行器bean
…”,并从baeldung中获得了一个非常简单的示例,说明如何添加我们自己的任务执行器。但是我可以找到任何指导来说明后果,以及一些值得应用的案例。

个人需求:我正在努力为在Kafka主题上发布的微服务日志提供一个企业架构。在我的基于日志的案例中,“由不限制并发线程数和不重用它引起的风险”这一说法似乎是合理的。

我正在本地桌面成功运行以下代码,但是我想知道我是否正确提供了一个自定义Task Executor。

我的问题:这种配置是否考虑到我已经在使用kafkatempla(即,至少同步同步,单例和线程安全,至少就产生/发送消息而言是默认的),实际上是朝正确的方向重用线程并避免意外传播使用SimpleAsyncTaskExecutor时创建线程?

生产者配置

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("KafkaMsgExecutor-");
        executor.initialize();
        return executor;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

}

制片人

@Service
public class Producer {

    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Async
    public void send(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(final SendResult<String, String> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}

出于演示目的:

@SpringBootApplication
public class KafkaDemoApplication  implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);

    }

    @Autowired
    private Producer p;

    @Override
    public void run(String... strings) throws Exception {
        p.send("test", " qualquer messagem demonstrativa");
    }

}

阅读 4500

收藏
2020-05-30

共1个答案

小编典典

这是默认的实现 SimpleAsyncTaskExecutor

protected void doExecute(Runnable task) {
    Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
    thread.start();
}

为每个任务创建新线程,在Java中创建线程并不便宜:(参考

线程对象占用大量内存,在大型应用程序中,分配和取消分配许多线程对象会产生大量内存管理开销。

=>使用此任务执行程序重复执行任务将对应用程序性能产生负面影响(此外,此执行程序默认情况下不限制并发任务的数量)

这就是为什么建议您使用线程池实现的原因,线程创建的开销仍然存在,但是由于线程被重用而不是create-fire-forget而大大减少了。

配置时ThreadPoolTaskExecutor,应根据您的应用程序负载正确定义两个值得注意的参数:

  1. private int maxPoolSize = Integer.MAX_VALUE;

这是池中的最大线程数。

  1. private int queueCapacity = Integer.MAX_VALUE;

这是排队的最大任务数。当队列已满时,默认值可能会导致OutOfMemory异常。

使用默认值(Integer.MAX_VALUE)可能会导致资源不足/服务器崩溃。

您可以通过增加最大池大小的数量来提高吞吐量setMaxPoolSize(),以减少加载增加时的预热,将核心池大小设置为更高的值setCorePoolSize()(加载增加时,将初始化两个之间的线程数不相同maxPoolSize - corePoolSize

2020-05-30