src/task/SendAndReceiveDataMessagesTask.ts (88 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import AudioVideoControllerState from '../audiovideocontroller/AudioVideoControllerState'; import DataMessage from '../datamessage/DataMessage'; import RemovableObserver from '../removableobserver/RemovableObserver'; import SignalingClientEvent from '../signalingclient/SignalingClientEvent'; import SignalingClientEventType from '../signalingclient/SignalingClientEventType'; import SignalingClientObserver from '../signalingclientobserver/SignalingClientObserver'; import { SdkDataMessageFrame, SdkDataMessagePayload, SdkSignalFrame, } from '../signalingprotocol/SignalingProtocol.js'; import BaseTask from './BaseTask'; export default class SendAndReceiveDataMessagesTask extends BaseTask implements RemovableObserver, SignalingClientObserver { protected taskName = 'SendAndReceiveDataMessagesTask'; private static TOPIC_REGEX = new RegExp(/^[a-zA-Z0-9_-]{1,36}$/); private static DATA_SIZE = 2048; constructor(private context: AudioVideoControllerState) { super(context.logger); } async run(): Promise<void> { this.context.removableObservers.push(this); this.context.signalingClient.registerObserver(this); this.context.realtimeController.realtimeSubscribeToSendDataMessage(this.sendDataMessageHandler); } removeObserver(): void { this.context.realtimeController.realtimeUnsubscribeFromSendDataMessage( this.sendDataMessageHandler ); this.context.signalingClient.removeObserver(this); } handleSignalingClientEvent(event: SignalingClientEvent): void { if ( event.type === SignalingClientEventType.ReceivedSignalFrame && event.message.type === SdkSignalFrame.Type.DATA_MESSAGE ) { for (const message of event.message.dataMessage.messages) { const dataMessage = new DataMessage( (message.ingestTimeNs as number) / 1000000, message.topic, message.data, message.senderAttendeeId, message.senderExternalUserId, (message.ingestTimeNs as number) === 0 ); this.context.realtimeController.realtimeReceiveDataMessage(dataMessage); } } } sendDataMessageHandler = ( topic: string, data: Uint8Array | string | any, // eslint-disable-line @typescript-eslint/no-explicit-any lifetimeMs?: number ): void => { if (this.context.signalingClient.ready()) { let uint8Data; if (data instanceof Uint8Array) { uint8Data = data; } else if (typeof data === 'string') { uint8Data = new TextEncoder().encode(data); } else { uint8Data = new TextEncoder().encode(JSON.stringify(data)); } this.validateDataMessage(topic, uint8Data, lifetimeMs); const message = SdkDataMessagePayload.create(); message.topic = topic; message.lifetimeMs = lifetimeMs; message.data = uint8Data; const messageFrame = SdkDataMessageFrame.create(); messageFrame.messages = [message]; this.context.signalingClient.sendDataMessage(messageFrame); } else { this.context.logger.error('Cannot send data message because signaling client is not ready'); } }; private validateDataMessage(topic: string, data: Uint8Array, lifetimeMs?: number): void { if (!SendAndReceiveDataMessagesTask.TOPIC_REGEX.test(topic)) { throw new Error('Invalid topic'); } if (data.length > SendAndReceiveDataMessagesTask.DATA_SIZE) { throw new Error('Data size has to be less than 2048 bytes'); } if (lifetimeMs && lifetimeMs < 0) { throw new Error('The life time of the message has to be non negative'); } } }