in lambda/core/directinvoke/directinvoke.go [95:212]
func ReceiveDirectInvoke(w http.ResponseWriter, r *http.Request, token interop.Token) (*interop.Invoke, error) {
log.Infof("Received Invoke(invokeID: %s) Request", token.InvokeID)
w.Header().Set("Trailer", EndOfResponseTrailer)
custHeaders := CustomerHeaders{}
if err := custHeaders.Load(r.Header.Get(CustomerHeadersHeader)); err != nil {
renderBadRequest(w, r, interop.ErrMalformedCustomerHeaders.Error())
return nil, interop.ErrMalformedCustomerHeaders
}
now := metering.Monotime()
MaxDirectResponseSize = interop.MaxPayloadSize
if maxPayloadSize := r.Header.Get(MaxPayloadSizeHeader); maxPayloadSize != "" {
if n, err := strconv.ParseInt(maxPayloadSize, 10, 64); err == nil && n >= -1 {
MaxDirectResponseSize = n
} else {
log.Error("MaxPayloadSize header is not a valid number")
renderBadRequest(w, r, interop.ErrInvalidMaxPayloadSize.Error())
return nil, interop.ErrInvalidMaxPayloadSize
}
}
if valueFromHeader := r.Header.Get(InvokeResponseModeHeader); valueFromHeader != "" {
invokeResponseMode, err := convertToInvokeResponseMode(valueFromHeader)
if err != nil {
log.Errorf(
"InvokeResponseMode header is not a valid string. Was: %#v, Allowed: %#v.",
valueFromHeader,
strings.Join(interop.AllInvokeResponseModes, ", "),
)
renderBadRequest(w, r, err.Error())
return nil, err
}
InvokeResponseMode = invokeResponseMode
}
// TODO: stop using `MaxDirectResponseSize`
if isStreamingInvoke(int(MaxDirectResponseSize), InvokeResponseMode) {
w.Header().Add("Trailer", FunctionErrorTypeTrailer)
w.Header().Add("Trailer", FunctionErrorBodyTrailer)
// FIXME
// Until WorkerProxy stops sending MaxDirectResponseSize == -1 to identify streaming
// invokes, we need to override InvokeResponseMode to avoid setting InvokeResponseMode to buffered (default) for a streaming invoke (MaxDirectResponseSize == -1).
InvokeResponseMode = interop.InvokeResponseModeStreaming
ResponseBandwidthRate = interop.ResponseBandwidthRate
if responseBandwidthRate := r.Header.Get(ResponseBandwidthRateHeader); responseBandwidthRate != "" {
if n, err := strconv.ParseInt(responseBandwidthRate, 10, 64); err == nil &&
interop.MinResponseBandwidthRate <= n && n <= interop.MaxResponseBandwidthRate {
ResponseBandwidthRate = n
} else {
log.Error("ResponseBandwidthRate header is not a valid number or is out of the allowed range")
renderBadRequest(w, r, interop.ErrInvalidResponseBandwidthRate.Error())
return nil, interop.ErrInvalidResponseBandwidthRate
}
}
ResponseBandwidthBurstSize = interop.ResponseBandwidthBurstSize
if responseBandwidthBurstSize := r.Header.Get(ResponseBandwidthBurstSizeHeader); responseBandwidthBurstSize != "" {
if n, err := strconv.ParseInt(responseBandwidthBurstSize, 10, 64); err == nil &&
interop.MinResponseBandwidthBurstSize <= n && n <= interop.MaxResponseBandwidthBurstSize {
ResponseBandwidthBurstSize = n
} else {
log.Error("ResponseBandwidthBurstSize header is not a valid number or is out of the allowed range")
renderBadRequest(w, r, interop.ErrInvalidResponseBandwidthBurstSize.Error())
return nil, interop.ErrInvalidResponseBandwidthBurstSize
}
}
}
inv := &interop.Invoke{
ID: r.Header.Get(InvokeIDHeader),
ReservationToken: chi.URLParam(r, "reservationtoken"),
InvokedFunctionArn: r.Header.Get(InvokedFunctionArnHeader),
VersionID: r.Header.Get(VersionIDHeader),
ContentType: r.Header.Get(ContentTypeHeader),
CognitoIdentityID: custHeaders.CognitoIdentityID,
CognitoIdentityPoolID: custHeaders.CognitoIdentityPoolID,
TraceID: token.TraceID,
LambdaSegmentID: token.LambdaSegmentID,
ClientContext: custHeaders.ClientContext,
Payload: r.Body,
DeadlineNs: fmt.Sprintf("%d", now+token.FunctionTimeout.Nanoseconds()),
NeedDebugLogs: token.NeedDebugLogs,
InvokeReceivedTime: now,
InvokeResponseMode: InvokeResponseMode,
RestoreDurationNs: token.RestoreDurationNs,
RestoreStartTimeMonotime: token.RestoreStartTimeMonotime,
}
if inv.ID != token.InvokeID {
renderBadRequest(w, r, interop.ErrInvalidInvokeID.Error())
return nil, interop.ErrInvalidInvokeID
}
if inv.ReservationToken != token.ReservationToken {
renderBadRequest(w, r, interop.ErrInvalidReservationToken.Error())
return nil, interop.ErrInvalidReservationToken
}
if inv.VersionID != token.VersionID {
renderBadRequest(w, r, interop.ErrInvalidFunctionVersion.Error())
return nil, interop.ErrInvalidFunctionVersion
}
if now > token.InvackDeadlineNs {
renderBadRequest(w, r, interop.ErrReservationExpired.Error())
return nil, interop.ErrReservationExpired
}
w.Header().Set(VersionIDHeader, token.VersionID)
w.Header().Set(ReservationTokenHeader, token.ReservationToken)
w.Header().Set(InvokeIDHeader, token.InvokeID)
return inv, nil
}