stream과 함께 CompletableFuture multi task join 하기
이번에 어쩌다 보니 completablefuture를 사용하면서 stream으로 병렬로 작업하게 되어 겸사겸사 기록합니다.
(사실 쓰면서 굳이 써야했을까 싶으면서도 나쁘진 않은것 같습니다.)
작업자체는 db에서 데이터를 날짜 range를 조회한 후, 2차가공을 거쳐 join을 하는 코드였습니다.
이부분을 어떻게 할까 하다가 작업을 다음과 같이 나눴습니다.
1. 날짜 range를 list로 나눈다.
2. list를 completablefuture를 돌려 전체 작업을 마무리한다.
3. 각 마무리된 작업을 join하고 결과를 리턴한다.
해당 작업을 간단하게 조금 바꿔서 다른 코드로 작성해보았습니다.
list - 특정 조건 class
CompletableFuture - 작업목록 (시간 체크상 1초 딜레이도 줘봤습니다.)
후작업 : join 및 병렬처리, list리턴
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5); // 기본 스레드 수
taskExecutor.setMaxPoolSize(10); // 최대 스레드 수
taskExecutor.setQueueCapacity(100); // Queue 사이즈
taskExecutor.initialize();
Student student1 = new Student("du");
Student student2 = new Student("young");
Student student3 = new Student("me");
Student student4 = new Student("so");
Student student5 = new Student("nam");
List<Student> students = Arrays.asList(student1, student2, student3, student4, student5);
long startTime = System.currentTimeMillis();
List<Student> response = students.stream()
.map(student -> {
return CompletableFuture.supplyAsync(() -> {
student.setName("hi " + student.getName());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return student;
}).thenApply(s -> {
s.setName(s.getName() + "!");
return s;
});
})
.map(CompletableFuture::join)
.parallel()
.collect(Collectors.toList());
response.forEach(System.out::println);
long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("경과시간 :" + elapsedTime);
taskExecutor.shutdown();
---------------------------------------------------------------
@ToString
@Getter
@Setter
@AllArgsConstructor
static class Student{
String name;
}
코드가 살짝 애매하게 짜치지만... 중요한건 map에서 completablefuture 작업을 하고 이를 join하고 list 로 받습니다.
이때 가장 중요한게, taskExecutor와, .parallel() 입니다.
1. parallel을 사용하지 않았을때
만약 parallel을 사용하지 않는다면.. 정말 아름답게 이로직은 동기로 동작합니다.
왜냐면 map은 순차적으로 join 을 하기때문에 하나씩 호출됩니다.
이부분을 방지하기위해 꼭 챙겨갑시다..
2. taskExecutor 적용
이부분은 개인적으로 미리선언해둔 taskExecutor를 사용하는것을 선호합니다.
실제로 사용하지않는경우, default forkjoinpool을 사용하지만, 특정 서비스용으로 선언해둔 Executor를 사용합니다.
default를 어디서 어떻게 사용하고 있을지도 모르며.. 특정 비동기로직 로깅, 최소 thread수 유지 등 여러 이유로 별도로 선언한것을 사용합니다.
위 코드는 completablefuture에서 supplyAsync 에만 executor를 변수로 넘겼지만, 만약 체인된 작업도 async라면 같은 threadpool을 넘기면 될것같습니다.