azkustoingest/internal/queued/queued.go (363 lines of code) (raw):

// Package filesystem provides a client with the ability to import data into Kusto via a variety of fileystems // such as local storage or blobstore. package queued import ( "context" "fmt" "github.com/Azure/azure-kusto-go/azkustoingest/ingestoptions" "github.com/Azure/azure-kusto-go/azkustoingest/internal/utils" "io" "net/http" "net/url" "os" "path/filepath" "time" "github.com/Azure/azure-kusto-go/azkustodata/errors" "github.com/Azure/azure-kusto-go/azkustoingest/internal/gzip" "github.com/Azure/azure-kusto-go/azkustoingest/internal/properties" "github.com/Azure/azure-kusto-go/azkustoingest/internal/resources" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" "github.com/google/uuid" ) const ( _1MiB = 1024 * 1024 // The numbers below are magic numbers. They were derived from doing Azure to Azure tests of azcopy for various file sizes // to prove that changes weren't going to make azcopy slower. It was found that multiplying azcopy's concurrency by 10x (to 50) // made a 5x improvement in speed. We don't have any numbers from the service side to give us numbers we should use, so this // is our best guess from observation. DO NOT CHANGE UNLESS YOU KNOW BETTER. BlockSize = 8 * _1MiB Concurrency = 50 StorageMaxRetryPolicy = 3 ) // Queued provides methods for taking data from various sources and ingesting it into Kusto using queued ingestion. type Queued interface { io.Closer Local(ctx context.Context, from string, props properties.All) error Reader(ctx context.Context, reader io.Reader, props properties.All) (string, error) Blob(ctx context.Context, from string, fileSize int64, props properties.All) error } // uploadStream provides a type that mimics `azblob.UploadStream` to allow fakes for testing. type uploadStream func(context.Context, io.Reader, *azblob.Client, string, string, *azblob.UploadStreamOptions) (azblob.UploadStreamResponse, error) // uploadBlob provides a type that mimics `azblob.UploadFile` to allow fakes for test type uploadBlob func(context.Context, *os.File, *azblob.Client, string, string, *azblob.UploadFileOptions) (azblob.UploadFileResponse, error) // Ingestion provides methods for taking data from a filesystem of some type and ingesting it into Kusto. // This object is scoped for a single database and table. type Ingestion struct { http *http.Client db string table string mgr *resources.Manager uploadStream uploadStream uploadBlob uploadBlob bufferSize int maxBuffers int applicationForTracing string clientVersionForTracing string } // Option is an optional argument to New(). type Option func(s *Ingestion) // WithStaticBuffer sets a static buffer with a buffer size and max amount of buffers for uploading blobs to kusto. func WithStaticBuffer(bufferSize int, maxBuffers int) Option { return func(s *Ingestion) { s.bufferSize = bufferSize s.maxBuffers = maxBuffers } } // New is the constructor for Ingestion. func New(db, table string, mgr *resources.Manager, http *http.Client, applicationForTracing string, clientVersionForTracing string, options ...Option) (*Ingestion, error) { i := &Ingestion{ db: db, table: table, mgr: mgr, http: http, uploadStream: func(ctx context.Context, reader io.Reader, client *azblob.Client, container, blob string, options *azblob.UploadStreamOptions) (azblob.UploadStreamResponse, error) { return client.UploadStream(ctx, container, blob, reader, options) }, uploadBlob: func(ctx context.Context, file *os.File, client *azblob.Client, container, blob string, options *azblob.UploadFileOptions) (azblob.UploadFileResponse, error) { return client.UploadFile(ctx, container, blob, file, options) }, applicationForTracing: applicationForTracing, clientVersionForTracing: clientVersionForTracing, } for _, opt := range options { opt(i) } return i, nil } // Local ingests a local file into Kusto. func (i *Ingestion) Local(ctx context.Context, from string, props properties.All) error { containers, err := i.mgr.GetRankedStorageContainers() if err != nil { return err } if len(containers) == 0 { return errors.ES( errors.OpFileIngest, errors.KBlobstore, "no Blob Storage container resources are defined, there is no container to upload to", ).SetNoRetry() } queues, err := i.mgr.GetRankedStorageQueues() if err != nil { return err } // We want to check the queue size here so we don't upload a file and then find we don't have a Kusto queue to stick // it in. If we don't have a container, that is handled by containerQueue(). if len(queues) == 0 { return errors.ES(errors.OpFileIngest, errors.KBlobstore, "no Kusto queue resources are defined, there is no queue to upload to").SetNoRetry() } // Go over all the containers and try to upload the file to each one. If we succeed, we are done. for attempts, containerUri := range containers { if attempts >= StorageMaxRetryPolicy { return errors.ES(errors.OpFileIngest, errors.KBlobstore, "max retry policy reached").SetNoRetry() } client, containerName, err := i.upstreamContainer(containerUri) if err != nil { i.mgr.ReportStorageResourceResult(containerUri.Account(), false) continue } blobURL, size, err := i.localToBlob(ctx, from, client, containerName, &props) if err == nil { i.mgr.ReportStorageResourceResult(containerUri.Account(), true) return i.Blob(ctx, blobURL, size, props) } // check if the error is retryable if errors.Retry(err) { i.mgr.ReportStorageResourceResult(containerUri.Account(), false) continue } else { return err } } return errors.ES(errors.OpFileIngest, errors.KBlobstore, "could not upload file to any container") } // Reader uploads a file via an io.Reader. // If the function succeeds, it returns the path of the created blob. func (i *Ingestion) Reader(ctx context.Context, reader io.Reader, props properties.All) (string, error) { containers, err := i.mgr.GetRankedStorageContainers() if err != nil { return "", err } if len(containers) == 0 { return "", errors.ES( errors.OpFileIngest, errors.KBlobstore, "no Blob Storage container resources are defined, there is no container to upload to", ).SetNoRetry() } queues, err := i.mgr.GetRankedStorageQueues() if err != nil { return "", err } // We want to check the queue size here so so we don't upload a file and then find we don't have a Kusto queue to stick // it in. If we don't have a container, that is handled by containerQueue(). if len(queues) == 0 { return "", errors.ES(errors.OpFileIngest, errors.KBlobstore, "no Kusto queue resources are defined, there is no queue to upload to").SetNoRetry() } compression := utils.CompressionDiscovery(props.Source.OriginalSource) shouldCompress := ShouldCompress(&props, compression) blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(props.Source.OriginalSource), compression, shouldCompress, props.Ingestion.Additional.Format.String()) size := int64(0) if shouldCompress { reader = gzip.Compress(reader) } // Go over all the containers and try to upload the file to each one. If we succeed, we are done. for attempts, containerUri := range containers { if attempts >= StorageMaxRetryPolicy { return "", errors.ES(errors.OpFileIngest, errors.KBlobstore, "max retry policy reached").SetNoRetry() } client, containerName, err := i.upstreamContainer(containerUri) if err != nil { i.mgr.ReportStorageResourceResult(containerUri.Account(), false) continue } _, err = i.uploadStream( ctx, reader, client, containerName, blobName, &azblob.UploadStreamOptions{BlockSize: int64(i.bufferSize), Concurrency: i.maxBuffers}, ) if err != nil { i.mgr.ReportStorageResourceResult(containerUri.Account(), false) continue } i.mgr.ReportStorageResourceResult(containerUri.Account(), true) if gz, ok := reader.(*gzip.Streamer); ok { size = gz.InputSize() } err = i.Blob(ctx, fullUrl(client, containerName, blobName), size, props) return blobName, err } return blobName, errors.ES(errors.OpFileIngest, errors.KBlobstore, "problem uploading to Blob Storage") } // Blob ingests a file from Azure Blob Storage into Kusto. func (i *Ingestion) Blob(ctx context.Context, from string, fileSize int64, props properties.All) error { // To learn more about ingestion properties, go to: // https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/#ingestion-properties // To learn more about ingestion methods go to: // https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods props.Ingestion.BlobPath = from if fileSize != 0 { props.Ingestion.RawDataSize = fileSize } props.Ingestion.RetainBlobOnSuccess = !props.Source.DeleteLocalSource props.Ingestion.ApplicationForTracing = i.applicationForTracing props.Ingestion.ClientVersionForTracing = i.clientVersionForTracing err := CompleteFormatFromFileName(&props, from) if err != nil { return err } j, err := props.Ingestion.MarshalJSONString() if err != nil { return errors.ES(errors.OpFileIngest, errors.KInternal, "could not marshal the ingestion blob info: %s", err).SetNoRetry() } queueResources, err := i.mgr.GetRankedStorageQueues() if err != nil { return err } // Go over all the queues and try to upload the file to each one. If we succeed, we are done. for attempts, queueUri := range queueResources { if attempts >= StorageMaxRetryPolicy { return errors.ES(errors.OpFileIngest, errors.KBlobstore, "max retry policy reached").SetNoRetry() } queue, err := i.upstreamQueue(queueUri) if err != nil { i.mgr.ReportStorageResourceResult(queueUri.Account(), false) continue } if _, err := queue.EnqueueMessage(ctx, j, nil); err != nil { i.mgr.ReportStorageResourceResult(queueUri.Account(), false) continue } else { i.mgr.ReportStorageResourceResult(queueUri.Account(), true) return props.ApplyDeleteLocalSourceOption() } } return errors.ES(errors.OpFileIngest, errors.KBlobstore, "could not upload file to any queue") } func CompleteFormatFromFileName(props *properties.All, from string) error { // If they did not tell us how the file was encoded, try to discover it from the file extension. if props.Ingestion.Additional.Format != properties.DFUnknown { return nil } et := properties.DataFormatDiscovery(from) if et == properties.DFUnknown { // If we can't figure out the file type, default to CSV. et = properties.CSV } props.Ingestion.Additional.Format = et return nil } func (i *Ingestion) upstreamContainer(resourceUri *resources.URI) (*azblob.Client, string, error) { storageUrl := resourceUri.URL() serviceURL := fmt.Sprintf("%s://%s?%s", storageUrl.Scheme, storageUrl.Host, resourceUri.SAS().Encode()) client, err := azblob.NewClientWithNoCredential(serviceURL, &azblob.ClientOptions{ ClientOptions: azcore.ClientOptions{ Transport: i.http, }, }) if err != nil { return nil, "", errors.E(errors.OpFileIngest, errors.KBlobstore, err) } return client, resourceUri.ObjectName(), nil } func (i *Ingestion) upstreamQueue(resourceUri *resources.URI) (*azqueue.QueueClient, error) { queueUrl := resourceUri.URL() serviceUrl := fmt.Sprintf("%s://%s?%s", queueUrl.Scheme, queueUrl.Host, resourceUri.SAS().Encode()) service, err := azqueue.NewServiceClientWithNoCredential(serviceUrl, &azqueue.ClientOptions{ ClientOptions: azcore.ClientOptions{ Transport: i.http, }, }) if err != nil { return nil, errors.E(errors.OpFileIngest, errors.KBlobstore, err) } return service.NewQueueClient(resourceUri.ObjectName()), nil } var nower = time.Now // localToBlob copies from a local to an Azure Blobstore blob. It returns the URL of the Blob, the local file info and an // error if there was one. func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob.Client, container string, props *properties.All) (string, int64, error) { compression := utils.CompressionDiscovery(from) shouldCompress := ShouldCompress(props, compression) blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(from), compression, shouldCompress, props.Ingestion.Additional.Format.String()) file, err := os.Open(from) if err != nil { return "", 0, errors.ES( errors.OpFileIngest, errors.KLocalFileSystem, "problem retrieving source file %q: %s", from, err, ).SetNoRetry() } defer file.Close() stat, err := file.Stat() if err != nil { return "", 0, errors.ES( errors.OpFileIngest, errors.KLocalFileSystem, "could not Stat the file(%s): %s", from, err, ).SetNoRetry() } if shouldCompress { gstream := gzip.New() gstream.Reset(file) _, err = i.uploadStream( ctx, gstream, client, container, blobName, &azblob.UploadStreamOptions{BlockSize: int64(i.bufferSize), Concurrency: i.maxBuffers}, ) if err != nil { return "", 0, errors.ES(errors.OpFileIngest, errors.KBlobstore, "problem uploading to Blob Storage: %s", err) } return fullUrl(client, container, blobName), gstream.InputSize(), nil } // The high-level API UploadFileToBlockBlob function uploads blocks in parallel for optimal performance, and can handle large files as well. // This function calls StageBlock/CommitBlockList for files larger 256 MBs, and calls Upload for any file smaller _, err = i.uploadBlob( ctx, file, client, container, blobName, &azblob.UploadFileOptions{ BlockSize: BlockSize, Concurrency: Concurrency, }, ) if err != nil { return "", 0, errors.ES(errors.OpFileIngest, errors.KBlobstore, "problem uploading to Blob Storage: %s", err) } return fullUrl(client, container, blobName), stat.Size(), nil } func GenBlobName(databaseName string, tableName string, time time.Time, guid string, fileName string, compressionFileExtension ingestoptions.CompressionType, shouldCompress bool, dataFormat string) string { extension := "gz" if !shouldCompress { if compressionFileExtension == ingestoptions.CTNone { extension = dataFormat } else { extension = compressionFileExtension.String() } extension = dataFormat } blobName := fmt.Sprintf("%s_%s_%s_%s_%s.%s", databaseName, tableName, time, guid, fileName, extension) return blobName } // Do not compress if user specified in DontCompress or CompressionType, // if the file extension shows compression, or if the format is binary. func ShouldCompress(props *properties.All, compressionFileExtension ingestoptions.CompressionType) bool { if props.Source.DontCompress { return false } if props.Source.CompressionType != ingestoptions.CTUnknown { if props.Source.CompressionType != ingestoptions.CTNone { return false } } else { if compressionFileExtension != ingestoptions.CTUnknown && compressionFileExtension != ingestoptions.CTNone { return false } } return props.Ingestion.Additional.Format.ShouldCompress() } // This allows mocking the stat func later on var statFunc = os.Stat // IsLocalPath detects whether a path points to a file system accessiable file // If this file requires another protocol http protocol it will return false // If the file requires another protocol(ftp, https, etc) it will return an error func IsLocalPath(s string) (bool, error) { u, err := url.Parse(s) if err == nil { switch u.Scheme { // With this we know it SHOULD be a blobstore path. It might not be, but I think that is a fine assumption to make. case "http", "https": return false, nil } } // By this point, we know its not blobstore, so it needs to be something that gets resolved to a file. // So we are going to Stat() the file and see if it exists and is not a directory. // In your tests, this would fail "file://" which we don't support. Also, because of this method, your tests // are going to be broken. Again, fileystems, blah.... stat, err := statFunc(s) if err != nil { return false, fmt.Errorf("It is not a valid local file path (could not stat file) and not a valid blob path") } if stat.IsDir() { return false, fmt.Errorf("path is a local directory and not a valid file") } return true, nil } func fullUrl(client *azblob.Client, container string, blob string) string { parseURL, err := azblob.ParseURL(client.URL()) if err != nil { return "" } parseURL.ContainerName = container parseURL.BlobName = blob return parseURL.String() } func (i *Ingestion) Close() error { i.mgr.Close() return nil }