func streamImpl()

in azkustoingest/streaming.go [143:175]


func streamImpl(c streamIngestor, ctx context.Context, payload io.Reader, props properties.All, isBlobUri bool) (*Result, error) {
	compress := queued.ShouldCompress(&props, ingestoptions.CTUnknown)
	if compress && !isBlobUri {
		payload = gzip.Compress(payload)
	}

	if props.Ingestion.Additional.Format == DFUnknown {
		props.Ingestion.Additional.Format = CSV
	}

	err := c.StreamIngest(ctx, props.Ingestion.DatabaseName, props.Ingestion.TableName, payload, props.Ingestion.Additional.Format,
		props.Ingestion.Additional.IngestionMappingRef,
		props.Streaming.ClientRequestId,
		isBlobUri)

	if err != nil {
		if e, ok := errors.GetKustoError(err); ok {
			return nil, e
		}
		return nil, errors.E(errors.OpIngestStream, errors.KClientArgs, err)
	}

	err = props.ApplyDeleteLocalSourceOption()
	if err != nil {
		return nil, err
	}

	result := newResult()
	result.putProps(props)
	result.record.Status = "Success"

	return result, nil
}