in azkustoingest/ingest.go [83:135]
func (i *Ingestion) prepForIngestion(ctx context.Context, options []FileOption, props properties.All, source SourceScope) (*Result, properties.All, error) {
result := newResult()
auth, err := i.mgr.AuthContext(ctx)
if err != nil {
return nil, properties.All{}, err
}
props.Ingestion.Additional.AuthContext = auth
for _, o := range options {
if err := o.Run(&props, QueuedClient, source); err != nil {
return nil, properties.All{}, err
}
}
if source == FromReader && props.Ingestion.Additional.Format == DFUnknown {
props.Ingestion.Additional.Format = CSV
}
if props.Ingestion.Additional.IngestionMappingType != DFUnknown && props.Ingestion.Additional.Format.MappingKind() != props.Ingestion.Additional.IngestionMappingType {
return nil, properties.All{}, errors.ES(
errors.OpUnknown,
errors.KClientArgs,
"format and ingestion mapping type must match (hint: using ingestion mapping sets the format automatically)",
).SetNoRetry()
}
if props.Ingestion.ReportLevel != properties.None {
if props.Source.ID == uuid.Nil {
props.Source.ID = uuid.New()
}
switch props.Ingestion.ReportMethod {
case properties.ReportStatusToTable, properties.ReportStatusToQueueAndTable:
tableResources, err := i.mgr.GetTables()
if err != nil {
return nil, properties.All{}, err
}
if len(tableResources) == 0 {
return nil, properties.All{}, fmt.Errorf("User requested reporting status to table, yet status table resource URI is not found")
}
props.Ingestion.TableEntryRef.TableConnectionString = tableResources[0].URL().String()
props.Ingestion.TableEntryRef.PartitionKey = props.Source.ID.String()
props.Ingestion.TableEntryRef.RowKey = uuid.Nil.String()
}
}
result.putProps(props)
return result, props, nil
}