azkustoingest/file_options.go (362 lines of code) (raw):

package azkustoingest import ( "encoding/json" "fmt" "github.com/Azure/azure-kusto-go/azkustoingest/ingestoptions" "github.com/cenkalti/backoff/v4" "time" "github.com/Azure/azure-kusto-go/azkustodata/errors" "github.com/Azure/azure-kusto-go/azkustoingest/internal/properties" ) type SourceScope uint type ClientScope uint const ( FromFile SourceScope = 1 << iota FromReader FromBlob QueuedClient ClientScope = 1 << iota StreamingClient ManagedClient ) func (s SourceScope) String() string { switch s { case FromFile: return "FromFile" case FromReader: return "FromReader" case FromBlob: return "FromBlob" default: panic(fmt.Sprintf("unknown SourceScope %d", s)) } } func (s ClientScope) String() string { switch s { case QueuedClient: return "QueuedClient" case StreamingClient: return "StreamingClient" default: panic(fmt.Sprintf("unknown ClientScope %d", s)) } } // FileOption is an optional argument to FromFile(). type FileOption interface { fmt.Stringer SourceScopes() SourceScope ClientScopes() ClientScope Run(p *properties.All, clientType ClientScope, sourceType SourceScope) error } type option struct { run func(p *properties.All) error clientScopes ClientScope sourceScope SourceScope name string } func (o option) SourceScopes() SourceScope { return o.sourceScope } func (o option) ClientScopes() ClientScope { return o.clientScopes } func (o option) String() string { return o.name } func (o option) Run(p *properties.All, clientType ClientScope, sourceType SourceScope) error { errType := errors.OpFileIngest if clientType&StreamingClient != 0 { errType = errors.OpIngestStream } if o.clientScopes&clientType == 0 { return errors.ES(errType, errors.KClientArgs, fmt.Sprintf("%s is not valid for client '%s'", o.name, clientType)) } if o.sourceScope&sourceType == 0 { return errors.ES(errType, errors.KClientArgs, fmt.Sprintf("%s is not valid for ingestion source type '%s' for client '%s'", o.name, sourceType, clientType)) } return o.run(p) } // Database overrides the default database name. func Database(name string) FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.DatabaseName = name return nil }, clientScopes: QueuedClient | StreamingClient | ManagedClient, sourceScope: FromFile | FromReader | FromBlob, name: "Database", } } // Table overrides the default table name. func Table(name string) FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.TableName = name return nil }, clientScopes: QueuedClient | StreamingClient | ManagedClient, sourceScope: FromFile | FromReader | FromBlob, name: "Table", } } // DontCompress sets whether to compress the data. In streaming - do not pass DontCompress if file is not already compressed. func DontCompress() FileOption { return option{ run: func(p *properties.All) error { p.Source.DontCompress = true return nil }, clientScopes: QueuedClient | StreamingClient | ManagedClient, sourceScope: FromFile | FromReader, name: "DontCompress", } } func backOff(off *backoff.ExponentialBackOff) FileOption { return option{ run: func(p *properties.All) error { p.ManagedStreaming.Backoff = off return nil }, clientScopes: ManagedClient, sourceScope: FromFile | FromReader | FromBlob, name: "BackOff", } } // FlushImmediately the service batching manager will not aggregate this file, thus overriding the batching policy func FlushImmediately() FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.FlushImmediately = true return nil }, clientScopes: QueuedClient | ManagedClient, sourceScope: FromFile | FromReader | FromBlob, name: "FlushImmediately", } } // IgnoreFirstRecord tells Kusto to flush on write. func IgnoreFirstRecord() FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.Additional.IgnoreFirstRecord = true return nil }, clientScopes: QueuedClient | ManagedClient, sourceScope: FromFile | FromReader | FromBlob, name: "IgnoreFirstRecord", } } // DataFormat indicates what type of encoding format was used for source data. // Not all options can be used in every method. type DataFormat = properties.DataFormat // note: any change here needs to be kept up to date with the properties version. // I'm not a fan of having two copies, but I don't think it is worth moving to its own package // to allow properties and ingest to both import without a cycle. // //goland:noinspection GoUnusedConst - Part of the API const ( // DFUnknown indicates the EncodingType is not set. DFUnknown DataFormat = properties.DFUnknown // AVRO indicates the source is encoded in Apache Avro format. AVRO DataFormat = properties.AVRO // ApacheAVRO indicates the source is encoded in Apache avro2json format. ApacheAVRO DataFormat = properties.ApacheAVRO // CSV indicates the source is encoded in comma seperated values. CSV DataFormat = properties.CSV // JSON indicates the source is encoded as one or more lines, each containing a record in Javascript Object Notation. JSON DataFormat = properties.JSON // MultiJSON indicates the source is encoded in JSON-Array of individual records in Javascript Object Notation. Optionally, //multiple documents can be concatenated. MultiJSON DataFormat = properties.MultiJSON // ORC indicates the source is encoded in Apache Optimized Row Columnar format. ORC DataFormat = properties.ORC // Parquet indicates the source is encoded in Apache Parquet format. Parquet DataFormat = properties.Parquet // PSV is pipe "|" separated values. PSV DataFormat = properties.PSV // Raw is a text file that has only a single string value. Raw DataFormat = properties.Raw // SCSV is a file containing semicolon ";" separated values. SCSV DataFormat = properties.SCSV // SOHSV is a file containing SOH-separated values(ASCII codepoint 1). SOHSV DataFormat = properties.SOHSV // SStream indicates the source is encoded as a Microsoft Cosmos Structured Streams format SStream DataFormat = properties.SStream // TSV is a file containing tab seperated values ("\t"). TSV DataFormat = properties.TSV // TSVE is a file containing escaped-tab seperated values ("\t"). TSVE DataFormat = properties.TSVE // TXT is a text file with lines ending with "\n". TXT DataFormat = properties.TXT // W3CLogFile indicates the source is encoded using W3C Extended Log File format W3CLogFile DataFormat = properties.W3CLogFile // SingleJSON indicates the source is a single JSON value -- newlines are regular whitespace. SingleJSON DataFormat = properties.SingleJSON ) // InferFormatFromFileName looks at the file name and tries to discern what the file format is func InferFormatFromFileName(fName string) DataFormat { return properties.DataFormatDiscovery(fName) } // IngestionMapping provides runtime mapping of the data being imported to the columns in the table. // "mapping" will be JSON encoded, so it can be any type that can be JSON marshalled. If you pass a string // or []byte, it will be interpreted as already being JSON encoded. // The format parameter will automatically set the FileOption.Format option. func IngestionMapping(mapping interface{}, format DataFormat) FileOption { return option{ run: func(p *properties.All) error { kind := format.MappingKind() if kind == DFUnknown { return errors.ES( errors.OpUnknown, errors.KClientArgs, "IngestionMapping() option does not support EncodingType %v", format, ).SetNoRetry() } var j string switch v := mapping.(type) { case string: j = v case []byte: j = string(v) default: b, err := json.Marshal(mapping) if err != nil { return errors.ES( errors.OpUnknown, errors.KClientArgs, "IngestMapping option was passed to an azkustoingest.Ingestion call that was not a string, []byte or could be JSON encoded: %s", err, ).SetNoRetry() } j = string(b) } p.Ingestion.Additional.IngestionMapping = j p.Ingestion.Additional.IngestionMappingType = kind p.Ingestion.Additional.Format = format return nil }, clientScopes: QueuedClient | ManagedClient, sourceScope: FromFile | FromReader | FromBlob, name: "IngestionMapping", } } // IngestionMappingRef provides the name of a pre-created mapping for the data being imported to the fields in the table. // For more details, see: https://docs.microsoft.com/azure/kusto/management/create-ingestion-mapping-command // The formatparameter will also automatically set the FileOption.Format option. func IngestionMappingRef(refName string, format DataFormat) FileOption { return option{ run: func(p *properties.All) error { kind := format.MappingKind() if kind == DFUnknown { return errors.ES(errors.OpUnknown, errors.KClientArgs, "IngestionMappingRef() option does not support EncodingType %v", format).SetNoRetry() } p.Ingestion.Additional.IngestionMappingRef = refName p.Ingestion.Additional.IngestionMappingType = kind p.Ingestion.Additional.Format = format return nil }, clientScopes: QueuedClient | StreamingClient | ManagedClient, sourceScope: FromFile | FromReader | FromBlob, name: "IngestionMappingRef", } } // DeleteSource deletes the source file from when it has been uploaded to Kusto. func DeleteSource() FileOption { return option{ run: func(p *properties.All) error { p.Source.DeleteLocalSource = true return nil }, clientScopes: QueuedClient | StreamingClient | ManagedClient, sourceScope: FromFile, name: "DeleteSource", } } // IgnoreSizeLimit ignores the size limit for data ingestion. func IgnoreSizeLimit() FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.IgnoreSizeLimit = true return nil }, sourceScope: FromFile | FromReader | FromBlob, clientScopes: QueuedClient | ManagedClient, name: "IgnoreSizeLimit", } } // Tags are tags to be associated with the ingested ata. func Tags(tags []string) FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.Additional.Tags = tags return nil }, sourceScope: FromFile | FromReader | FromBlob, clientScopes: QueuedClient | ManagedClient, name: "Tags", } } // IfNotExists provides a string value that, if specified, prevents ingestion from succeeding if the table already // has data tagged with an ingest-by: tag with the same value. This ensures idempotent data ingestion. // For more information see: https://docs.microsoft.com/en-us/azure/kusto/management/extents-overview#ingest-by-extent-tags func IfNotExists(ingestByTag string) FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.Additional.IngestIfNotExists = ingestByTag return nil }, sourceScope: FromFile | FromReader | FromBlob, clientScopes: QueuedClient | ManagedClient, name: "IfNotExists", } } // ReportResultToTable option requests that the ingestion status will be tracked in an Azure table. // Note using Table status reporting is not recommended for high capacity ingestions, as it could slow down the ingestion. // In such cases, it's recommended to enable it temporarily for debugging failed ingestions. func ReportResultToTable() FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.ReportLevel = properties.FailureAndSuccess p.Ingestion.ReportMethod = properties.ReportStatusToTable return nil }, sourceScope: FromFile | FromReader | FromBlob, clientScopes: QueuedClient | ManagedClient, name: "ReportResultToTable", } } // SetCreationTime option allows the user to override the data creation time the retention policies are considered against // If not set the data creation time is considered to be the time of ingestion func SetCreationTime(t time.Time) FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.Additional.CreationTime = t return nil }, sourceScope: FromFile | FromReader | FromBlob, clientScopes: QueuedClient | ManagedClient, name: "SetCreationTime", } } // ValidationOption is an an option for validating the ingestion input data. // These are defined as constants within this package. type ValidationOption int8 //goland:noinspection GoUnusedConst - Part of the API const ( // VOUnknown indicates that a ValidationOption was not set. VOUnknown ValidationOption = 0 // SameNumberOfFields indicates that all records ingested must have the same number of fields. SameNumberOfFields ValidationOption = 1 // IgnoreNonDoubleQuotedFields indicates that fields that do not have double quotes should be ignored. IgnoreNonDoubleQuotedFields ValidationOption = 2 ) // ValidationImplication is a setting used to indicate what to do when a Validation Policy is violated. // These are defined as constants within this package. type ValidationImplication int8 //goland:noinspection GoUnusedConst - Part of the API const ( // FailIngestion indicates that any violation of the ValidationPolicy will cause the entire ingestion to fail. FailIngestion ValidationImplication = 0 // IgnoreFailures indicates that failure of the ValidationPolicy will be ignored. IgnoreFailures ValidationImplication = 1 ) // ValPolicy sets a policy for validating data as it is sent for ingestion. // For more information, see: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/ type ValPolicy struct { // Options provides an option that will flag data that does not validate. Options ValidationOption `json:"ValidationOptions"` // Implications sets what to do when a policy option is violated. Implications ValidationImplication `json:"ValidationImplications"` } // ValidationPolicy uses a ValPolicy to set our ingestion data validation policy. If not set, no validation policy // is used. // For more information, see: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/ func ValidationPolicy(policy ValPolicy) FileOption { return option{ run: func(p *properties.All) error { b, err := json.Marshal(policy) if err != nil { return errors.ES(errors.OpUnknown, errors.KInternal, "bug: the ValPolicy provided would not JSON encode").SetNoRetry() } // You might be asking, what if we are just using blobstore? Well, then this option doesn't matter :) p.Ingestion.Additional.ValidationPolicy = string(b) return nil }, sourceScope: FromFile | FromReader | FromBlob, clientScopes: QueuedClient | ManagedClient, name: "ValidationPolicy", } } // FileFormat can be used to indicate what type of encoding is supported for the file. This is only needed if // the file extension is not present. A file like: "input.json.gz" or "input.json" does not need this option, while // "input" would. // If an ingestion mapping is specified, there is no need to specify the file format. func FileFormat(et DataFormat) FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.Additional.Format = et return nil }, sourceScope: FromFile | FromReader | FromBlob, clientScopes: QueuedClient | StreamingClient | ManagedClient, name: "FileFormat", } } // ClientRequestId is an identifier for the ingestion, that can later be queried. func ClientRequestId(clientRequestId string) FileOption { return option{ run: func(p *properties.All) error { p.Streaming.ClientRequestId = clientRequestId return nil }, sourceScope: FromFile | FromReader | FromBlob, clientScopes: StreamingClient | ManagedClient, name: "ClientRequestId", } } // CompressionType sets the compression type of the data. // Use this if the file name does not expose the compression type. // This sets DontCompress to true for compressed data. func CompressionType(compressionType ingestoptions.CompressionType) FileOption { return option{ run: func(p *properties.All) error { p.Source.CompressionType = compressionType return nil }, clientScopes: QueuedClient | StreamingClient | ManagedClient, sourceScope: FromFile | FromReader, name: "CompressionType", } } // RawDataSize is the uncompressed data size. Should be used to comunicate the file size to the service for efficient ingestion. // Also used by managed client in the decision to use queued ingestion instead of streaming (if > 4mb) func RawDataSize(size int64) FileOption { return option{ run: func(p *properties.All) error { p.Ingestion.RawDataSize = size return nil }, clientScopes: QueuedClient | ManagedClient, sourceScope: FromFile | FromReader | FromBlob, name: "RawDataSize", } }