in logger/common.go [234:335]
func (l *Logger) Read(
ctx context.Context,
pipe io.Reader,
source string,
bufferSizeInBytes int,
sendLogMsgToDest sendLogToDestFunc,
) error {
var (
partialTimestamp time.Time
bytesInBuffer int
err error
eof bool
)
// Initiate an in-memory buffer to hold bytes read from container pipe.
buf := make([]byte, bufferSizeInBytes)
// isFirstPartial indicates if current message saved in buffer is not a complete line,
// and is the first partial of the whole log message. Initialize to true.
isFirstPartial := true
// isPartialMsg indicates if current message is a partial log message. Initialize to false.
isPartialMsg := false
for {
select {
case <-ctx.Done():
debug.SendEventsToLog(l.Info.ContainerID,
fmt.Sprintf("Logging stopped in pipe %s", source),
debug.DEBUG, 0)
return nil
default:
eof, bytesInBuffer, err = readFromContainerPipe(pipe, buf, bytesInBuffer, l.maxReadBytes)
if err != nil {
return err
}
// If container pipe is closed and no bytes left in our buffer, directly return.
if eof && bytesInBuffer == 0 {
return nil
}
// Iteratively scan the unread part in our own buffer and read logs line by line.
// Then send it to destination.
head := 0
// This function returns -1 if '\n' in not present in buffer.
lenOfLine := bytes.IndexByte(buf[head:bytesInBuffer], newline)
for lenOfLine >= 0 {
curLine := buf[head : head+lenOfLine]
err, partialTimestamp, _, _ = sendLogMsgToDest(
curLine,
source,
isFirstPartial,
isPartialMsg,
partialTimestamp,
)
if err != nil {
return err
}
// Since we have found a newline symbol, it means this line has ended.
// Reset flags.
isFirstPartial = true
isPartialMsg = false
// Update the index of head of next line message.
head += lenOfLine + 1
lenOfLine = bytes.IndexByte(buf[head:bytesInBuffer], newline)
}
// If the pipe is closed and the last line does not end with a newline symbol, send whatever left
// in the buffer to destination as a single log message. Or if our buffer is full but there is
// no newline symbol yet, record it as a partial log message and send it as a single log message
// to destination.
if eof || bufferIsFull(buf, head, bytesInBuffer) {
// Still bytes left in the buffer after we identified all newline symbols.
if head < bytesInBuffer {
curLine := buf[head:bytesInBuffer]
err, partialTimestamp, isFirstPartial, isPartialMsg = sendLogMsgToDest(
curLine,
source,
isFirstPartial,
true, // Record as a partial message.
partialTimestamp,
)
if err != nil {
return err
}
// reset head and bytesInBuffer
head = 0
bytesInBuffer = 0
}
// If pipe is closed after we send all bytes left in buffer, then directly return.
if eof {
return nil
}
}
// If there are any bytes left in the buffer, move them to the head and handle them in the
// next round.
if head > 0 {
copy(buf[0:], buf[head:bytesInBuffer])
bytesInBuffer -= head
}
}
}
}