个人知识:我从javacodegeeks中读到:“ … SimpleAsyncTaskExecutor对于玩具项目是可以的,但对于任何大于此的项目,它都有一定的风险,因为它不限制并发线程且不重用线程。因此,为了安全起见,我们还将添加任务执行器bean …”,并从baeldung中获得了一个非常简单的示例,说明如何添加我们自己的任务执行器。但是我可以找到任何指导来说明后果,以及一些值得应用的案例。
个人需求:我正在努力为在Kafka主题上发布的微服务日志提供一个企业架构。在我的基于日志的案例中,“由不限制并发线程数和不重用它引起的风险”这一说法似乎是合理的。
我正在本地桌面成功运行以下代码,但是我想知道我是否正确提供了一个自定义Task Executor。
我的问题:这种配置是否考虑到我已经在使用kafkatempla(即,至少同步同步,单例和线程安全,至少就产生/发送消息而言是默认的),实际上是朝正确的方向重用线程并避免意外传播使用SimpleAsyncTaskExecutor时创建线程?
生产者配置
@EnableAsync @Configuration public class KafkaProducerConfig { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class); @Value("${kafka.brokers}") private String servers; @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(500); executor.setThreadNamePrefix("KafkaMsgExecutor-"); executor.initialize(); return executor; } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class); return props; } }
制片人
@Service public class Producer { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Async public void send(String topic, String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(final SendResult<String, String> message) { LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset()); } @Override public void onFailure(final Throwable throwable) { LOGGER.error("unable to send message= " + message, throwable); } }); } }
出于演示目的:
@SpringBootApplication public class KafkaDemoApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(KafkaDemoApplication.class, args); } @Autowired private Producer p; @Override public void run(String... strings) throws Exception { p.send("test", " qualquer messagem demonstrativa"); } }
这是默认的实现 SimpleAsyncTaskExecutor
SimpleAsyncTaskExecutor
protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); }
为每个任务创建新线程,在Java中创建线程并不便宜:(参考)
线程对象占用大量内存,在大型应用程序中,分配和取消分配许多线程对象会产生大量内存管理开销。
=>使用此任务执行程序重复执行任务将对应用程序性能产生负面影响(此外,此执行程序默认情况下不限制并发任务的数量)
这就是为什么建议您使用线程池实现的原因,线程创建的开销仍然存在,但是由于线程被重用而不是create-fire-forget而大大减少了。
配置时ThreadPoolTaskExecutor,应根据您的应用程序负载正确定义两个值得注意的参数:
ThreadPoolTaskExecutor
private int maxPoolSize = Integer.MAX_VALUE
这是池中的最大线程数。
private int queueCapacity = Integer.MAX_VALUE;
这是排队的最大任务数。当队列已满时,默认值可能会导致OutOfMemory异常。
使用默认值(Integer.MAX_VALUE)可能会导致资源不足/服务器崩溃。
Integer.MAX_VALUE
您可以通过增加最大池大小的数量来提高吞吐量setMaxPoolSize(),以减少加载增加时的预热,将核心池大小设置为更高的值setCorePoolSize()(加载增加时,将初始化两个之间的线程数不相同maxPoolSize - corePoolSize)
setMaxPoolSize()
setCorePoolSize()
maxPoolSize - corePoolSize