in pkg/webservice/handlers.go [1317:1406]
func getStream(w http.ResponseWriter, r *http.Request) {
writeHeaders(w, r.Method)
eventSystem := events.GetEventSystem()
if !eventSystem.IsEventTrackingEnabled() {
buildJSONErrorResponse(w, "Event tracking is disabled", http.StatusInternalServerError)
return
}
f, ok := w.(http.Flusher)
if !ok {
buildJSONErrorResponse(w, "Writer does not implement http.Flusher", http.StatusInternalServerError)
return
}
if !streamingLimiter.AddHost(r.Host) {
buildJSONErrorResponse(w, "Too many streaming connections", http.StatusServiceUnavailable)
return
}
defer streamingLimiter.RemoveHost(r.Host)
var count uint64
if countStr := r.URL.Query().Get("count"); countStr != "" {
var err error
count, err = strconv.ParseUint(countStr, 10, 64)
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
return
}
}
rc := http.NewResponseController(w)
// make sure both deadlines can be set
if err := rc.SetWriteDeadline(time.Time{}); err != nil {
log.Log(log.REST).Error("Cannot set write deadline", zap.Error(err))
buildJSONErrorResponse(w, fmt.Sprintf("Cannot set write deadline: %v", err), http.StatusInternalServerError)
return
}
if err := rc.SetReadDeadline(time.Time{}); err != nil {
log.Log(log.REST).Error("Cannot set read deadline", zap.Error(err))
buildJSONErrorResponse(w, fmt.Sprintf("Cannot set read deadline: %v", err), http.StatusInternalServerError)
return
}
enc := json.NewEncoder(w)
stream := eventSystem.CreateEventStream(r.Host, count)
defer eventSystem.RemoveStream(stream)
if err := enc.Encode(dao.YunikornID{
InstanceUUID: schedulerContext.Load().GetUUID(),
}); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
return
}
f.Flush()
// Reading events in an infinite loop until either the client disconnects or Yunikorn closes the channel.
// This results in a persistent HTTP connection where the message body is never closed.
// Write deadline is adjusted before sending data to the client.
for {
select {
case <-r.Context().Done():
log.Log(log.REST).Info("Connection closed for event stream client",
zap.String("host", r.Host))
return
case e, ok := <-stream.Events:
err := rc.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err != nil {
// should not fail at this point
log.Log(log.REST).Error("Cannot set write deadline", zap.Error(err))
buildJSONErrorResponse(w, fmt.Sprintf("Cannot set write deadline: %v", err), http.StatusOK) // status code is already 200 at this point
return
}
if !ok {
// the channel was closed by the event system itself
msg := "Event stream was closed by the producer"
buildJSONErrorResponse(w, msg, http.StatusOK) // status code is 200 at this point, cannot be changed
log.Log(log.REST).Error(msg)
return
}
if err := enc.Encode(e); err != nil {
log.Log(log.REST).Error("Marshalling error",
zap.String("host", r.Host))
buildJSONErrorResponse(w, err.Error(), http.StatusOK) // status code is 200 at this point, cannot be changed
return
}
f.Flush()
}
}
}