DeviceBridge/Services/SubscriptionCallbackFactory.cs (172 lines of code) (raw):

// Copyright (c) Microsoft Corporation. All rights reserved. using System; using System.IO; using System.Net.Http; using System.Text; using System.Text.Json; using System.Threading.Tasks; using DeviceBridge.Models; using Microsoft.Azure.Devices.Client; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using NLog; namespace DeviceBridge.Services { /// <summary> /// This module contains the logic to build custom device client callbacks for subscriptions. /// The callbacks convert C2D/connection events into HTTP notifications. /// </summary> public class SubscriptionCallbackFactory : ISubscriptionCallbackFactory { private readonly Logger _logger; private readonly IHttpClientFactory _httpClientFactory; public SubscriptionCallbackFactory(Logger logger, IHttpClientFactory httpClientFactory) { _logger = logger; _httpClientFactory = httpClientFactory; } public DesiredPropertyUpdateCallback GetDesiredPropertyUpdateCallback(string deviceId, DeviceSubscription desiredPropertySubscription) { return async (desiredPopertyUpdate, _) => { _logger.Info("Got desired property update for device {deviceId}. Callback URL: {callbackUrl}. Payload: {desiredPopertyUpdate}", deviceId, desiredPropertySubscription.CallbackUrl, desiredPopertyUpdate.ToJson()); try { var body = new DesiredPropertyUpdateEventBody() { DeviceId = deviceId, DeviceReceivedAt = DateTime.UtcNow, DesiredProperties = new JRaw(desiredPopertyUpdate.ToJson()), }; var payload = new StringContent(JsonConvert.SerializeObject(body), Encoding.UTF8, "application/json"); using var httpResponse = await _httpClientFactory.CreateClient("RetryClient").PostAsync(desiredPropertySubscription.CallbackUrl, payload); httpResponse.EnsureSuccessStatusCode(); _logger.Info("Successfully executed desired property update callback for device {deviceId}. Callback status code {statusCode}", deviceId, httpResponse.StatusCode); } catch (Exception e) { _logger.Error(e, "Failed to execute desired property update callback for device {deviceId}", deviceId); } }; } public Func<Message, Task<ReceiveMessageCallbackStatus>> GetReceiveC2DMessageCallback(string deviceId, DeviceSubscription messageSubscription) { _logger.Info("Creating C2D callback {deviceId}. Callback URL {callbackUrl}", deviceId, messageSubscription.CallbackUrl); return async (receivedMessage) => { try { using StreamReader reader = new StreamReader(receivedMessage.BodyStream); var messageBody = reader.ReadToEnd(); _logger.Info("Got C2D message for device {deviceId}. Callback URL {callbackUrl}. Payload: {payload}", deviceId, messageSubscription.CallbackUrl, messageBody); var body = new C2DMessageInvocationEventBody() { DeviceId = deviceId, DeviceReceivedAt = DateTime.UtcNow, MessageBody = new JRaw(messageBody), Properties = receivedMessage.Properties, MessageId = receivedMessage.MessageId, ExpiryTimeUTC = receivedMessage.ExpiryTimeUtc, }; // Send request to callback URL var requestPayload = new StringContent(JsonConvert.SerializeObject(body), Encoding.UTF8, "application/json"); using var httpResponse = await _httpClientFactory.CreateClient("RetryClient").PostAsync(messageSubscription.CallbackUrl, requestPayload); var statusCode = (int)httpResponse.StatusCode; if (statusCode >= 200 && statusCode < 300) { _logger.Info("Received C2D message callback with status {statusCode}, request accepted.", statusCode); return ReceiveMessageCallbackStatus.Accept; } if (statusCode >= 400 && statusCode < 500) { _logger.Info("Received C2D message callback with status {statusCode}, request rejected.", statusCode); return ReceiveMessageCallbackStatus.Reject; } _logger.Info("Received C2D message callback with status {statusCode}, request abandoned.", statusCode); return ReceiveMessageCallbackStatus.Abandon; } catch (Exception e) { _logger.Error(e, "Failed to execute message callback, device {deviceId}. Request abandoned.", deviceId); return ReceiveMessageCallbackStatus.Abandon; } }; } public MethodCallback GetMethodCallback(string deviceId, DeviceSubscription methodSubscription) { return async (methodRequest, _) => { _logger.Info("Got method request for device {deviceId}. Callback URL {callbackUrl}. Method: {methodName}. Payload: {payload}", deviceId, methodSubscription.CallbackUrl, methodRequest.Name, methodRequest.DataAsJson); try { var body = new MethodInvocationEventBody() { DeviceId = deviceId, DeviceReceivedAt = DateTime.UtcNow, MethodName = methodRequest.Name, RequestData = new JRaw(methodRequest.DataAsJson), }; // Send request to callback URL var requestPayload = new StringContent(JsonConvert.SerializeObject(body), Encoding.UTF8, "application/json"); using var httpResponse = await _httpClientFactory.CreateClient("RetryClient").PostAsync(methodSubscription.CallbackUrl, requestPayload); httpResponse.EnsureSuccessStatusCode(); // Read method response from callback response using var responseStream = await httpResponse.Content.ReadAsStreamAsync(); MethodResponseBody responseBody = null; try { responseBody = await System.Text.Json.JsonSerializer.DeserializeAsync<MethodResponseBody>(responseStream, new JsonSerializerOptions { PropertyNameCaseInsensitive = true, }); } catch (System.Text.Json.JsonException e) { _logger.Error(e, "Received malformed JSON response when executing method callback for device {deviceId}", deviceId); } MethodResponse methodResponse; string serializedResponsePayload = null; int status = 200; // If we got a custom response, return the custom payload and status. If not, just respond with a 200. if (responseBody != null && responseBody.Status != null) { status = responseBody.Status.Value; } if (responseBody != null && responseBody.Payload != null) { serializedResponsePayload = System.Text.Json.JsonSerializer.Serialize(responseBody.Payload); methodResponse = new MethodResponse(Encoding.UTF8.GetBytes(serializedResponsePayload), status); } else { methodResponse = new MethodResponse(status); } _logger.Info("Successfully executed method callback for device {deviceId}. Response status: {responseStatus}. Response payload: {responsePayload}", deviceId, status, serializedResponsePayload); return methodResponse; } catch (Exception e) { _logger.Error(e, "Failed to execute method callback for device {deviceId}", deviceId); return new MethodResponse(500); } }; } public Func<ConnectionStatus, ConnectionStatusChangeReason, Task> GetConnectionStatusChangeCallback(string deviceId, DeviceSubscription connectionStatusSubscription) { return async (status, reason) => { _logger.Info("Got connection status change for device {deviceId}. Callback URL: {callbackUrl}. Status: {status}. Reason: {reason}", deviceId, connectionStatusSubscription.CallbackUrl, status, reason); try { var body = new ConnectionStatusChangeEventBody() { DeviceId = deviceId, DeviceReceivedAt = DateTime.UtcNow, Status = status.ToString(), Reason = reason.ToString(), }; var payload = new StringContent(JsonConvert.SerializeObject(body), Encoding.UTF8, "application/json"); using var httpResponse = await _httpClientFactory.CreateClient("RetryClient").PostAsync(connectionStatusSubscription.CallbackUrl, payload); httpResponse.EnsureSuccessStatusCode(); _logger.Info("Successfully executed connection status change callback for device {deviceId}. Callback status code {statusCode}", deviceId, httpResponse.StatusCode); } catch (Exception e) { _logger.Error(e, "Failed to execute connection status change callback for device {deviceId}", deviceId); } }; } } }