code/KustoCopyConsole/Entity/InMemory/RowItemInMemoryCache.cs (289 lines of code) (raw):
using KustoCopyConsole.Entity.RowItems;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace KustoCopyConsole.Entity.InMemory
{
internal class RowItemInMemoryCache
{
#region Constructors
private RowItemInMemoryCache(IImmutableDictionary<string, ActivityCache> activityMap)
{
ActivityMap = activityMap;
}
public RowItemInMemoryCache()
: this(ImmutableDictionary<string, ActivityCache>.Empty)
{
}
public RowItemInMemoryCache(IEnumerable<RowItemBase> items)
: this()
{
foreach (var item in items)
{
ActivityMap = AppendItemToActivityCache(item);
}
}
#endregion
public IImmutableDictionary<string, ActivityCache> ActivityMap { get; }
public IEnumerable<ActivityFlatHierarchy> GetActivityFlatHierarchy(
Func<ActivityCache, bool> activityPredicate,
Func<IterationCache, bool> iterationPredicate)
{
return ActivityMap
.Values
.Where(a => activityPredicate(a))
.SelectMany(a => a.IterationMap.Values.Where(i => iterationPredicate(i)).Select(i => new
{
Activity = a,
Iteration = i,
TempTableItem = i.TempTable
}))
.SelectMany(o => o.Iteration.BlockMap.Values.Select(b => new ActivityFlatHierarchy(
o.Activity.RowItem,
o.Iteration.RowItem,
o.TempTableItem,
b.RowItem,
b.UrlMap.Values.Select(u => u.RowItem),
b.ExtentMap.Values.Select(e => e.RowItem))));
}
public IEnumerable<RowItemBase> GetItems()
{
foreach (var activity in ActivityMap.Values)
{
yield return activity.RowItem;
foreach (var iteration in activity.IterationMap.Values)
{
yield return iteration.RowItem;
if (iteration.TempTable != null)
{
yield return iteration.TempTable;
}
foreach (var block in iteration.BlockMap.Values)
{
yield return block.RowItem;
foreach (var url in block.UrlMap.Values)
{
yield return url.RowItem;
}
}
}
}
}
public RowItemInMemoryCache CleanOnRestart()
{
var newActivityMap = ActivityMap.Values
.Select(a => a.CleanOnRestart())
.ToImmutableDictionary(a => a.RowItem.ActivityName);
return new RowItemInMemoryCache(newActivityMap);
}
public RowItemInMemoryCache AppendItem(RowItemBase item)
{
return new RowItemInMemoryCache(AppendItemToActivityCache(item));
}
private IImmutableDictionary<string, ActivityCache> AppendItemToActivityCache(
RowItemBase item)
{
switch (item)
{
case ActivityRowItem a:
return AppendActivity(a);
case IterationRowItem i:
return AppendIteration(i);
case TempTableRowItem t:
return AppendTempTable(t);
case BlockRowItem sb:
return AppendBlock(sb);
case UrlRowItem url:
return AppendUrl(url);
case ExtentRowItem extent:
return AppendExtent(extent);
default:
throw new NotSupportedException(
$"Not supported row item type: {item.GetType().Name}");
}
}
private IImmutableDictionary<string, ActivityCache> AppendActivity(
ActivityRowItem item)
{
var activityName = item.ActivityName;
if (ActivityMap.ContainsKey(activityName))
{
var activity = ActivityMap[activityName];
return ActivityMap.SetItem(
activityName,
new ActivityCache(item, activity.IterationMap));
}
else
{
return ActivityMap.Add(activityName, new ActivityCache(item));
}
}
private IImmutableDictionary<string, ActivityCache> AppendIteration(
IterationRowItem item)
{
var activityName = item.ActivityName;
if (ActivityMap.ContainsKey(activityName))
{
var table = ActivityMap[activityName];
if (table.IterationMap.ContainsKey(item.IterationId))
{
var iteration = table.IterationMap[item.IterationId];
return ActivityMap.SetItem(
activityName,
table.AppendIteration(
new IterationCache(item, iteration.TempTable, iteration.BlockMap)));
}
else
{
return ActivityMap.SetItem(
activityName,
table.AppendIteration(new IterationCache(item)));
}
}
else
{
throw new NotSupportedException("Activity should come before block in logs");
}
}
private IImmutableDictionary<string, ActivityCache> AppendTempTable(TempTableRowItem item)
{
var activityName = item.ActivityName;
if (ActivityMap.ContainsKey(activityName))
{
var activity = ActivityMap[activityName];
if (activity.IterationMap.ContainsKey(item.IterationId))
{
var iteration = activity.IterationMap[item.IterationId];
return ActivityMap.SetItem(
activityName,
activity.AppendIteration(
new IterationCache(iteration.RowItem, item, iteration.BlockMap)));
}
else
{
throw new NotSupportedException("Iteration should come before block in logs");
}
}
else
{
throw new NotSupportedException("Activity should come before block in logs");
}
}
private IImmutableDictionary<string, ActivityCache> AppendBlock(BlockRowItem item)
{
var activityName = item.ActivityName;
if (ActivityMap.ContainsKey(activityName))
{
var activity = ActivityMap[activityName];
if (activity.IterationMap.ContainsKey(item.IterationId))
{
var iteration = activity.IterationMap[item.IterationId];
if (iteration.BlockMap.ContainsKey(item.BlockId))
{
var block = iteration.BlockMap[item.BlockId];
return ActivityMap.SetItem(
activityName,
activity.AppendIteration(
iteration.AppendBlock(
new BlockCache(item, block.UrlMap, block.ExtentMap))));
}
else
{
return ActivityMap.SetItem(
activityName,
activity.AppendIteration(
iteration.AppendBlock(new BlockCache(item))));
}
}
else
{
throw new NotSupportedException("Iteration should come before block in logs");
}
}
else
{
throw new NotSupportedException("Activity should come before block in logs");
}
}
private IImmutableDictionary<string, ActivityCache> AppendUrl(UrlRowItem item)
{
var activityName = item.ActivityName;
if (ActivityMap.ContainsKey(activityName))
{
var activity = ActivityMap[activityName];
if (activity.IterationMap.ContainsKey(item.IterationId))
{
var iteration = activity.IterationMap[item.IterationId];
if (iteration.BlockMap.ContainsKey(item.BlockId))
{
var block = iteration.BlockMap[item.BlockId];
var na = ActivityMap.SetItem(
activityName,
activity.AppendIteration(
iteration.AppendBlock(
block.AppendUrl(new UrlCache(item)))));
var ni = activity.AppendIteration(
iteration.AppendBlock(
block.AppendUrl(new UrlCache(item))));
var nii = iteration.AppendBlock(
block.AppendUrl(new UrlCache(item)));
return ActivityMap.SetItem(
activityName,
activity.AppendIteration(
iteration.AppendBlock(
block.AppendUrl(new UrlCache(item)))));
}
else
{
throw new NotSupportedException("Block should come before url in logs");
}
}
else
{
throw new NotSupportedException("Iteration should come before block in logs");
}
}
else
{
throw new NotSupportedException("Activity should come before iteration in logs");
}
}
private IImmutableDictionary<string, ActivityCache> AppendExtent(ExtentRowItem item)
{
var activityName = item.ActivityName;
if (ActivityMap.ContainsKey(activityName))
{
var activity = ActivityMap[activityName];
if (activity.IterationMap.ContainsKey(item.IterationId))
{
var iteration = activity.IterationMap[item.IterationId];
if (iteration.BlockMap.ContainsKey(item.BlockId))
{
var block = iteration.BlockMap[item.BlockId];
var newActivityMap = ActivityMap.SetItem(
activityName,
activity.AppendIteration(
iteration.AppendBlock(
block.AppendExtent(new ExtentCache(item)))));
return newActivityMap;
}
else
{
throw new NotSupportedException("Block should come before extent in logs");
}
}
else
{
throw new NotSupportedException("Iteration should come before block in logs");
}
}
else
{
throw new NotSupportedException("Activity should come before iteration in logs");
}
}
}
}