func()

in azkustoingest/ingest.go [83:135]


func (i *Ingestion) prepForIngestion(ctx context.Context, options []FileOption, props properties.All, source SourceScope) (*Result, properties.All, error) {
	result := newResult()

	auth, err := i.mgr.AuthContext(ctx)
	if err != nil {
		return nil, properties.All{}, err
	}

	props.Ingestion.Additional.AuthContext = auth

	for _, o := range options {
		if err := o.Run(&props, QueuedClient, source); err != nil {
			return nil, properties.All{}, err
		}
	}

	if source == FromReader && props.Ingestion.Additional.Format == DFUnknown {
		props.Ingestion.Additional.Format = CSV
	}

	if props.Ingestion.Additional.IngestionMappingType != DFUnknown && props.Ingestion.Additional.Format.MappingKind() != props.Ingestion.Additional.IngestionMappingType {
		return nil, properties.All{}, errors.ES(
			errors.OpUnknown,
			errors.KClientArgs,
			"format and ingestion mapping type must match (hint: using ingestion mapping sets the format automatically)",
		).SetNoRetry()
	}

	if props.Ingestion.ReportLevel != properties.None {
		if props.Source.ID == uuid.Nil {
			props.Source.ID = uuid.New()
		}

		switch props.Ingestion.ReportMethod {
		case properties.ReportStatusToTable, properties.ReportStatusToQueueAndTable:
			tableResources, err := i.mgr.GetTables()
			if err != nil {
				return nil, properties.All{}, err
			}

			if len(tableResources) == 0 {
				return nil, properties.All{}, fmt.Errorf("User requested reporting status to table, yet status table resource URI is not found")
			}

			props.Ingestion.TableEntryRef.TableConnectionString = tableResources[0].URL().String()
			props.Ingestion.TableEntryRef.PartitionKey = props.Source.ID.String()
			props.Ingestion.TableEntryRef.RowKey = uuid.Nil.String()
		}
	}

	result.putProps(props)
	return result, props, nil
}