Raw gRPC API
に最終更新MagicOnion はプリミティブな gRPC API(ClientStreaming、ServerStreaming、DuplexStreaming)を定義して使用することができます。特に DuplexStreaming は StreamingHub の基礎となって使用されています。特別な理由がない限り、StreamingHub の使用を推奨します。
ServerStreaming の使い方
ServerStreaming はサーバーからクライアントへ複数の値を送信するストリーミングパターンです。クライアントは単一のリクエストを送信し、サーバーは複数のレスポンスを返すことができます。
サーバー側の実装
ServerStreaming を実装するには、GetServerStreamingContext<T>()
を使用してストリーミングコンテキストを取得します。
public async Task<ServerStreamingResult<WeatherData>> GetWeatherUpdatesAsync(string location, int count)
{
var stream = GetServerStreamingContext<WeatherData>();
// 指定された回数だけ天気データを送信
for (int i = 0; i < count; i++)
{
var weatherData = new WeatherData
{
Temperature = Random.Shared.Next(-10, 35),
Humidity = Random.Shared.Next(30, 90),
Timestamp = DateTime.UtcNow
};
await stream.WriteAsync(weatherData);
// 1秒待機(リアルタイムデータのシミュレーション)
await Task.Delay(1000);
}
return stream.Result();
}
クライアント側の実装
クライアント側では、ResponseStream.ReadAllAsync()
を使用してサーバーから送信されるすべての値を受信します。
var client = MagicOnionClient.Create<IWeatherService>(channel);
var stream = await client.GetWeatherUpdatesAsync("Tokyo", 5);
await foreach (var weatherData in stream.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"気温: {weatherData.Temperature}°C, 湿度: {weatherData.Humidity}%, 時刻: {weatherData.Timestamp}");
}
使用例
ServerStreaming は以下のようなシナリオで有用です:
- リアルタイムデータフィード(株価、センサーデータなど)
- 大量データの分割送信
- プログレス更新の通知
- ログのストリーミング
ClientStreaming の使い方
ClientStreaming はクライアントからサーバーへ複数の値を送信するストリーミングパターンです。クライアントは複数のメッセージを送信し、サーバーは単一のレスポンスを返します。
サーバー側の実装
ClientStreaming を実装するには、GetClientStreamingContext<TRequest, TResponse>()
を使用してストリーミングコンテキストを取得します。
public async Task<ClientStreamingResult<SensorData, AnalysisResult>> AnalyzeSensorDataAsync()
{
var stream = GetClientStreamingContext<SensorData, AnalysisResult>();
var allData = new List<SensorData>();
// クライアントからのすべてのデータを受信
await foreach (var data in stream.ReadAllAsync())
{
Logger.Debug($"Received sensor data: {data.Value} at {data.Timestamp}");
allData.Add(data);
}
// 受信したデータを分析
var result = new AnalysisResult
{
Average = allData.Average(d => d.Value),
Max = allData.Max(d => d.Value),
Min = allData.Min(d => d.Value),
Count = allData.Count
};
return stream.Result(result);
}
クライアント側の実装
クライアント側では、RequestStream.WriteAsync()
を使用して複数の値を送信し、最後に CompleteAsync()
を呼び出してストリームを完了します。
var client = MagicOnionClient.Create<ISensorService>(channel);
var stream = await client.AnalyzeSensorDataAsync();
// センサーデータを送信
for (int i = 0; i < 10; i++)
{
var sensorData = new SensorData
{
Value = Random.Shared.NextDouble() * 100,
Timestamp = DateTime.UtcNow
};
await stream.RequestStream.WriteAsync(sensorData);
await Task.Delay(100); // センサー読み取り間隔のシミュレーション
}
// ストリームを完了
await stream.RequestStream.CompleteAsync();
// サーバーからの分析結果を受信
var result = await stream.ResponseAsync;
Console.WriteLine($"平均: {result.Average}, 最大: {result.Max}, 最小: {result.Min}, 件数: {result.Count}");
使用例
ClientStreaming は以下のようなシナリオで有用です:
- ファイルアップロード(チャンク単位)
- バッチデータの送信
- センサーデータの収集
- ログの一括送信
DuplexStreaming の使い方
DuplexStreaming は双方向のストリーミングパターンで、クライアントとサーバーが同時に複数のメッセージを送受信できます。これは MagicOnion の StreamingHub の基礎となる技術です。
サーバー側の実装
DuplexStreaming を実装するには、GetDuplexStreamingContext<TRequest, TResponse>()
を使用してストリーミングコンテキストを取得します。
public async Task<DuplexStreamingResult<ChatMessage, ChatMessage>> ChatAsync()
{
var stream = GetDuplexStreamingContext<ChatMessage, ChatMessage>();
// クライアントからのメッセージを受信するタスク
var receiveTask = Task.Run(async () =>
{
await foreach (var message in stream.ReadAllAsync())
{
Logger.Debug($"Received: {message.User}: {message.Content}");
// エコーバック(受信したメッセージにサーバー応答を付けて返す)
var response = new ChatMessage
{
User = "Server",
Content = $"Echo: {message.Content}",
Timestamp = DateTime.UtcNow
};
await stream.WriteAsync(response);
}
});
// ウェルカムメッセージを送信
await stream.WriteAsync(new ChatMessage
{
User = "Server",
Content = "チャットへようこそ!",
Timestamp = DateTime.UtcNow
});
await receiveTask;
return stream.Result();
}
クライアント側の実装
クライアント側では、送信と受信を並行して処理します。
var client = MagicOnionClient.Create<IChatService>(channel);
var stream = await client.ChatAsync();
// サーバーからのメッセージを受信するタスク
var receiveTask = Task.Run(async () =>
{
await foreach (var message in stream.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"[{message.Timestamp}] {message.User}: {message.Content}");
}
});
// ユーザー入力を送信
while (true)
{
var input = Console.ReadLine();
if (input == "exit") break;
var message = new ChatMessage
{
User = "Client",
Content = input,
Timestamp = DateTime.UtcNow
};
await stream.RequestStream.WriteAsync(message);
}
// ストリームを完了
await stream.RequestStream.CompleteAsync();
await receiveTask;
使用例
DuplexStreaming は以下のようなシナリオで有用です:
- リアルタイムチャット
- ゲームの双方向通信
- コラボレーションツール
- リアルタイム監視システム
注意事項
-
StreamingHub の検討: DuplexStreaming が必要な場合、多くのケースで StreamingHub の方が適しています。StreamingHub は DuplexStreaming の上に構築された、より高レベルな API を提供します。
-
エラーハンドリング: ストリーミング中の例外は適切に処理する必要があります。接続の切断やタイムアウトに対する対策を実装してください。
-
リソース管理: 長時間実行されるストリーミング接続は、適切にリソースを管理し、必要に応じてタイムアウトを設定してください。
-
並行処理: DuplexStreaming では送信と受信が並行して行われるため、スレッドセーフティに注意してください。
サンプルコード
サーバーのサンプル
// 定義
public interface IMyFirstService : IService<IMyFirstService>
{
UnaryResult<string> SumAsync(int x, int y);
Task<UnaryResult<string>> SumLegacyTaskAsync(int x, int y);
Task<ClientStreamingResult<int, string>> ClientStreamingSampleAsync();
Task<ServerStreamingResult<string>> ServerStreamingSampleAsync(int x, int y, int z);
Task<DuplexStreamingResult<int, string>> DuplexStreamingSampleAsync();
}
// サーバー
public class MyFirstService : ServiceBase<IMyFirstService>, IMyFirstService
{
public async UnaryResult<string> SumAsync(int x, int y)
{
Logger.Debug($"Called SumAsync - x:{x} y:{y}");
return (x + y).ToString();
}
public async Task<ClientStreamingResult<int, string>> ClientStreamingSampleAsync()
{
Logger.Debug($"Called ClientStreamingSampleAsync");
// ClientStreaming の場合は、GetClientStreamingContext を使用します。
var stream = GetClientStreamingContext<int, string>();
// クライアントから非同期で受信
await foreach (var x in stream.ReadAllAsync())
{
Logger.Debug("Client Stream Received:" + x);
}
// StreamingContext.Result() で結果値を返します。
return stream.Result("finished");
}
public async Task<ServerStreamingResult<string>> ServerStreamingSampleAsync(int x, int y, int z)
{
Logger.Debug($"Called ServerStreamingSampleAsync - x:{x} y:{y} z:{z}");
var stream = GetServerStreamingContext<string>();
var acc = 0;
for (int i = 0; i < z; i++)
{
acc = acc + x + y;
await stream.WriteAsync(acc.ToString());
}
return stream.Result();
}
public async Task<DuplexStreamingResult<int, string>> DuplexStreamingSampleAsync()
{
Logger.Debug($"Called DuplexStreamingSampleAsync");
// DuplexStreamingContext はサーバーとクライアントの両方のストリーミングを表します。
var stream = GetDuplexStreamingContext<int, string>();
var waitTask = Task.Run(async () =>
{
// ForEachAsync(MoveNext, Current) でクライアントストリーミングを受信できます。
await foreach (var x in stream.ReadAllAsync())
{
Logger.Debug($"Duplex Streaming Received:" + x);
}
});
// WriteAsync は ServerStreaming です。
await stream.WriteAsync("test1");
await stream.WriteAsync("test2");
await stream.WriteAsync("finish");
await waitTask;
return stream.Result();
}
}
クライアントのサンプル
static async Task ClientStreamRun(IMyFirstService client)
{
var stream = await client.ClientStreamingSampleAsync();
for (int i = 0; i < 3; i++)
{
await stream.RequestStream.WriteAsync(i);
}
await stream.RequestStream.CompleteAsync();
var response = await stream.ResponseAsync;
Console.WriteLine("Response:" + response);
}
static async Task ServerStreamRun(IMyFirstService client)
{
var stream = await client.ServerStreamingSampleAsync(10, 20, 3);
await foreach (var x in stream.ResponseStream.ReadAllAsync())
{
Console.WriteLine("ServerStream Response:" + x);
}
}
static async Task DuplexStreamRun(IMyFirstService client)
{
var stream = await client.DuplexStreamingSampleAsync();
var count = 0;
await foreach (var x in stream.ResponseStream.ReadAllAsync())
{
Console.WriteLine("DuplexStream Response:" + x);
await stream.RequestStream.WriteAsync(count++);
if (x == "finish")
{
await stream.RequestStream.CompleteAsync();
}
}
}