Darabonba/Utils/StreamUtils.cs (224 lines of code) (raw):
using System;
using System.IO;
using System.Text;
using System.Collections.Generic;
using Newtonsoft.Json;
using System.Threading.Tasks;
using Darabonba.Models;
namespace Darabonba.Utils
{
public class StreamUtils
{
private const string DATA_PREFIX = "data:";
private const string EVENT_PREFIX = "event:";
private const string ID_PREFIX = "id:";
private const string RETRY_PREFIX = "retry:";
public static Stream BytesReadable(string str)
{
return BytesReadable(Encoding.UTF8.GetBytes(str));
}
public static Stream BytesReadable(byte[] bytes)
{
MemoryStream stream = new MemoryStream();
stream.Write(bytes, 0, bytes.Length);
stream.Seek(0, SeekOrigin.Begin);
return stream;
}
public static string ToString(byte[] val)
{
return Encoding.UTF8.GetString(val);
}
public static object ParseJSON(string val)
{
return JsonConvert.DeserializeObject(val);
}
public static byte[] Read(Stream stream, int length)
{
byte[] data = new byte[length];
stream.Read(data, 0, length);
return data;
}
public static void Pipe(Stream readStream, Stream writeStream)
{
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = readStream.Read(buffer, 0, buffer.Length)) > 0)
{
writeStream.Write(buffer, 0, bytesRead);
}
}
public static Stream StreamFor(object data)
{
if (data is Stream)
{
Stream stream = data as Stream;
if (stream.CanRead)
{
Stream copy = new MemoryStream();
stream.Position = 0;
stream.CopyTo(copy);
copy.Position = 0;
return copy;
}
throw new Exception("stream is not readable");
}
if (data is string)
{
string str = data as string;
Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(str));
return stream;
}
throw new Exception("data is not Stream or String");
}
public static byte[] ReadAsBytes(Stream stream)
{
int bufferLength = 4096;
using (var ms = new MemoryStream())
{
var buffer = new byte[bufferLength];
while (true)
{
var length = stream.Read(buffer, 0, bufferLength);
if (length == 0)
{
break;
}
ms.Write(buffer, 0, length);
}
ms.Seek(0, SeekOrigin.Begin);
var bytes = new byte[ms.Length];
ms.Read(bytes, 0, bytes.Length);
stream.Close();
stream.Dispose();
return bytes;
}
}
public async static Task<byte[]> ReadAsBytesAsync(Stream stream)
{
int bufferLength = 4096;
using (var ms = new MemoryStream())
{
var buffer = new byte[bufferLength];
while (true)
{
var length = await stream.ReadAsync(buffer, 0, bufferLength);
if (length == 0)
{
break;
}
await ms.WriteAsync(buffer, 0, length);
}
ms.Seek(0, SeekOrigin.Begin);
var bytes = new byte[ms.Length];
await ms.ReadAsync(bytes, 0, bytes.Length);
stream.Close();
stream.Dispose();
return bytes;
}
}
public static string ReadAsString(Stream stream)
{
return ToString(ReadAsBytes(stream));
}
public static async Task<string> ReadAsStringAsync(Stream stream)
{
return ToString(await ReadAsBytesAsync(stream));
}
public static object ReadAsJSON(Stream stream)
{
object jResult = ParseJSON(ReadAsString(stream));
object result = JSONUtils.Deserialize(jResult);
return result;
}
public async static Task<object> ReadAsJSONAsync(Stream stream)
{
object jResult = ParseJSON(await ReadAsStringAsync(stream));
object result = JSONUtils.Deserialize(jResult);
return result;
}
public class EventResult
{
public List<SSEEvent> Events { get; set; }
public string Remain { get; set; }
public EventResult(List<SSEEvent> events, string remain)
{
Events = events;
Remain = remain;
}
}
private static EventResult TryGetEvents(string head, string chunk)
{
string all = head + chunk;
var events = new List<SSEEvent>();
var start = 0;
for (var i = 0; i < all.Length - 1; i++)
{
// message separated by \n\n
if (all[i] == '\n' && i + 1 < all.Length && all[i + 1] == '\n')
{
var rawEvent = all.Substring(start, i - start).Trim();
var sseEvent = ParseEvent(rawEvent);
events.Add(sseEvent);
start = i + 2;
i++;
}
}
string remain = all.Substring(start);
return new EventResult(events, remain);
}
private static SSEEvent ParseEvent(string rawEvent)
{
var sseEvent = new SSEEvent();
var lines = rawEvent.Split('\n');
foreach (var line in lines)
{
if (line.StartsWith(DATA_PREFIX))
{
sseEvent.Data = line.Substring(DATA_PREFIX.Length).Trim();
}
else if (line.StartsWith(EVENT_PREFIX))
{
sseEvent.Event = line.Substring(EVENT_PREFIX.Length).Trim();
}
else if (line.StartsWith(ID_PREFIX))
{
sseEvent.Id = line.Substring(ID_PREFIX.Length).Trim();
}
else if (line.StartsWith(RETRY_PREFIX))
{
var retryData = line.Substring(RETRY_PREFIX.Length).Trim();
int retryValue;
if (int.TryParse(retryData, out retryValue))
{
sseEvent.Retry = retryValue;
}
}
else if (line.StartsWith(":"))
{
// ignore the line
}
}
return sseEvent;
}
public static IEnumerable<SSEEvent> ReadAsSSE(Stream stream)
{
using (var reader = new StreamReader(stream))
{
var buffer = new char[4096];
var rest = string.Empty;
int count;
while ((count = reader.Read(buffer, 0, buffer.Length)) > 0)
{
var chunk = new string(buffer, 0, count);
var eventResult = TryGetEvents(rest, chunk);
rest = eventResult.Remain;
if (eventResult.Events != null && eventResult.Events.Count > 0)
{
foreach (var @event in eventResult.Events)
{
yield return @event;
}
}
}
}
}
}
}