src/Custom/RealtimeConversation/RealtimeConversationSession.cs (292 lines of code) (raw):
using System;
using System.Buffers;
using System.ClientModel;
using System.ClientModel.Primitives;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net.WebSockets;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace OpenAI.RealtimeConversation;
[Experimental("OPENAI002")]
public partial class RealtimeConversationSession : IDisposable
{
public WebSocket WebSocket { get; protected set; }
private readonly RealtimeConversationClient _parentClient;
private readonly Uri _endpoint;
private readonly ApiKeyCredential _credential;
private readonly SemaphoreSlim _audioSendSemaphore = new(1, 1);
private bool _isSendingAudioStream = false;
internal bool ShouldBufferTurnResponseData { get; set; }
protected internal RealtimeConversationSession(
RealtimeConversationClient parentClient,
Uri endpoint,
ApiKeyCredential credential)
{
Argument.AssertNotNull(endpoint, nameof(endpoint));
Argument.AssertNotNull(credential, nameof(credential));
_parentClient = parentClient;
_endpoint = endpoint;
_credential = credential;
}
/// <summary>
/// Transmits audio data from a stream, ending the client turn once the stream is complete.
/// </summary>
/// <param name="audio"> The audio stream to transmit. </param>
/// <param name="cancellationToken"> An optional cancellation token. </param>
/// <exception cref="InvalidOperationException"></exception>
public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(audio, nameof(audio));
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
{
if (_isSendingAudioStream)
{
throw new InvalidOperationException($"Only one stream of audio may be sent at once.");
}
_isSendingAudioStream = true;
}
try
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 16);
while (true)
{
int bytesRead = await audio.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
break;
}
ReadOnlyMemory<byte> audioMemory = buffer.AsMemory(0, bytesRead);
BinaryData audioData = BinaryData.FromBytes(audioMemory);
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audioData);
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
await SendCommandAsync(requestData, cancellationToken.ToRequestOptions()).ConfigureAwait(false);
}
}
finally
{
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
{
_isSendingAudioStream = false;
}
}
}
public virtual void SendInputAudio(Stream audio, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(audio, nameof(audio));
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
{
if (_isSendingAudioStream)
{
throw new InvalidOperationException($"Only one stream of audio may be sent at once.");
}
_isSendingAudioStream = true;
}
try
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 * 16);
while (true)
{
int bytesRead = audio.Read(buffer, 0, buffer.Length);
if (bytesRead == 0)
{
break;
}
ReadOnlyMemory<byte> audioMemory = buffer.AsMemory(0, bytesRead);
BinaryData audioData = BinaryData.FromBytes(audioMemory);
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audioData);
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
SendCommand(requestData, cancellationToken.ToRequestOptions());
}
}
finally
{
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
{
_isSendingAudioStream = false;
}
}
}
/// <summary>
/// Transmits a single chunk of audio.
/// </summary>
/// <param name="audio"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public virtual async Task SendInputAudioAsync(BinaryData audio, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(audio, nameof(audio));
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
{
if (_isSendingAudioStream)
{
throw new InvalidOperationException($"Cannot send a standalone audio chunk while a stream is already in progress.");
}
// TODO: consider automatically limiting/breaking size of chunk (as with streaming)
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio);
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
await SendCommandAsync(requestData, cancellationToken.ToRequestOptions()).ConfigureAwait(false);
}
}
/// <summary>
/// Transmits a single chunk of audio.
/// </summary>
/// <param name="audio"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public virtual void SendInputAudio(BinaryData audio, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(audio, nameof(audio));
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
{
if (_isSendingAudioStream)
{
throw new InvalidOperationException($"Cannot send a standalone audio chunk while a stream is already in progress.");
}
// TODO: consider automatically limiting/breaking size of chunk (as with streaming)
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio);
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
SendCommand(requestData, cancellationToken.ToRequestOptions());
}
}
public virtual async Task ClearInputAudioAsync(CancellationToken cancellationToken = default)
{
InternalRealtimeClientEventInputAudioBufferClear internalCommand = new();
await SendCommandAsync(internalCommand, cancellationToken).ConfigureAwait(false);
}
public virtual void ClearInputAudio(CancellationToken cancellationToken = default)
{
InternalRealtimeClientEventInputAudioBufferClear internalCommand = new();
SendCommand(internalCommand, cancellationToken);
}
public virtual async Task ConfigureSessionAsync(ConversationSessionOptions sessionOptions, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(sessionOptions, nameof(sessionOptions));
InternalRealtimeClientEventSessionUpdate internalCommand = new(sessionOptions);
await SendCommandAsync(internalCommand, cancellationToken).ConfigureAwait(false);
}
public virtual void ConfigureSession(ConversationSessionOptions sessionOptions, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(sessionOptions, nameof(sessionOptions));
InternalRealtimeClientEventSessionUpdate internalCommand = new(sessionOptions);
SendCommand(internalCommand, cancellationToken);
}
public virtual async Task AddItemAsync(ConversationItem item, CancellationToken cancellationToken = default)
=> await AddItemAsync(item, previousItemId: null, cancellationToken).ConfigureAwait(false);
public virtual void AddItem(ConversationItem item, CancellationToken cancellationToken = default)
=> AddItem(item, previousItemId: null, cancellationToken);
public virtual async Task AddItemAsync(ConversationItem item, string previousItemId, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(item, nameof(item));
InternalRealtimeClientEventConversationItemCreate internalCommand = new(item)
{
PreviousItemId = previousItemId,
};
await SendCommandAsync(internalCommand, cancellationToken).ConfigureAwait(false);
}
public virtual void AddItem(ConversationItem item, string previousItemId, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(item, nameof(item));
InternalRealtimeClientEventConversationItemCreate internalCommand = new(item)
{
PreviousItemId = previousItemId,
};
SendCommand(internalCommand, cancellationToken);
}
public virtual async Task DeleteItemAsync(string itemId, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(itemId, nameof(itemId));
InternalRealtimeClientEventConversationItemDelete internalCommand = new(itemId);
await SendCommandAsync(internalCommand, cancellationToken).ConfigureAwait(false);
}
public virtual void DeleteItem(string itemId, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(itemId, nameof(itemId));
InternalRealtimeClientEventConversationItemDelete internalCommand = new(itemId);
SendCommand(internalCommand, cancellationToken);
}
public virtual async Task TruncateItemAsync(string itemId, int contentPartIndex, TimeSpan audioDuration, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(itemId, nameof(itemId));
InternalRealtimeClientEventConversationItemTruncate internalCommand = new(
itemId: itemId,
contentIndex: contentPartIndex,
audioEndMs: (int)audioDuration.TotalMilliseconds);
await SendCommandAsync(internalCommand, cancellationToken).ConfigureAwait(false);
}
public virtual void TruncateItem(string itemId, int contentPartIndex, TimeSpan audioDuration, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(itemId, nameof(itemId));
InternalRealtimeClientEventConversationItemTruncate internalCommand = new(
itemId: itemId,
contentIndex: contentPartIndex,
audioEndMs: (int)audioDuration.TotalMilliseconds);
SendCommand(internalCommand, cancellationToken);
}
public virtual async Task CommitPendingAudioAsync(CancellationToken cancellationToken = default)
{
InternalRealtimeClientEventInputAudioBufferCommit internalCommand = new();
await SendCommandAsync(internalCommand, cancellationToken).ConfigureAwait(false);
}
public virtual void CommitPendingAudio(CancellationToken cancellationToken = default)
{
InternalRealtimeClientEventInputAudioBufferCommit internalCommand = new();
SendCommand(internalCommand, cancellationToken);
}
public virtual async Task InterruptResponseAsync(CancellationToken cancellationToken = default)
{
InternalRealtimeClientEventResponseCancel internalCommand = new();
await SendCommandAsync(internalCommand, cancellationToken).ConfigureAwait(false);
}
public virtual void InterruptResponse(CancellationToken cancellationToken = default)
{
InternalRealtimeClientEventResponseCancel internalCommand = new();
SendCommand(internalCommand, cancellationToken);
}
public virtual async Task StartResponseAsync(ConversationResponseOptions options, CancellationToken cancellationToken = default)
{
InternalRealtimeClientEventResponseCreate internalCommand = new(
kind: InternalRealtimeClientEventType.ResponseCreate,
eventId: null,
additionalBinaryDataProperties: null,
response: options);
await SendCommandAsync(internalCommand, cancellationToken).ConfigureAwait(false);
}
public virtual async Task StartResponseAsync(CancellationToken cancellationToken = default)
{
await StartResponseAsync(new ConversationResponseOptions(), cancellationToken).ConfigureAwait(false);
}
public virtual void StartResponse(ConversationResponseOptions options, CancellationToken cancellationToken = default)
{
InternalRealtimeClientEventResponseCreate internalCommand = new(
kind: InternalRealtimeClientEventType.ResponseCreate,
eventId: null,
additionalBinaryDataProperties: null,
response: options);
SendCommand(internalCommand, cancellationToken);
}
public void StartResponse(CancellationToken cancellationToken = default)
{
StartResponse(new ConversationResponseOptions(), cancellationToken);
}
public virtual async Task CancelResponseAsync(CancellationToken cancellationToken = default)
{
InternalRealtimeClientEventResponseCancel internalCommand = new();
await SendCommandAsync(internalCommand, cancellationToken).ConfigureAwait(false);
}
public virtual void CancelResponse(CancellationToken cancellationToken = default)
{
InternalRealtimeClientEventResponseCancel internalCommand = new();
SendCommand(internalCommand, cancellationToken);
}
public virtual async IAsyncEnumerable<ConversationUpdate> ReceiveUpdatesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await foreach (ClientResult protocolEvent in ReceiveUpdatesAsync(cancellationToken.ToRequestOptions()))
{
ConversationUpdate nextUpdate = (ConversationUpdate)protocolEvent;
yield return nextUpdate;
}
}
public virtual IEnumerable<ConversationUpdate> ReceiveUpdates(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
internal virtual async Task SendCommandAsync(InternalRealtimeClientEvent command, CancellationToken cancellationToken = default)
{
BinaryData requestData = ModelReaderWriter.Write(command);
RequestOptions cancellationOptions = cancellationToken.ToRequestOptions();
await SendCommandAsync(requestData, cancellationOptions).ConfigureAwait(false);
}
internal virtual void SendCommand(InternalRealtimeClientEvent command, CancellationToken cancellationToken = default)
{
BinaryData requestData = ModelReaderWriter.Write(command);
RequestOptions cancellationOptions = cancellationToken.ToRequestOptions();
SendCommand(requestData, cancellationOptions);
}
public void Dispose()
{
WebSocket?.Dispose();
}
}