pipe/hdfs.go (123 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package pipe
import (
"database/sql"
"io"
"os"
"strings"
"time"
"github.com/efirs/hdfs/v2"
"github.com/uber/storagetapper/config"
"github.com/uber/storagetapper/log"
"github.com/uber/storagetapper/metrics"
//"context"
)
type hdfsClient struct {
*hdfs.Client
}
type hdfsWriter struct {
*hdfs.FileWriter
}
func (p *hdfsClient) OpenRead(name string, offset int64) (io.ReadCloser, error) {
f, err := p.Client.Open(name)
if err != nil {
return nil, err
}
_, err = f.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
return f, nil
}
func (p *hdfsClient) openWriteLow(name string) (flushWriteCloser, io.Seeker, error) {
f, err := p.Client.Append(name)
if err != nil {
f, err = p.Client.Create(name)
}
return &hdfsWriter{f}, nil, err
}
func (p *hdfsClient) OpenWrite(name string) (fc flushWriteCloser, sc io.Seeker, err error) {
return fc, sc, withRetry(func() error { fc, sc, err = p.openWriteLow(name); return err })
}
var retryTimeout = 10 //seconds
func retriable(err error) bool {
return strings.Contains(err.Error(), "org.apache.hadoop.ipc.StandbyException") ||
strings.Contains(err.Error(), "org.apache.hadoop.ipc.RetriableException")
}
func withRetry(fn func() error) error {
err := fn()
for i := 0; err != nil && retriable(err) && i < retryTimeout*10; i++ {
time.Sleep(100 * time.Millisecond)
err = fn()
}
return err
}
func (p *hdfsClient) MkdirAll(path string, perm os.FileMode) error {
return withRetry(func() error { return p.Client.MkdirAll(path, perm) })
}
func (p *hdfsClient) Rename(oldpath, newpath string) error {
return withRetry(func() error { return p.Client.Rename(oldpath, newpath) })
}
func (p *hdfsClient) Remove(path string) error {
return withRetry(func() error { return p.Client.Remove(path) })
}
func (p *hdfsClient) Cancel(f io.Closer) error {
return nil
}
func (p *hdfsClient) ReadDir(dir string, _ string) ([]os.FileInfo, error) {
return p.Client.ReadDir(dir)
}
func (p *hdfsWriter) Write(b []byte) (int, error) {
var err error
var n int
off := 0
for off < len(b) && err == nil {
n, err = p.FileWriter.Write(b[off:])
off += n
for i := 0; err != nil && retriable(err) && i < retryTimeout*10 && off < len(b); i++ {
time.Sleep(100 * time.Millisecond)
n, err = p.FileWriter.Write(b[off:])
off += n
}
}
return off, err
}
func (p *hdfsWriter) Flush() error {
return nil
// return withRetry(func() error { return p.FileWriter.Flush() })
}
func (p *hdfsWriter) Close() error {
return withRetry(func() error { return p.FileWriter.Close() })
}
type hdfsPipe struct {
filePipe
hdfs *hdfs.Client
}
// hdfsConsumer consumes messages from Hdfs using topic and partition specified during consumer creation
type hdfsConsumer struct {
fileConsumer
}
func init() {
registerPlugin("hdfs", initHdfsPipe)
}
func initHdfsPipe(cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {
cp := hdfs.ClientOptions{User: cfg.Hadoop.User, Addresses: cfg.Hadoop.Addresses}
client, err := hdfs.NewClient(cp)
if log.E(err) {
return nil, err
}
log.Infof("Connected to HDFS cluster at: %v", cfg.Hadoop.Addresses)
return &hdfsPipe{filePipe{cfg.Hadoop.BaseDir, *cfg}, client}, nil
}
// Type returns Pipe type as Hdfs
func (p *hdfsPipe) Type() string {
return "hdfs"
}
// Close releases resources associated with the pipe
func (p *hdfsPipe) Close() error {
return p.hdfs.Close()
}
//NewProducer registers a new sync producer
func (p *hdfsPipe) NewProducer(topic string) (Producer, error) {
m := metrics.NewFilePipeMetrics("pipe_producer", map[string]string{"topic": topic, "pipeType": "hdfs"})
return &fileProducer{filePipe: &p.filePipe, topic: topic, files: make(map[string]*file), fs: &hdfsClient{p.hdfs}, metrics: m, stats: make(map[string]*stat)}, nil
}
//NewConsumer registers a new hdfs consumer with context
func (p *hdfsPipe) NewConsumer(topic string) (Consumer, error) {
m := metrics.NewFilePipeMetrics("pipe_consumer", map[string]string{"topic": topic, "pipeType": "hdfs"})
c := &hdfsConsumer{fileConsumer{filePipe: &p.filePipe, topic: topic, fs: &hdfsClient{p.hdfs}, metrics: m}}
_, err := p.initConsumer(&c.fileConsumer, c.fetchNextPoll)
return c, err
}