tunnel/tunnel.go (160 lines of code) (raw):

package tunnel import ( "bytes" "encoding/binary" "fmt" "hash/crc32" "github.com/alibaba/MongoShake/v2/oplog" utils "github.com/alibaba/MongoShake/v2/common" conf "github.com/alibaba/MongoShake/v2/collector/configure" "github.com/gugemichael/nimo4go" LOG "github.com/vinllen/log4go" ) const InitialStageChecking = false const ( MsgNormal = 0x00000000 MsgRetransmission = 0x00000001 MsgProbe = 0x00000010 MsgResident = 0x00000100 MsgPersistent = 0x00001000 MsgStorageBackend = 0x00010000 ) const ( ReplyOK int64 = 0 ReplyError int64 = -1 ReplyNetworkOpFail int64 = -2 ReplyNetworkTimeout int64 = -3 ReplyRetransmission int64 = -4 ReplyServerFault int64 = -5 ReplyChecksumInvalid int64 = -6 ReplyCompressorNotSupported int64 = -7 ReplyDecompressInvalid = -8 ) // WMessage wrapped TMessage type WMessage struct { *TMessage // whole raw log ParsedLogs []*oplog.PartialLog // parsed log } type TMessage struct { Checksum uint32 Tag uint32 Shard uint32 Compress uint32 RawLogs [][]byte } func (msg *TMessage) Crc32() uint32 { var value uint32 for _, log := range msg.RawLogs { value ^= crc32.ChecksumIEEE(log) } return value } func (msg *TMessage) ToBytes(order binary.ByteOrder) []byte { buffer := bytes.Buffer{} binary.Write(&buffer, order, msg.Checksum) binary.Write(&buffer, order, msg.Tag) binary.Write(&buffer, order, msg.Shard) binary.Write(&buffer, order, msg.Compress) binary.Write(&buffer, order, uint32(len(msg.RawLogs))) for _, log := range msg.RawLogs { binary.Write(&buffer, order, uint32(len(log))) buffer.Write(log) } return buffer.Bytes() } func (msg *TMessage) FromBytes(buf []byte, order binary.ByteOrder) { buffer := bytes.NewBuffer(buf) binary.Read(buffer, order, &msg.Checksum) binary.Read(buffer, order, &msg.Tag) binary.Read(buffer, order, &msg.Shard) binary.Read(buffer, order, &msg.Compress) var n uint32 binary.Read(buffer, order, &n) nimo.AssertTrue((buffer.Len() != 0 && msg.Tag&MsgProbe == 0) || (buffer.Len() == 0 && msg.Tag&MsgProbe != 0), "message decode left bytes are empty") var start = uint32(len(buf) - buffer.Len()) for n != 0 { tmp := bytes.NewBuffer(buf[start:]) var length uint32 binary.Read(tmp, order, &length) start += 4 // total "n" number should be exactly correct. crash with // out of range with slice if we got dirty records nimo.AssertTrue(start+length <= uint32(len(buf)), "oplogs in msg offset is invalid") bytes := buf[start : start+length] start += length msg.RawLogs = append(msg.RawLogs, bytes) n-- } } func (msg *TMessage) String() string { return fmt.Sprintf("[cksum:%d, tag:%d, shard:%d, compress:%d, logs_len:%d]", msg.Checksum, msg.Tag, msg.Shard, msg.Compress, len(msg.RawLogs)) } func (msg *TMessage) ApproximateSize() uint64 { var size uint64 = 0 for _, log := range msg.RawLogs { size += uint64(len(log)) } return size } type Writer interface { /** * Indicate weather this tunnel cares about ACK feedback value. * Like RPC_TUNNEL (ack required is true), it's asynchronous and * needs peer receiver has completely consumed the log entries * and we can drop the reserved log entries only if the log entry * ACK is confirmed */ AckRequired() bool /** * prepare stage of the tunnel such as create the network connection or initialize * something etc before the Send() invocation. * return true on successful or false on failed */ Prepare() bool /** * write the real tunnel message to tunnel. * * return the right ACK offset value with positive number. if AckRequired is set * this ACk offset is used to purge buffered oplogs. Otherwise upper layer use * the max oplog ts as ACK offset and discard the returned value (ACK offset). * error on returning a negative number */ Send(message *WMessage) int64 /** * whether need parsed log or raw log */ ParsedLogsRequired() bool /* * tunnel name */ Name() string } type WriterFactory struct { Name string } // create specific Tunnel with tunnel name and pass connection // or usefully meta func (factory *WriterFactory) Create(address []string, workerId uint32) Writer { switch factory.Name { case utils.VarTunnelKafka: return &KafkaWriter{ RemoteAddr: address[0], PartitionId: int(workerId) % conf.Options.TunnelKafkaPartitionNumber, } case utils.VarTunnelTcp: return &TCPWriter{RemoteAddr: address[0]} case utils.VarTunnelRpc: return &RPCWriter{RemoteAddr: address[0]} case utils.VarTunnelMock: return &MockWriter{} case utils.VarTunnelFile: return &FileWriter{Local: address[0]} case utils.VarTunnelDirect: return &DirectWriter{RemoteAddrs: address, ReplayerId: workerId} default: LOG.Critical("Specific tunnel not found [%s]", factory.Name) return nil } } // create specific Tunnel with tunnel name and pass connection // or usefully meta func (factory *ReaderFactory) Create(address string) Reader { switch factory.Name { case utils.VarTunnelKafka: return &KafkaReader{address: address} case utils.VarTunnelTcp: return &TCPReader{listenAddress: address} case utils.VarTunnelRpc: return &RPCReader{address: address} case utils.VarTunnelMock: return &MockReader{} case utils.VarTunnelFile: return &FileReader{File: address} case utils.VarTunnelDirect: LOG.Critical("direct mode not supported in reader") return nil default: LOG.Critical("Specific tunnel not found [%s]", factory.Name) return nil } } type Reader interface { /** * Bridge of tunnel reader and aggregater(replayer) * */ Link(aggregate []Replayer) error } type Replayer interface { /** * Replay oplog entry with batched Oplog * */ Sync(message *TMessage, completion func()) int64 /** * Ack offset value * */ GetAcked() int64 } type ReaderFactory struct { Name string }