func()

in tunnel/file_reader.go [74:137]


func (tunnel *FileReader) read() {
	defer tunnel.dataFile.filehandle.Close()

	bufferedReader := tunnel.dataFile.filehandle
	bits := make([]byte, 4, 4)
	totalLogs := 0
	for {
		message := new(TMessage)

		// for checksum multi read() is acceptable, the underlaying reader is Buffered
		if n, err := io.ReadFull(bufferedReader, bits); n != len(bits) || err != nil {
			break
		}
		message.Checksum = binary.BigEndian.Uint32(bits[:])
		// for tag
		io.ReadFull(bufferedReader, bits)
		message.Tag = binary.BigEndian.Uint32(bits[:])
		// for shard
		io.ReadFull(bufferedReader, bits)
		message.Shard = binary.BigEndian.Uint32(bits[:])
		// for compress
		io.ReadFull(bufferedReader, bits)
		message.Compress = binary.BigEndian.Uint32(bits[:])
		// for 0xeeeeeeee
		io.ReadFull(bufferedReader, bits)
		if !bytes.Equal(bits, []byte{0xee, 0xee, 0xee, 0xee}) {
			LOG.Critical("File oplog block magic is not 0xeeeeeeee. found 0x%x", bits)
			break
		}
		io.ReadFull(bufferedReader, bits)
		blockRemained := binary.BigEndian.Uint32(bits)

		logs := [][]byte{}
		for blockRemained > 0 {
			// oplog entry length
			io.ReadFull(bufferedReader, bits[:])
			oplogLength := binary.BigEndian.Uint32(bits[:])
			log := make([]byte, oplogLength, oplogLength)
			if _, err := io.ReadFull(bufferedReader, log); err == io.EOF {
				break
			}

			logs = append(logs, log)
			// header + body
			blockRemained -= (4 + oplogLength)
			totalLogs++
		}
		message.RawLogs = logs

		if message.Shard < 0 {
			LOG.Warn("Oplog hashed value is bad negative")
			break
		}
		message.Tag |= MsgRetransmission

		// resharding
		if message.Shard >= uint32(len(tunnel.pipe)) {
			message.Shard %= uint32(len(tunnel.pipe))
		}
		tunnel.pipe[message.Shard] <- message
		LOG.Info("File tunnel reader extract oplogs with shard[%d], compressor[%d], count (%d)", message.Shard, message.Compress, len(message.RawLogs))
	}
	LOG.Info("File tunnel reader complete. total oplogs %d", totalLogs)
}