azkustoingest/internal/properties/properties.go (269 lines of code) (raw):
// Package properties provides Kusto REST properties that will need to be serialized and sent to Kusto
// based upon the type of ingestion we are doing.
package properties
import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/Azure/azure-kusto-go/azkustodata"
"github.com/Azure/azure-kusto-go/azkustoingest/ingestoptions"
"github.com/cenkalti/backoff/v4"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/Azure/azure-kusto-go/azkustodata/errors"
"github.com/google/uuid"
)
// DataFormat indicates what type of encoding format was used for source data.
// Note: This is very similar to azkustoingest.DataFormat, except this supports more formats.
// We are not using a shared list, because this list is used only internally and is for the
// data itself, not the mapping reference. Structure prevents packages such as filesystem
// from importing ingest, because ingest imports filesystem. We would end up with recursive imports.
// More info here: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/
type DataFormat int
const (
// DFUnknown indicates the EncodingType is not set.
DFUnknown DataFormat = 0
// AVRO indicates the source is encoded in Apache Avro format.
AVRO DataFormat = 1
// ApacheAVRO indicates the source is encoded in Apache avro2json format.
ApacheAVRO DataFormat = 2
// CSV indicates the source is encoded in comma seperated values.
CSV DataFormat = 3
// JSON indicates the source is encoded as one or more lines, each containing a record in Javascript Object Notation.
JSON DataFormat = 4
// MultiJSON indicates the source is encoded in JSON-Array of individual records in Javascript Object Notation. Optionally,
//multiple documents can be concatenated.
MultiJSON DataFormat = 5
// ORC indicates the source is encoded in Apache Optimized Row Columnar format.
ORC DataFormat = 6
// Parquet indicates the source is encoded in Apache Parquet format.
Parquet DataFormat = 7
// PSV is pipe "|" separated values.
PSV DataFormat = 8
// Raw is a text file that has only a single string value.
Raw DataFormat = 9
// SCSV is a file containing semicolon ";" separated values.
SCSV DataFormat = 10
// SOHSV is a file containing SOH-separated values(ASCII codepoint 1).
SOHSV DataFormat = 11
// SStream indicats the source is encoded as a Microsoft Cosmos Structured Streams format
SStream DataFormat = 12
// TSV is a file containing tab seperated values ("\t").
TSV DataFormat = 13
// TSVE is a file containing escaped-tab seperated values ("\t").
TSVE DataFormat = 14
// TXT is a text file with lines delimited by "\n".
TXT DataFormat = 15
// W3CLogFile indicates the source is encoded using W3C Extended Log File format.
W3CLogFile DataFormat = 16
// SingleJSON indicates the source is a single JSON value -- newlines are regular whitespace.
SingleJSON DataFormat = 17
)
type dfDescriptor struct {
camelName string
jsonName string
detectableExt string
mappingKind DataFormat
shouldCompress bool
}
var dfDescriptions = []dfDescriptor{
{"", "", "", DFUnknown, true},
{"Avro", "avro", ".avro", AVRO, false},
{"ApacheAvro", "avro", "", AVRO, false},
{"Csv", "csv", ".csv", CSV, true},
{"Json", "json", ".json", JSON, true},
{"MultiJson", "multijson", "", JSON, true},
{"Orc", "orc", ".orc", ORC, false},
{"Parquet", "parquet", ".parquet", Parquet, false},
{"Psv", "psv", ".psv", CSV, true},
{"Raw", "raw", ".raw", CSV, true},
{"Scsv", "scsv", ".scsv", CSV, true},
{"Sohsv", "sohsv", ".sohsv", CSV, true},
{"SStream", "sstream", ".ss", DFUnknown, false},
{"Tsv", "tsv", ".tsv", CSV, true},
{"Tsve", "tsve", ".tsve", CSV, true},
{"Txt", "txt", ".txt", CSV, true},
{"W3cLogFile", "w3clogfile", ".w3clogfile", W3CLogFile, true},
{"SingleJson", "singlejson", "", JSON, true},
}
// IngestionReportLevel defines which ingestion statuses are reported by the DM.
type IngestionReportLevel int
//goland:noinspection GoUnusedConst - Part of the API
const (
// FailuresOnly tells to the DM to report the ingestion sytatus of failed ingestions only.
FailuresOnly IngestionReportLevel = 0
// None tells to the DM not to report ingestion status.
None IngestionReportLevel = 1
// FailureAndSuccess tells to the DM to report ingestion status for failed and successfull ingestions.
FailureAndSuccess IngestionReportLevel = 2
)
// IngestionReportMethod defines where the DM reports ingestion statuses to.
type IngestionReportMethod int
//goland:noinspection GoUnusedConst - Part of the API
const (
// ReportStatusToQueue tells the DM to report ingestion status to the a queue.
ReportStatusToQueue IngestionReportMethod = 0
// ReportStatusToTable tells the DM to report ingestion status to the a table.
ReportStatusToTable IngestionReportMethod = 1
// ReportStatusToQueueAndTable tells the DM to report ingestion status to both queues and tables.
ReportStatusToQueueAndTable IngestionReportMethod = 2
// ReportStatusToAzureMonitoring tells the DM to report ingestion status to azure monitor.
ReportStatusToAzureMonitoring IngestionReportMethod = 3
)
// String implements fmt.Stringer.
func (d DataFormat) String() string {
if d > 0 && int(d) < len(dfDescriptions) {
return dfDescriptions[d].jsonName
}
return ""
}
// CamelCase returns the CamelCase version. This is for internal use, do not use.
// This can be removed in future versions.
func (d DataFormat) CamelCase() string {
if d > 0 && int(d) < len(dfDescriptions) {
return dfDescriptions[d].camelName
}
return ""
}
func (d DataFormat) KnownOrDefault() azkustodata.DataFormatForStreaming {
if d == DFUnknown {
return CSV
}
return d
}
// MarshalJSON implements json.Marshaler.MarshalJSON.
func (d DataFormat) MarshalJSON() ([]byte, error) {
if d == 0 {
return nil, fmt.Errorf("DataFormat is an invalid encoding type")
}
return []byte(fmt.Sprintf("%q", d.String())), nil
}
// MappingKind returns the mapping kind associated with this DataFormat```
func (d DataFormat) MappingKind() DataFormat {
if int(d) < len(dfDescriptions) {
return dfDescriptions[d].mappingKind
}
return DFUnknown
}
func (d DataFormat) ShouldCompress() bool {
if d > 0 && int(d) < len(dfDescriptions) {
return dfDescriptions[d].shouldCompress
}
return true
}
// DataFormatDiscovery looks at the file name and tries to discern what the file format is.
func DataFormatDiscovery(fName string) DataFormat {
name := fName
u, err := url.Parse(fName)
if err == nil && u.Scheme != "" {
name = u.Path
}
ext := strings.ToLower(filepath.Ext(strings.TrimSuffix(strings.TrimSuffix(strings.ToLower(name), ".zip"), ".gz")))
if ext == "" {
return DFUnknown
}
for i := 1; i < len(dfDescriptions); i++ {
if ext == dfDescriptions[i].detectableExt {
return DataFormat(i)
}
}
return DFUnknown
}
// All holds the complete set of properties that might be used.
type All struct {
// Ingestion is a set of properties that are used across all ingestion methods.
Ingestion Ingestion
// Source provides options that are used to operate on the source data.
Source SourceOptions
// Streaming provides options that are used when doing an ingestion from a stream.
Streaming Streaming
// ManagedStreaming provides options that are used when doing an ingestion from a ManagedStreaming client.
ManagedStreaming ManagedStreaming
}
// ManagedStreaming provides options that are used when doing an ingestion from a ManagedStreaming client.
type ManagedStreaming struct {
// Backoff is the backoff strategy to use when retrying a transiently failed ingestion.
Backoff backoff.BackOff
}
// Streaming provides options that are used when doing a streaming ingestion.
type Streaming struct {
// ClientRequestID is the client request ID to use for the ingestion.
ClientRequestId string
}
// SourceOptions are options that the user provides about the source that is going to be uploaded.
type SourceOptions struct {
// ID allows someone to set the UUID for upload themselves. We aren't providing this option at this time, but here
// when we do.
ID uuid.UUID
// DeleteLocalSource indicates to delete the local file after it has been consumed.
DeleteLocalSource bool
// DontCompress indicates to not compress the file. In streaming - do not pass DontCompress if file is not already compressed.
DontCompress bool
// OriginalSource is the path to the original source file, used for deletion.
OriginalSource string
// CompressionType is the type of compression used on the file.
CompressionType ingestoptions.CompressionType
}
// Ingestion is a JSON serializable set of options that must be provided to the service.
type Ingestion struct {
// ID is the unqique UUID for this upload.
ID uuid.UUID `json:"Id"`
// BlobPath is the URI representing the blob.
BlobPath string
// DatabaseName is the name of the Kusto database the data will ingest into.
DatabaseName string
// TableName is the name of the Kusto table the the data will ingest into.
TableName string
// RawDataSize is the uncompressed data size. Should be used to comunicate the file size to the service for efficient ingestion.
RawDataSize int64 `json:",omitempty"`
// RetainBlobOnSuccess indicates if the source blob should be retained or deleted. True is preferrable.
RetainBlobOnSuccess bool `json:",omitempty"`
// FlushImmediately - the service batching manager will not aggregate this file, thus overriding the batching policy
FlushImmediately bool
// IgnoreSizeLimit - ignores the size limit for data ingestion.
IgnoreSizeLimit bool `json:",omitempty"`
// ReportLevel defines which if any ingestion states are reported.
ReportLevel IngestionReportLevel `json:",omitempty"`
// ReportMethod defines which mechanisms are used to report the ingestion status.
ReportMethod IngestionReportMethod `json:",omitempty"`
// SourceMessageCreationTime is when we created the blob.
SourceMessageCreationTime time.Time `json:",omitempty"`
// Additional (properties) is a set of extra properties added to the ingestion command.
Additional Additional `json:"AdditionalProperties"`
// TableEntryRef points to the staus table entry used to report the status of this ingestion.
TableEntryRef StatusTableDescription `json:"IngestionStatusInTable,omitempty"`
// ApplicationForTracing is the application name that is used for tracing.
ApplicationForTracing string `json:",omitempty"`
// ClientVersionForTracing is the client version that is used for tracing.
ClientVersionForTracing string `json:",omitempty"`
}
type TagsList []string
func (t TagsList) MarshalJSON() ([]byte, error) {
j, err := json.Marshal([]string(t))
if err != nil {
return nil, err
}
return json.Marshal(string(j))
}
// Additional is additional properites.
type Additional struct {
// AuthContext is the authorization string that we get from resources.Manager.AuthContext().
AuthContext string `json:"authorizationContext,omitempty"`
// IngestionMapping is a json string that maps the data being imported to the table's columns.
// See: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/
IngestionMapping string `json:"ingestionMapping,omitempty"`
// IngestionMappingRef is a string representing a mapping reference that has been uploaded to the server
// via a Mgmt() call. See: https://docs.microsoft.com/en-us/azure/kusto/management/create-ingestion-mapping-command
IngestionMappingRef string `json:"ingestionMappingReference,omitempty"`
// IngestionMappingType is what the mapping reference is encoded in: csv, json, avro, ...
IngestionMappingType DataFormat `json:"ingestionMappingType,omitempty"`
// ValidationPolicy is a JSON encoded string that tells our ingestion action what policies we want on the
// data being ingested and what to do when that is violated.
ValidationPolicy string `json:"validationPolicy,omitempty"`
Format DataFormat `json:"format,omitempty"`
IgnoreFirstRecord bool `json:"ignoreFirstRecord"`
// Tags is a list of tags to associated with the ingested data.
Tags TagsList `json:"tags,omitempty"`
// IngestIfNotExists is a string value that, if specified, prevents ingestion from succeeding if the table already
// has data tagged with an ingest-by: tag with the same value. This ensures idempotent data ingestion.
IngestIfNotExists string `json:"ingestIfNotExists,omitempty"`
// CreationTime is used to override the time considered for retantion policies, which by default is the time of ingestion.
CreationTime time.Time `json:"creationTime,omitempty"`
}
// StatusTableDescription is a reference to the table status entry used for this ingestion command.
type StatusTableDescription struct {
// TableConnectionString is a secret-free connection string to the status table.
TableConnectionString string `json:",omitempty"`
// PartitionKey is the partition key of the table entry.
PartitionKey string `json:",omitempty"`
// RowKey is the row key of the table entry.
RowKey string `json:",omitempty"`
}
func (p *All) ApplyDeleteLocalSourceOption() error {
if p.Source.DeleteLocalSource && p.Source.OriginalSource != "" {
if err := os.Remove(p.Source.OriginalSource); err != nil {
return errors.ES(errors.OpFileIngest, errors.KLocalFileSystem, "file was uploaded successfully, but we could not delete the local file: %s",
err).SetNoRetry()
}
}
return nil
}
// MarshalJSON implements json.Marshaller. This is for use only by the SDK and may be removed at any time.
func (a Additional) MarshalJSON() ([]byte, error) {
// TODO(daniel): Have the backend fixed.
// OK: This is here because in .Net DataFormat and IngestionMappingType are two different enumerators.
// For some reason, they encode the values in two different ways and do exact string matches on the server.
// So you must use "csv" and "Csv". For the moment, until we can get a backend change, we have to encode these
// differently. I don't want to have two enumerators for the same thing, so I've done this hack to get around it.
type additional2 Additional
b, err := json.Marshal(additional2(a))
if err != nil {
return nil, err
}
m := map[string]interface{}{}
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
if _, ok := m["ingestionMappingType"]; ok {
m["ingestionMappingType"] = a.IngestionMappingType.CamelCase()
}
return json.Marshal(m)
}
// MarshalJSONString will marshal Ingestion into a base64 encoded string.
func (i Ingestion) MarshalJSONString() (base64String string, err error) {
i = i.defaults()
if err := i.validate(); err != nil {
return "", err
}
j, err := json.Marshal(i)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(j), nil
}
// defaults sets default values that can be auto-generated if not set. This is used inside our MarshalJSONString().
func (i Ingestion) defaults() Ingestion {
if uuidIsZero(i.ID) {
i.ID = uuid.New()
}
if i.SourceMessageCreationTime.IsZero() {
i.SourceMessageCreationTime = time.Now()
}
return i
}
func (i Ingestion) validate() error {
if uuidIsZero(i.ID) {
return fmt.Errorf("the ID cannot be an zero value UUID")
}
switch "" {
case i.DatabaseName:
return fmt.Errorf("the database name cannot be an empty string")
case i.TableName:
return fmt.Errorf("the table name cannot be an empty string")
case i.Additional.AuthContext:
return fmt.Errorf("the authorization context was an empty string, which is not allowed")
case i.BlobPath:
return fmt.Errorf("the BlobPath was not set")
}
return nil
}
func RemoveQueryParamsFromUrl(url string) string {
result := url
if idx := strings.Index(result, "?"); idx != -1 {
result = result[:idx]
}
if idx := strings.Index(result, ";"); idx != -1 {
result = result[:idx]
}
return result
}
func uuidIsZero(id uuid.UUID) bool {
for _, b := range id {
if b != 0 {
return false
}
}
return true
}