DarabonbaUnitTests/Utils/StreamUtilsTest.cs (460 lines of code) (raw):

using System.Collections.Generic; using System.IO; using System.Text; using System.Threading.Tasks; using System.Threading; using Darabonba.Models; using Darabonba.Utils; using Xunit; using System.Net; using System.Net.Http; using Newtonsoft.Json; using System; namespace DaraUnitTests.Utils { public class SseServer : IDisposable { private readonly HttpListener _httpListener; private CancellationTokenSource _cancellationTokenSource; public SseServer(string uriPrefix) { _httpListener = new HttpListener(); _httpListener.Prefixes.Add(uriPrefix); } public void Start() { _cancellationTokenSource = new CancellationTokenSource(); _httpListener.Start(); Task.Run(() => HandleIncomingConnections(_cancellationTokenSource.Token)); } private async Task HandleIncomingConnections(CancellationToken cancellationToken) { while (!_cancellationTokenSource.IsCancellationRequested) { try { var context = await _httpListener.GetContextAsync().ConfigureAwait(false); if (context.Request.Url?.AbsolutePath == "/sse") { HandleSseResponse(context.Response); } else if (context.Request.Url?.AbsolutePath == "/sse_with_no_spaces") { HandleSseWithNoSpacesResponse(context.Response); } else if (context.Request.Url?.AbsolutePath == "/sse_invalid_retry") { HandleSseWithInvalidRetryResponse(context.Response); } else if (context.Request.Url?.AbsolutePath == "/sse_with_data_divided") { HandleSseWithDataDividedResponse(context.Response); } } catch (HttpListenerException) when (cancellationToken.IsCancellationRequested) { throw new HttpListenerException(); } } } private void HandleSseResponse(HttpListenerResponse response) { int count = 0; Timer timer = null; var cts = new CancellationTokenSource(); var token = cts.Token; timer = new Timer(_ => { if (token.IsCancellationRequested) { return; } if (count >= 5) { cts.Cancel(); timer.Dispose(); response.Close(); return; } try { byte[] buffer = Encoding.UTF8.GetBytes(string.Format( "data: {0}\nevent: flow\nid: sse-test\nretry: 3\n:heartbeat\n\n", JsonConvert.SerializeObject(new { count = count }))); response.OutputStream.Write(buffer, 0, buffer.Length); response.OutputStream.Flush(); count++; } catch (ObjectDisposedException ex) { Console.WriteLine($"ObjectDisposedException caught: {ex.Message}"); } }, null, 0, 100); } private void HandleSseWithNoSpacesResponse(HttpListenerResponse response) { int count = 0; Timer timer = null; var cts = new CancellationTokenSource(); var token = cts.Token; timer = new Timer(_ => { if (token.IsCancellationRequested) { return; } if (count >= 5) { cts.Cancel(); timer.Dispose(); response.Close(); return; } try { byte[] buffer = Encoding.UTF8.GetBytes(string.Format("data: {0}\nevent:flow\nid:sse-test\nretry:3\n\n", JsonConvert.SerializeObject(new { count = count }))); response.OutputStream.Write(buffer, 0, buffer.Length); response.OutputStream.Flush(); count++; } catch (ObjectDisposedException ex) { Console.WriteLine($"ObjectDisposedException caught: {ex.Message}"); } }, null, 0, 100); } private void HandleSseWithInvalidRetryResponse(HttpListenerResponse response) { int count = 0; Timer timer = null; var cts = new CancellationTokenSource(); var token = cts.Token; timer = new Timer(_ => { if (token.IsCancellationRequested) { return; } if (count >= 5) { cts.Cancel(); timer.Dispose(); response.Close(); return; } try { byte[] buffer = Encoding.UTF8.GetBytes(string.Format("data: {0}\nevent:flow\nid:sse-test\nretry: abc\n\n", JsonConvert.SerializeObject(new { count = count }))); response.OutputStream.Write(buffer, 0, buffer.Length); response.OutputStream.Flush(); count++; } catch (ObjectDisposedException ex) { Console.WriteLine($"ObjectDisposedException caught: {ex.Message}"); } }, null, 0, 100); } private void HandleSseWithDataDividedResponse(HttpListenerResponse response) { int count = 0; Timer timer = null; var cts = new CancellationTokenSource(); var token = cts.Token; timer = new Timer(_ => { if (token.IsCancellationRequested) { return; } if (count >= 5) { cts.Cancel(); timer.Dispose(); response.Close(); return; } if (count == 1) { byte[] buffer = Encoding.UTF8.GetBytes("data:{\"count\":"); response.OutputStream.Write(buffer, 0, buffer.Length); response.OutputStream.Flush(); count++; return; } if (count == 2) { byte[] buffer = Encoding.UTF8.GetBytes(string.Format("{0},\"tag\":\"divided\"}}\nevent:flow\nid:sse-test\nretry:3\n\n", count++)); response.OutputStream.Write(buffer, 0, buffer.Length); response.OutputStream.Flush(); return; } try { byte[] buffer1 = Encoding.UTF8.GetBytes(string.Format("data: {0}\nevent:flow\nid:sse-test\nretry:3\n\n", JsonConvert.SerializeObject(new { count = count++ }))); response.OutputStream.Write(buffer1, 0, buffer1.Length); response.OutputStream.Flush(); } catch (ObjectDisposedException ex) { Console.WriteLine($"ObjectDisposedException caught: {ex.Message}"); } }, null, 0, 100); } public void Stop() { _cancellationTokenSource.Cancel(); _httpListener.Stop(); _httpListener.Close(); } public void Dispose() { Stop(); ((IDisposable)_httpListener)?.Dispose(); _cancellationTokenSource?.Dispose(); } } public class StreamUtilsTest : IAsyncLifetime { private SseServer server = new SseServer("http://localhost:8384/"); public async Task InitializeAsync() { server.Start(); await Task.Delay(1000); } public Task DisposeAsync() { server.Dispose(); #if NET45 return Task.FromResult(0); #else return Task.CompletedTask; #endif } [Fact] public void Test_ReadAsString() { using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes("test"))) { Assert.Equal("test", StreamUtils.ReadAsString(stream)); } } [Fact] public async void Test_ReadAsStringAsync() { using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes("test"))) { Assert.Equal("test", await StreamUtils.ReadAsStringAsync(stream)); } } [Fact] public void Test_ReadAsJSON() { string jsonStr = "{\"arrayObj\":[[{\"itemName\":\"item\",\"itemInt\":1},{\"itemName\":\"item2\",\"itemInt\":2}],[{\"itemName\":\"item3\",\"itemInt\":3}]],\"arrayList\":[[[1,2],[3,4]],[[5,6],[7]],[]],\"listStr\":[1,2,3],\"items\":[{\"total_size\":18,\"partNumber\":1,\"tags\":[{\"aa\":\"11\"}]},{\"total_size\":20,\"partNumber\":2,\"tags\":[{\"aa\":\"22\"}]}],\"next_marker\":\"\",\"test\":{\"total_size\":19,\"partNumber\":1,\"tags\":[{\"aa\":\"11\"}]}}"; byte[] array = Encoding.UTF8.GetBytes(jsonStr); using (MemoryStream stream = new MemoryStream(array)) { Dictionary<string, object> dic = (Dictionary<string, object>)StreamUtils.ReadAsJSON(stream); Assert.NotNull(dic); List<object> listResult = (List<object>)dic["items"]; Dictionary<string, object> item1 = (Dictionary<string, object>)listResult[0]; Assert.Equal(18L, item1["total_size"]); Assert.Empty((string)dic["next_marker"]); Assert.Equal(2, ((List<object>)dic["arrayObj"]).Count); } jsonStr = "[{\"itemName\":\"item\",\"itemInt\":1},{\"itemName\":\"item2\",\"itemInt\":2}]"; array = Encoding.UTF8.GetBytes(jsonStr); using (MemoryStream stream = new MemoryStream(array)) { List<object> listResult = (List<object>)StreamUtils.ReadAsJSON(stream); Assert.NotNull(listResult); Dictionary<string, object> item1 = (Dictionary<string, object>)listResult[0]; Assert.Equal("item", item1["itemName"]); Assert.Equal(1L, item1["itemInt"]); } } [Fact] public void Test_Read() { using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes("test"))) { Assert.NotNull(StreamUtils.Read(stream, 3)); Assert.Equal(3, StreamUtils.Read(stream, 3).Length); } } [Fact] public void Test_Pipe() { byte[] inputData = new byte[] {1, 2, 3, 4, 5}; using (MemoryStream readStream = new MemoryStream(inputData)) using (MemoryStream writeStream = new MemoryStream()) { StreamUtils.Pipe(readStream, writeStream); byte[] outputData = writeStream.ToArray(); Assert.Equal(inputData, outputData); } byte[] inputData1 = new byte[] { }; using (MemoryStream readStream1 = new MemoryStream(inputData1)) using (MemoryStream writeStream1 = new MemoryStream()) { StreamUtils.Pipe(readStream1, writeStream1); byte[] outputData1 = writeStream1.ToArray(); Assert.Empty(outputData1); } } [Fact] public void Test_StreamFor() { byte[] data = Encoding.UTF8.GetBytes("test"); using (MemoryStream stream = new MemoryStream(data)) { Stream copy = StreamUtils.StreamFor(stream); Assert.NotNull(copy); Assert.True(copy.CanRead); string str = new StreamReader(copy).ReadToEnd(); Assert.Equal("test", str); string data1 = "test1"; Stream copy1 = StreamUtils.StreamFor(data1); string str1 = new StreamReader(copy1).ReadToEnd(); Assert.Equal("test1", str1); int data2 = 111; Exception ex = Assert.Throws<Exception>(() => StreamUtils.StreamFor(data2)); Assert.Equal("data is not Stream or String", ex.Message); } } [Fact] public async void Test_ReadAsJSONAsync() { string jsonStr = "{\"arrayObj\":[[{\"itemName\":\"item\",\"itemInt\":1},{\"itemName\":\"item2\",\"itemInt\":2}],[{\"itemName\":\"item3\",\"itemInt\":3}]],\"arrayList\":[[[1,2],[3,4]],[[5,6],[7]],[]],\"listStr\":[1,2,3],\"items\":[{\"total_size\":18,\"partNumber\":1,\"tags\":[{\"aa\":\"11\"}]},{\"total_size\":20,\"partNumber\":2,\"tags\":[{\"aa\":\"22\"}]}],\"next_marker\":\"\",\"test\":{\"total_size\":19,\"partNumber\":1,\"tags\":[{\"aa\":\"11\"}]}}"; byte[] array = Encoding.UTF8.GetBytes(jsonStr); using (MemoryStream stream = new MemoryStream(array)) { Dictionary<string, object> dic = (Dictionary<string, object>)await StreamUtils.ReadAsJSONAsync(stream); Assert.NotNull(dic); List<object> listResult = (List<object>)dic["items"]; Dictionary<string, object> item1 = (Dictionary<string, object>)listResult[0]; Assert.Equal(18L, item1["total_size"]); Assert.Empty((string)dic["next_marker"]); Assert.Equal(2, ((List<object>)dic["arrayObj"]).Count); } jsonStr = "[{\"itemName\":\"item\",\"itemInt\":1},{\"itemName\":\"item2\",\"itemInt\":2}]"; array = Encoding.UTF8.GetBytes(jsonStr); using (MemoryStream stream = new MemoryStream(array)) { List<object> listResult = (List<object>)await StreamUtils.ReadAsJSONAsync(stream); Assert.NotNull(listResult); Dictionary<string, object> item1 = (Dictionary<string, object>)listResult[0]; Assert.Equal("item", item1["itemName"]); Assert.Equal(1L, item1["itemInt"]); } } [Fact] public void Test_ReadAsBytes() { using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes("test"))) { Assert.NotNull(StreamUtils.ReadAsBytes(stream)); } } [Fact] public async void Test_ReadAsBytesAsync() { using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes("test"))) { Assert.NotNull(await StreamUtils.ReadAsBytesAsync(stream)); } } // [Fact] // public async Task Test_ReadAsSSEAsync() // { // using (var client = new HttpClient()) // { // var response = await client.GetStreamAsync("http://localhost:8384/sse"); // // var events = new List<SSEEvent>(); // // await foreach (var sseEvent in StreamUtil.ReadAsSSEAsync(response)) // { // events.Add(sseEvent); // } // // for (int i = 0; i < 5; i++) // { // Assert.Equal(JsonConvert.SerializeObject(new { count = i }), events[i].Data); // Assert.Equal("sse-test", events[i].Id); // Assert.Equal("flow", events[i].Event); // Assert.Equal(3, events[i].Retry); // } // } // } // // [Fact] // public async Task Test_ReadAsSSEAsync_WithNoSpaces() // { // using (var client = new HttpClient()) // { // var response = await client.GetStreamAsync("http://localhost:8384/sse_with_no_spaces"); // // var events = new List<SSEEvent>(); // // await foreach (var sseEvent in StreamUtil.ReadAsSSEAsync(response)) // { // events.Add(sseEvent); // } // // Assert.Equal(5, events.Count); // // for (int i = 0; i < 5; i++) // { // Assert.Equal(JsonConvert.SerializeObject(new { count = i }), events[i].Data); // Assert.Equal("sse-test", events[i].Id); // Assert.Equal("flow", events[i].Event); // Assert.Equal(3, events[i].Retry); // } // } // } // // [Fact] // public async Task Test_ReadAsSSEAsync_WithInvalidRetry() // { // using (var client = new HttpClient()) // { // var response = await client.GetStreamAsync("http://localhost:8384/sse_invalid_retry"); // // var events = new List<SSEEvent>(); // // await foreach (var sseEvent in StreamUtil.ReadAsSSEAsync(response)) // { // events.Add(sseEvent); // } // // Assert.Equal(5, events.Count); // // for (int i = 0; i < 5; i++) // { // Assert.Equal(JsonConvert.SerializeObject(new { count = i }), events[i].Data); // Assert.Equal("sse-test", events[i].Id); // Assert.Equal("flow", events[i].Event); // Assert.Null(events[i].Retry); // } // } // } // // [Fact] // public async Task Test_ReadAsSSEAsync_WithDividedData() // { // using (var client = new HttpClient()) // { // var response = await client.GetStreamAsync("http://localhost:8384/sse_with_data_divided"); // // var events = new List<SSEEvent>(); // // await foreach (var sseEvent in StreamUtil.ReadAsSSEAsync(response)) // { // events.Add(sseEvent); // } // Assert.Equal(4, events.Count); // Assert.Equal(JsonConvert.SerializeObject(new { count = 0 }), events[0].Data); // Assert.Equal("sse-test", events[0].Id); // Assert.Equal("flow", events[0].Event); // Assert.Equal(3, events[0].Retry); // // Assert.Equal(JsonConvert.SerializeObject(new { count = 2, tag = "divided" }), events[1].Data); // Assert.Equal("sse-test", events[1].Id); // Assert.Equal("flow", events[1].Event); // Assert.Equal(3, events[1].Retry); // // Assert.Equal(JsonConvert.SerializeObject(new { count = 3 }), events[2].Data); // Assert.Equal("sse-test", events[2].Id); // Assert.Equal("flow", events[2].Event); // Assert.Equal(3, events[2].Retry); // // Assert.Equal(JsonConvert.SerializeObject(new { count = 4 }), events[3].Data); // Assert.Equal("sse-test", events[3].Id); // Assert.Equal("flow", events[3].Event); // Assert.Equal(3, events[3].Retry); // } // } [Fact] public void Test_ReadAsSSE() { using (var client = new HttpClient()) { var response = client.GetStreamAsync("http://localhost:8384/sse").Result; var events = new List<SSEEvent>(); foreach (var sseEvent in StreamUtils.ReadAsSSE(response)) { events.Add(sseEvent); } for (int i = 0; i < 5; i++) { Assert.Equal(JsonConvert.SerializeObject(new { count = i }), events[i].Data); Assert.Equal("sse-test", events[i].Id); Assert.Equal("flow", events[i].Event); Assert.Equal(3, events[i].Retry); } } } [Fact] public async Task Test_ReadAsSSE_WithNoSpaces() { using (var client = new HttpClient()) { var response = await client.GetStreamAsync("http://localhost:8384/sse_with_no_spaces"); var events = new List<SSEEvent>(); foreach (var sseEvent in StreamUtils.ReadAsSSE(response)) { events.Add(sseEvent); } Assert.Equal(5, events.Count); for (int i = 0; i < 5; i++) { Assert.Equal(JsonConvert.SerializeObject(new { count = i }), events[i].Data); Assert.Equal("sse-test", events[i].Id); Assert.Equal("flow", events[i].Event); Assert.Equal(3, events[i].Retry); } } } [Fact] public async Task Test_ReadAsSSE_WithInvalidRetry() { using (var client = new HttpClient()) { var response = await client.GetStreamAsync("http://localhost:8384/sse_invalid_retry"); var events = new List<SSEEvent>(); foreach (var sseEvent in StreamUtils.ReadAsSSE(response)) { events.Add(sseEvent); } Assert.Equal(5, events.Count); for (int i = 0; i < 5; i++) { Assert.Equal(JsonConvert.SerializeObject(new { count = i }), events[i].Data); Assert.Equal("sse-test", events[i].Id); Assert.Equal("flow", events[i].Event); Assert.Null(events[i].Retry); } } } [Fact] public async Task Test_ReadAsSSE_WithDividedData() { using (var client = new HttpClient()) { var response = await client.GetStreamAsync("http://localhost:8384/sse_with_data_divided"); var events = new List<SSEEvent>(); foreach (var sseEvent in StreamUtils.ReadAsSSE(response)) { events.Add(sseEvent); } Assert.Equal(4, events.Count); Assert.Equal(JsonConvert.SerializeObject(new { count = 0 }), events[0].Data); Assert.Equal("sse-test", events[0].Id); Assert.Equal("flow", events[0].Event); Assert.Equal(3, events[0].Retry); Assert.Equal(JsonConvert.SerializeObject(new { count = 2, tag = "divided" }), events[1].Data); Assert.Equal("sse-test", events[1].Id); Assert.Equal("flow", events[1].Event); Assert.Equal(3, events[1].Retry); Assert.Equal(JsonConvert.SerializeObject(new { count = 3 }), events[2].Data); Assert.Equal("sse-test", events[2].Id); Assert.Equal("flow", events[2].Event); Assert.Equal(3, events[2].Retry); Assert.Equal(JsonConvert.SerializeObject(new { count = 4 }), events[3].Data); Assert.Equal("sse-test", events[3].Id); Assert.Equal("flow", events[3].Event); Assert.Equal(3, events[3].Retry); } } } }