azkustoingest/status.go (190 lines of code) (raw):

package azkustoingest import ( "fmt" "time" "github.com/Azure/azure-kusto-go/azkustoingest/internal/properties" storageuid "github.com/gofrs/uuid" "github.com/google/uuid" "github.com/kylelemons/godebug/pretty" ) // StatusCode is the ingestion status type StatusCode string //goland:noinspection GoUnusedConst - Part of the API const ( // Pending status represents a temporary status. // Might change during the course of ingestion based on the // outcome of the data ingestion operation into Kusto. Pending StatusCode = "Pending" // Succeeded status represents a permanent status. // The data has been successfully ingested to Kusto. Succeeded StatusCode = "Succeeded" // Failed Status represents a permanent status. // The data has not been ingested to Kusto. Failed StatusCode = "Failed" // Queued status represents a permanent status. // The data has been queued for ingestion & status tracking was not requested. // (This does not indicate that the ingestion was successful.) Queued StatusCode = "Queued" // Skipped status represents a permanent status. // No data was supplied for ingestion. The ingest operation was skipped. Skipped StatusCode = "Skipped" // PartiallySucceeded status represents a permanent status. // Part of the data was successfully ingested to Kusto, while other parts failed. PartiallySucceeded StatusCode = "PartiallySucceeded" // StatusRetrievalFailed means the client ran into truble reading the status from the service StatusRetrievalFailed StatusCode = "StatusRetrievalFailed" // StatusRetrievalCanceled means the user canceld the status check StatusRetrievalCanceled StatusCode = "StatusRetrievalCanceled" ) // IsFinal returns true if the ingestion status is a final status, or false if the status is temporary func (i StatusCode) IsFinal() bool { return i != Pending } // IsSuccess returns true if the status code is a final successfull status code func (i StatusCode) IsSuccess() bool { switch i { case Succeeded, Queued: return true default: return false } } // FailureStatusCode indicates the status of failed ingestion attempts type FailureStatusCode string const ( // Unknown represents an undefined or unset failure state Unknown FailureStatusCode = "Unknown" // Permanent represnets failure state that will benefit from a retry attempt Permanent FailureStatusCode = "Permanent" // Transient represnet a retryable failure state Transient FailureStatusCode = "Transient" // Exhausted represents a retryable failure that has exhusted all retry attempts Exhausted FailureStatusCode = "Exhausted" ) // IsRetryable indicates whether there's any merit in retying ingestion func (i FailureStatusCode) IsRetryable() bool { switch i { case Transient, Exhausted: return true default: return false } } // statusRecord is a record containing information regarding the status of an ingestion command type statusRecord struct { // Status is The ingestion status returned from the service. Status remains 'Pending' during the ingestion process and // is updated by the service once the ingestion completes. When <see cref="IngestionReportMethod"/> is set to 'Queue', the ingestion status // will always be 'Queued' and the caller needs to query the reports queues for ingestion status, as configured. To query statuses that were // reported to queue, see: <see href="https://docs.microsoft.com/en-us/azure/kusto/api/netfx/kusto-ingest-client-status#ingestion-status-in-azure-queue"/>. // When <see cref="IngestionReportMethod"/> is set to 'Table', call <see cref="IKustoIngestionResult.GetIngestionStatusBySourceId"/> or // <see cref="IKustoIngestionResult.GetIngestionStatusCollection"/> to retrieve the most recent ingestion status. Status StatusCode // IngestionSourceID is a unique identifier representing the ingested source. It can be supplied during the ingestion execution. IngestionSourceID uuid.UUID // IngestionSourcePath is the URI of the blob, potentially including the secret needed to access // the blob. This can be a filesystem URI (on-premises deployments only), // or an Azure Blob Storage URI (including a SAS key or a semicolon followed by the account key). IngestionSourcePath string // Database is the name of the database holding the target table. Database string // Table is the name of the target table into which the data will be ingested. Table string // UpdatedOn is the last updated time of the ingestion status. UpdatedOn time.Time // OperationID is the ingestion's operation ID. OperationID uuid.UUID // ActivityID is the ingestion's activity ID. ActivityID uuid.UUID // ErrorCode In case of a failure, indicates the failure's error code. ErrorCode string // FailureStatus - In case of a failure, indicates the failure's status. FailureStatus FailureStatusCode // Details is a human readable description of the error added in case of a failure. Details string // OriginatesFromUpdatePolicy indicates whether or not the failure originated from an Update Policy, in case of a failure. OriginatesFromUpdatePolicy bool } const ( undefinedString = "Undefined" unknownString = "Unknown" ) // newStatusRecord creates a new record initialized with defaults. func newStatusRecord() statusRecord { rec := statusRecord{ Status: Failed, IngestionSourceID: uuid.Nil, IngestionSourcePath: undefinedString, Database: undefinedString, Table: undefinedString, UpdatedOn: time.Now(), OperationID: uuid.Nil, ActivityID: uuid.Nil, ErrorCode: unknownString, FailureStatus: Unknown, Details: "", OriginatesFromUpdatePolicy: false, } return rec } // FromProps takes in data from ingestion options. func (r *statusRecord) FromProps(props properties.All) { r.IngestionSourceID = props.Source.ID r.Database = props.Ingestion.DatabaseName r.Table = props.Ingestion.TableName r.UpdatedOn = time.Now() if props.Ingestion.BlobPath != "" && r.IngestionSourcePath == undefinedString { r.IngestionSourcePath = properties.RemoveQueryParamsFromUrl(props.Ingestion.BlobPath) } } // FromMap converts an ingestion status record to a key value map. func (r *statusRecord) FromMap(data map[string]interface{}) { strStatus := safeGetString(data, "Status") if len(strStatus) > 0 { r.Status = StatusCode(strStatus) } strStatus = safeGetString(data, "FailureStatus") if len(strStatus) > 0 { r.FailureStatus = FailureStatusCode(strStatus) } r.IngestionSourcePath = properties.RemoveQueryParamsFromUrl(safeGetString(data, "IngestionSourcePath")) r.Database = safeGetString(data, "Database") r.Table = safeGetString(data, "Table") r.ErrorCode = safeGetString(data, "ErrorCode") r.Details = safeGetString(data, "Details") r.IngestionSourceID = getGoogleUUIDFromInterface(data, "IngestionSourceId") r.OperationID = getGoogleUUIDFromInterface(data, "OperationId") r.ActivityID = getGoogleUUIDFromInterface(data, "ActivityId") if data["UpdatedOn"] != nil { if t, err := getTimeFromInterface(data["UpdatedOn"]); err == nil { r.UpdatedOn = t } } if data["OriginatesFromUpdatePolicy"] != nil { if b, ok := data["OriginatesFromUpdatePolicy"].(bool); ok { r.OriginatesFromUpdatePolicy = b } } } // StatusFromMapForTests converts an ingestion status record to a key value map. This is useful for comparison in tests. func StatusFromMapForTests(data map[string]interface{}) error { r := newStatusRecord() r.FromMap(data) return r } // ToMap converts an ingestion status record to a key value map. func (r *statusRecord) ToMap() map[string]interface{} { data := make(map[string]interface{}) // Since we only create the initial record, It's not our responsibility to write the following fields: // OperationID, AcitivityID, ErrorCode, FailureStatus, Details, OriginatesFromUpdatePolicy // Those will be read from the server if they have data in them data["Status"] = r.Status data["IngestionSourceId"] = r.IngestionSourceID data["IngestionSourcePath"] = properties.RemoveQueryParamsFromUrl(r.IngestionSourcePath) data["Database"] = r.Database data["Table"] = r.Table data["UpdatedOn"] = r.UpdatedOn.Format(time.RFC3339Nano) return data } // String implements fmt.Stringer. func (r *statusRecord) String() string { return pretty.Sprint(r) } // Error converts an ingestion status to a string. Since we only provide the record in case of an error, the success branches will never be called. func (r statusRecord) Error() string { switch r.Status { case Succeeded: return fmt.Sprintf("Ingestion succeeded\n" + r.String()) case Queued: return fmt.Sprintf("Ingestion Queued\n" + r.String()) case PartiallySucceeded: return fmt.Sprintf("Ingestion succeeded partially\n" + r.String()) default: return fmt.Sprintf("Ingestion Failed\n" + r.String()) } } func getTimeFromInterface(x interface{}) (time.Time, error) { switch x := x.(type) { case string: return time.Parse(time.RFC3339Nano, x) case time.Time: return x, nil default: return time.Now(), fmt.Errorf("getTimeFromInterface: Unexpected format %T", x) } } func getGoogleUUIDFromInterface(data map[string]interface{}, key string) uuid.UUID { x := data[key] if x == nil { return uuid.Nil } switch x := x.(type) { case uuid.UUID: return x case string: uid, err := uuid.Parse(x) if err != nil { return uuid.Nil } return uid case storageuid.UUID: uid, err := uuid.ParseBytes(x.Bytes()) if err != nil { return uuid.Nil } return uid default: return uuid.Nil } } func safeGetString(data map[string]interface{}, key string) string { if v := data[key]; v != nil { if s, ok := v.(string); ok { return s } } return "" }