func ReceiveDirectInvoke()

in lambda/core/directinvoke/directinvoke.go [95:212]


func ReceiveDirectInvoke(w http.ResponseWriter, r *http.Request, token interop.Token) (*interop.Invoke, error) {
	log.Infof("Received Invoke(invokeID: %s) Request", token.InvokeID)
	w.Header().Set("Trailer", EndOfResponseTrailer)

	custHeaders := CustomerHeaders{}
	if err := custHeaders.Load(r.Header.Get(CustomerHeadersHeader)); err != nil {
		renderBadRequest(w, r, interop.ErrMalformedCustomerHeaders.Error())
		return nil, interop.ErrMalformedCustomerHeaders
	}

	now := metering.Monotime()

	MaxDirectResponseSize = interop.MaxPayloadSize
	if maxPayloadSize := r.Header.Get(MaxPayloadSizeHeader); maxPayloadSize != "" {
		if n, err := strconv.ParseInt(maxPayloadSize, 10, 64); err == nil && n >= -1 {
			MaxDirectResponseSize = n
		} else {
			log.Error("MaxPayloadSize header is not a valid number")
			renderBadRequest(w, r, interop.ErrInvalidMaxPayloadSize.Error())
			return nil, interop.ErrInvalidMaxPayloadSize
		}
	}

	if valueFromHeader := r.Header.Get(InvokeResponseModeHeader); valueFromHeader != "" {
		invokeResponseMode, err := convertToInvokeResponseMode(valueFromHeader)
		if err != nil {
			log.Errorf(
				"InvokeResponseMode header is not a valid string. Was: %#v, Allowed: %#v.",
				valueFromHeader,
				strings.Join(interop.AllInvokeResponseModes, ", "),
			)
			renderBadRequest(w, r, err.Error())
			return nil, err
		}
		InvokeResponseMode = invokeResponseMode
	}

	// TODO: stop using `MaxDirectResponseSize`
	if isStreamingInvoke(int(MaxDirectResponseSize), InvokeResponseMode) {
		w.Header().Add("Trailer", FunctionErrorTypeTrailer)
		w.Header().Add("Trailer", FunctionErrorBodyTrailer)

		// FIXME
		// Until WorkerProxy stops sending MaxDirectResponseSize == -1 to identify streaming
		// invokes, we need to override InvokeResponseMode to avoid setting InvokeResponseMode to buffered (default) for a streaming invoke (MaxDirectResponseSize == -1).
		InvokeResponseMode = interop.InvokeResponseModeStreaming

		ResponseBandwidthRate = interop.ResponseBandwidthRate
		if responseBandwidthRate := r.Header.Get(ResponseBandwidthRateHeader); responseBandwidthRate != "" {
			if n, err := strconv.ParseInt(responseBandwidthRate, 10, 64); err == nil &&
				interop.MinResponseBandwidthRate <= n && n <= interop.MaxResponseBandwidthRate {
				ResponseBandwidthRate = n
			} else {
				log.Error("ResponseBandwidthRate header is not a valid number or is out of the allowed range")
				renderBadRequest(w, r, interop.ErrInvalidResponseBandwidthRate.Error())
				return nil, interop.ErrInvalidResponseBandwidthRate
			}
		}

		ResponseBandwidthBurstSize = interop.ResponseBandwidthBurstSize
		if responseBandwidthBurstSize := r.Header.Get(ResponseBandwidthBurstSizeHeader); responseBandwidthBurstSize != "" {
			if n, err := strconv.ParseInt(responseBandwidthBurstSize, 10, 64); err == nil &&
				interop.MinResponseBandwidthBurstSize <= n && n <= interop.MaxResponseBandwidthBurstSize {
				ResponseBandwidthBurstSize = n
			} else {
				log.Error("ResponseBandwidthBurstSize header is not a valid number or is out of the allowed range")
				renderBadRequest(w, r, interop.ErrInvalidResponseBandwidthBurstSize.Error())
				return nil, interop.ErrInvalidResponseBandwidthBurstSize
			}
		}
	}

	inv := &interop.Invoke{
		ID:                       r.Header.Get(InvokeIDHeader),
		ReservationToken:         chi.URLParam(r, "reservationtoken"),
		InvokedFunctionArn:       r.Header.Get(InvokedFunctionArnHeader),
		VersionID:                r.Header.Get(VersionIDHeader),
		ContentType:              r.Header.Get(ContentTypeHeader),
		CognitoIdentityID:        custHeaders.CognitoIdentityID,
		CognitoIdentityPoolID:    custHeaders.CognitoIdentityPoolID,
		TraceID:                  token.TraceID,
		LambdaSegmentID:          token.LambdaSegmentID,
		ClientContext:            custHeaders.ClientContext,
		Payload:                  r.Body,
		DeadlineNs:               fmt.Sprintf("%d", now+token.FunctionTimeout.Nanoseconds()),
		NeedDebugLogs:            token.NeedDebugLogs,
		InvokeReceivedTime:       now,
		InvokeResponseMode:       InvokeResponseMode,
		RestoreDurationNs:        token.RestoreDurationNs,
		RestoreStartTimeMonotime: token.RestoreStartTimeMonotime,
	}

	if inv.ID != token.InvokeID {
		renderBadRequest(w, r, interop.ErrInvalidInvokeID.Error())
		return nil, interop.ErrInvalidInvokeID
	}

	if inv.ReservationToken != token.ReservationToken {
		renderBadRequest(w, r, interop.ErrInvalidReservationToken.Error())
		return nil, interop.ErrInvalidReservationToken
	}

	if inv.VersionID != token.VersionID {
		renderBadRequest(w, r, interop.ErrInvalidFunctionVersion.Error())
		return nil, interop.ErrInvalidFunctionVersion
	}

	if now > token.InvackDeadlineNs {
		renderBadRequest(w, r, interop.ErrReservationExpired.Error())
		return nil, interop.ErrReservationExpired
	}

	w.Header().Set(VersionIDHeader, token.VersionID)
	w.Header().Set(ReservationTokenHeader, token.ReservationToken)
	w.Header().Set(InvokeIDHeader, token.InvokeID)

	return inv, nil
}