CompletableFuture使用

本文地址:CompletableFuture使用

1、自定义线程池配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/* Copyright © 2022 Yuech and/or its affiliates. All rights reserved. */

package ...;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
* 定时任务线程池配置
*
* @author Yuech
* @version 1.0
* @since 2022-11-24 16:01
*/
@Data
@ConfigurationProperties(prefix = "common.pool")
public class CommonThreadPoolProperties {

/**
* 线程池是否可用
*/
private boolean enabled = true;
/**
* 核心线程数
*/
private int corePoolSize = 20;

/**
* 最大线程数
*/
private int maxPoolSize = 40;

/**
* 线程池维护线程所允许的空闲时间
*/
private int keepAliveSeconds = 300;

/**
* 队列最大长度
*/
private int queueCapacity = -1;

/**
* 线程名前缀
*/
private String threadNamePrefix = "Common-Execute-";
}

2、自定义线程池

使用CallerRunsPolicy线程池丢弃策略,如果线程池满了,将会交给主线程执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/* Copyright © 2022 Yuech and/or its affiliates. All rights reserved. */

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
* 线程池配置
*
* @author Yuech
* @version 1.0
* @since 2022-11-24 15:57
*/
@Configuration
@EnableAsync
@ConditionalOnProperty(prefix = "common.pool", value = "enabled", matchIfMissing = true)
@EnableConfigurationProperties({CommonThreadPoolProperties.class})
@ComponentScan(value = "com.yuech.config")
public class CommonThreadPoolConfig {

@Autowired
private CommonThreadPoolProperties properties;

@Bean
public ThreadPoolTaskExecutor commonAsyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程池大小
executor.setCorePoolSize(properties.getCorePoolSize());
// 最大线程数
executor.setMaxPoolSize(properties.getMaxPoolSize());
// 队列容量
executor.setQueueCapacity(properties.getQueueCapacity());
// 活跃时间
executor.setKeepAliveSeconds(properties.getKeepAliveSeconds());
// 线程名字前缀
executor.setThreadNamePrefix(properties.getThreadNamePrefix());

/*
* setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
* CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

}

3、CompletableFuture调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package ...;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/**
* 日志服务实现类
*
* @author Yuech
* @since 2022-10-20 17:01
*/
@Slf4j
@Service
public class AccessLogServiceImpl extends ServiceImpl<AccessLogMapper, AccessLog> implements AccessLogService {

@Resource
private AccessLogInfoService infoService;

@Resource
private AccessLogManager accessLogManager;

@Resource(name = "commonAsyncTaskExecutor")
private ThreadPoolTaskExecutor commonAsyncTaskExecutor;

@Transactional(rollbackFor = Exception.class)
@Override
public Result<AccessLogPushResponseDTO> resendOrPushAgain(List<AccessLogVO> accessLogList) {

if (CollectionUtils.isEmpty(accessLogList)) {
return Result.success(new AccessLogPushResponseDTO(CommonConstant.ZERO_LONG,
CommonConstant.ZERO_LONG, null));
}
// 重新下发或重新推送
List<CompletableFuture<AccessLogPushResponseDetailDTO>> responseFutureList =
accessLogList.stream().map(accessLog -> doResendOrPushAgain(accessLog)).collect(Collectors.toList());

// 使用allOf方法来表示所有的并行任务
CompletableFuture<Void> allFutures =
CompletableFuture
.allOf(responseFutureList.toArray(new CompletableFuture[responseFutureList.size()]));

// 下面的方法可以帮助我们获得所有子任务的处理结果
CompletableFuture<List<AccessLogPushResponseDetailDTO>> finalResults = allFutures
.thenApply(v -> responseFutureList.stream().map(accountFindingFuture -> accountFindingFuture.join())
.collect(Collectors.toList()));

AccessLogPushResponseDTO responseDto = new AccessLogPushResponseDTO();
Result<AccessLogPushResponseDTO> result = Result.success(responseDto);
try {
List<AccessLogPushResponseDetailDTO> responseDetailList = finalResults.get();
if (CollectionUtils.isEmpty(responseDetailList)) {
return Result.success(new AccessLogPushResponseDTO(CommonConstant.ZERO_LONG,
CommonConstant.ZERO_LONG, null));
}

Long successCount = CommonConstant.ZERO_LONG;
Long failureCount = CommonConstant.ZERO_LONG;
for (AccessLogPushResponseDetailDTO dto : responseDetailList) {

if (dto == null) {
continue;
}
if (AccessResponseStatus.SUCCESS.getStatus().equals(dto.getResult())) {
successCount += 1;
} else if (AccessResponseStatus.FAIL.getStatus().equals(dto.getResult())) {
failureCount += 1;
}
}
responseDto.setDetailList(responseDetailList);
responseDto.setSuccessCount(successCount);
responseDto.setFailureCount(failureCount);
} catch (InterruptedException e) {
log.error("resendOrPushAgain InterruptedException异常", e);
result = Result.DefaultFailure("resendOrPushAgain ExecutionException异常 : " + e.getMessage());
} catch (ExecutionException e) {
log.error("resendOrPushAgain ExecutionException异常", e);
result = Result.DefaultFailure("resendOrPushAgain ExecutionException异常 : " + e.getMessage());
}
return result;
}

private CompletableFuture<AccessLogPushResponseDetailDTO> doResendOrPushAgain(AccessLogVO accessLog) {

return CompletableFuture.supplyAsync(() -> {
AccessLogAndDetailDTO dto = getAccessLogAndDetailInfo(accessLog);
if (dto == null) {
return new AccessLogPushResponseDetailDTO(null, AccessResponseStatus.FAIL.getStatus(),
AccessLogConstants.SEND_OR_PUSH_ACCESS_LOG_IS_NULL);
}
HttpReq httpReq = null;
String config = dto.getConfig();
String id = dto.getId();
if (StringUtils.isNotBlank(config)) {
httpReq = JacksonUtils.toObj(config, HttpReq.class);
}
if (httpReq == null) {
return new AccessLogPushResponseDetailDTO(id, AccessResponseStatus.FAIL.getStatus(),
AccessLogConstants.SEND_OR_PUSH_ACCESS_LOG_CONFIG_IS_NULL);
}

// ...

Integer result = AccessResponseStatus.SUCCESS.getStatus();
AccessLogDto accessLogDto = new AccessLogDto();

return new AccessLogPushResponseDetailDTO(id, result, accessLogDto.getRes());
}, commonAsyncTaskExecutor);
}
}

4、参考

http://events.jianshu.io/p/8c9dc192fa63