func()

in go-example-adaptive-batching-extension/agent/http.go [125:210]


func (a HttpAgent) Init(agentID string) error {
	extensions_api_address, ok := os.LookupEnv("AWS_LAMBDA_RUNTIME_API")
	if !ok {
		return errors.New("AWS_LAMBDA_RUNTIME_API is not set")
	}

	logsApiBaseUrl := fmt.Sprintf("http://%s", extensions_api_address)

	logsApiClient, err := logsapi.NewClient(logsApiBaseUrl)
	if err != nil {
		return err
	}

	_, err = a.listener.Start()
	if err != nil {
		return err
	}

	// Read environment variable ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES
	inputJson := os.Getenv("ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES")
	inputJsonBytes := []byte(inputJson)

	var eventTypes []logsapi.EventType

	// No Json included
	if inputJson == "" {
		// Hold defaults
		eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
		httpLogger.Info("ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES not included, subscribing to default log types")
	} else if !json.Valid(inputJsonBytes) {
		// Invalid JSON provided
		eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
		httpLogger.Info("ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES includes invalid JSON, subscribing to default log types")
	} else {

		// Unmarshal json into structure
		var jsonArray []logsapi.EventType

		err = json.Unmarshal(inputJsonBytes, &jsonArray)
		if err != nil {
			// Error unmarshaling json
			eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
			httpLogger.Info("Unable to unmarshal json from ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES, subscribing to default log types")
		}

		// If array is empty, use default values
		if len(jsonArray) == 0 {
			eventTypes = append(eventTypes, logsapi.Platform, logsapi.Function)
			httpLogger.Info("LogTypes in ADAPTIVE_BATCHING_EXTENSION_LOG_TYPES does not include any elements, subscribing to default log types")
		}

		// loop through elements, and check if required elements are included

		for _, logType := range jsonArray {
			switch logType {
			case logsapi.Platform:
				eventTypes = append(eventTypes, logsapi.Platform)
			case logsapi.Function:
				eventTypes = append(eventTypes, logsapi.Function)
			case logsapi.Extension:
				eventTypes = append(eventTypes, logsapi.Extension)
			default:
				httpLogger.Info("Log type ", logType, " is not valid. Not including")
			}
		}

	}

	bufferingCfg := logsapi.BufferingCfg{
		MaxItems:  1000,
		MaxBytes:  262144,
		TimeoutMS: 25,
	}
	if err != nil {
		return err
	}
	destination := logsapi.Destination{
		Protocol:   logsapi.HttpProto,
		URI:        logsapi.URI(fmt.Sprintf("http://sandbox.localdomain:%s", DefaultHttpListenerPort)),
		HttpMethod: logsapi.HttpPost,
		Encoding:   logsapi.JSON,
	}

	_, err = logsApiClient.Subscribe(eventTypes, bufferingCfg, destination, agentID)
	return err
}