in azkustoingest/streaming.go [143:175]
func streamImpl(c streamIngestor, ctx context.Context, payload io.Reader, props properties.All, isBlobUri bool) (*Result, error) {
compress := queued.ShouldCompress(&props, ingestoptions.CTUnknown)
if compress && !isBlobUri {
payload = gzip.Compress(payload)
}
if props.Ingestion.Additional.Format == DFUnknown {
props.Ingestion.Additional.Format = CSV
}
err := c.StreamIngest(ctx, props.Ingestion.DatabaseName, props.Ingestion.TableName, payload, props.Ingestion.Additional.Format,
props.Ingestion.Additional.IngestionMappingRef,
props.Streaming.ClientRequestId,
isBlobUri)
if err != nil {
if e, ok := errors.GetKustoError(err); ok {
return nil, e
}
return nil, errors.E(errors.OpIngestStream, errors.KClientArgs, err)
}
err = props.ApplyDeleteLocalSourceOption()
if err != nil {
return nil, err
}
result := newResult()
result.putProps(props)
result.record.Status = "Success"
return result, nil
}