func()

in lib/backend/hdfsbackend/webhdfs/client.go [111:180]


func (c *client) Create(path string, src io.Reader) error {
	// We must be able to replay src in the event that uploading to the data node
	// fails halfway through the upload, thus we attempt to upcast src to an io.Seeker
	// for this purpose. If src is not an io.Seeker, we drain it to an in-memory buffer
	// that can be replayed.
	readSeeker, ok := src.(io.ReadSeeker)
	if !ok {
		var b []byte
		if buf, ok := src.(*bytes.Buffer); ok {
			// Optimization to avoid draining an existing buffer.
			b = buf.Bytes()
		} else {
			cbuf := &capBuffer{int64(c.config.BufferGuard), new(bytes.Buffer)}
			if _, err := io.Copy(cbuf, src); err != nil {
				return drainSrcError{err}
			}
			b = cbuf.buf.Bytes()
		}
		readSeeker = bytes.NewReader(b)
	}

	v := c.values()
	v.Set("op", "CREATE")
	v.Set("buffersize", strconv.FormatInt(int64(c.config.BufferSize), 10))
	v.Set("overwrite", "true")

	var nameresp, dataresp *http.Response
	var nnErr error
	for _, nn := range c.namenodes {
		nameresp, nnErr = httputil.Put(
			getURL(nn, path, v),
			httputil.SendRetry(httputil.RetryBackoff(c.nameNodeBackOff())),
			httputil.SendRedirect(func(req *http.Request, via []*http.Request) error {
				return http.ErrUseLastResponse
			}),
			httputil.SendAcceptedCodes(http.StatusTemporaryRedirect, http.StatusPermanentRedirect))
		if nnErr != nil {
			if retryable(nnErr) {
				continue
			}
			return nnErr
		}
		defer nameresp.Body.Close()

		// Follow redirect location manually per WebHDFS protocol.
		loc, ok := nameresp.Header["Location"]
		if !ok || len(loc) == 0 {
			return fmt.Errorf("missing location field in response header: %s", nameresp.Header)
		}

		dataresp, nnErr = httputil.Put(
			loc[0],
			httputil.SendBody(readSeeker),
			httputil.SendAcceptedCodes(http.StatusCreated))
		if nnErr != nil {
			if retryable(nnErr) {
				// Reset reader for next retry.
				if _, err := readSeeker.Seek(0, io.SeekStart); err != nil {
					return fmt.Errorf("seek: %s", err)
				}
				continue
			}
			return nnErr
		}
		defer dataresp.Body.Close()

		return nil
	}
	return allNameNodesFailedError{nnErr}
}