func()

in odps/tunnel/stream_upload_session.go [319:391]


func (su *StreamUploadSession) newUploadConnection(reader io.ReadCloser, writer io.WriteCloser, currentSlot *slot, dataSize int64, recordCount int64, timeout time.Duration) (*httpConnection, error) {
	queryArgs := make(url.Values, 5)
	queryArgs.Set("uploadid", su.id)
	queryArgs.Set("slotid", currentSlot.id)
	if su.schemaVersion >= 0 {
		queryArgs.Set("schema_version", strconv.Itoa(su.schemaVersion))
	}

	if su.partitionKey != "" {
		queryArgs.Set("partition", su.partitionKey)
	}

	if recordCount > 0 {
		queryArgs.Set("record_count", strconv.FormatInt(recordCount, 10))
	}

	if len(su.Columns) > 0 {
		queryArgs.Set("zorder_columns", strings.Join(su.Columns, ","))
	}
	if su.QuotaName != "" {
		queryArgs.Set("quotaName", su.QuotaName)
	}

	queryArgs.Set("check_latest_schema", strconv.FormatBool(!su.allowSchemaMismatch))

	headers := getCommonHeaders()
	if dataSize < 0 {
		headers[common.HttpHeaderTransferEncoding] = "chunked"
	} else {
		headers[common.HttpHeaderContentLength] = strconv.FormatInt(dataSize, 10)
	}
	headers[common.HttpHeaderContentType] = "application/octet-stream"
	headers[common.HttpHeaderOdpsSlotNum] = strconv.Itoa(su.slotSelector.SlotNum())

	if su.Compressor != nil {
		headers[common.HttpHeaderContentEncoding] = su.Compressor.Name()
	}
	headers[common.HttpHeaderRoutedServer] = currentSlot.Server()

	resource := su.ResourceUrl()
	req, err := su.RestClient.NewRequestWithParamsAndHeaders(common.HttpMethod.PutMethod, resource, reader, queryArgs, headers)
	if err != nil {
		return nil, errors.WithStack(err)
	}

	resChan := make(chan resOrErr)
	go func() {
		endpoint := su.RestClient.Endpoint()
		if su.P2PMode {
			defaultEndpoint, _ := url.Parse(su.RestClient.Endpoint())

			newUrl := url.URL{
				Scheme: defaultEndpoint.Scheme,
				Host:   currentSlot.ip,
			}

			endpoint = newUrl.String()
		}

		client := restclient.NewOdpsRestClient(su.RestClient, endpoint)
		client.TcpConnectionTimeout = su.RestClient.TcpConnectionTimeout
		client.HttpTimeout = su.RestClient.HttpTimeout
		if timeout > 0 {
			client.HttpTimeout = timeout
		}

		res, err := client.Do(req)
		resChan <- resOrErr{err: err, res: res}
	}()

	httpConn := newHttpConnection(writer, resChan, su.Compressor)
	return httpConn, nil
}