in server/internal/async/api.go [35:150]
func NewAsync(ctx context.Context, options Options) (*Async, error) {
var err error
var cred azcore.TokenCredential
logger := ctxlogger.GetLogger(ctx)
// logger := log.New(log.NewTextHandler(os.Stdout, nil).WithAttrs(logattrs.GetAttrs()))
// if options.JsonLog {
// logger = log.New(log.NewJSONHandler(os.Stdout, nil).WithAttrs(logattrs.GetAttrs()))
// }
// Use MSI in Standalone E2E env for credential
if options.IdentityResourceID != "" {
resourceID := azidentity.ResourceID(options.IdentityResourceID)
opts := azidentity.ManagedIdentityCredentialOptions{ID: resourceID}
cred, err = azidentity.NewManagedIdentityCredential(&opts)
} else {
cred, err = azidentity.NewDefaultAzureCredential(nil)
}
if err != nil {
log.Error(err.Error())
return nil, err
}
if options.EntityTableName == "" {
logger.Error("No EntityTableName set.")
return nil, errors.New("No EntityTableName set.")
}
if err := sanitizeTableName(options.EntityTableName); err != nil {
logger.Error("Table name is not valid: " + err.Error())
return nil, err
}
var operationContainerClient oc.OperationContainerClient
if options.OperationContainerAddr != "" {
operationContainerClient, err = ocClient.NewClient(options.OperationContainerAddr, interceptor.GetClientInterceptorLogOptions(logger, logattrs.GetAttrs()))
if err != nil {
log.Error("did not connect to operationContainerClient: " + err.Error())
return nil, err
}
}
var serviceBusClient servicebus.ServiceBusClientInterface
if options.ServiceBusHostName != "" {
serviceBusClient, err = servicebus.CreateServiceBusClient(ctx, options.ServiceBusHostName, cred, nil)
if err != nil {
log.Error("Something went wrong creating the service bus client: " + err.Error())
return nil, err
}
}
var receiver servicebus.ReceiverInterface
if options.ServiceBusQueueName != "" {
receiver, err = serviceBusClient.NewServiceBusReceiver(ctx, options.ServiceBusQueueName, nil)
if err != nil {
log.Error("Something went wrong creating the service bus receiver: " + err.Error())
return nil, err
}
}
// Verify that some db information was provided
if options.DatabaseServerUrl == "" && options.DatabaseName == "" && options.DatabaseConnectionString == "" {
logger.Error("No database information provided.")
return nil, errors.New("No database information provided.")
}
// The database is created by the bicep files and deployed in the deployment of service specific resources. The entityTableName
// might not be created yet (since the table is created by the server and async and server should initialize simultaneously)
// but that doesn't matter because if the entityTable hasn't been created, it means that the server hasn't started and async
// should not be receiving any messages through the service bus to process.
var dbClient *sql.DB
if options.DatabaseConnectionString != "" {
dbClient, err = database.NewDbClientWithConnectionString(ctx, options.DatabaseConnectionString)
if err != nil {
logger.Error("Error creating connection pool: " + err.Error())
return nil, err
}
} else if options.DatabaseServerUrl != "" && options.DatabaseName != "" {
dbClient, err = database.NewDbClient(ctx, options.DatabaseServerUrl, options.DatabasePort, options.DatabaseName)
if err != nil {
logger.Error("Error creating connection pool: " + err.Error())
return nil, err
}
}
// Instantiate a matcher. Here we would add all of our operation types.
matcher := opbus.NewMatcher()
lro := &longRunningOperation.LongRunningOperation{}
matcher.Register(operations.LroName, lro)
matcher.RegisterEntity(operations.LroName, longRunningOperation.CreateLroEntityFunc)
entityController, err := NewEntityController(ctx, options, matcher, dbClient)
if err != nil {
log.Error("Something went wrong creating the entity controller: " + err.Error())
return nil, err
}
operationStatusHook := &OperationStatusHook{
dbClient: dbClient,
EntityTableName: options.EntityTableName,
}
hooks := []opbus.BaseOperationHooksInterface{operationStatusHook}
// Add hooks from hooks.go
processor, err := opbus.CreateProcessor(receiver, matcher, operationContainerClient, entityController, nil, nil, nil, hooks)
if err != nil {
return nil, err
}
async := &Async{
Processor: processor,
}
return async, nil
}