회사 프로젝트 중 비동기 처리로 인해 작업이 순서대로 처리되지 못하는 문제가 발생했다. @Async와 CompletableFuture를 혼합해 사용했는데 데이터 일관성 유지와 비동기 처리의 이해가 부족했기 때문! 해당문제를 인지하고 문제를 해결하는 과정을 정리해볼려고 한다
내가 원했던 프로세스는 각 원본 영상을 비동기적으로 Google Cloud Storage(GCS)에서 다운로드하고(최대 9개) 자막 파일(SRT)을 생성한 후 앞에 2가지 작업이 완료되면 FFmpeg로 srt 파일을 읽어 자막을 만들고 영상을 가공하는 작업이었다.
1. 내가 사용했던 Java와 Spring에서 비동기 처리방법
비동기처리를 하는것에는 다양한 방법이 있지만 보편적이로 많이 사용된다고 생각하고, 내가 사용하는데 문제를 겪었던 CompletableFuture와 @Async에 대해서 먼저 간단하게 설명해볼려고한다
1) Java에서의 비동기처리 CompletableFuture
Java에서는 CompletableFuture 클래스를 통해 비동기적으로 작업을 실행하고, 작업 완료 후 thenApply, thenAccept, thenCompose 등 다양한 메서드를 사용해 연속 작업을 체인 형태로 연결할 수 있다.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 비동기 작업 수행
return "작업 결과";
});
future.thenAccept(result -> {
// 작업 결과를 사용한 후속 작업
});
분류 | 메서드 | 설명 |
---|---|---|
작업 생성 | supplyAsync | 비동기적으로 값을 공급하는 작업을 시작하고 CompletableFuture를 반환 |
runAsync | 반환 값이 없는 비동기 작업을 실행 | |
후속 작업 | thenApply | 작업 완료 후 결과를 받아 다른 값을 반환 |
thenAccept | 작업 완료 후 결과를 받아 처리하지만, 반환 값이 필요 없는 경우 사용 | |
thenRun | 작업 완료 후 결과 없이 실행할 작업을 지정. 이후 추가 작업만 수행할 때 사용 | |
thenCompose | 비동기 작업 결과를 사용하여 다른 비동기 작업을 연결해 실행, CompletableFuture를 평탄화 | |
thenCombine | 두 개의 CompletableFuture 결과를 조합해 새로운 CompletableFuture를 반환 | |
thenAcceptBoth | 두 CompletableFuture의 결과를 받아 처리, 반환 값은 없음 | |
예외 처리 | exceptionally | 작업 중 발생한 예외 처리, 대체 값을 반환 |
handle | 결과나 예외 모두를 처리, 상황에 맞는 값을 반환 | |
whenComplete | 예외 유무에 관계없이 작업 완료 후 실행할 동작을 정의. 결과는 반환하지 않음 | |
결과 대기 | join | 비동기 작업이 완료될 때까지 기다린 후 결과 반환. 예외 발생 시 ExecutionException을 발생시키지 않음 |
get | 작업 완료 시까지 대기하고 결과를 반환. 예외 발생 시 ExecutionException 발생 | |
다중 작업 | allOf | 여러 CompletableFuture 작업이 모두 완료될 때까지 대기 |
anyOf | 여러 CompletableFuture 중 하나라도 완료되면 그 즉시 반환 |
2) Spring에서의 비동기처리 @Async
Spring의 @Async는 메서드가 별도의 스레드에서 비동기적(독립적으로)으로 실행되도록 하는 Spring 어노테이션이다. 주로 오래 걸리는 작업을 백그라운드에서 실행하여 주요 스레드가 블로킹되지 않도록 할 때 유용하다.
@Async
public CompletableFuture<String> performAsyncTask() {
// 비동기 작업 수행
return CompletableFuture.completedFuture("완료");
}
2. 트러블슈팅
<문제 코드: 상황설명을 위한 예시코드>
public CompletableFuture<String> initiateDownloadProcess(RequestDto requestDto) {
CompletableFuture<String> downloadUrlFuture = this.executeDownloadProcess(requestDto);
this.cleanupTemporaryDirectory(requestDto.getProjectId());
return downloadUrlFuture;
}
@Async
public CompletableFuture<String> executeDownloadProcess(RequestDto requestDto) {
Project project = projectRepository.findById(requestDto.getProjectId())
.orElseThrow(() -> new ApiException(ErrorCode.NOT_FOUND));
// 비디오 목록의 각 비디오에 대해 비동기 다운로드 및 가공 처리
List<CompletableFuture<String>> processingFutures = requestDto.getVideoList().parallelStream()
.map(video -> processVideoAsync(project))
.collect(Collectors.toList());
// 모든 비디오 처리 완료 후 압축 및 업로드 처리
return CompletableFuture.allOf(processingFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
// 모든 비디오 가공 완료 후 압축 및 업로드
return uploadAndReturnDownloadLink(processingFutures);
});
}
public CompletableFuture<String> processVideoAsync(Project project) {
return CompletableFuture.supplyAsync(() -> {
// 비디오 다운로드 및 SRT 파일 생성 비동기 작업
CompletableFuture<String> videoDownloadFuture = CompletableFuture.supplyAsync(() ->
// 저장소에서 원본영상 다운로드
this.downloadOriginalVideo(project);
);
CompletableFuture<File> srtCreationFuture = CompletableFuture.supplyAsync(() ->
// srtFile 생성
this.generateSrt(project);
);
// 다운로드 및 SRT 파일 생성 작업 완료 후, 비디오 가공
String downloadedVideoPath = videoDownloadFuture.join();
File srtFile = srtCreationFuture.join();
return createProcessedVideo(downloadedVideoPath, srtFile, project);
});
}
1. 아무리 CompletableFuture를 사용해도 순서대로 진행되지 않음
문제:
- download 메소드에서 processDownload가 완료되기 전에 작업 디렉토리가 삭제되어 작업이 정상적으로 진행되지 않음
- processDownload 메소드에서 allOf 메소드를 사용해도 processVideoAsync메소드가 완료되기 전에 다음로직으로 넘어가 압축 및 업로드 할 때 downloadFutures 값이 null로 들어가서 오류남. 앞뒤로 순서대로 처리되게끔 아무리 수정을 해도 순서대로 처리가 안되었다.
원인:
- @Async가 비동기 진입점으로 작동
- @Async가 적용된 메소드는 새로운 스레드에서 실행되며, 이 안에서 CompletableFuture를 사용해도 각 작업이 독립적으로 실행된다. CompletableFuture는 ForkJoinPool.commonPool()을 기본으로 사용하여 비동기 작업을 실행하는데, @Async로 시작한 메소드 내에서 사용하면, @Async가 제공하는 TaskExecutor와 CompletableFuture의 스레드 풀이 혼합될 수 있다.
- CompletableFuture의 병렬 작업 및 순서 불확실성
- CompletableFuture는 기본적으로 비동기 작업을 수행할 때 각 체인을 별도의 스레드에서 실행할 수 있다. 그러므로 join()이나 thenApply()를 사용해도 각 체인이 순서에 따라 같은 스레드에서 실행된다고 보장할 수 없다. 특히 @Async와 결합하면 여러 스레드에서 비동기 실행이 섞이면서 순서가 엉킬 가능성이 더욱 커진다.
해결 방법:
해결방법은 매우 단순했다. executeDownloadProcess메소드에서 @Async 어노테이션만 제거하면 순서대로 진행이 되었다. 메소드 내부가 전반적으로 순서를 보장해야하는 작업인데 @Async를 에초에 왜사용했는지.....🥲
추가로 allOf로 기다리지 않아도 순서대로 처리가 진행 되었다
// 변경 전
@Async
public CompletableFuture<String> executeDownloadProcess(RequestDto requestDto) {
Project project = projectRepository.findById(requestDto.getProjectId())
.orElseThrow(() -> new ApiException(ErrorCode.NOT_FOUND));
// 비디오 목록의 각 비디오에 대해 비동기 다운로드 및 가공 처리
List<CompletableFuture<String>> processingFutures = requestDto.getVideoList().parallelStream()
.map(video -> processVideoAsync(project))
.collect(Collectors.toList());
// 모든 비디오 처리 완료 후 압축 및 업로드 처리
return CompletableFuture.allOf(processingFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
// 모든 비디오 가공 완료 후 압축 및 업로드
return uploadAndReturnDownloadLink(processingFutures);
});
}
// 변경 후
public CompletableFuture<String> executeDownloadProcess(RequestDto requestDto) {
Project project = projectRepository.findById(requestDto.getProjectId())
.orElseThrow(() -> new ApiException(ErrorCode.NOT_FOUND));
// 비디오 목록의 각 비디오에 대해 비동기 다운로드 및 가공 처리
List<CompletableFuture<String>> processingFutures = requestDto.getVideoList().parallelStream()
.map(video -> processVideoAsync(project))
.collect(Collectors.toList());
// 모든 비디오 처리 완료 후 압축 및 업로드 처리
return uploadAndReturnDownloadLink(processingFutures);
}
2. LazyInitializationException 발생
문제: 비동기 작업 중 매핑된 데이터에 접근하는 과정에서 LazyInitializationException이 발생
원인: JPA에서 Lazy 로딩 설정이 되어 있는 경우, 연관 데이터를 로드하려면 활성 세션이 필요하지만, 비동기 작업에서 해당 세션이 유지되지 않아 예외가 발생. 데이터베이스 연결이 끊어진 후 엔티티에 접근하면 LazyInitializationException발생
해결 방법:
- 다양한 방법 이 있지만 필요한 연관 데이터를 미리 로딩하도록 @EntityGraph를 사용했다. @EntityGraph는 특정 엔티티를 조회할 때, 필요한 연관 데이터를 즉시 로딩하여 Lazy 로딩 문제를 방지 할수 있다.
- 적용해볼 만한 다른 방법으로는 관련 데이터를 DTO에 캡슐화하여 사용하는 방법이 있는데 이는 대규모 데이터나 여러 연관 관계가 있는 엔티티를 효율적으로 다루기 좋다. 나는 영상들이 1분 미만이라 자막들이 다 짧아서 성능에 영향을 많이 줄 것 같지 않아 나름 효율적이라고 생각되면서 코드 수정을 최소화 하는 방법을 선택했다.
// 변경전
public CompletableFuture<String> processVideoAsync(Project project) {
return CompletableFuture.supplyAsync(() -> {
// 비디오 다운로드 및 SRT 파일 생성 비동기 작업
CompletableFuture<String> videoDownloadFuture = CompletableFuture.supplyAsync(() ->
// 저장소에서 원본영상 다운로드
this.downloadOriginalVideo(project);
);
CompletableFuture<File> srtCreationFuture = CompletableFuture.supplyAsync(() ->
// srtFile 생성
this.generateSrt(project);
);
// 다운로드 및 SRT 파일 생성 작업 완료 후, 비디오 가공
String downloadedVideoPath = videoDownloadFuture.join();
File srtFile = srtCreationFuture.join();
return createProcessedVideo(downloadedVideoPath, srtFile, project);
});
}
// 변경 후
public CompletableFuture<String> processVideoAsync(Project project) {
return CompletableFuture.supplyAsync(() -> {
////////호출 시 Repository에서 @EntityGraph로 연관 데이터 즉시로딩//////
List<Subtitle> subtitleList = subtitleRepository.findByProjectId(project.getId());
CompletableFuture<String> videoDownloadFuture = CompletableFuture.supplyAsync(() ->
// 저장소에서 원본영상 다운로드
this.downloadOriginalVideo(project);
);
CompletableFuture<File> srtCreationFuture = CompletableFuture.supplyAsync(() ->
// srtFile 생성
this.generateSrt(project);
);
// 다운로드 및 SRT 파일 생성 작업 완료 후, 비디오 가공
String downloadedVideoPath = videoDownloadFuture.join();
File srtFile = srtCreationFuture.join();
return createProcessedVideo(downloadedVideoPath, srtFile, project);
});
}
// Repository에서 subtitle과 하위 wordList까지 호출
public interface SubtitleRepository extends JpaRepository<Subtitle, Long> {
@EntityGraph(attributePaths = {"wordList"})
List<Subtitle> findByProjectId(Long projectId);
}
5. 마치며
비동기 처리와 순서 보장의 필요성은 프로젝트에서 매우 중요한 요소이다. @Async를 사용할 때는 작업이 독립적이고 순서가 중요하지 않은 경우에 사용하고, 순서가 필요한 경우 CompletableFuture와 같은 체인형 비동기 처리를 사용하는 것이 좋다!(는 것을 알면서도 또 반복하는 나....🥲) 비동기처리를 처음하는것도 아닌데 같은 실수를 반복하지 않고자 정리를 한번 해보았다. LazyInitializationException도 마찬가지.. 이번 경험을 통해 겸사겸사 비동기 작업의 특성을 더욱 깊이 이해하게 되어 오히려 좋다. 앞으로 코드를 설계할 때 꼭 고려해서 신중하게 해야겠다!
이 비동기 처리 작업에는 CPU 바운드(srt 파일 생성, ffmpeg로 영상 작업) 작업뿐만 아니라 I/O 바운드(gcs 업로드/다운로드) 작업도 함께 처리한다. CompletableFuture로 CPU 바운드 작업을 효율적으로 처리하게 했다면, I/O 바운드 작업의 효율적인 처리를 위해 Spring의 WebFlux를 같이 적용하는 방안을 검토중이다. WebFlux는 비동기 스트림 처리와 함께 최소한의 리소스를 사용하여 높은 동시성을 제공하기 때문에, 특히 I/O 바운드 작업에서 효과적인 성능을 기대할 수 있다
기존 CompletableFuture 기반의 코드와 WebFlux를 조합하여 적용해보고 글남겨보는걸로!(빠른시일내에!!) CPU 바운드와 I/O 바운드 작업의 균형 잡힌 비동기 설계에 도전! 🚀
[reference]
- Spring Framework 공식 문서 - @Async : https://docs.spring.io/spring-framework/reference/integration/scheduling.html#scheduling-annotation-support-async
- Oracle Documentation - CompletableFuture API : https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
'개발 하나둘셋 > Java & Spring' 카테고리의 다른 글
테스트 피라미드로 보는 스타트업 테스트 전략 (1) | 2025.01.05 |
---|---|
서버-클라이언트 연결로 실시간 상태 전달하는 SSE 특징 및 적용기 (0) | 2024.11.17 |
FFmpeg로 자막(srt) 삽입하여 영상 인코딩 시 자막이 깨지는 문제! Troubleshooting (0) | 2024.06.23 |
Spring Boot 3.x 주요 변경 사항과 마이그레이션 방법 (0) | 2024.02.04 |
Redis 서버 재시작 시 데이터 초기화 문제와 해결 방법: RDB와 AOF (1) | 2024.01.21 |