odps/tableschema/trunc_time.go (148 lines of code) (raw):
package tableschema
import (
"fmt"
"strings"
"time"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
)
// TruncTime Implementation class of GenerateExpression, corresponding to SQL trunc_time('dateColumnName', 'datePart') syntax
type TruncTime struct {
dateColumnName string
cachedDataColumnIndex int
datePart DatePart
formatter *time.Time
}
// DatePart Enumeration value, used to represent the unit of time type trunc
type DatePart int
const (
_ DatePart = iota
YEAR // YEAR trunc time to string like YYYY
MONTH // MONTH trunc time to string like YYYY-MM
DAY // DAY trunc time to string like YYYY-MM-DD
HOUR // HOUR trunc time to string like YYYY-MM-DD HH:00:00
)
func (p DatePart) String() string {
switch p {
case YEAR:
return "year"
case MONTH:
return "month"
case DAY:
return "day"
case HOUR:
return "hour"
default:
return "unknown"
}
}
const (
name = "trunc_time"
nullValue = "__NULL__"
minEpochMillis = -315619200000 // MinEpochMillis Timestamp of '1960-01-01'
)
// NewTruncTime Create a TruncTime instance based on column name and date part
func NewTruncTime(dateColumnName string, datePart DatePart) *TruncTime {
truncTime := &TruncTime{
dateColumnName: dateColumnName,
datePart: datePart,
cachedDataColumnIndex: -1,
}
return truncTime
}
func newTruncTimeWithConstant(dateColumnName string, constant string) (*TruncTime, error) {
datePart, err := stringToDatePart(constant)
if err != nil {
return nil, err
}
if dateColumnName == "" {
return nil, fmt.Errorf("TruncTime dateColumnName cannot be empty")
}
truncTime := &TruncTime{
dateColumnName: dateColumnName,
datePart: datePart,
cachedDataColumnIndex: -1,
}
return truncTime, nil
}
func stringToDatePart(constant string) (DatePart, error) {
constant = strings.ToLower(constant)
switch constant {
case "year":
return YEAR, nil
case "month":
return MONTH, nil
case "day":
return DAY, nil
case "hour":
return HOUR, nil
default:
return -1, fmt.Errorf("unknown date part: %s", constant)
}
}
func (t *TruncTime) generate(record *data.Record, schema *TableSchema) (string, error) {
var dataValue data.Data
if t.cachedDataColumnIndex != -1 {
dataValue = record.Get(t.cachedDataColumnIndex)
} else {
for i, column := range schema.Columns {
if column.Name == t.dateColumnName {
dataValue = record.Get(i)
t.cachedDataColumnIndex = i
break
}
}
}
if dataValue == nil {
return nullValue, nil
}
var epochMillis int64
switch v := dataValue.(type) {
case data.Date:
t := v.Time()
epochMillis = t.UnixNano() / int64(time.Millisecond)
case data.DateTime:
t := v.Time()
epochMillis = t.UnixNano() / int64(time.Millisecond)
case data.Timestamp:
t := v.Time()
epochMillis = t.UnixNano() / int64(time.Millisecond)
case data.TimestampNtz:
t := v.Time()
epochMillis = t.UnixNano() / int64(time.Millisecond)
default:
return nullValue, fmt.Errorf("unknown data type: %T", v)
}
return t.truncEpochMillis(epochMillis), nil
}
func (t *TruncTime) truncEpochMillis(epochMillis int64) string {
if epochMillis < minEpochMillis {
return t.minGenerateValue()
}
localDateTime := time.Unix(epochMillis/1000, 0).UTC()
switch t.datePart {
case YEAR:
return localDateTime.Format("2006")
case MONTH:
return localDateTime.Format("2006-01")
case DAY:
return localDateTime.Format("2006-01-02")
case HOUR:
return localDateTime.Format("2006-01-02 15:00:00")
default:
return ""
}
}
func (t *TruncTime) minGenerateValue() string {
switch t.datePart {
case YEAR:
return "1959"
case MONTH:
return "1959-12"
case DAY:
return "1959-12-31"
case HOUR:
return "1959-12-31 23:00:00"
default:
return ""
}
}
// String Returns a string representation like "trunc_time(dateColumnName, 'datePart')"
func (t *TruncTime) String() string {
return fmt.Sprintf("%s(`%s`, '%s')", name, t.dateColumnName, t.datePart)
}