func()

in logger/common.go [265:412]


func (l *Logger) Read(
	ctx context.Context,
	pipe io.Reader,
	source string,
	bufferSizeInBytes int,
	sendLogMsgToDest sendLogToDestFunc,
) error {
	var (
		msgTimestamp  time.Time
		bytesInBuffer int
		err           error
		eof           bool
	)
	// Initiate an in-memory buffer to hold bytes read from container pipe.
	buf := make([]byte, bufferSizeInBytes)
	// isFirstPartial indicates if current message saved in buffer is not a complete line,
	// and is the first partial of the whole log message. Initialize to true.
	isFirstPartial := true
	// isPartialMsg indicates if current message is a partial log message. Initialize to false.
	isPartialMsg := false
	// isLastPartial indicates if this message completes a partial message
	isLastPartial := false
	// partialID is a random ID given to each split message
	partialID := ""
	// partialOrdinal orders the split messages and count up from 1
	partialOrdinal := 1

	for {
		select {
		case <-ctx.Done():
			debug.SendEventsToLog(l.Info.ContainerID,
				fmt.Sprintf("Logging stopped in pipe %s", source),
				debug.DEBUG, 0)
			return nil
		default:
			eof, bytesInBuffer, err = readFromContainerPipe(pipe, buf, bytesInBuffer, l.maxReadBytes)
			if err != nil {
				return err
			}

			// If container pipe is closed and no bytes left in our buffer, directly return.
			if eof && bytesInBuffer == 0 {
				return nil
			}

			// Iteratively scan the unread part in our own buffer and read logs line by line.
			// Then send it to destination.
			head := 0
			// This function returns -1 if '\n' in not present in buffer.
			lenOfLine := bytes.IndexByte(buf[head:bytesInBuffer], newline)
			for lenOfLine >= 0 {
				// If this is the end of a partial message
				// use the existing timestamp, so that all
				// partials split from the same message have the same timestamp
				// If not, new timestamp.
				if isPartialMsg {
					isLastPartial = true
				} else {
					msgTimestamp = time.Now().UTC()
				}
				curLine := buf[head : head+lenOfLine]
				err = sendLogMsgToDest(
					curLine,
					source,
					isPartialMsg,
					isLastPartial,
					partialID,
					partialOrdinal,
					msgTimestamp,
				)
				if err != nil {
					return err
				}

				atomic.AddUint64(&bytesSentToDst, uint64(len(curLine)))
				atomic.AddUint64(&numberOfNewLineChars, 1)
				// Since we have found a newline symbol, it means this line has ended.
				// Reset flags.
				isFirstPartial = true
				isPartialMsg = false
				isLastPartial = false
				partialID = ""
				partialOrdinal = 1

				// Update the index of head of next line message.
				head += lenOfLine + 1
				lenOfLine = bytes.IndexByte(buf[head:bytesInBuffer], newline)
			}

			// If the pipe is closed and the last line does not end with a newline symbol, send whatever left
			// in the buffer to destination as a single log message. Or if our buffer is full but there is
			// no newline symbol yet, record it as a partial log message and send it as a single log message
			// to destination.
			if eof || bufferIsFull(buf, head, bytesInBuffer) {
				// Still bytes left in the buffer after we identified all newline symbols.
				if head < bytesInBuffer {
					curLine := buf[head:bytesInBuffer]

					// Record as a partial message.
					isPartialMsg = true
					if isFirstPartial {
						msgTimestamp = time.Now().UTC()
						partialID, err = generateRandomID()
					}
					if err != nil {
						return err
					}

					err = sendLogMsgToDest(
						curLine,
						source,
						isPartialMsg,
						isLastPartial,
						partialID,
						partialOrdinal,
						msgTimestamp,
					)
					if err != nil {
						return err
					}

					atomic.AddUint64(&bytesSentToDst, uint64(len(curLine)))
					// reset head and bytesInBuffer
					head = 0
					bytesInBuffer = 0
					// increment partial flags
					partialOrdinal++
					if isFirstPartial {
						// if this was the first partial message
						// the next one is not the first if it is also partial
						isFirstPartial = false
					}
				}
				// If pipe is closed after we send all bytes left in buffer, then directly return.
				if eof {
					return nil
				}
			}

			// If there are any bytes left in the buffer, move them to the head and handle them in the
			// next round.
			if head > 0 {
				copy(buf[0:], buf[head:bytesInBuffer])
				bytesInBuffer -= head
			}
		}
	}
}