in cmd/json2pubsub/function.go [205:424]
func RequestHandler(w http.ResponseWriter, r *http.Request) {
originIp, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
log.Warn().Err(err).Str("RemoteAddr", r.RemoteAddr).Msg("Unable to parse remote address")
w.WriteHeader(http.StatusBadRequest)
return
}
// Only accept POST requests
if r.Method != "POST" {
log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Not a POST request.")
w.WriteHeader(http.StatusBadRequest)
return
}
postBody, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Warn().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to read request body: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if len(postBody) == 0 {
log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Empty request body.")
w.WriteHeader(http.StatusBadRequest)
return
}
requestHeaders := make(map[string]string, 0)
for key := range r.Header {
requestHeaders[strings.ToLower(key)] = r.Header.Get(key)
}
var postValues url.Values
contentType := r.Header.Get("Content-Type")
if contentType == "application/x-www-form-urlencoded" {
postValues, err = url.ParseQuery(string(postBody))
if err != nil {
log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Failed to parse post body for content type application/x-www-form-urlencoded")
w.WriteHeader(http.StatusBadRequest)
return
}
}
var jsonBody interface{}
if contentType == "application/json" || contentType == "text/json" {
if json.Valid([]byte(postBody)) {
json.Unmarshal([]byte(postBody), &jsonBody)
} else {
log.Warn().Str("RemoteAddr", r.RemoteAddr).Msg("Invalid JSON body.")
w.WriteHeader(http.StatusBadRequest)
return
}
} else {
jsonBody = postValues
}
// Set up request structure
currentTime := time.Now()
currentTimeUTC := currentTime.UTC()
celParams := map[string]interface{}{
"origin": map[string]interface{}{
"ip": originIp,
},
"request": map[string]interface{}{
"body": string(postBody),
"method": r.Method,
"path": r.URL.RawPath,
"scheme": r.URL.Scheme,
"query": r.URL.RawQuery,
"json": jsonBody,
"post": postValues,
"headers": requestHeaders,
"unixtime": currentTime.Unix(),
"time": map[string]int{
"year": currentTimeUTC.Year(),
"month": int(currentTimeUTC.Month()),
"day": currentTimeUTC.Day(),
"hour": currentTimeUTC.Hour(),
"minute": currentTimeUTC.Minute(),
"second": currentTimeUTC.Second(),
},
},
}
out, _, err := requestControlExpr.Eval(celParams)
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to evaluate request control check: %v", out)
w.WriteHeader(http.StatusInternalServerError)
return
}
result, ok := out.(celtypes.Bool)
if !ok || result.Equal(celtypes.False).Value().(bool) {
log.Error().Str("RemoteAddr", r.RemoteAddr).Msgf("Request control check failed: %v", out)
w.WriteHeader(http.StatusForbidden)
return
}
messageOut, _, err := extractMessage.Eval(celParams)
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to evaluate message extraction: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
var messageJson interface{}
if messageOut.Type() == celtypes.StringType {
_messageOut, err := messageOut.ConvertToNative(reflect.TypeOf(""))
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert message output to string: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if json.Valid([]byte(_messageOut.(string))) {
json.Unmarshal([]byte(_messageOut.(string)), &messageJson)
} else {
log.Error().Str("RemoteAddr", r.RemoteAddr).Str("json", _messageOut.(string)).Msg("Invalid JSON string from message CEL.")
w.WriteHeader(http.StatusBadRequest)
return
}
} else if messageOut.Type() == celtypes.MapType {
_messageOut, err := messageOut.ConvertToNative(reflect.TypeOf(map[string]interface{}{}))
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert message output to map: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
messageJson = _messageOut.(map[string]interface{})
} else if messageOut.Type() == celtypes.ListType {
_messageOut, err := messageOut.ConvertToNative(reflect.TypeOf([]interface{}{}))
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert message output to list: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
messageJson = _messageOut.([]interface{})
}
if messageJson == nil {
log.Error().Str("RemoteAddr", r.RemoteAddr).Msg("Failed to turn request into a Pub/Sub message.")
w.WriteHeader(http.StatusBadRequest)
return
}
messageData, err := json.Marshal(messageJson)
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Unable to marshal message data to JSON: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
msg := &pubsub.Message{
Data: messageData,
}
topic, err := GetPubsubTopic(r.Context(), cloudProjectId, userAgent, pubsubTopic)
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msg("Unable to get Pub/Sub client for topic.")
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Info().Str("RemoteAddr", r.RemoteAddr).Msgf("Publishing a message to topic %s, len=%d bytes", topic.String(), len(msg.Data))
if _, err := topic.Publish(r.Context(), msg).Get(r.Context()); err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Unable to publish message to Pub/Sub topic: %v", err)
w.WriteHeader(http.StatusServiceUnavailable)
return
}
if !defaultResponseBody {
bodyOut, _, err := responseBody.Eval(celParams)
if err != nil || bodyOut == nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to evaluate response boy: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if bodyOut.Type() == celtypes.StringType {
_bodyOut, err := bodyOut.ConvertToNative(reflect.TypeOf(""))
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body to string: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(_bodyOut.(string)))
} else if bodyOut.Type() == celtypes.MapType {
_bodyOut, err := bodyOut.ConvertToNative(reflect.TypeOf(map[string]interface{}{}))
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body to map: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
_bodyAsMap := _bodyOut.(map[string]interface{})
_bodyJson, err := json.Marshal(_bodyAsMap)
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body map to JSON: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(_bodyJson)
} else if bodyOut.Type() == celtypes.ListType {
_bodyOut, err := bodyOut.ConvertToNative(reflect.TypeOf([]interface{}{}))
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body to list: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
_bodyAsList := _bodyOut.([]interface{})
_bodyJson, err := json.Marshal(_bodyAsList)
if err != nil {
log.Error().Err(err).Str("RemoteAddr", r.RemoteAddr).Msgf("Failed to convert response body list to JSON: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(_bodyJson)
}
} else {
w.WriteHeader(http.StatusOK)
}
}