개발/Spring(boot)

SSE를 활용해 작업 진행률 전달하기

Ski_ 2025. 2. 16. 23:59

서버에서 특정 작업이 오래 걸릴 수 있고, 이 경우 클라이언트에게 진행률을 알려주는것이 UX 관점에서 좋다고 생각한다.

그럼 서버가 클라이언트에게 작업 진행률을 어떻게 전달할 수 있는지에 대해 작성해보겠다.

 

그래서 이번 글에서는 Springboot의 WebFlux를 활용해 이를 전달하는 방법에 대해

0. WebFlux 선택 이유

1. 초기 환경 세팅

2. SSE 코드 작성

3. SSE와 WebSocket 통신의 차이

4. 내가 작성한 SSE의 한계점

순서로 작성해보고자 한다.


0. WebFlux 선택 이유

SSE를 사용하는 방법은 크게 두가지가 있다.

1) ResponseBodyEmitter를 사용하는 방법 (Spring MVC 기반)

2) WebFlux의 Flux<T>를 사용하는 방법 (Reactive Streams 기반)

 

그럼 두 방식에 대한 간단한 설명과 장단점에 대해 작성해보겠다.

1) ResponseBodyEmitter 방식

  • Spring MVC에서 제공하는 비동기 응답 스트림 처리 도구.
  • 클라이언트가 연결을 유지하고 있는 동안 서버가 데이터를 지속적으로 전송할 수 있도록 함.
  • Blocking I/O(스레드당 요청 처리) 기반이며, 내부적으로는 DeferredResult를 확장한 형태.

장점

  • Spring MVC 기반으로 기존 Spring 프로젝트에서 쉽게 적용 가능.
  • 단순한 구조로 학습 곡선이 낮음.
  • JVM 기반의 Blocking I/O로 작은 규모의 애플리케이션에서는 빠르게 적용할 수 있음.

단점

  • Blocking I/O 기반으로 많은 SSE 연결 시 스레드 리소스가 부족할 수 있음.
  • 대규모 연결 처리에는 부적합(스케일링 이슈).
  • ResponseBodyEmitter는 기본적으로 WebFlux와 같은 Reactive Streams의 장점을 활용하지 못함.
  • Spring MVC의 전통적인 스레드 모델이므로, 고성능 대규모 SSE가 필요한 경우 성능 저하 가능.

2) WebFlux 기반 SSE

  • Spring WebFlux는 비동기 논블로킹 I/OReactive Streams를 기반으로 하는 웹 프레임워크.
  • Netty와 같은 논블로킹 서버를 사용할 수 있으며, 대규모 SSE 연결을 효율적으로 처리할 수 있음.
  • Flux(Reactive Stream)로 SSE를 지원

장점

  1. Reactive Streams 기반으로 대규모 동시 연결을 효율적으로 처리.
  2. 비동기 논블로킹 I/O 덕분에 적은 리소스로 많은 SSE 연결을 지원.
  3. WebFlux API와 잘 통합되어 있어, Flux/Mono 기반의 Reactive Programming 패러다임을 활용.
  4. Spring Boot 2.0+ 이후 SSE, WebSocket, HTTP Streaming에 최적화되어 있음.

단점

  1. Reactive Streams의 개념(Flux, Mono)을 학습해야 하며, 학습 곡선이 높음.
  2. 기존 Spring MVC 프로젝트에서는 WebFlux 의존성을 추가해야 하고, 일부 리팩토링이 필요할 수 있음.
  3. 단순한 SSE 구현에는 오히려 과한 기술 스택이 될 수도 있음.

테스트를 위한 프로젝트를 새로 만들것이고, Kotlin의 Flux를 사용해보고 싶었기에 WebFlux 기반 SSE 방식을 선택하고 이를 활용해 작업 진행률을 받아오는 방법에 대해 작성해보겠다.


1. 초기 환경 세팅

ssetest라는 프로젝트를 만들어보자. 앞서 말했듯이 Kotlin과 WebFlux를 사용할 예정이다.

언어는 Koltin, JDK는 correto-17, Springboot는 3.4.2, 의존성은 SpringWeb과 SpringReactiveWeb 두 가지를 추가하겠다.

 

간단한 테스트용 컨트롤러를 만들어 테스트해보면

@Controller
class SseController {

    @GetMapping
    fun test() : ResponseEntity<String> {
        return ResponseEntity.ok("Hello SSE")
    }
}

잘 나오므로 이제 SSE를 발생시키는(?) 코드를 작성해보겠다.


2. SSE 코드 작성

테스트는 단일 서버로 해야 하므로 몇가지 가정이 필요하다.

1. 서버에 요청이 들어왔고

2. 서버에서 오래 걸리는 작업이 있고 이 작업의 진행률은 파일에 기록된다.

3. 클라이언트에게 작업의 진행률이 바뀔 때 마다 알려줘야 한다.

4. 작업이 완료되었을 때 어떤 작업이 완료됬는지 알려준다.

 

간단하게 이정도 가정만 가지고 코드를 작성해보겠다.

그래서 서버에서는 3개의 요청이 들어온다고 가정하겠다.

1. 처음 서버에 작업 요청을 보내는 "/start"

2. 작업 요청의 진행률을 sse로 확인하는 "/progress" 

3. 작업이 끝났을 때 결과값을 요청하는 "/result" 

@Controller
class SseController(
    private val sseService: SseService
) {

    @GetMapping("/start")
    fun startTask(@RequestParam name: String) : ResponseEntity<String> {
        return ResponseEntity.ok(sseService.startTask(name))
    }

    @PostMapping("/progress", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun getProgress(@RequestParam name: String) : Flow<Any> {
        return sseService.streamProgress(name)
    }

    @GetMapping("/result")
    fun getResult(@RequestParam name: String) : ResponseEntity<String> {
        return ResponseEntity.ok(sseService.getResult(name))
    }
}

SSE는 뭔가 복잡한 설정이 필요할 줄 알았지만 Controller는 그냥 Flow 타입을 반환하기만 하면 돼서 생각보다 간단했다.

 

이제 Service를 작성해보자.

먼저 CoroutineScope와 유저가 붙인 작업 이름과 실제 작업 이름을 가지는 Map을 하나 정의하겠다.

private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val taskFileMap = ConcurrentHashMap<String, String>()

CoroutineScope을 정의하는 방법도 굉장히 다양한 것으로 알고 있는데 이는 나중에 공부하려고 한다.

Dispatchers.IO는 I/O 최적화된 스레드에서 실행하고,(파일, DB, 네트워크 요청 등)

SupervisorJob()은 하위 코루틴이 실패해도 다른 코루틴이 취소되지 않도록 관리한다.

 

이후 오래 걸리는 작업이 있다고 가정할 것 이므로 단순하게 10초동안 진행률을 올리고, 이를 파일에 적는 메서드를 하나 작성했다.

private fun execute(taskName: String) {
    val file = File(taskName)
    scope.launch {
        for (i in 1..100 step 10) {
            delay(1000)
            updateProgress(i, file)
        }
        delay(1000)
        updateProgress(Integer.MAX_VALUE, file)
    }
}

private fun updateProgress(progress: Int, file: File) {
        file.writeText(progress.toString())
}

그럼 본격적으로 Controller에서 호출하는 메서드들을 작성해보겠다.

먼저 "/start"에서 호출하는 startTask 함수이다

fun startTask(taskName: String): String {
    if (taskFileMap.containsKey(taskName)) {
        return "task ${taskFileMap.get(taskName)} already started"
    } else {
        val taskId = UUID.randomUUID().toString()
        taskFileMap.put(taskName, taskId)
        execute(taskId)

        return "task ${taskFileMap.get(taskName)} started"
    }
}

내용은 간단하게 이미 동일한 이름의 작업이 있다면 오류를, 아니면 작업을 실행해주는 함수이다.

바로 위에 작성한 execute 함수를 호출한다.

 

"/progress"에서 호출하는 streamProgress 함수이다.

fun streamProgress(taskName: String): Flow<Any> {

    return flow {
        try {
            while (true) {
                delay(1000)
                val taskId = taskFileMap.get(taskName)!!
                val file = File(taskId)
                val progress = file.readText().toIntOrNull() ?: 0

                emit("Progress: $progress%")
                if (progress >= 100) {
                    emit("task $taskName completed")
                    break
                }
            }
        } catch (e: Exception) {
            println("SSE Connection Closed: ${e.message}")
        }
    }
}

특이하게 return type이 Flow인데, 이게 바로 SSE 방식으로 클라이언트에게 값을 전달해주게 된다.

Flow를 사용하면 Client와 단방향 통신이 가능하게 되고 emit 안의 있는 값들이 실시간으로 Client에게 전달되게 된다.

별개로 sse와 소켓 통신 방식에 대한 차이점도 있는데 이는 이후에 작성해보겠다.

 

"/result"에서 호출하는 getResult 함수이다.

fun getResult(taskName: String): Int {
    validateTaskName(taskName)
    val result = validateProgressCompleted(taskName)
    taskFileMap.remove(taskName)

    return result
}

private fun validateTaskName(taskName: String) {
    if (!taskFileMap.containsKey(taskName)) {
        throw BadRequestException("task $taskName not found")
    }
}

private fun validateProgressCompleted(taskName: String) : Int {
    val result = File(taskFileMap.get(taskName)).readText().toInt()
    if (result < 100) {
        throw BadRequestException("task $taskName running")
    }
    return result
}

결과를 조회하려는 작업이 유효한지, 완료되었는지 확인하고 완료되었다면 결과값을 반환한다.

 

여기까지가 큰 틀에서의 SSE 구현이며 아래와 같이 테스트 결과도 잘 나오는것을 확인할 수 있다.

내가 작성한 Service 코드는 아래와 같다.

package com.ssetest.service

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.apache.coyote.BadRequestException
import org.springframework.stereotype.Service
import java.io.File
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap

@Service
class SseService {

    private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    private val taskFileMap = ConcurrentHashMap<String, String>()

    fun startTask(taskName: String): String {
        if (taskFileMap.containsKey(taskName)) {
            return "task ${taskFileMap.get(taskName)} already started"
        } else {
            val taskId = UUID.randomUUID().toString()
            taskFileMap.put(taskName, taskId)
            execute(taskId)

            return "task ${taskFileMap.get(taskName)} started"
        }
    }

    fun streamProgress(taskName: String): Flow<Any> {
        validateTaskName(taskName)
        validateProgress(taskName)

        return flow {
            try {
                while (true) {
                    delay(1000)
                    val taskId = taskFileMap.get(taskName)!!
                    val file = File(taskId)
                    val progress = file.readText().toIntOrNull() ?: 0

                    emit("Progress: $progress%")
                    if (progress >= 100) {
                        emit("task $taskName completed")
                        break
                    }
                }
            } catch (e: Exception) {
                println("SSE Connection Closed: ${e.message}")
            }
        }
    }

    fun getResult(taskName: String): Int {
        validateTaskName(taskName)
        val result = validateProgressCompleted(taskName)
        taskFileMap.remove(taskName)

        return result
    }

    private fun execute(taskName: String) {
        val file = File(taskName)
        scope.launch {
            for (i in 1..100 step 10) {
                delay(1000)
                updateProgress(i, file)
            }
            delay(1000)
            updateProgress(Integer.MAX_VALUE, file)
        }
    }

    private fun updateProgress(progress: Int, file: File) {
        file.writeText(progress.toString())
    }

    private fun validateTaskName(taskName: String) {
        if (!taskFileMap.containsKey(taskName)) {
            throw BadRequestException("task $taskName not found")
        }
    }

    private fun validateProgress(taskName: String) {
        val taskId = taskFileMap.get(taskName)
        val file = File(taskId)
        val progress = file.readText().toIntOrNull() ?: 0
        if (progress >= 100) {
            throw BadRequestException("task $taskName already completed")
        }
    }

    private fun validateProgressCompleted(taskName: String) : Int {
        val result = File(taskFileMap.get(taskName)).readText().toInt()
        if (result < 100) {
            throw BadRequestException("task $taskName running")
        }
        return result
    }
}

그럼 이제 SSE와 Websock방식의 차이점에 대해 작성하고, 내가 구현한 SSE의 한계점에 대해 작성해보겠다.


3. SSE와 Websock방식의 차이점

0. 결론

  • SSE 구현이 간단하고 HTTP 기반으로 동작하여 단방향 실시간 데이터 스트리밍에 적합.
  • WebSocket 양방향 통신 낮은 오버헤드를 제공하여, 양쪽 모두에서 적극적인 데이터 교환이 필요한 복잡한 실시간 애플리케이션에 적합.

1. 통신 방향 및 프로토콜

  • SSE 
    • 단방향 통신: 서버에서 클라이언트로만 데이터가 푸시. 클라이언트는 단순히 데이터를 수신하기만 하며, 서버에 데이터를 보내려면 별도의 HTTP 요청을 사용해야 함.
    • HTTP 기반: 기존 HTTP 프로토콜 위에서 동작하며, text/event-stream 콘텐츠 타입으로 스트리밍 데이터를 전송.
    • 자동 재연결: 기본적으로 브라우저의 EventSource API가 자동 재연결 기능을 내장하고 있어, 연결이 끊어졌을 때 클라이언트가 자동으로 재연결을 시도.
  • WebSocket 
    • 양방향 통신: 클라이언트와 서버가 모두 데이터를 자유롭게 주고받을 수 있는 전이중(Full-Duplex) 통신을 지원.
    • 별도 프로토콜: 초기 HTTP 핸드셰이크 후 전용 WebSocket 프로토콜로 전환되어, 보다 효율적인 실시간 통신 가능.
    • 낮은 오버헤드: 헤더 오버헤드가 작고, 지속적인 연결을 유지하면서 데이터를 빠르게 교환 가능.

2. 구현의 용이성과 지원

  • SSE
    • 구현 용이성: 브라우저 내장 EventSource API 덕분에 클라이언트 구현이 간단.
    • HTTP 인프라와 호환: 프록시, 로드 밸런서, 캐시 서버 등 기존 HTTP 인프라와 자연스럽게 통합.
    • 제한 사항: 단방향 통신이기 때문에, 클라이언트에서 서버로의 실시간 데이터 전송이 필요한 경우에는 추가 메커니즘 필요. 또한, 일부 오래된 브라우저에서는 지원x
  • WebSocket
    • 구현 복잡성: 서버와 클라이언트 모두에서 WebSocket 핸드셰이크, 메시지 처리, 연결 관리 등 추가적인 로직이 필요
    • 양방향 통신: 실시간 채팅, 게임, 협업 도구 등 클라이언트와 서버 간의 상호작용이 중요한 애플리케이션에 적합.

3. 사용 사례

  • SSE
    • 실시간 알림, 진행률 업데이트, 뉴스 피드와 같이 서버에서 주기적으로 업데이트를 전송하는 경우
    • 단순하고 가벼운 구현이 필요한 경우
    • 클라이언트가 주로 수신만 하면 되는 상황
  • WebSocket
    • 실시간 채팅, 게임, 협업 애플리케이션 등 양방향 통신이 필수적인 경우
    • 높은 빈도의 메시지 교환이나 대용량 데이터 전송이 필요한 경우
    • 서버와 클라이언트 모두 적극적으로 데이터를 주고받아야 하는 상황

4. 내가 구현한 SSE의 한계점

개인적으로 가장 큰 한계점은 Polling 방식과 동일하게 동작한다는 점 이었다.

fun streamProgress(taskName: String): Flow<Any> {

    return flow {
        try {
            while (true) {
                delay(1000)
                val taskId = taskFileMap.get(taskName)!!
                val file = File(taskId)
                val progress = file.readText().toIntOrNull() ?: 0

                emit("Progress: $progress%")
                if (progress >= 100) {
                    emit("task $taskName completed")
                    break
                }
            }
        } catch (e: Exception) {
            println("SSE Connection Closed: ${e.message}")
        }
    }
}

이 코드는 무한 루프에 1초마다 딜레이를 주고 동작하고 있으므로 1초마다 Polling하는 것과 동일하다고 생각했다.

그래서 파일을 관찰하다가 파일의 변경사항이 생길 때 마다 SSE를 실행하는것이 적합하다고 생각했지만 복잡도가 너무 늘어나서 이는 포기했다.


오늘은 SSE에 대해 작성해봤다.

생각보다 구현이 간단했지만 그래도 생각할 점이 꽤나 있었던 것 같다. 그리고 Android의 ViewModel에서 봤던 Flux가 갑자기 나와서 신기했지만 결국 ViewModel에서 사용하는 Flux도 데이터를 동기화하기 위해 사용했고, SSE 역시 데이터를 클라이언트와 동기화 한다는 점에서 어떻게 보면 비슷한 성격으로 사용하는 것 같다고 느꼈다.

반응형