in XForm/XForm/WorkflowRunner.cs [64:172]
public IXTable Build(string tableName, XDatabaseContext outerContext, bool deferred)
{
// Validate the source name is recognized
if (!Sources.Contains(tableName))
{
// If it wasn't in cache, check individually for it live
if (!XDatabaseContext.StreamProvider.ContainsTable(tableName)) throw new UsageException(tableName, "Table", Sources);
// If found, update the cache
UpdateSources();
}
// If only a Date was passed for AsOfDate, look for the last version as of that day
if(outerContext.RequestedAsOfDateTime.TimeOfDay == TimeSpan.Zero)
{
outerContext.RequestedAsOfDateTime = outerContext.RequestedAsOfDateTime.AddDays(1).AddSeconds(-1);
}
// If we previously found the latest for this table, just return it again
LatestTableForCutoff previousLatest;
if (_currentTableVersions.TryGet(tableName, out previousLatest)
&& previousLatest.Cutoff >= outerContext.RequestedAsOfDateTime
&& previousLatest.TableVersion.AsOfDate <= outerContext.RequestedAsOfDateTime
&& TableMetadataSerializer.UncachedExists(outerContext.StreamProvider, previousLatest.TableVersion.Path))
{
outerContext.NewestDependency = previousLatest.TableVersion.AsOfDate;
return BinaryTableReader.Build(outerContext.StreamProvider, previousLatest.TableVersion.Path);
}
// Create a context to track what we're building now
XDatabaseContext innerContext = XDatabaseContext.Push(outerContext);
innerContext.CurrentTable = tableName;
// If this is a query, there won't be a cached table - just build a pipeline to make it
StreamAttributes queryAttributes = innerContext.StreamProvider.Attributes(innerContext.StreamProvider.Path(LocationType.Query, tableName, ".xql"));
if (queryAttributes.Exists)
{
IXTable queryPipeline = innerContext.Query(innerContext.StreamProvider.ReadAllText(queryAttributes.Path));
innerContext.Pop(outerContext);
return queryPipeline;
}
// Find the latest already built result, and associated query
ItemVersions tableVersions = innerContext.StreamProvider.ItemVersions(LocationType.Table, tableName);
ItemVersion latestTable = tableVersions.LatestBeforeCutoff(CrawlType.Full, outerContext.RequestedAsOfDateTime);
string latestTableQuery = null;
if (latestTable != null)
{
latestTableQuery = TableMetadataSerializer.Read(outerContext.StreamProvider, latestTable.Path).Query;
}
// Set the dependency date to the latest table we've already built (if any)
innerContext.NewestDependency = (latestTable == null ? DateTime.MinValue : latestTable.AsOfDate);
// Determine the XQL to build the table and construct a builder which can do so
string xql;
IXTable builder;
// Find the config to build the table and scan dependency versions to determine whether table is out-of-date
StreamAttributes configAttributes = innerContext.StreamProvider.Attributes(innerContext.StreamProvider.Path(LocationType.Config, tableName, ".xql"));
if (!configAttributes.Exists)
{
// If this is a simple source, just reading it is how to build it
xql = $"read {XqlScanner.Escape(tableName, TokenType.Value)}";
// Build a reader concatenating all needed pieces
builder = ReadSource(tableName, innerContext);
}
else
{
// If there is a config, the config is how to build it
xql = innerContext.StreamProvider.ReadAllText(configAttributes.Path);
// Build a pipeline for the query, recursively creating dependencies
builder = innerContext.Query(xql);
}
// If we don't have the table or the source, we have to throw
if (latestTable == null && builder == null) throw new UsageException(tableName, "Table", innerContext.StreamProvider.Tables());
// Get the path we're either reading or building
string tablePath = innerContext.StreamProvider.Path(LocationType.Table, tableName, CrawlType.Full, innerContext.NewestDependency);
// If we can rebuild this table and we need to (sources rebuilt, query changed, out-of-date, deleted), rebuild it
if (builder != null)
{
if (latestTable == null
|| innerContext.RebuiltSomething
|| (latestTableQuery != null && xql != latestTableQuery)
|| IsOutOfDate(latestTable.AsOfDate, innerContext.NewestDependency)
|| !TableMetadataSerializer.UncachedExists(outerContext.StreamProvider, latestTable.Path))
{
// If we're not running now, just return how to build it
if (deferred) return builder;
// Otherwise, build it now; we'll return the query to read the output
innerContext.CurrentQuery = xql;
Trace.WriteLine($"COMPUTE: [{innerContext.NewestDependency.ToString(StreamProviderExtensions.DateTimeFolderFormat)}] {tableName}");
BinaryTableWriter.Build(builder, innerContext, tablePath).RunAndDispose();
innerContext.RebuiltSomething = true;
}
}
// Report the newest dependency in this chain to the components above
innerContext.Pop(outerContext);
_currentTableVersions.Add(tableName, new LatestTableForCutoff(outerContext.RequestedAsOfDateTime, new ItemVersion(LocationType.Table, tableName, CrawlType.Full, innerContext.NewestDependency)));
return BinaryTableReader.Build(innerContext.StreamProvider, tablePath);
}