in lib/backend/hdfsbackend/webhdfs/client.go [111:180]
func (c *client) Create(path string, src io.Reader) error {
// We must be able to replay src in the event that uploading to the data node
// fails halfway through the upload, thus we attempt to upcast src to an io.Seeker
// for this purpose. If src is not an io.Seeker, we drain it to an in-memory buffer
// that can be replayed.
readSeeker, ok := src.(io.ReadSeeker)
if !ok {
var b []byte
if buf, ok := src.(*bytes.Buffer); ok {
// Optimization to avoid draining an existing buffer.
b = buf.Bytes()
} else {
cbuf := &capBuffer{int64(c.config.BufferGuard), new(bytes.Buffer)}
if _, err := io.Copy(cbuf, src); err != nil {
return drainSrcError{err}
}
b = cbuf.buf.Bytes()
}
readSeeker = bytes.NewReader(b)
}
v := c.values()
v.Set("op", "CREATE")
v.Set("buffersize", strconv.FormatInt(int64(c.config.BufferSize), 10))
v.Set("overwrite", "true")
var nameresp, dataresp *http.Response
var nnErr error
for _, nn := range c.namenodes {
nameresp, nnErr = httputil.Put(
getURL(nn, path, v),
httputil.SendRetry(httputil.RetryBackoff(c.nameNodeBackOff())),
httputil.SendRedirect(func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}),
httputil.SendAcceptedCodes(http.StatusTemporaryRedirect, http.StatusPermanentRedirect))
if nnErr != nil {
if retryable(nnErr) {
continue
}
return nnErr
}
defer nameresp.Body.Close()
// Follow redirect location manually per WebHDFS protocol.
loc, ok := nameresp.Header["Location"]
if !ok || len(loc) == 0 {
return fmt.Errorf("missing location field in response header: %s", nameresp.Header)
}
dataresp, nnErr = httputil.Put(
loc[0],
httputil.SendBody(readSeeker),
httputil.SendAcceptedCodes(http.StatusCreated))
if nnErr != nil {
if retryable(nnErr) {
// Reset reader for next retry.
if _, err := readSeeker.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seek: %s", err)
}
continue
}
return nnErr
}
defer dataresp.Body.Close()
return nil
}
return allNameNodesFailedError{nnErr}
}