in odps/tunnel/stream_upload_session.go [319:391]
func (su *StreamUploadSession) newUploadConnection(reader io.ReadCloser, writer io.WriteCloser, currentSlot *slot, dataSize int64, recordCount int64, timeout time.Duration) (*httpConnection, error) {
queryArgs := make(url.Values, 5)
queryArgs.Set("uploadid", su.id)
queryArgs.Set("slotid", currentSlot.id)
if su.schemaVersion >= 0 {
queryArgs.Set("schema_version", strconv.Itoa(su.schemaVersion))
}
if su.partitionKey != "" {
queryArgs.Set("partition", su.partitionKey)
}
if recordCount > 0 {
queryArgs.Set("record_count", strconv.FormatInt(recordCount, 10))
}
if len(su.Columns) > 0 {
queryArgs.Set("zorder_columns", strings.Join(su.Columns, ","))
}
if su.QuotaName != "" {
queryArgs.Set("quotaName", su.QuotaName)
}
queryArgs.Set("check_latest_schema", strconv.FormatBool(!su.allowSchemaMismatch))
headers := getCommonHeaders()
if dataSize < 0 {
headers[common.HttpHeaderTransferEncoding] = "chunked"
} else {
headers[common.HttpHeaderContentLength] = strconv.FormatInt(dataSize, 10)
}
headers[common.HttpHeaderContentType] = "application/octet-stream"
headers[common.HttpHeaderOdpsSlotNum] = strconv.Itoa(su.slotSelector.SlotNum())
if su.Compressor != nil {
headers[common.HttpHeaderContentEncoding] = su.Compressor.Name()
}
headers[common.HttpHeaderRoutedServer] = currentSlot.Server()
resource := su.ResourceUrl()
req, err := su.RestClient.NewRequestWithParamsAndHeaders(common.HttpMethod.PutMethod, resource, reader, queryArgs, headers)
if err != nil {
return nil, errors.WithStack(err)
}
resChan := make(chan resOrErr)
go func() {
endpoint := su.RestClient.Endpoint()
if su.P2PMode {
defaultEndpoint, _ := url.Parse(su.RestClient.Endpoint())
newUrl := url.URL{
Scheme: defaultEndpoint.Scheme,
Host: currentSlot.ip,
}
endpoint = newUrl.String()
}
client := restclient.NewOdpsRestClient(su.RestClient, endpoint)
client.TcpConnectionTimeout = su.RestClient.TcpConnectionTimeout
client.HttpTimeout = su.RestClient.HttpTimeout
if timeout > 0 {
client.HttpTimeout = timeout
}
res, err := client.Do(req)
resChan <- resOrErr{err: err, res: res}
}()
httpConn := newHttpConnection(writer, resChan, su.Compressor)
return httpConn, nil
}