프로젝트 진행중 API를 많이 타는 조금 복잡한 로직이 있다. 대신 비동기로 처리해서 즉시 응답을 주고 서버에서 처리가 완료가 되면 상태값을 바꾸는 걸로 완료를 알린다. 그런데 상태값이 바뀌더라도 사용자가 새로고침을 하지 않으면 계속 진행중으로 보이는 문제가 있다. 문제를 안삼으면 문제가되지 않지만 사용자 경험상 충분히 개선의 여지가 있을것 같아 이를 개선할 수 있는 방법이 무엇이 있는 지 찾아보았다
우리가사용하는 웹페이지는 대부분의 경우 웹 브라우저인 클라이언트가 HTTP 요청을 서버에 보내고, 서버는 적절한 응답을 반환한다. 하지만 내가 위에서 말한것 같은 실시간 업데이트 정보나 연속적인 정보를 클라이언트에게 보내야하는 경우 계속 HTTP 요청을 하게 되면 여러가지 비용면에서 좋지 않다. 이런 상황에서 사용할 수 있는 폴링, 웹소켓, SSE에 대해서 알아보고 구현방법을 정리해봤다
1. 풀링(Polling)
폴링에는 short 폴링과 long polling이 있다
- short polling
클라이언트가 서버로부터 정보를 받기위해 주기적으로 서버에 요청을 보내 응답을 받는 방법이다.
과정
- 클라이언트가 HTTP 요청을 보냄(짧은 주기로 요청을 반복)
- 새로운 정보를 클라이언트에게 반환
구현이 단순하다는 장점이 있지만
호출주기가 짧을경우 업데이트 여부에 상관 없이 서버에 계속 요청을 하기 때문이 리소스가 낭비될 수 있다
실시간성이 중요하지 않아 호출 주기가 긴경우는 고려해볼 수 있을것 같다
- long polling
클라이언트가 서버로 요청을 하면 일정시간 서버에서 대기하다가 업데이트가 되면 응답을 보낸다.
과정
- 클라이언트가 HTTP 요청을 보냄
- 서버에서 대기하다가 업데이트 시 클라이언트에 응답
- 클라이언트는 응답을 받고 다시 서버에 요청(반복)
short polling 보다 요청수를 줄일 수 있어 리소스 낭비가 덜하다
다만 데이터 상태가 자주 바뀐다면 서버에 부담이 될 수 있다
2. WebSocket
TCP에 기반한 양방향 메세지 전달 프로토콜로 서버와 웹브라우저 사이 양방향 통신이 가능하다.
과정
- 클라이언트와 서버는 HTTP를 통해 연결을 설정하고 WebSocket handshake를 통해 연결
- 이후 데이터는 클라이언트와 서버간 양방향으로 전송
웹소켓은 데이터를 보낼 때마다 새로 연결할 필요가 없어 속도가 빠르고 순서가 보장된다. 보통 채팅기능이나 데이터 동시 편집 등 실시간 업데이트와 긴밀한 동기화가 필요할 때 주로 사용된다.
하지만 구현이 구현이 복잡하여 꼭 실시간 양방향 통신이 필요할 경우에만 사용하는것이 좋다
3. SSE(Server-Sent Event)
한번 연결을 하면 일정시간 동안 서버에서 클라이언트로 데이터를 전송할 수 있다. 연결이 종료될 때까지 데이터를 계속 보낼 수 있다.
과정
- 클라이언트는 요청을 통해 서버와 연결
- 서버는 데이터가 변경될 때마다 클라이언트에게 알려줌
HTTP 기반으로 서버에서 클라이언트로 이벤트(데이터)를 푸시할 수 있고, 클라이언트는 EventSource API를 사용하여 서버에서 보낸 데이터를 실시간으로 수신할 수 있다. 서버에서 클라이언트로 단방향통신만 가능하며 데이터를 보낼때는 text 메세지를 이용한다. HTTP/2에서는 100개 까지도 연결을 할 수 있다.
4. 상황에 따른 의사결정 과정
프론트에서 데이터 요청을 하고 처리 완료 후 목록에 실시간 반영이 되지 않았을 때 문제를 인지했다. 맨처음 이야기가 나온것은 풀링방법이었다. 백엔드 코드 수정도 필요 없었고 제일 쉬운 방법이었어서. 하지만 로그인 후 메인페이지에서 목록조회를 하기 때문에 사용자가 많아지고 목록에서 대기할 경우 리소스 소모가 큰 누가봐도 비효율적인 방법이었다.
서버에서 클라이언트로 데이터를 보내는 방법을 찾아봤는데 SSE와 웹소켓에 대해 알게되었다. 웹소켓은 개발자로 취업하기 전 프로젝트에서 사용해본적이 있는데 아무리봐도 우리 프로젝트에 적용하기에는 조금 오바스러운 느낌이 있었다. 양방향 통신이 필요 없기 때문에 굳이 복잡한 적용과정을 거칠 필요가 없다고 생각했다. SSE에 대해 알아본 후 팀에 제안을 하게되었다. 아무래도 사용해본적 없는 딱히 반가워하지 않았는데 적용이 어렵지 않았고 문제해결에 대해 모두 관심이 있었기 때문에 결국에는 적용하기로 했다.
5. Spring에서 SSE 구현하기
[SseEmitterRegistry]
- Spring MVC에서 SSE 프로토콜을 지원하는 SseEmitter 클래스를 사용하였다. SeeEmitter 클래스는 서버가 클라이언트에게 비동기적으로 데이터를 푸시할 수 있도록 해준다.
- Java에서 제공하는 스레드 안전한 HashMap인 ConcurrentHashMap을 사용하였고 여러 스레드가 동시에 데이터를 읽고 쓰는 환경에서도 동시성을 지원하며, 동기화로 인한 성능 저하를 줄이는 특징이 있다.
- SseEmitterRegistry를 통해 중앙에서 Ssemitter를 관리 한다. 클라이언트가 많아질 수록 중앙에서 관리하지 않으면 각 클라이언트의 Ssemitter를 추적하가 연결 상태를 유지 하기 어렵다. 또한 각각 생성하고 관리하면 동일한 클라이언트에 대해 여러 SseEmitter 객체가 생성(중복 생성) 될 수 있으며, 동시성 문제 등도 발생할 수 있다.
@Component
public class SseEmitterRegistry {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
/**
* Emitter 추가
* @param memberId 사용자 ID
* @param emitter SseEmitter 객체
*/
public void addEmitter(String memberId, SseEmitter emitter) {
emitters.put(memberId, emitter);
}
/**
* Emitter 가져오기
* @param memberId 사용자 ID
* @return SseEmitter 객체
*/
public SseEmitter getEmitter(String memberId) {
return emitters.get(memberId);
}
/**
* Emitter 제거
* @param memberId 사용자 ID
*/
public void removeEmitter(String memberId) {
emitters.remove(memberId);
}
/**
* Emitter가 완료되었거나 에러가 발생한 경우 안전하게 제거
* @param memberId 사용자 ID
*/
public void completeAndRemoveEmitter(String memberId) {
SseEmitter emitter = emitters.get(memberId);
if (emitter != null) {
try {
emitter.complete();
} catch (IllegalStateException e) {
// 이미 완료된 상태인 경우 무시
} finally {
emitters.remove(memberId);
}
}
}
}
[SseController]
- SseController에서 이벤트를 푸시할 API를 생성해 준다
- 각 클라이언트의 SSE 연결(SseEmitter)을 사용자 ID로 관리하기 위해 키값으로 memberId를 사용하였다
- 클라이언트가 연결을 종료하거나 서버 연결이 타임아웃되면 해당 클라이언트의 SseEmitter를 제거하여 메모리 누수를 방지 한다
- SseEmitter 관리가 제대로 되지 않을 경우 에러의 연속 이다. 타임아웃 시에도 필수로 연결 종료 후 삭제해야한다. 자세한 내용은 아래 트러블슈팅에서.....
@RestController
@RequiredArgsConstructor
public class SseController {
private final SseEmitterRegistry emitterRegistry;
@GetMapping("/api/sse/project")
public SseEmitter projectStreamSse(@AuthenticationPrincipal CustomUser customUser) {
String memberId = customUser.getId();
SseEmitter emitter = new SseEmitter(30 * 60 * 1000L); // sse 연결시간 30분으로 설정
emitterRegistry.addEmitter(memberId, emitter); // 사용자 ID별로 emitter 저장
// 연결 시작 시 메시지 전송
try {
emitter.send(SseEmitter.event()
.name("connect")
.data("Connection established."));
} catch (IOException e) {
e.printStackTrace();
}
// 클라이언트가 연결을 종료 시 emitter 제거
emitter.onCompletion(() -> {
System.out.println("Connection completed for memberId: " + memberId);
emitterRegistry.removeEmitter(memberId); // emitters 삭제
});
// 타임아웃 시 연결 종료 후 emitter 제거
emitter.onTimeout(() -> {
System.out.println("Connection timed out for memberId: " + memberId);
emitter.complete(); // 연결 종료
emitterRegistry.removeEmitter(memberId); // emitters 삭제
});
return emitter;
}
}
주요 메서드
- send(Object object): 데이터를 클라이언트로 전송
- send(SseEventBuilder eventBuilder): 커스텀 이벤트 객체를 클라이언트로 전송
- complete(): SSE 연결을 정상적으로 종료
- completeWithError(Throwable ex): 오류로 연결 종료
- onCompletion(Runnable callback): 클라이언트와의 연결이 완료될 때 실행할 콜백 정의
- onTimeout(Runnable callback): 연결 타임아웃 시 실행할 콜백 정의
[SseService]
- 특정 처리가 완료된 경우 notifyAnalysisStatusChange 메소드를 호출해서 완료상태를 클라이언트에게 알리도록 했다
- 프로젝트 저장 처리가 완료되어 알림을 보낼 때 실질적으로 알림을 보내지 않아도 저장 메소드는 온전히 처리되어야 하기 때문에 혹시 emitter가 없어도 오류가 발생하지 않도록 null에 대한 처리를 해줬다
- send메세지를 통해 클라이언트에게 메세지를 전달
@Slf4j
@Service
@RequiredArgsConstructor
@Transactional
public class SseService {
private final SseEmitterRegistry emitterRegistry;
public void saveProject(ProjectDto.RequestDto requestDto, CustomUser customUser) {
// 1. 프로젝트 생성
// 2. 분석 등 추가 작업
// 3. 분석상태 업데이트(진행중 -> 완료)
// 4. 분석상태 변경알림
this.notifyAnalysisStatusChange(project)
}
// 분석완료 시 알림
public void notifyAnalysisStatusChange(Project project) {
String memberId = project.getMember().getId(); // 프로젝트의 사용자 ID를 가져옴
SseEmitter emitter = emitterRegistry.getEmitter(memberId); // 특정 사용자에 대한 emitter 가져오기 (수정된 부분)
// 처리해주지 않으면 emitter가 없어서 'saveProject'메소드가 완료되어도 예외처리됨
if (emitter == null) {
// SSE 연결이 없는 경우 로그 추가
log.warn("SSE 연결이 존재하지 않습니다. 사용자 ID: {}", memberId);
return; // 예외 방지용 반환
}
if (project.getAnalysisStatus() == AnalysisStatus.COMPLETED) {
try {
String message = "프로젝트 " + project.getId() + " 분석 완료";
emitter.send(message); // 특정 사용자에게 메세지 전송
} catch (IOException e) {
log.info("SSE 연결이 끊어졌습니다. 사용자 ID: {}", memberId, e);
emitter.completeWithError(e);
emitterRegistry.removeEmitter(memberId); // 오류 발생 시 emitter 제거
}
}
}
}
[SseHeartbeatScheduler]
- 연결된 emitters가 있을 때 스케줄러를 통해 1분마다 클라이언트에게 이벤트 메세지("ping")를 보내 SSE 연결이 유효하다는 것을 알려주었다.
- if문으로 emitters가 비어 있거나 null인 경우를 확인하여 불필요한 루프 실행을 방지하고 예외 발생을 예방
@Component
@AllArgsConstructor
public class SseHeartbeatScheduler {
private final SseEmitterRegistry emitterRegistry;
@Scheduled(fixedRate = 60000) // 1분마다 호출
public void sendHeartbeat() {
Map<String, SseEmitter> emitters = emitterRegistry.getEmitters();
if (emitters == null || emitters.isEmpty()) {
return;
}
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
String memberId = entry.getKey();
SseEmitter emitter = entry.getValue();
try {
System.out.println(memberId + " ping");
emitter.send(SseEmitter.event().name("heartbeat").data("ping"));
} catch (IllegalStateException e) { // SseEmitter가 이미 완료된 경우 제거
emitterRegistry.completeAndRemoveEmitter(memberId);
System.err.println("SseEmitter already completed for memberId: " + memberId);
} catch (IOException e) { // 연결이 끊어 진경우 제거
emitterRegistry.completeAndRemoveEmitter(memberId);
System.out.println("Disconnected: " + memberId);
}
}
}
}
postman test
sse api를 호출 하면 내가 설정한 30분 까지 연결되어 있다
1분마다 heartbeat 이벤트를 실행하고 프로젝트가 완료되면 완료 메세지를 보낸다
아래와 같이 한번연결로 클라이언트에게 메세지가 지속적으로 전달된다
sse는 관련 헤더설정을 해야하는 spring에서 자동으로 해주지만 제대로 들어오는지 한번 더 확인하기!
"Content-Type": "text/event-stream"
"Cache-Control": "no-cache"
"Connection": "keep-alive"
6. 트러블슈팅
1. heartbeat 이벤트가 제대로 들어오지 않는 문제
문제: SSE 연결을 했는데 heartbeat 이벤트도 찍히지 않고, 프로젝트 상태변경이 되도 text 메세지도 들어오지 않았다
원인: 스케줄러와 controller에서 각각 SseEmitter를 필드로 생성하여 각각 따로 돌아가고 있었다. 로그를 찍어봤을 때 SseEmitter는 생성되었지만 내가 연결했던 api로 메세지가 전달되지 않은 것은 SseEmitter가 중복으로 생성되었기 때문!
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
해결: 앞서 구현한것 처럼 SseEmitterRegistry로 공통으로 SseEmitter를 관리해주었다!
@Component
public class SseEmitterRegistry {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
public Map<String, SseEmitter> getEmitters() {
return emitters;
}
}
2. AsyncRequestTimeoutException 발생
문제: 로그로 Connection time이 종료되어 emitter를 삭제하도록 했는데, AsyncRequestTimeoutException 발생했다.
원인:
- SseEmitter에서 타임아웃 시간이 설정되어 있다면 이 시간 내에 데이터 전송이 이루어지지 않아 발생하는 오류이다. SseEmitter 객체가 제대로 종료되지 않은 상태였다.
- 실제로 처음에는 remove만 수행하여 SseEmitter 객체가 유지될 수 있었다. 이 경우 불필요한 연결 상태가 남아 있고 스케줄러의 sendHeartbeat() 메소드로 emitter로 데이터를 전송하려고 시도 했기 때문에 위와 같은 문제가 발생했다
해결: emitter.complete()로 연결을 종료 후 삭제 했다. 동기화를 위해 전역으로 관리하는 SseEmitterRegistry에서 삭제.
// 변경 전
emitter.onTimeout(() -> {
System.out.println("Connection timed out for memberId: " + memberId);
emitterRegistry.getEmitters().remove(memberId); // emitters 삭제
});
// 변경 후
emitter.onTimeout(() -> {
System.out.println("Connection timed out for memberId: " + memberId);
emitter.complete(); // 연결 종료
emitterRegistry.getEmitters().remove(memberId); // emitters 삭제
});
3. 프론트에 메세지가 전달이 안되는 문제 (feat. nginx)
문제: 로컬에서는 메세지 전달이 잘 되는데 개발서버에 올린뒤로는 설정한 시작 메세지도, ping 메세지도 안들어왔다. 일정시간 동안 메세지가 수신이 되지 않아 연결이 끊어졌다.
원인:
- 로컬에서는 메세지가 잘 오는데 서버에서 안된다는건 서버 설정문제라고 판단되어 서버 설정 위주로 찾아보았다
- SSE 요청은 일반 API 요청과 다르게 장시간 연결과 데이터 스트리밍을 요구하기 때문에 분리 설정이 필요하다는 것을 발견!
해결:
- nginx에 sse에 필수적인 설정을 추가했다. 다른 API에 영향을 줄 수 있으니 SSE 경로(/api/sse/)에 따로 설정.
proxy_buffering off;
chunked_transfer_encoding off;
- proxy_buffeing off;
- Nginx는 기본적으로 프록시 요청의 응답을 클라이언트로 바로 전달하지 않고, 메모리 또는 디스크에 버퍼링한 후 클라이언트로 전송하는데, SSE는 서버에서 데이터를 스트리밍 방식으로 전송하므로 버퍼링이 활성화된 상태에서는 데이터를 즉시 클라이언트로 전달할 수 없고, 연결이 끊어질 수 있다
- proxy_buffering off;을 통해 Nginx가 데이터를 버퍼링하지 않고 클라이언트로 즉시 전달하도록 한다.
- chunked_transfer_encoding off;
- HTTP/1.1에서는 데이터를 "청크(chunk)" 단위로 클라이언트에 전송할 수 있는데 SSE는 청크 단위로 데이터를 보내는 방식과 충돌할 수 있으므로, chunked_transfer_encoding off;를 설정해 데이터가 잘못 처리되지 않도록 해야한다.
- 이후 서버에도 메세지가 잘 들어오는 것을 확인!
7. 마치며
막상 구현해보니 SSE는 HTTP를 기반으로 동작하기 때문에 특별한 라이브러리나 프로토콜 추가 없이 빠르게 도입할 수 있다는게 정말 큰 장점이 었다. 특히, Spring의 SseEmitter를 활용하면 클라이언트와의 연결을 효율적으로 관리할 수 있어 편리했다
다만, SseEmitter를 관리하는 과정에서 클라이언트에 전달하려는 메세지가 제대로 들어오지 않거나 타임아웃 오류 등을 만나는 어려움을 겪었다. 이를 해결하는 과정에서 SSE나 SseEmitter의 수명 주기를 대해 이해 하게 되었고 추가로 SSE 뿐만아니라 nginx에 대한 이해와 경험도 추가 되었다. 추가로 안정성도 높이고 상태 관리의 중요성도 다시 한번 느꼈👍
지금은 단일 인스턴스라 쉽게 구현했지만 SseEmitter 객체는 서버의 메모리에 저장되기 때문에 서버 인스턴스를 늘려야 하는 상황에서는 추가 작업이 필요하다. 클라이언트가 어느 서버 인스턴스에 연결되었는지 알 수 없기 때문! 인스턴스를 늘려야할경우에는 Redis Pub/Sub이나 Kafka와 같은 메시지 브로커나 분산 캐시를 통해 중앙에서 연결을 관리함으로써 문제를 해결할 수 있다.
다음에 서버 인스턴스가 늘어나게 된다면 또 적용기를 공유해보도록 해야겠다!
[reference]
- https://tecoble.techcourse.co.kr/post/2022-10-11-server-sent-events/
- SseEmitter API
- https://serverfault.com/questions/801628/for-server-sent-events-sse-what-nginx-proxy-configuration-is-appropriate
'개발 하나둘셋 > Java & Spring' 카테고리의 다른 글
테스트 피라미드로 보는 스타트업 테스트 전략 (1) | 2025.01.05 |
---|---|
Java와 Spring에서의 비동기 처리 @Async와 CompletableFuture (2) | 2024.11.08 |
FFmpeg로 자막(srt) 삽입하여 영상 인코딩 시 자막이 깨지는 문제! Troubleshooting (0) | 2024.06.23 |
Spring Boot 3.x 주요 변경 사항과 마이그레이션 방법 (0) | 2024.02.04 |
Redis 서버 재시작 시 데이터 초기화 문제와 해결 방법: RDB와 AOF (1) | 2024.01.21 |