in tunnel/file_reader.go [74:137]
func (tunnel *FileReader) read() {
defer tunnel.dataFile.filehandle.Close()
bufferedReader := tunnel.dataFile.filehandle
bits := make([]byte, 4, 4)
totalLogs := 0
for {
message := new(TMessage)
// for checksum multi read() is acceptable, the underlaying reader is Buffered
if n, err := io.ReadFull(bufferedReader, bits); n != len(bits) || err != nil {
break
}
message.Checksum = binary.BigEndian.Uint32(bits[:])
// for tag
io.ReadFull(bufferedReader, bits)
message.Tag = binary.BigEndian.Uint32(bits[:])
// for shard
io.ReadFull(bufferedReader, bits)
message.Shard = binary.BigEndian.Uint32(bits[:])
// for compress
io.ReadFull(bufferedReader, bits)
message.Compress = binary.BigEndian.Uint32(bits[:])
// for 0xeeeeeeee
io.ReadFull(bufferedReader, bits)
if !bytes.Equal(bits, []byte{0xee, 0xee, 0xee, 0xee}) {
LOG.Critical("File oplog block magic is not 0xeeeeeeee. found 0x%x", bits)
break
}
io.ReadFull(bufferedReader, bits)
blockRemained := binary.BigEndian.Uint32(bits)
logs := [][]byte{}
for blockRemained > 0 {
// oplog entry length
io.ReadFull(bufferedReader, bits[:])
oplogLength := binary.BigEndian.Uint32(bits[:])
log := make([]byte, oplogLength, oplogLength)
if _, err := io.ReadFull(bufferedReader, log); err == io.EOF {
break
}
logs = append(logs, log)
// header + body
blockRemained -= (4 + oplogLength)
totalLogs++
}
message.RawLogs = logs
if message.Shard < 0 {
LOG.Warn("Oplog hashed value is bad negative")
break
}
message.Tag |= MsgRetransmission
// resharding
if message.Shard >= uint32(len(tunnel.pipe)) {
message.Shard %= uint32(len(tunnel.pipe))
}
tunnel.pipe[message.Shard] <- message
LOG.Info("File tunnel reader extract oplogs with shard[%d], compressor[%d], count (%d)", message.Shard, message.Compress, len(message.RawLogs))
}
LOG.Info("File tunnel reader complete. total oplogs %d", totalLogs)
}