tunnel/file_writer.go (126 lines of code) (raw):
package tunnel
import (
"bytes"
"encoding/binary"
"io"
"os"
"sync/atomic"
"time"
LOG "github.com/vinllen/log4go"
)
const (
OPEN_FILE_FLAGS = os.O_CREATE | os.O_RDWR | os.O_TRUNC
)
const (
FILE_MAGIC_NUMBER uint64 = 0xeeeeeeeeee201314
FILE_PROTOCOL_NUMBER uint32 = 1
BLOCK_HEADER_SIZE = 20
)
var globalInitializer = int32(0)
var oplogMessage chan *TMessage
type FileWriter struct {
// local file folder path
Local string
// data file header
fileHeader *FileHeader
// data file handle
dataFile *DataFile
logs uint64
}
/**
* File Structure
*
* |----- Header ------|------ OplogBlock ------|------ OplogBlock --------| ......
* |<--- 32bytes ---->|
*
*/
type FileHeader struct {
Magic uint64
Protocol uint32
Checksum uint32
Reserved [16]byte
}
type DataFile struct {
filehandle *os.File
}
func (dataFile *DataFile) WriteHeader() {
fileHeader := new(FileHeader)
fileHeader.Magic = FILE_MAGIC_NUMBER
fileHeader.Protocol = FILE_PROTOCOL_NUMBER
buffer := bytes.Buffer{}
binary.Write(&buffer, binary.BigEndian, fileHeader.Magic)
binary.Write(&buffer, binary.BigEndian, fileHeader.Protocol)
binary.Write(&buffer, binary.BigEndian, fileHeader.Checksum)
binary.Write(&buffer, binary.BigEndian, fileHeader.Reserved)
dataFile.filehandle.Write(buffer.Bytes())
dataFile.filehandle.Sync()
dataFile.filehandle.Seek(32, 0)
}
func (dataFile *DataFile) ReadHeader() *FileHeader {
fileHeader := &FileHeader{}
header := [32]byte{}
io.ReadFull(dataFile.filehandle, header[:])
buffer := bytes.NewBuffer(header[:])
binary.Read(buffer, binary.BigEndian, &fileHeader.Magic)
binary.Read(buffer, binary.BigEndian, &fileHeader.Protocol)
binary.Read(buffer, binary.BigEndian, &fileHeader.Checksum)
binary.Read(buffer, binary.BigEndian, &fileHeader.Reserved)
return fileHeader
}
func (tunnel *FileWriter) Name() string {
return "file"
}
func (tunnel *FileWriter) Send(message *WMessage) int64 {
if message.Tag&MsgProbe == 0 {
oplogMessage <- message.TMessage
}
return 0
}
func (tunnel *FileWriter) SyncToDisk() {
buffer := &bytes.Buffer{}
for {
select {
case message := <-oplogMessage:
// oplogs array
for _, log := range message.RawLogs {
tunnel.logs++
binary.Write(buffer, binary.BigEndian, uint32(len(log)))
binary.Write(buffer, binary.BigEndian, log)
}
tag := message.Tag | MsgPersistent | MsgStorageBackend
headerBuffer := &bytes.Buffer{}
binary.Write(headerBuffer, binary.BigEndian, message.Checksum)
binary.Write(headerBuffer, binary.BigEndian, tag)
binary.Write(headerBuffer, binary.BigEndian, message.Shard)
binary.Write(headerBuffer, binary.BigEndian, message.Compress)
binary.Write(headerBuffer, binary.BigEndian, uint32(0xeeeeeeee))
binary.Write(headerBuffer, binary.BigEndian, uint32(buffer.Len()))
tunnel.dataFile.filehandle.Write(headerBuffer.Bytes())
tunnel.dataFile.filehandle.Write(buffer.Bytes())
buffer.Reset()
case <-time.After(time.Millisecond * 1000):
LOG.Info("File tunnel sync flush. total oplogs %d", tunnel.logs)
tunnel.dataFile.filehandle.Sync()
}
}
}
func _Open(path string) (*os.File, bool) {
if file, err := os.OpenFile(path, OPEN_FILE_FLAGS, os.ModePerm); err == nil {
return file, true
}
LOG.Critical("File tunnel create data file failed")
return nil, false
}
func (tunnel *FileWriter) Prepare() bool {
if atomic.CompareAndSwapInt32(&globalInitializer, 0, 1) {
if file, ok := _Open(tunnel.Local); ok {
tunnel.dataFile = &DataFile{filehandle: file}
} else {
LOG.Critical("File tunnel open failed")
return false
}
if info, err := os.Stat(tunnel.Local); err != nil || info.IsDir() {
LOG.Critical("File tunnel check path failed. %v", err)
return false
}
tunnel.dataFile.WriteHeader()
oplogMessage = make(chan *TMessage, 8192)
go tunnel.SyncToDisk()
}
return true
}
func (tunnel *FileWriter) AckRequired() bool {
return false
}
func (tunnel *FileWriter) ParsedLogsRequired() bool {
return false
}