func InvokeHandler()

in cmd/aws-lambda-rie/handlers.go [76:190]


func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs interop.Bootstrap) {
	log.Debugf("invoke: -> %s %s %v", r.Method, r.URL, r.Header)
	bodyBytes, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Errorf("Failed to read invoke body: %s", err)
		w.WriteHeader(500)
		return
	}

	rawClientContext, err := base64.StdEncoding.DecodeString(r.Header.Get("X-Amz-Client-Context"))
	if err != nil {
		log.Errorf("Failed to decode X-Amz-Client-Context: %s", err)
		w.WriteHeader(500)
		return
	}

	initDuration := ""
	inv := GetenvWithDefault("AWS_LAMBDA_FUNCTION_TIMEOUT", "300")
	timeoutDuration, _ := time.ParseDuration(inv + "s")
	// Default
	timeout, err := strconv.ParseInt(inv, 10, 64)
	if err != nil {
		panic(err)
	}

	functionVersion := GetenvWithDefault("AWS_LAMBDA_FUNCTION_VERSION", "$LATEST")
	memorySize := GetenvWithDefault("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "3008")

	if !initDone {

		initStart, initEnd := InitHandler(sandbox, functionVersion, timeout, bs)

		// Calculate InitDuration
		initTimeMS := math.Min(float64(initEnd.Sub(initStart).Nanoseconds()),
			float64(timeoutDuration.Nanoseconds())) / float64(time.Millisecond)

		initDuration = fmt.Sprintf("Init Duration: %.2f ms\t", initTimeMS)

		// Set initDone so next invokes do not try to Init the function again
		initDone = true
	}

	invokeStart := time.Now()
	invokePayload := &interop.Invoke{
		ID:                 uuid.New().String(),
		InvokedFunctionArn: fmt.Sprintf("arn:aws:lambda:us-east-1:012345678912:function:%s", GetenvWithDefault("AWS_LAMBDA_FUNCTION_NAME", "test_function")),
		TraceID:            r.Header.Get("X-Amzn-Trace-Id"),
		LambdaSegmentID:    r.Header.Get("X-Amzn-Segment-Id"),
		Payload:            bytes.NewReader(bodyBytes),
		ClientContext:      string(rawClientContext),
	}
	fmt.Println("START RequestId: " + invokePayload.ID + " Version: " + functionVersion)

	// If we write to 'w' directly and waitUntilRelease fails, we won't be able to propagate error anymore
	invokeResp := &ResponseWriterProxy{}
	if err := sandbox.Invoke(invokeResp, invokePayload); err != nil {
		switch err {

		// Reserve errors:
		case rapidcore.ErrAlreadyReserved:
			log.Errorf("Failed to reserve: %s", err)
			w.WriteHeader(http.StatusBadRequest)
			return
		case rapidcore.ErrInternalServerError:
			w.WriteHeader(http.StatusInternalServerError)
			return
		case rapidcore.ErrInitDoneFailed:
			w.WriteHeader(http.StatusBadGateway)
			w.Write(invokeResp.Body)
			return
		case rapidcore.ErrReserveReservationDone:
			// TODO use http.StatusBadGateway
			w.WriteHeader(http.StatusGatewayTimeout)
			return

		// Invoke errors:
		case rapidcore.ErrNotReserved:
		case rapidcore.ErrAlreadyReplied:
		case rapidcore.ErrAlreadyInvocating:
			log.Errorf("Failed to set reply stream: %s", err)
			w.WriteHeader(http.StatusBadRequest)
			return
		case rapidcore.ErrInvokeReservationDone:
			// TODO use http.StatusBadGateway
			w.WriteHeader(http.StatusGatewayTimeout)
			return
		case rapidcore.ErrInvokeResponseAlreadyWritten:
			return
		// AwaitRelease errors:
		case rapidcore.ErrInvokeDoneFailed:
			w.WriteHeader(http.StatusBadGateway)
			w.Write(invokeResp.Body)
			return
		case rapidcore.ErrReleaseReservationDone:
			// TODO return sandbox status when we implement async reset handling
			// TODO use http.StatusOK
			w.WriteHeader(http.StatusGatewayTimeout)
			return
		case rapidcore.ErrInvokeTimeout:
			printEndReports(invokePayload.ID, initDuration, memorySize, invokeStart, timeoutDuration)

			w.Write([]byte(fmt.Sprintf("Task timed out after %d.00 seconds", timeout)))
			time.Sleep(100 * time.Millisecond)
			//initDone = false
			return
		}
	}

	printEndReports(invokePayload.ID, initDuration, memorySize, invokeStart, timeoutDuration)

	if invokeResp.StatusCode != 0 {
		w.WriteHeader(invokeResp.StatusCode)
	}
	w.Write(invokeResp.Body)
}