func getStream()

in pkg/webservice/handlers.go [1317:1406]


func getStream(w http.ResponseWriter, r *http.Request) {
	writeHeaders(w, r.Method)
	eventSystem := events.GetEventSystem()
	if !eventSystem.IsEventTrackingEnabled() {
		buildJSONErrorResponse(w, "Event tracking is disabled", http.StatusInternalServerError)
		return
	}

	f, ok := w.(http.Flusher)
	if !ok {
		buildJSONErrorResponse(w, "Writer does not implement http.Flusher", http.StatusInternalServerError)
		return
	}

	if !streamingLimiter.AddHost(r.Host) {
		buildJSONErrorResponse(w, "Too many streaming connections", http.StatusServiceUnavailable)
		return
	}
	defer streamingLimiter.RemoveHost(r.Host)

	var count uint64
	if countStr := r.URL.Query().Get("count"); countStr != "" {
		var err error
		count, err = strconv.ParseUint(countStr, 10, 64)
		if err != nil {
			buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
			return
		}
	}

	rc := http.NewResponseController(w)
	// make sure both deadlines can be set
	if err := rc.SetWriteDeadline(time.Time{}); err != nil {
		log.Log(log.REST).Error("Cannot set write deadline", zap.Error(err))
		buildJSONErrorResponse(w, fmt.Sprintf("Cannot set write deadline: %v", err), http.StatusInternalServerError)
		return
	}
	if err := rc.SetReadDeadline(time.Time{}); err != nil {
		log.Log(log.REST).Error("Cannot set read deadline", zap.Error(err))
		buildJSONErrorResponse(w, fmt.Sprintf("Cannot set read deadline: %v", err), http.StatusInternalServerError)
		return
	}
	enc := json.NewEncoder(w)
	stream := eventSystem.CreateEventStream(r.Host, count)
	defer eventSystem.RemoveStream(stream)

	if err := enc.Encode(dao.YunikornID{
		InstanceUUID: schedulerContext.Load().GetUUID(),
	}); err != nil {
		buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
		return
	}
	f.Flush()

	// Reading events in an infinite loop until either the client disconnects or Yunikorn closes the channel.
	// This results in a persistent HTTP connection where the message body is never closed.
	// Write deadline is adjusted before sending data to the client.
	for {
		select {
		case <-r.Context().Done():
			log.Log(log.REST).Info("Connection closed for event stream client",
				zap.String("host", r.Host))
			return
		case e, ok := <-stream.Events:
			err := rc.SetWriteDeadline(time.Now().Add(5 * time.Second))
			if err != nil {
				// should not fail at this point
				log.Log(log.REST).Error("Cannot set write deadline", zap.Error(err))
				buildJSONErrorResponse(w, fmt.Sprintf("Cannot set write deadline: %v", err), http.StatusOK) // status code is already 200 at this point
				return
			}

			if !ok {
				// the channel was closed by the event system itself
				msg := "Event stream was closed by the producer"
				buildJSONErrorResponse(w, msg, http.StatusOK) // status code is 200 at this point, cannot be changed
				log.Log(log.REST).Error(msg)
				return
			}

			if err := enc.Encode(e); err != nil {
				log.Log(log.REST).Error("Marshalling error",
					zap.String("host", r.Host))
				buildJSONErrorResponse(w, err.Error(), http.StatusOK) // status code is 200 at this point, cannot be changed
				return
			}
			f.Flush()
		}
	}
}