in logger/common.go [265:412]
func (l *Logger) Read(
ctx context.Context,
pipe io.Reader,
source string,
bufferSizeInBytes int,
sendLogMsgToDest sendLogToDestFunc,
) error {
var (
msgTimestamp time.Time
bytesInBuffer int
err error
eof bool
)
// Initiate an in-memory buffer to hold bytes read from container pipe.
buf := make([]byte, bufferSizeInBytes)
// isFirstPartial indicates if current message saved in buffer is not a complete line,
// and is the first partial of the whole log message. Initialize to true.
isFirstPartial := true
// isPartialMsg indicates if current message is a partial log message. Initialize to false.
isPartialMsg := false
// isLastPartial indicates if this message completes a partial message
isLastPartial := false
// partialID is a random ID given to each split message
partialID := ""
// partialOrdinal orders the split messages and count up from 1
partialOrdinal := 1
for {
select {
case <-ctx.Done():
debug.SendEventsToLog(l.Info.ContainerID,
fmt.Sprintf("Logging stopped in pipe %s", source),
debug.DEBUG, 0)
return nil
default:
eof, bytesInBuffer, err = readFromContainerPipe(pipe, buf, bytesInBuffer, l.maxReadBytes)
if err != nil {
return err
}
// If container pipe is closed and no bytes left in our buffer, directly return.
if eof && bytesInBuffer == 0 {
return nil
}
// Iteratively scan the unread part in our own buffer and read logs line by line.
// Then send it to destination.
head := 0
// This function returns -1 if '\n' in not present in buffer.
lenOfLine := bytes.IndexByte(buf[head:bytesInBuffer], newline)
for lenOfLine >= 0 {
// If this is the end of a partial message
// use the existing timestamp, so that all
// partials split from the same message have the same timestamp
// If not, new timestamp.
if isPartialMsg {
isLastPartial = true
} else {
msgTimestamp = time.Now().UTC()
}
curLine := buf[head : head+lenOfLine]
err = sendLogMsgToDest(
curLine,
source,
isPartialMsg,
isLastPartial,
partialID,
partialOrdinal,
msgTimestamp,
)
if err != nil {
return err
}
atomic.AddUint64(&bytesSentToDst, uint64(len(curLine)))
atomic.AddUint64(&numberOfNewLineChars, 1)
// Since we have found a newline symbol, it means this line has ended.
// Reset flags.
isFirstPartial = true
isPartialMsg = false
isLastPartial = false
partialID = ""
partialOrdinal = 1
// Update the index of head of next line message.
head += lenOfLine + 1
lenOfLine = bytes.IndexByte(buf[head:bytesInBuffer], newline)
}
// If the pipe is closed and the last line does not end with a newline symbol, send whatever left
// in the buffer to destination as a single log message. Or if our buffer is full but there is
// no newline symbol yet, record it as a partial log message and send it as a single log message
// to destination.
if eof || bufferIsFull(buf, head, bytesInBuffer) {
// Still bytes left in the buffer after we identified all newline symbols.
if head < bytesInBuffer {
curLine := buf[head:bytesInBuffer]
// Record as a partial message.
isPartialMsg = true
if isFirstPartial {
msgTimestamp = time.Now().UTC()
partialID, err = generateRandomID()
}
if err != nil {
return err
}
err = sendLogMsgToDest(
curLine,
source,
isPartialMsg,
isLastPartial,
partialID,
partialOrdinal,
msgTimestamp,
)
if err != nil {
return err
}
atomic.AddUint64(&bytesSentToDst, uint64(len(curLine)))
// reset head and bytesInBuffer
head = 0
bytesInBuffer = 0
// increment partial flags
partialOrdinal++
if isFirstPartial {
// if this was the first partial message
// the next one is not the first if it is also partial
isFirstPartial = false
}
}
// If pipe is closed after we send all bytes left in buffer, then directly return.
if eof {
return nil
}
}
// If there are any bytes left in the buffer, move them to the head and handle them in the
// next round.
if head > 0 {
copy(buf[0:], buf[head:bytesInBuffer])
bytesInBuffer -= head
}
}
}
}