func RequestHandler()

in cmd/json2pubsub/function.go [205:424]


func RequestHandler(w http.ResponseWriter, r *http.Request) {
	originIp, _, err := net.SplitHostPort(r.RemoteAddr)
	if err != nil {
		log.Warn().Err(err).Str("RemoteAddr", r.RemoteAddr).Msg("Unable to parse remote address")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	// Only accept POST requests
	if r.Method != "POST" {
		log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Not a POST request.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	postBody, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Warn().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to read request body: %v", err)
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
	if len(postBody) == 0 {
		log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Empty request body.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	requestHeaders := make(map[string]string, 0)
	for key := range r.Header {
		requestHeaders[strings.ToLower(key)] = r.Header.Get(key)
	}

	var postValues url.Values
	contentType := r.Header.Get("Content-Type")
	if contentType == "application/x-www-form-urlencoded" {
		postValues, err = url.ParseQuery(string(postBody))
		if err != nil {
			log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Failed to parse post body for content type application/x-www-form-urlencoded")
			w.WriteHeader(http.StatusBadRequest)
			return
		}
	}

	var jsonBody interface{}
	if contentType == "application/json" || contentType == "text/json" {
		if json.Valid([]byte(postBody)) {
			json.Unmarshal([]byte(postBody), &jsonBody)
		} else {
			log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Invalid JSON body.")
			w.WriteHeader(http.StatusBadRequest)
			return
		}
	} else {
		jsonBody = postValues
	}

	// Set up request structure
	currentTime := time.Now()
	currentTimeUTC := currentTime.UTC()
	celParams := map[string]interface{}{
		"origin": map[string]interface{}{
			"ip": originIp,
		},
		"request": map[string]interface{}{
			"body":     string(postBody),
			"method":   r.Method,
			"path":     r.URL.RawPath,
			"scheme":   r.URL.Scheme,
			"query":    r.URL.RawQuery,
			"json":     jsonBody,
			"post":     postValues,
			"headers":  requestHeaders,
			"unixtime": currentTime.Unix(),
			"time": map[string]int{
				"year":   currentTimeUTC.Year(),
				"month":  int(currentTimeUTC.Month()),
				"day":    currentTimeUTC.Day(),
				"hour":   currentTimeUTC.Hour(),
				"minute": currentTimeUTC.Minute(),
				"second": currentTimeUTC.Second(),
			},
		},
	}

	out, _, err := requestControlExpr.Eval(celParams)
	if err != nil {
		log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to evaluate request control check: %v", out)
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	result, ok := out.(celtypes.Bool)
	if !ok || result.Equal(celtypes.False).Value().(bool) {
		log.Error().Str("RemoteAddr", r.RemoteAddr).Msgf("Request control check failed: %v", out)
		w.WriteHeader(http.StatusForbidden)
		return
	}

	messageOut, _, err := extractMessage.Eval(celParams)
	if err != nil {
		log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to evaluate message extraction: %v", err)
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
	var messageJson interface{}
	if messageOut.Type() == celtypes.StringType {
		_messageOut, err := messageOut.ConvertToNative(reflect.TypeOf(""))
		if err != nil {
			log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert message output to string: %v", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		if json.Valid([]byte(_messageOut.(string))) {
			json.Unmarshal([]byte(_messageOut.(string)), &messageJson)
		} else {
			log.Error().Str("RemoteAddr", r.RemoteAddr).Str("json", _messageOut.(string)).Msg("Invalid JSON string from message CEL.")
			w.WriteHeader(http.StatusBadRequest)
			return
		}
	} else if messageOut.Type() == celtypes.MapType {
		_messageOut, err := messageOut.ConvertToNative(reflect.TypeOf(map[string]interface{}{}))
		if err != nil {
			log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert message output to map: %v", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		messageJson = _messageOut.(map[string]interface{})
	} else if messageOut.Type() == celtypes.ListType {
		_messageOut, err := messageOut.ConvertToNative(reflect.TypeOf([]interface{}{}))
		if err != nil {
			log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert message output to list: %v", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		messageJson = _messageOut.([]interface{})
	}
	if messageJson == nil {
		log.Error().Str("RemoteAddr", r.RemoteAddr).Msg("Failed to turn request into a Pub/Sub message.")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	messageData, err := json.Marshal(messageJson)
	if err != nil {
		log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Unable to marshal message data to JSON: %v", err)
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	msg := &pubsub.Message{
		Data: messageData,
	}

	topic, err := GetPubsubTopic(r.Context(), cloudProjectId, userAgent, pubsubTopic)
	if err != nil {
		log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msg("Unable to get Pub/Sub client for topic.")
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	log.Info().Str("RemoteAddr", r.RemoteAddr).Msgf("Publishing a message to topic %s, len=%d bytes", topic.String(), len(msg.Data))
	if _, err := topic.Publish(r.Context(), msg).Get(r.Context()); err != nil {
		log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Unable to publish message to Pub/Sub topic: %v", err)
		w.WriteHeader(http.StatusServiceUnavailable)
		return
	}

	if !defaultResponseBody {
		bodyOut, _, err := responseBody.Eval(celParams)
		if err != nil || bodyOut == nil {
			log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to evaluate response boy: %v", err)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		if bodyOut.Type() == celtypes.StringType {
			_bodyOut, err := bodyOut.ConvertToNative(reflect.TypeOf(""))
			if err != nil {
				log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body to string: %v", err)
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			w.WriteHeader(http.StatusOK)
			w.Write([]byte(_bodyOut.(string)))
		} else if bodyOut.Type() == celtypes.MapType {
			_bodyOut, err := bodyOut.ConvertToNative(reflect.TypeOf(map[string]interface{}{}))
			if err != nil {
				log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body to map: %v", err)
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			_bodyAsMap := _bodyOut.(map[string]interface{})
			_bodyJson, err := json.Marshal(_bodyAsMap)
			if err != nil {
				log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body map to JSON: %v", err)
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			w.WriteHeader(http.StatusOK)
			w.Write(_bodyJson)
		} else if bodyOut.Type() == celtypes.ListType {
			_bodyOut, err := bodyOut.ConvertToNative(reflect.TypeOf([]interface{}{}))
			if err != nil {
				log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body to list: %v", err)
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			_bodyAsList := _bodyOut.([]interface{})
			_bodyJson, err := json.Marshal(_bodyAsList)
			if err != nil {
				log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body list to JSON: %v", err)
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			w.WriteHeader(http.StatusOK)
			w.Write(_bodyJson)
		}
	} else {
		w.WriteHeader(http.StatusOK)
	}
}