in go-example-adaptive-batching-extension/agent/http.go [125:210]
func (a HttpAgent) Init(agentID string) error {
extensions_api_address, ok := os.LookupEnv("AWS_LAMBDA_RUNTIME_API")
if !ok {
return errors.New("AWS_LAMBDA_RUNTIME_API is not set")
}
logsApiBaseUrl := fmt.Sprintf("http://%s", extensions_api_address)
logsApiClient, err := logsapi.NewClient(logsApiBaseUrl)
if err != nil {
return err
}
_, err = a.listener.Start()
if err != nil {
return err
}
// Read environment variable ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES
inputJson := os.Getenv("ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES")
inputJsonBytes := []byte(inputJson)
var eventTypes []logsapi.EventType
// No Json included
if inputJson == "" {
// Hold defaults
eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
httpLogger.Info("ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES not included, subscribing to default log types")
} else if !json.Valid(inputJsonBytes) {
// Invalid JSON provided
eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
httpLogger.Info("ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES includes invalid JSON, subscribing to default log types")
} else {
// Unmarshal json into structure
var jsonArray []logsapi.EventType
err = json.Unmarshal(inputJsonBytes, &jsonArray)
if err != nil {
// Error unmarshaling json
eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
httpLogger.Info("Unable to unmarshal json from ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES, subscribing to default log types")
}
// If array is empty, use default values
if len(jsonArray) == 0 {
eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
httpLogger.Info("LogTypes in ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES does not include any elements, subscribing to default log types")
}
// loop through elements, and check if required elements are included
for _, logType := range jsonArray {
switch logType {
case logsapi.Platform:
eventTypes = append(eventTypes, logsapi.Platform)
case logsapi.Function:
eventTypes = append(eventTypes, logsapi.Function)
case logsapi.Extension:
eventTypes = append(eventTypes, logsapi.Extension)
default:
httpLogger.Info("Log type ", logType, " is not valid. Not including")
}
}
}
bufferingCfg := logsapi.BufferingCfg{
MaxItems: 1000,
MaxBytes: 262144,
TimeoutMS: 25,
}
if err != nil {
return err
}
destination := logsapi.Destination{
Protocol: logsapi.HttpProto,
URI: logsapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s", DefaultHttpListenerPort)),
HttpMethod: logsapi.HttpPost,
Encoding: logsapi.JSON,
}
_, err = logsApiClient.Subscribe(eventTypes, bufferingCfg, destination, agentID)
return err
}