Raw gRPC APIs
최종 업데이트MagicOnion은 기본적인 gRPC APIs(ClientStreaming, ServerStreaming, DuplexStreaming)를 정의하고 사용할 수 있습니다. 특히 DuplexStreaming은 StreamingHub의 기반으로 사용됩니다. 특별한 이유가 없다면 StreamingHub를 사용하는 것을 권장합니다.
ServerStreaming
ServerStreaming은 서버에서 클라이언트로 여러 값을 전송하는 스트리밍 패턴입니다. 클라이언트는 단일 요청을 보내고, 서버는 여러 응답을 반환할 수 있습니다.
서버 측 구현
ServerStreaming을 구현하려면 GetServerStreamingContext<T>()를 사용하여 스트리밍 컨텍스트를 가져옵니다.
public async Task<ServerStreamingResult<WeatherData>> GetWeatherUpdatesAsync(string location, int count)
{
    var stream = GetServerStreamingContext<WeatherData>();
    // 지정된 횟수만큼 날씨 데이터 전송
    for (int i = 0; i < count; i++)
    {
        var weatherData = new WeatherData
        {
            Temperature = Random.Shared.Next(-10, 35),
            Humidity = Random.Shared.Next(30, 90),
            Timestamp = DateTime.UtcNow
        };
        await stream.WriteAsync(weatherData);
        
        // 1초 대기 (실시간 데이터 시뮬레이션)
        await Task.Delay(1000);
    }
    return stream.Result();
}
클라이언트 측 구현
클라이언트 측에서는 ResponseStream.ReadAllAsync()를 사용하여 서버에서 전송되는 모든 값을 수신합니다.
var client = MagicOnionClient.Create<IWeatherService>(channel);
var stream = await client.GetWeatherUpdatesAsync("Tokyo", 5);
await foreach (var weatherData in stream.ResponseStream.ReadAllAsync())
{
    Console.WriteLine($"온도: {weatherData.Temperature}°C, 습도: {weatherData.Humidity}%, 시간: {weatherData.Timestamp}");
}
사용 사례
ServerStreaming은 다음과 같은 시나리오에서 유용합니다:
- 실시간 데이터 피드 (주식 가격, 센서 데이터 등)
- 대량 데이터의 청크 단위 전송
- 진행 상황 업데이트 알림
- 로그 스트리밍
ClientStreaming
ClientStreaming은 클라이언트에서 서버로 여러 값을 전송하는 스트리밍 패턴입니다. 클라이언트는 여러 메시지를 보내고, 서버는 단일 응답을 반환합니다.
서버 측 구현
ClientStreaming을 구현하려면 GetClientStreamingContext<TRequest, TResponse>()를 사용하여 스트리밍 컨텍스트를 가져옵니다.
public async Task<ClientStreamingResult<SensorData, AnalysisResult>> AnalyzeSensorDataAsync()
{
    var stream = GetClientStreamingContext<SensorData, AnalysisResult>();
    var allData = new List<SensorData>();
    
    // 클라이언트로부터 모든 데이터 수신
    await foreach (var data in stream.ReadAllAsync())
    {
        Logger.Debug($"센서 데이터 수신: {data.Value} at {data.Timestamp}");
        allData.Add(data);
    }
    // 수신된 데이터 분석
    var result = new AnalysisResult
    {
        Average = allData.Average(d => d.Value),
        Max = allData.Max(d => d.Value),
        Min = allData.Min(d => d.Value),
        Count = allData.Count
    };
    return stream.Result(result);
}
클라이언트 측 구현
클라이언트 측에서는 RequestStream.WriteAsync()를 사용하여 여러 값을 전송하고, 마지막에 CompleteAsync()를 호출하여 스트림을 완료합니다.
var client = MagicOnionClient.Create<ISensorService>(channel);
var stream = await client.AnalyzeSensorDataAsync();
// 센서 데이터 전송
for (int i = 0; i < 10; i++)
{
    var sensorData = new SensorData
    {
        Value = Random.Shared.NextDouble() * 100,
        Timestamp = DateTime.UtcNow
    };
    
    await stream.RequestStream.WriteAsync(sensorData);
    await Task.Delay(100); // 센서 읽기 간격 시뮬레이션
}
// 스트림 완료
await stream.RequestStream.CompleteAsync();
// 서버로부터 분석 결과 수신
var result = await stream.ResponseAsync;
Console.WriteLine($"평균: {result.Average}, 최대: {result.Max}, 최소: {result.Min}, 개수: {result.Count}");
사용 사례
ClientStreaming은 다음과 같은 시나리오에서 유용합니다:
- 파일 업로드 (청크 단위)
- 배치 데이터 제출
- 센서 데이터 수집
- 대량 로그 제출
DuplexStreaming
DuplexStreaming은 클라이언트와 서버가 동시에 여러 메시지를 주고받을 수 있는 양방향 스트리밍 패턴입니다. 이는 MagicOnion의 StreamingHub를 위한 기반 기술입니다.
서버 측 구현
DuplexStreaming을 구현하려면 GetDuplexStreamingContext<TRequest, TResponse>()를 사용하여 스트리밍 컨텍스트를 가져옵니다.
public async Task<DuplexStreamingResult<ChatMessage, ChatMessage>> ChatAsync()
{
    var stream = GetDuplexStreamingContext<ChatMessage, ChatMessage>();
    // 클라이언트로부터 메시지를 수신하는 태스크
    var receiveTask = Task.Run(async () =>
    {
        await foreach (var message in stream.ReadAllAsync())
        {
            Logger.Debug($"수신: {message.User}: {message.Content}");
            
            // 에코백 (수신한 메시지에 서버 응답을 추가하여 반환)
            var response = new ChatMessage
            {
                User = "Server",
                Content = $"Echo: {message.Content}",
                Timestamp = DateTime.UtcNow
            };
            
            await stream.WriteAsync(response);
        }
    });
    // 환영 메시지 전송
    await stream.WriteAsync(new ChatMessage
    {
        User = "Server",
        Content = "채팅에 오신 것을 환영합니다!",
        Timestamp = DateTime.UtcNow
    });
    await receiveTask;
    return stream.Result();
}
클라이언트 측 구현
클라이언트 측에서는 송신과 수신을 병렬로 처리합니다.
var client = MagicOnionClient.Create<IChatService>(channel);
var stream = await client.ChatAsync();
// 서버로부터 메시지를 수신하는 태스크
var receiveTask = Task.Run(async () =>
{
    await foreach (var message in stream.ResponseStream.ReadAllAsync())
    {
        Console.WriteLine($"[{message.Timestamp}] {message.User}: {message.Content}");
    }
});
// 사용자 입력 전송
while (true)
{
    var input = Console.ReadLine();
    if (input == "exit") break;
    var message = new ChatMessage
    {
        User = "Client",
        Content = input,
        Timestamp = DateTime.UtcNow
    };
    await stream.RequestStream.WriteAsync(message);
}
// 스트림 완료
await stream.RequestStream.CompleteAsync();
await receiveTask;
사용 사례
DuplexStreaming은 다음과 같은 시나리오에서 유용합니다:
- 실시간 채팅
- 양방향 게임 통신
- 협업 도구
- 실시간 모니터링 시스템
주의사항
- 
StreamingHub 고려: DuplexStreaming이 필요한 경우, 많은 경우에 StreamingHub가 더 적합합니다. StreamingHub는 DuplexStreaming 위에 구축된 더 높은 수준의 API를 제공합니다. 
- 
오류 처리: 스트리밍 중 예외는 적절히 처리되어야 합니다. 연결 끊김 및 타임아웃에 대한 대책을 구현하세요. 
- 
리소스 관리: 장시간 실행되는 스트리밍 연결은 리소스를 적절히 관리하고 필요에 따라 타임아웃을 설정해야 합니다. 
- 
동시 처리: DuplexStreaming에서는 송신과 수신이 동시에 발생하므로 스레드 안전성에 주의하세요. 
샘플 코드
서버 샘플
// Definitions
public interface IMyFirstService : IService<IMyFirstService>
{
    UnaryResult<string> SumAsync(int x, int y);
    Task<UnaryResult<string>> SumLegacyTaskAsync(int x, int y);
    Task<ClientStreamingResult<int, string>> ClientStreamingSampleAsync();
    Task<ServerStreamingResult<string>> ServerStreamingSampleAsync(int x, int y, int z);
    Task<DuplexStreamingResult<int, string>> DuplexStreamingSampleAsync();
}
// Server
public class MyFirstService : ServiceBase<IMyFirstService>, IMyFirstService
{
    public async UnaryResult<string> SumAsync(int x, int y)
    {
        Logger.Debug($"Called SumAsync - x:{x} y:{y}");
        return (x + y).ToString();
    }
    public async Task<ClientStreamingResult<int, string>> ClientStreamingSampleAsync()
    {
        Logger.Debug($"Called ClientStreamingSampleAsync");
        // If ClientStreaming, use GetClientStreamingContext.
        var stream = GetClientStreamingContext<int, string>();
        // receive from client asynchronously
        await foreach (var x in stream.ReadAllAsync())
        {
            Logger.Debug("Client Stream Received:" + x);
        }
        // StreamingContext.Result() for result value.
        return stream.Result("finished");
    }
    public async Task<ServerStreamingResult<string>> ServerStreamingSampleAsync(int x, int y, int z)
    {
        Logger.Debug($"Called ServerStreamingSampleAsync - x:{x} y:{y} z:{z}");
        var stream = GetServerStreamingContext<string>();
        var acc = 0;
        for (int i = 0; i < z; i++)
        {
            acc = acc + x + y;
            await stream.WriteAsync(acc.ToString());
        }
        return stream.Result();
    }
    public async Task<DuplexStreamingResult<int, string>> DuplexStreamingSampleAsync()
    {
        Logger.Debug($"Called DuplexStreamingSampleAsync");
        // DuplexStreamingContext represents both server and client streaming.
        var stream = GetDuplexStreamingContext<int, string>();
        var waitTask = Task.Run(async () =>
        {
            // ForEachAsync(MoveNext, Current) can receive client streaming.
            await foreach (var x in stream.ReadAllAsync())
            {
                Logger.Debug($"Duplex Streaming Received:" + x);
            }
        });
        // WriteAsync is ServerStreaming.
        await stream.WriteAsync("test1");
        await stream.WriteAsync("test2");
        await stream.WriteAsync("finish");
        await waitTask;
        return stream.Result();
    }
}
클라이언트 샘플
static async Task ClientStreamRun(IMyFirstService client)
{
    var stream = await client.ClientStreamingSampleAsync();
    for (int i = 0; i < 3; i++)
    {
        await stream.RequestStream.WriteAsync(i);
    }
    await stream.RequestStream.CompleteAsync();
    var response = await stream.ResponseAsync;
    Console.WriteLine("Response:" + response);
}
static async Task ServerStreamRun(IMyFirstService client)
{
    var stream = await client.ServerStreamingSampleAsync(10, 20, 3);
    await foreach (var x in stream.ResponseStream.ReadAllAsync())
    {
        Console.WriteLine("ServerStream Response:" + x);
    }
}
static async Task DuplexStreamRun(IMyFirstService client)
{
    var stream = await client.DuplexStreamingSampleAsync();
    var count = 0;
    await foreach (var x in stream.ResponseStream.ReadAllAsync())
    {
        Console.WriteLine("DuplexStream Response:" + x);
        await stream.RequestStream.WriteAsync(count++);
        if (x == "finish")
        {
            await stream.RequestStream.CompleteAsync();
        }
    }
}