processing-pipelines/common/csharp/CloudEventReader.cs (103 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System.Threading.Tasks;
using CloudNative.CloudEvents;
using CloudNative.CloudEvents.AspNetCore;
using CloudNative.CloudEvents.NewtonsoftJson;
using Google.Events.Protobuf.Cloud.Audit.V1;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Google.Events.Protobuf.Cloud.Scheduler.V1;
using Google.Events.Protobuf.Cloud.Storage.V1;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
namespace Common
{
public class CloudEventReader
{
private readonly ILogger _logger;
public CloudEventReader(ILogger logger)
{
_logger = logger;
}
public async Task<(string, string)> ReadCloudStorageData(HttpContext context)
{
_logger.LogInformation("Reading cloud storage data");
string bucket = null, name = null;
CloudEvent cloudEvent;
CloudEventFormatter formatter;
var ceType = context.Request.Headers["ce-type"];
switch (ceType)
{
case LogEntryData.WrittenCloudEventType:
//"protoPayload" : {"resourceName":"projects/_/buckets/events-atamel-images-input/objects/atamel.jpg}";
formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(LogEntryData));
cloudEvent = await context.Request.ToCloudEventAsync(formatter);
_logger.LogInformation($"Received CloudEvent\n{cloudEvent.GetLog()}");
var logEntryData = (LogEntryData)cloudEvent.Data;
var tokens = logEntryData.ProtoPayload.ResourceName.Split('/');
bucket = tokens[3];
name = tokens[5];
break;
case StorageObjectData.FinalizedCloudEventType:
formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(StorageObjectData));
cloudEvent = await context.Request.ToCloudEventAsync(formatter);
_logger.LogInformation($"Received CloudEvent\n{cloudEvent.GetLog()}");
var storageObjectData = (StorageObjectData)cloudEvent.Data;
bucket = storageObjectData.Bucket;
name = storageObjectData.Name;
break;
case MessagePublishedData.MessagePublishedCloudEventType:
// {"message": {
// "data": "eyJidWNrZXQiOiJldmVudHMtYXRhbWVsLWltYWdlcy1pbnB1dCIsIm5hbWUiOiJiZWFjaC5qcGcifQ==",
// },"subscription": "projects/events-atamel/subscriptions/cre-europe-west1-trigger-resizer-sub-000"}
formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(MessagePublishedData));
cloudEvent = await context.Request.ToCloudEventAsync(formatter);
_logger.LogInformation($"Received CloudEvent\n{cloudEvent.GetLog()}");
var messagePublishedData = (MessagePublishedData)cloudEvent.Data;
var pubSubMessage = messagePublishedData.Message;
_logger.LogInformation($"Type: {ceType} data: {pubSubMessage.Data.ToBase64()}");
var decoded = pubSubMessage.Data.ToStringUtf8();
_logger.LogInformation($"decoded: {decoded}");
var parsed = JValue.Parse(decoded);
bucket = (string)parsed["bucket"];
name = (string)parsed["name"];
break;
default:
// Data:
// {"bucket":"knative-atamel-images-input","name":"beach.jpg"}
formatter = new JsonEventFormatter();
cloudEvent = await context.Request.ToCloudEventAsync(formatter);
_logger.LogInformation($"Received CloudEvent\n{cloudEvent.GetLog()}");
dynamic data = cloudEvent.Data;
bucket = data.bucket;
name = data.name;
break;
}
_logger.LogInformation($"Extracted bucket: {bucket} and name: {name}");
return (bucket, name);
}
public async Task<string> ReadCloudSchedulerData(HttpContext context)
{
_logger.LogInformation("Reading cloud scheduler data");
string country = null;
CloudEvent cloudEvent;
CloudEventFormatter formatter;
var ceType = context.Request.Headers["ce-type"];
switch (ceType)
{
case MessagePublishedData.MessagePublishedCloudEventType:
formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(MessagePublishedData));
cloudEvent = await context.Request.ToCloudEventAsync(formatter);
_logger.LogInformation($"Received CloudEvent\n{cloudEvent.GetLog()}");
var messagePublishedData = (MessagePublishedData)cloudEvent.Data;
var pubSubMessage = messagePublishedData.Message;
_logger.LogInformation($"Type: {ceType} data: {pubSubMessage.Data.ToBase64()}");
country = pubSubMessage.Data.ToStringUtf8();
break;
case SchedulerJobData.ExecutedCloudEventType:
// Data: {"custom_data":"Q3lwcnVz"}
formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(SchedulerJobData));
cloudEvent = await context.Request.ToCloudEventAsync(formatter);
_logger.LogInformation($"Received CloudEvent\n{cloudEvent.GetLog()}");
var schedulerJobData = (SchedulerJobData)cloudEvent.Data;
_logger.LogInformation($"Type: {ceType} data: {schedulerJobData.CustomData.ToBase64()}");
country = schedulerJobData.CustomData.ToStringUtf8();
break;
}
_logger.LogInformation($"Extracted country: {country}");
return country;
}
}
}