in odps/tunnel/tunnel.go [210:296]
func (t *Tunnel) Preview(table *odps.Table, partitionValue string, limit int64) ([]data.Record, error) {
if limit < 0 {
limit = -1
}
if !table.IsLoaded() {
err := table.Load()
if err != nil {
return nil, err
}
}
projectName := table.ProjectName()
schemaName := table.SchemaName()
tableName := table.Name()
queryArgs := make(url.Values, 2)
queryArgs.Set("limit", strconv.FormatInt(limit, 10))
if partitionValue != "" {
partitionValue = strings.ReplaceAll(partitionValue, "'", "")
partitionValue = strings.ReplaceAll(partitionValue, "\"", "")
partitionValue = strings.ReplaceAll(partitionValue, "/", ",")
queryArgs.Set("partition", partitionValue)
}
resource := common.NewResourceBuilder(projectName)
var resourceUrl string
resourceUrl = resource.Table(schemaName, tableName)
resourceUrl += "/preview"
client, err := t.getRestClient(projectName)
if err != nil {
return nil, err
}
req, err := client.NewRequestWithUrlQuery(
common.HttpMethod.GetMethod,
resourceUrl,
nil,
queryArgs,
)
if err != nil {
return nil, err
}
addCommonSessionHttpHeader(req.Header)
req.Header.Set(common.HttpHeaderContentLength, "0")
res, err := client.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode/100 != 2 {
return nil, restclient.NewHttpNotOk(res)
}
contentEncoding := res.Header.Get(common.HttpHeaderContentEncoding)
if contentEncoding != "" {
res.Body = WrapByCompressor(res.Body, contentEncoding)
}
reader, err := ipc.NewReader(res.Body)
if err != nil {
return nil, err
}
columns := table.Schema().Columns
partitionColumns := table.Schema().PartitionColumns
allColumns := append(columns, partitionColumns...)
var results []data.Record
for {
arrowRecord, err := reader.Read()
isEOF := errors.Is(err, io.EOF)
if isEOF {
break
}
if err != nil {
return nil, err
}
records, err := tableschema.ToMaxComputeRecords(arrowRecord, allColumns, tableschema.ArrowOptionConfig.WithExtendedMode())
if err != nil {
return nil, err
}
results = append(results, records...)
arrowRecord.Retain()
}
return results, nil
}