in cmd/aws-lambda-rie/handlers.go [76:190]
func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs interop.Bootstrap) {
log.Debugf("invoke: -> %s %s %v", r.Method, r.URL, r.Header)
bodyBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Errorf("Failed to read invoke body: %s", err)
w.WriteHeader(500)
return
}
rawClientContext, err := base64.StdEncoding.DecodeString(r.Header.Get("X-Amz-Client-Context"))
if err != nil {
log.Errorf("Failed to decode X-Amz-Client-Context: %s", err)
w.WriteHeader(500)
return
}
initDuration := ""
inv := GetenvWithDefault("AWS_LAMBDA_FUNCTION_TIMEOUT", "300")
timeoutDuration, _ := time.ParseDuration(inv + "s")
// Default
timeout, err := strconv.ParseInt(inv, 10, 64)
if err != nil {
panic(err)
}
functionVersion := GetenvWithDefault("AWS_LAMBDA_FUNCTION_VERSION", "$LATEST")
memorySize := GetenvWithDefault("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "3008")
if !initDone {
initStart, initEnd := InitHandler(sandbox, functionVersion, timeout, bs)
// Calculate InitDuration
initTimeMS := math.Min(float64(initEnd.Sub(initStart).Nanoseconds()),
float64(timeoutDuration.Nanoseconds())) / float64(time.Millisecond)
initDuration = fmt.Sprintf("Init Duration: %.2f ms\t", initTimeMS)
// Set initDone so next invokes do not try to Init the function again
initDone = true
}
invokeStart := time.Now()
invokePayload := &interop.Invoke{
ID: uuid.New().String(),
InvokedFunctionArn: fmt.Sprintf("arn:aws:lambda:us-east-1:012345678912:function:%s", GetenvWithDefault("AWS_LAMBDA_FUNCTION_NAME", "test_function")),
TraceID: r.Header.Get("X-Amzn-Trace-Id"),
LambdaSegmentID: r.Header.Get("X-Amzn-Segment-Id"),
Payload: bytes.NewReader(bodyBytes),
ClientContext: string(rawClientContext),
}
fmt.Println("START RequestId: " + invokePayload.ID + " Version: " + functionVersion)
// If we write to 'w' directly and waitUntilRelease fails, we won't be able to propagate error anymore
invokeResp := &ResponseWriterProxy{}
if err := sandbox.Invoke(invokeResp, invokePayload); err != nil {
switch err {
// Reserve errors:
case rapidcore.ErrAlreadyReserved:
log.Errorf("Failed to reserve: %s", err)
w.WriteHeader(http.StatusBadRequest)
return
case rapidcore.ErrInternalServerError:
w.WriteHeader(http.StatusInternalServerError)
return
case rapidcore.ErrInitDoneFailed:
w.WriteHeader(http.StatusBadGateway)
w.Write(invokeResp.Body)
return
case rapidcore.ErrReserveReservationDone:
// TODO use http.StatusBadGateway
w.WriteHeader(http.StatusGatewayTimeout)
return
// Invoke errors:
case rapidcore.ErrNotReserved:
case rapidcore.ErrAlreadyReplied:
case rapidcore.ErrAlreadyInvocating:
log.Errorf("Failed to set reply stream: %s", err)
w.WriteHeader(http.StatusBadRequest)
return
case rapidcore.ErrInvokeReservationDone:
// TODO use http.StatusBadGateway
w.WriteHeader(http.StatusGatewayTimeout)
return
case rapidcore.ErrInvokeResponseAlreadyWritten:
return
// AwaitRelease errors:
case rapidcore.ErrInvokeDoneFailed:
w.WriteHeader(http.StatusBadGateway)
w.Write(invokeResp.Body)
return
case rapidcore.ErrReleaseReservationDone:
// TODO return sandbox status when we implement async reset handling
// TODO use http.StatusOK
w.WriteHeader(http.StatusGatewayTimeout)
return
case rapidcore.ErrInvokeTimeout:
printEndReports(invokePayload.ID, initDuration, memorySize, invokeStart, timeoutDuration)
w.Write([]byte(fmt.Sprintf("Task timed out after %d.00 seconds", timeout)))
time.Sleep(100 * time.Millisecond)
//initDone = false
return
}
}
printEndReports(invokePayload.ID, initDuration, memorySize, invokeStart, timeoutDuration)
if invokeResp.StatusCode != 0 {
w.WriteHeader(invokeResp.StatusCode)
}
w.Write(invokeResp.Body)
}