tunnel/file_reader.go (111 lines of code) (raw):
package tunnel
import (
"bytes"
"encoding/binary"
"errors"
"io"
"os"
LOG "github.com/vinllen/log4go"
)
type FileReader struct {
File string
dataFile *DataFile
pipe []chan *TMessage
replayers []Replayer
}
func (tunnel *FileReader) Link(relativeReplayer []Replayer) error {
tunnel.replayers = relativeReplayer
tunnel.pipe = make([]chan *TMessage, 0)
for i := 0; i != len(tunnel.replayers); i++ {
ch := make(chan *TMessage)
tunnel.pipe = append(tunnel.pipe, ch)
go tunnel.consume(ch)
}
var file *os.File
var err error
if file, err = os.Open(tunnel.File); err != nil {
LOG.Critical("File tunnel reader open %s failed, %v", tunnel.File, err)
return err
}
tunnel.dataFile = &DataFile{filehandle: file}
if fileHeader := tunnel.dataFile.ReadHeader(); fileHeader.Magic != FILE_MAGIC_NUMBER || fileHeader.Protocol != FILE_PROTOCOL_NUMBER {
LOG.Critical("File is not belong to mongoshake. magic header or protocol header is invalid")
return errors.New("file magic number or protocol number is invalid")
}
go tunnel.read()
return nil
}
func (tunnel *FileReader) consume(pipe <-chan *TMessage) {
seqKey := 1
for msg := range pipe {
// hash corresponding replayer
seqKey++
switch tunnel.replayers[msg.Shard].Sync(msg, func(context *TMessage, seq int) func() {
return func() {
LOG.Info("Sync tunnel message successful, signature: %d, %d", context.Checksum, seq)
}
}(msg, seqKey)) {
case ReplyChecksumInvalid:
fallthrough
case ReplyRetransmission:
fallthrough
case ReplyCompressorNotSupported:
fallthrough
case ReplyNetworkOpFail:
LOG.Warn("File tunnel rejected by replayer-%d", msg.Shard)
case ReplyError:
fallthrough
case ReplyServerFault:
LOG.Critical("File tunnel handle server fault")
}
}
}
func (tunnel *FileReader) read() {
defer tunnel.dataFile.filehandle.Close()
bufferedReader := tunnel.dataFile.filehandle
bits := make([]byte, 4, 4)
totalLogs := 0
for {
message := new(TMessage)
// for checksum multi read() is acceptable, the underlaying reader is Buffered
if n, err := io.ReadFull(bufferedReader, bits); n != len(bits) || err != nil {
break
}
message.Checksum = binary.BigEndian.Uint32(bits[:])
// for tag
io.ReadFull(bufferedReader, bits)
message.Tag = binary.BigEndian.Uint32(bits[:])
// for shard
io.ReadFull(bufferedReader, bits)
message.Shard = binary.BigEndian.Uint32(bits[:])
// for compress
io.ReadFull(bufferedReader, bits)
message.Compress = binary.BigEndian.Uint32(bits[:])
// for 0xeeeeeeee
io.ReadFull(bufferedReader, bits)
if !bytes.Equal(bits, []byte{0xee, 0xee, 0xee, 0xee}) {
LOG.Critical("File oplog block magic is not 0xeeeeeeee. found 0x%x", bits)
break
}
io.ReadFull(bufferedReader, bits)
blockRemained := binary.BigEndian.Uint32(bits)
logs := [][]byte{}
for blockRemained > 0 {
// oplog entry length
io.ReadFull(bufferedReader, bits[:])
oplogLength := binary.BigEndian.Uint32(bits[:])
log := make([]byte, oplogLength, oplogLength)
if _, err := io.ReadFull(bufferedReader, log); err == io.EOF {
break
}
logs = append(logs, log)
// header + body
blockRemained -= (4 + oplogLength)
totalLogs++
}
message.RawLogs = logs
if message.Shard < 0 {
LOG.Warn("Oplog hashed value is bad negative")
break
}
message.Tag |= MsgRetransmission
// resharding
if message.Shard >= uint32(len(tunnel.pipe)) {
message.Shard %= uint32(len(tunnel.pipe))
}
tunnel.pipe[message.Shard] <- message
LOG.Info("File tunnel reader extract oplogs with shard[%d], compressor[%d], count (%d)", message.Shard, message.Compress, len(message.RawLogs))
}
LOG.Info("File tunnel reader complete. total oplogs %d", totalLogs)
}