lib/backend/hdfsbackend/webhdfs/client.go (266 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package webhdfs
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"time"
"github.com/cenkalti/backoff"
"github.com/uber/kraken/lib/backend/backenderrors"
"github.com/uber/kraken/utils/httputil"
"github.com/uber/kraken/utils/memsize"
)
// Client wraps webhdfs operations. All paths must be absolute.
type Client interface {
Create(path string, src io.Reader) error
Rename(from, to string) error
Mkdirs(path string) error
Open(path string, dst io.Writer) error
GetFileStatus(path string) (FileStatus, error)
ListFileStatus(path string) ([]FileStatus, error)
}
type allNameNodesFailedError struct {
err error
}
func (e allNameNodesFailedError) Error() string {
return fmt.Sprintf("all name nodes failed: %s", e.err)
}
func retryable(err error) bool {
return httputil.IsForbidden(err) || httputil.IsNetworkError(err)
}
type client struct {
config Config
namenodes []string
username string
}
// NewClient creates a new Client.
func NewClient(config Config, namenodes []string, username string) (Client, error) {
config.applyDefaults()
if len(namenodes) == 0 {
return nil, errors.New("namenodes required")
}
return &client{config, namenodes, username}, nil
}
// nameNodeBackOff returns the backoff used on all http requests to namenodes.
// We use a fairly aggressive backoff to handle failover.
//
// TODO(codyg): Normally it is be the responsibility of the farthest downstream
// client to handle retry, however the Mesos fetcher does not currently support
// retry, and thus namenode failovers have caused task launch failures. Ideally
// we can remove this once the Mesos fetcher is more reliable.
func (c *client) nameNodeBackOff() backoff.BackOff {
b := &backoff.ExponentialBackOff{
InitialInterval: 2 * time.Second,
RandomizationFactor: 0.05,
Multiplier: 2,
MaxInterval: 30 * time.Second,
Clock: backoff.SystemClock,
}
return backoff.WithMaxRetries(b, 5)
}
type exceededCapError error
// capBuffer is a buffer that returns errors if the buffer exceeds cap.
type capBuffer struct {
cap int64
buf *bytes.Buffer
}
func (b *capBuffer) Write(p []byte) (n int, err error) {
if int64(len(p)+b.buf.Len()) > b.cap {
return 0, exceededCapError(
fmt.Errorf("buffer exceeded max capacity %s", memsize.Format(uint64(b.cap))))
}
return b.buf.Write(p)
}
type drainSrcError struct {
err error
}
func (e drainSrcError) Error() string { return fmt.Sprintf("drain src: %s", e.err) }
func (c *client) Create(path string, src io.Reader) error {
// We must be able to replay src in the event that uploading to the data node
// fails halfway through the upload, thus we attempt to upcast src to an io.Seeker
// for this purpose. If src is not an io.Seeker, we drain it to an in-memory buffer
// that can be replayed.
readSeeker, ok := src.(io.ReadSeeker)
if !ok {
var b []byte
if buf, ok := src.(*bytes.Buffer); ok {
// Optimization to avoid draining an existing buffer.
b = buf.Bytes()
} else {
cbuf := &capBuffer{int64(c.config.BufferGuard), new(bytes.Buffer)}
if _, err := io.Copy(cbuf, src); err != nil {
return drainSrcError{err}
}
b = cbuf.buf.Bytes()
}
readSeeker = bytes.NewReader(b)
}
v := c.values()
v.Set("op", "CREATE")
v.Set("buffersize", strconv.FormatInt(int64(c.config.BufferSize), 10))
v.Set("overwrite", "true")
var nameresp, dataresp *http.Response
var nnErr error
for _, nn := range c.namenodes {
nameresp, nnErr = httputil.Put(
getURL(nn, path, v),
httputil.SendRetry(httputil.RetryBackoff(c.nameNodeBackOff())),
httputil.SendRedirect(func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}),
httputil.SendAcceptedCodes(http.StatusTemporaryRedirect, http.StatusPermanentRedirect))
if nnErr != nil {
if retryable(nnErr) {
continue
}
return nnErr
}
defer nameresp.Body.Close()
// Follow redirect location manually per WebHDFS protocol.
loc, ok := nameresp.Header["Location"]
if !ok || len(loc) == 0 {
return fmt.Errorf("missing location field in response header: %s", nameresp.Header)
}
dataresp, nnErr = httputil.Put(
loc[0],
httputil.SendBody(readSeeker),
httputil.SendAcceptedCodes(http.StatusCreated))
if nnErr != nil {
if retryable(nnErr) {
// Reset reader for next retry.
if _, err := readSeeker.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seek: %s", err)
}
continue
}
return nnErr
}
defer dataresp.Body.Close()
return nil
}
return allNameNodesFailedError{nnErr}
}
func (c *client) Rename(from, to string) error {
v := c.values()
v.Set("op", "RENAME")
v.Set("destination", to)
var resp *http.Response
var nnErr error
for _, nn := range c.namenodes {
resp, nnErr = httputil.Put(
getURL(nn, from, v),
httputil.SendRetry(httputil.RetryBackoff(c.nameNodeBackOff())))
if nnErr != nil {
if retryable(nnErr) {
continue
}
return nnErr
}
resp.Body.Close()
return nil
}
return allNameNodesFailedError{nnErr}
}
func (c *client) Mkdirs(path string) error {
v := c.values()
v.Set("op", "MKDIRS")
v.Set("permission", "777")
var resp *http.Response
var nnErr error
for _, nn := range c.namenodes {
resp, nnErr = httputil.Put(
getURL(nn, path, v),
httputil.SendRetry(httputil.RetryBackoff(c.nameNodeBackOff())))
if nnErr != nil {
if retryable(nnErr) {
continue
}
return nnErr
}
resp.Body.Close()
return nil
}
return allNameNodesFailedError{nnErr}
}
func (c *client) Open(path string, dst io.Writer) error {
v := c.values()
v.Set("op", "OPEN")
v.Set("buffersize", strconv.FormatInt(int64(c.config.BufferSize), 10))
var resp *http.Response
var nnErr error
for _, nn := range c.namenodes {
// We retry 400s here because experience has shown the datanode this
// request gets redirected to is sometimes invalid, and will return a 400
// error. By retrying the request, we hope to eventually get redirected
// to a valid datanode.
resp, nnErr = httputil.Get(
getURL(nn, path, v),
httputil.SendRetry(
httputil.RetryBackoff(c.nameNodeBackOff()),
httputil.RetryCodes(http.StatusBadRequest)))
if nnErr != nil {
if retryable(nnErr) {
continue
}
if httputil.IsNotFound(nnErr) {
return backenderrors.ErrBlobNotFound
}
return nnErr
}
defer resp.Body.Close()
if n, err := io.Copy(dst, resp.Body); err != nil {
return fmt.Errorf("copy response: %s", err)
} else if n != resp.ContentLength {
return fmt.Errorf(
"transferred bytes %d does not match content length %d", n, resp.ContentLength)
}
return nil
}
return allNameNodesFailedError{nnErr}
}
func (c *client) GetFileStatus(path string) (FileStatus, error) {
v := c.values()
v.Set("op", "GETFILESTATUS")
var resp *http.Response
var nnErr error
for _, nn := range c.namenodes {
resp, nnErr = httputil.Get(
getURL(nn, path, v),
httputil.SendRetry(httputil.RetryBackoff(c.nameNodeBackOff())))
if nnErr != nil {
if retryable(nnErr) {
continue
}
if httputil.IsNotFound(nnErr) {
return FileStatus{}, backenderrors.ErrBlobNotFound
}
return FileStatus{}, nnErr
}
defer resp.Body.Close()
var fsr fileStatusResponse
if err := json.NewDecoder(resp.Body).Decode(&fsr); err != nil {
return FileStatus{}, fmt.Errorf("decode body: %s", err)
}
return fsr.FileStatus, nil
}
return FileStatus{}, allNameNodesFailedError{nnErr}
}
func (c *client) ListFileStatus(path string) ([]FileStatus, error) {
v := c.values()
v.Set("op", "LISTSTATUS")
var resp *http.Response
var nnErr error
for _, nn := range c.namenodes {
resp, nnErr = httputil.Get(
getURL(nn, path, v),
httputil.SendRetry(httputil.RetryBackoff(c.nameNodeBackOff())))
if nnErr != nil {
if retryable(nnErr) {
continue
}
return nil, nnErr
}
defer resp.Body.Close()
var lsr listStatusResponse
if err := json.NewDecoder(resp.Body).Decode(&lsr); err != nil {
return nil, fmt.Errorf("decode body: %s", err)
}
return lsr.FileStatuses.FileStatus, nil
}
return nil, allNameNodesFailedError{nnErr}
}
func (c *client) values() url.Values {
v := url.Values{}
if c.username != "" {
v.Set("user.name", c.username)
}
return v
}
func getURL(namenode, p string, v url.Values) string {
endpoint := path.Join("/webhdfs/v1", p)
return fmt.Sprintf("http://%s%s?%s", namenode, endpoint, v.Encode())
}