in AzureDataLake/DecisionServiceExtractor/CbParser.cs [57:205]
public IRow ParseEvent(IUpdatableRow output, Stream input)
{
TextReader inputReader;
output.Set(this.SkipLearnColumn, false);
if (!this.DataColumn.IsRequired)
inputReader = new StreamReader(input, Encoding.UTF8);
else
{
string data = new StreamReader(input, Encoding.UTF8).ReadToEnd();
inputReader = new StringReader(data);
output.Set(this.DataColumn, data);
}
if (this.hasJsonObject)
{
var jsonReader = new JsonTextReader(inputReader);
jsonReader.DateFormatString = "o";
var jobj = JObject.ReadFrom(jsonReader);
var chosenActionIndex = (int)jobj.Value<long>("_labelIndex");
// iterate through all expressions
foreach (var fe in this.expressions)
{
if (fe.Idx >= 0)
{
// special support for chosen action
var expr = fe.JsonPath.Replace("$.c._multi[($._labelIndex)]", $"$.c._multi[{chosenActionIndex}]");
try
{
string value = jobj.SelectToken(expr)?.Value<string>();
if (value != null)
output.Set(fe.Idx, value);
}
catch (Exception e)
{
output.Set(fe.Idx, e.Message);
}
}
}
// since we already parse it, no need to parse twice
output.Set(this.EventIdColumn, jobj.Value<string>("EventId"));
output.Set(this.TimestampColumn, jobj.Value<DateTime>("Timestamp"));
output.Set(this.EnqueuedTimeUtcColumn, jobj.Value<DateTime>("EnqueuedTimeUtc"));
if (this.SkipLearnColumn.IsRequired)
{
var optional = jobj.Value<bool?>("_skipLearn"); ;
output.Set(this.SkipLearnColumn.Idx, (bool)(optional.HasValue ? optional : false));
}
output.Set(this.CostColumn, (float)jobj.Value<double>("_label_cost"));
output.Set(this.ProbabilityColumn, (float)jobj.Value<double>("_label_probability"));
output.Set(this.ActionColumn, (int)jobj.Value<long>("_label_Action"));
if (this.PdropColumn.IsRequired)
{
var optional = jobj.Value<double?>("pdrop");
output.Set(this.PdropColumn.Idx, (float)(optional.HasValue ? optional.Value : 0.0f));
}
if (this.NumActionsColumn.IsRequired)
{
if (jobj["a"] is JArray arr)
output.Set(this.NumActionsColumn.Idx, arr.Count);
}
if (this.HasObservationsColumn.IsRequired)
{
if (jobj["o"] is JArray arr)
output.Set(this.HasObservationsColumn.Idx, 1);
}
// return early
return output.AsReadOnly();
}
// TODO: skip the dangling events
bool foundLabelCost = false;
// this is a optimized version only parsing parts of the data
using (var jsonReader = new JsonTextReader(inputReader))
{
jsonReader.DateFormatString = "o";
while (jsonReader.Read())
{
switch (jsonReader.TokenType)
{
case JsonToken.PropertyName:
{
var propertyName = (string)jsonReader.Value;
// "_label_cost":0,"_label_probability":0.200000003,"_label_Action":4,"_labelIndex":3,"Timestamp":"2018-03-30T01:48:11.5760000Z","Version":"1","EventId":
switch (propertyName)
{
case "EventId":
Helpers.ExtractPropertyString(jsonReader, output, this.EventIdColumn);
break;
case "Timestamp":
output.Set(this.TimestampColumn, (DateTime)jsonReader.ReadAsDateTime());
break;
case "_label_cost":
foundLabelCost = true;
Helpers.ExtractPropertyDouble(jsonReader, output, this.CostColumn);
output.Set(this.IsDanglingColumn, false);
break;
case "_label_probability":
Helpers.ExtractPropertyDouble(jsonReader, output, this.ProbabilityColumn);
break;
case "_label_Action":
Helpers.ExtractPropertyInteger(jsonReader, output, this.ActionColumn);
break;
case "a":
if (!this.NumActionsColumn.IsRequired)
jsonReader.Skip();
else
output.Set(this.NumActionsColumn.Idx, Helpers.CountArrayElements(jsonReader));
break;
case "o":
if (!this.HasObservationsColumn.IsRequired)
jsonReader.Skip();
else
output.Set(this.HasObservationsColumn.Idx, 1);
break;
case "pdrop":
Helpers.ExtractPropertyDouble(jsonReader, output, this.PdropColumn);
break;
case "_skipLearn":
Helpers.ExtractPropertyBool(jsonReader, output, this.SkipLearnColumn);
break;
case "EnqueuedTimeUtc":
output.Set(this.EnqueuedTimeUtcColumn, (DateTime)jsonReader.ReadAsDateTime());
output.Set(this.IsDanglingColumn, true);
break;
case "RewardValue":
Helpers.ExtractPropertyDoubleOpt(jsonReader, output, this.RewardValueColumn);
break;
default:
jsonReader.Skip();
break;
}
}
break;
}
}
}
return output.AsReadOnly();
}