tensorflow_ops/timestamp_ops_kernel.cc (718 lines of code) (raw):
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cstdint>
#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/strings/ascii.h"
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "sql_utils/public/functions/arithmetics.h"
#include "sql_utils/public/functions/date_time_util.h"
#include "sql_utils/public/functions/datetime.pb.h"
#include "sql_utils/public/functions/parse_date_time.h"
#include "sql_utils/public/types/timestamp_util.h"
#include "tensorflow_ops/constants.h"
#include "tensorflow_ops/utils.h"
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/framework/op_requires.h"
using ::tensorflow::DEVICE_CPU;
using ::tensorflow::OpKernel;
using ::tensorflow::OpKernelConstruction;
using ::tensorflow::OpKernelContext;
using ::tensorflow::Tensor;
using ::tensorflow::tstring;
using ::tensorflow::errors::Internal;
using ::tensorflow::errors::InvalidArgument;
namespace bigquery_ml_utils {
class ExtractFromTimestamp : public OpKernel {
public:
explicit ExtractFromTimestamp(OpKernelConstruction* context)
: OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the part tensor
const Tensor& part_tensor = context->input(0);
std::string part = absl::AsciiStrToLower(part_tensor.flat<tstring>()(0));
static auto* supported_parts =
new absl::flat_hash_set<functions::DateTimestampPart>(
{functions::MICROSECOND, functions::MILLISECOND,
functions::SECOND, functions::MINUTE,
functions::HOUR, functions::DAYOFWEEK,
functions::DAY, functions::DAYOFYEAR,
functions::WEEK, functions::WEEK_MONDAY,
functions::WEEK_TUESDAY, functions::WEEK_WEDNESDAY,
functions::WEEK_THURSDAY, functions::WEEK_FRIDAY,
functions::WEEK_SATURDAY, functions::ISOWEEK,
functions::MONTH, functions::QUARTER,
functions::YEAR, functions::ISOYEAR});
functions::DateTimestampPart part_enum;
OP_REQUIRES_OK(context, ParseInputDateTimestampPart(
part, name(), &part_enum, *supported_parts));
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(1);
auto timestamp = timestamp_tensor.flat<tstring>();
// Grab the time_zone tensor
const Tensor& time_zone_tensor = context->input(2);
std::string time_zone = time_zone_tensor.flat<tstring>()(0);
absl::TimeZone tz;
OP_REQUIRES_OK(
context, ToTslStatus(name(), functions::MakeTimeZone(time_zone, &tz)));
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<int64_t>();
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(context,
ParseInputTimestamp(timestamp(i), tz, name(), &ts));
// Extract part from the timestamp.
int32_t out;
OP_REQUIRES_OK(
context,
ToTslStatus(name(),
functions::ExtractFromTimestamp(
part_enum, ts, functions::kMicroseconds, tz, &out)));
// Set the output value.
// Currently, BQML util inference only supports int64.
output_flat(i) = static_cast<int64_t>(out);
}
}
};
class StringFromTimestamp : public OpKernel {
public:
explicit StringFromTimestamp(OpKernelConstruction* context)
: OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(0);
auto timestamp = timestamp_tensor.flat<tstring>();
// Grab the time_zone tensor
const Tensor& time_zone_tensor = context->input(1);
std::string time_zone = time_zone_tensor.flat<tstring>()(0);
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<tstring>();
// Parse and validate the timezone.
absl::TimeZone tz;
OP_REQUIRES_OK(
context, ToTslStatus(name(), functions::MakeTimeZone(time_zone, &tz)));
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(context,
ParseInputTimestamp(timestamp(i), tz, name(), &ts));
// Convert timestamp to string.
std::string out;
OP_REQUIRES_OK(
context,
ToTslStatus(name(),
functions::ConvertTimestampMicrosToStringWithTruncation(
ts, tz, &out)));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class TimestampFromString : public OpKernel {
public:
explicit TimestampFromString(OpKernelConstruction* context)
: OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(0);
auto timestamp = timestamp_tensor.flat<tstring>();
// Grab the time_zone tensor
const Tensor& time_zone_tensor = context->input(1);
std::string time_zone = time_zone_tensor.flat<tstring>()(0);
// Grab the allow_tz_in_str tensor
const Tensor& allow_tz_in_str_tensor = context->input(2);
bool allow_tz_in_str = allow_tz_in_str_tensor.flat<bool>()(0);
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<tstring>();
// Parse and validate the timezone.
absl::TimeZone tz;
OP_REQUIRES_OK(
context, ToTslStatus(name(), functions::MakeTimeZone(time_zone, &tz)));
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(
context,
ToTslStatus(name(), functions::ConvertStringToTimestamp(
timestamp(i), tz, functions::kMicroseconds,
allow_tz_in_str, &ts)));
// Format timestamp to string.
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class TimestampFromDate : public OpKernel {
public:
explicit TimestampFromDate(OpKernelConstruction* context)
: OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the date tensor
const Tensor& date_tensor = context->input(0);
auto date = date_tensor.flat<tstring>();
// Grab the time_zone tensor
const Tensor& time_zone_tensor = context->input(1);
std::string time_zone = time_zone_tensor.flat<tstring>()(0);
// Create an output tensor with the shape of the date tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(0, date_tensor.shape(),
&output_tensor));
auto output_flat = output_tensor->flat<tstring>();
// Parse and validate the timezone.
absl::TimeZone tz;
OP_REQUIRES_OK(
context, ToTslStatus(name(), functions::MakeTimeZone(time_zone, &tz)));
const int N = date.size();
for (int i = 0; i < N; i++) {
// Parse the date.
int32_t date_int;
OP_REQUIRES_OK(context, ParseInputDate(date(i), name(), &date_int));
int64_t ts;
OP_REQUIRES_OK(context,
ToTslStatus(name(), functions::ConvertDateToTimestamp(
date_int, functions::kMicroseconds,
tz, &ts)));
// Format timestamp to string.
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class TimestampFromDatetime : public OpKernel {
public:
explicit TimestampFromDatetime(OpKernelConstruction* context)
: OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the date tensor
const Tensor& date_tensor = context->input(0);
auto datetime = date_tensor.flat<tstring>();
// Grab the time_zone tensor
const Tensor& time_zone_tensor = context->input(1);
std::string time_zone = time_zone_tensor.flat<tstring>()(0);
// Create an output tensor with the shape of the datetime tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(0, date_tensor.shape(),
&output_tensor));
auto output_flat = output_tensor->flat<tstring>();
// Parse and validate the timezone.
absl::TimeZone tz;
OP_REQUIRES_OK(
context, ToTslStatus(name(), functions::MakeTimeZone(time_zone, &tz)));
const int N = datetime.size();
for (int i = 0; i < N; i++) {
// Parse the datetime.
DatetimeValue dt;
OP_REQUIRES_OK(context, ParseInputDatetime(datetime(i), name(), &dt));
absl::Time base_time;
OP_REQUIRES_OK(context,
ToTslStatus(name(), functions::ConvertDatetimeToTimestamp(
DatetimeValue::FromPacked64Micros(
dt.Packed64DatetimeMicros()),
tz, &base_time)));
int64_t ts = absl::ToUnixMicros(base_time);
// Format timestamp to string.
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class TimestampAdd : public OpKernel {
public:
explicit TimestampAdd(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(0);
auto timestamp = timestamp_tensor.flat<tstring>();
// Grab the interval tensor
const Tensor& diff_tensor = context->input(1);
auto interval_int = diff_tensor.flat<int64_t>();
OP_REQUIRES(
context, interval_int.size() == timestamp.size(),
InvalidArgument(absl::Substitute(
"Error in $0: timestamp and interval must have the same shape, "
"but are $1, $2",
name(), timestamp.size(), interval_int.size())));
// Grab the part tensor
const Tensor& part_tensor = context->input(2);
std::string part = part_tensor.flat<tstring>()(0);
functions::DateTimestampPart part_enum;
static auto* supported_parts =
new absl::flat_hash_set<functions::DateTimestampPart>(
{functions::MICROSECOND, functions::MILLISECOND, functions::SECOND,
functions::MINUTE, functions::HOUR, functions::DAY});
OP_REQUIRES_OK(context, ParseInputDateTimestampPart(
part, name(), &part_enum, *supported_parts));
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<tstring>();
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Default time zone.
absl::TimeZone tz = absl::UTCTimeZone();
// Parse the timestamp.
int64_t input_ts;
OP_REQUIRES_OK(context,
ParseInputTimestamp(timestamp(i), tz, name(), &input_ts));
absl::StatusOr<IntervalValue> interval =
GetIntervalValue(interval_int(i), part_enum);
OP_REQUIRES(
context, interval.ok(),
Internal("Error in getting interval of TimestampAdd with status: ",
interval.status()));
absl::Time base_time;
OP_REQUIRES_OK(context,
ToTslStatus(name(), functions::AddTimestamp(
absl::FromUnixMicros(input_ts), tz,
*interval, &base_time)));
int64_t ts = absl::ToUnixMicros(base_time);
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class TimestampSub : public OpKernel {
public:
explicit TimestampSub(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(0);
auto timestamp = timestamp_tensor.flat<tstring>();
// Grab the interval tensor
const Tensor& diff_tensor = context->input(1);
auto interval_int = diff_tensor.flat<int64_t>();
OP_REQUIRES(
context, interval_int.size() == timestamp.size(),
InvalidArgument(absl::Substitute(
"Error in $0: timestamp and interval must have the same shape, "
"but are $1, $2",
name(), timestamp.size(), interval_int.size())));
// Grab the part tensor
const Tensor& part_tensor = context->input(2);
std::string part = part_tensor.flat<tstring>()(0);
functions::DateTimestampPart part_enum;
static auto* supported_parts =
new absl::flat_hash_set<functions::DateTimestampPart>(
{functions::MICROSECOND, functions::MILLISECOND, functions::SECOND,
functions::MINUTE, functions::HOUR, functions::DAY});
OP_REQUIRES_OK(context, ParseInputDateTimestampPart(
part, name(), &part_enum, *supported_parts));
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<tstring>();
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Default time zone.
absl::TimeZone tz = absl::UTCTimeZone();
// Parse the timestamp.
int64_t input_ts;
OP_REQUIRES_OK(context,
ParseInputTimestamp(timestamp(i), tz, name(), &input_ts));
absl::StatusOr<IntervalValue> interval =
GetIntervalValue(-interval_int(i), part_enum);
OP_REQUIRES(
context, interval.ok(),
Internal("Error in getting interval of TimestampSub with status: ",
interval.status()));
absl::Time base_time;
OP_REQUIRES_OK(context,
ToTslStatus(name(), functions::AddTimestamp(
absl::FromUnixMicros(input_ts), tz,
*interval, &base_time)));
int64_t ts = absl::ToUnixMicros(base_time);
// Format timestamp to string.
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class TimestampDiff : public OpKernel {
public:
explicit TimestampDiff(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp_a tensor
const Tensor& timestamp_a_tensor = context->input(0);
auto timestamp_a = timestamp_a_tensor.flat<tstring>();
// Grab the timestamp_b tensor
const Tensor& timestamp_b_tensor = context->input(1);
auto timestamp_b = timestamp_b_tensor.flat<tstring>();
OP_REQUIRES(context, timestamp_a.size() == timestamp_b.size(),
InvalidArgument(
"Timestamps in TimestampDiff must have the same length."));
// Grab the part tensor
const Tensor& part_tensor = context->input(2);
std::string part = part_tensor.flat<tstring>()(0);
functions::DateTimestampPart part_enum;
static auto* supported_parts =
new absl::flat_hash_set<functions::DateTimestampPart>(
{functions::MICROSECOND, functions::MILLISECOND, functions::SECOND,
functions::MINUTE, functions::HOUR, functions::DAY});
OP_REQUIRES_OK(context, ParseInputDateTimestampPart(
part, name(), &part_enum, *supported_parts));
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_a_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<int64_t>();
const int N = timestamp_a.size();
for (int i = 0; i < N; i++) {
// Default time zone.
absl::TimeZone tz = absl::UTCTimeZone();
// Parse the timestamp.
int64_t ts_a;
int64_t ts_b;
OP_REQUIRES_OK(context,
ParseInputTimestamp(timestamp_a(i), tz, name(), &ts_a));
OP_REQUIRES_OK(context,
ParseInputTimestamp(timestamp_b(i), tz, name(), &ts_b));
int64_t out;
OP_REQUIRES_OK(
context, ToTslStatus(name(), functions::TimestampDiff(
ts_a, ts_b, functions::kMicroseconds,
part_enum, &out)));
// Set the output value.
output_flat(i) = out;
}
}
};
class TimestampTrunc : public OpKernel {
public:
explicit TimestampTrunc(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(0);
auto timestamp = timestamp_tensor.flat<tstring>();
// Grab the part tensor
const Tensor& part_tensor = context->input(1);
std::string part = part_tensor.flat<tstring>()(0);
functions::DateTimestampPart part_enum;
static auto* supported_parts =
new absl::flat_hash_set<functions::DateTimestampPart>(
{functions::MICROSECOND, functions::MILLISECOND, functions::SECOND,
functions::MINUTE, functions::HOUR, functions::DAY,
functions::WEEK, functions::WEEK_MONDAY, functions::WEEK_TUESDAY,
functions::WEEK_WEDNESDAY, functions::WEEK_THURSDAY,
functions::WEEK_FRIDAY, functions::WEEK_SATURDAY,
functions::ISOWEEK, functions::MONTH, functions::QUARTER,
functions::YEAR, functions::ISOYEAR});
OP_REQUIRES_OK(context, ParseInputDateTimestampPart(
part, name(), &part_enum, *supported_parts));
// Grab the time_zone tensor
const Tensor& time_zone_tensor = context->input(2);
std::string time_zone = time_zone_tensor.flat<tstring>()(0);
absl::TimeZone tz;
OP_REQUIRES_OK(
context, ToTslStatus(name(), functions::MakeTimeZone(time_zone, &tz)));
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<tstring>();
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t input_ts;
OP_REQUIRES_OK(context,
ParseInputTimestamp(timestamp(i), tz, name(), &input_ts));
int64_t out_ts;
OP_REQUIRES_OK(context,
ToTslStatus(name(), functions::TruncateTimestamp(
input_ts, functions::kMicroseconds,
tz, part_enum, &out_ts)));
// Format timestamp to string.
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(out_ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class FormatTimestamp : public OpKernel {
public:
explicit FormatTimestamp(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the format string tensor
const Tensor& format_tensor = context->input(0);
std::string format = format_tensor.flat<tstring>()(0);
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(1);
auto timestamp = timestamp_tensor.flat<tstring>();
// Grab the time_zone tensor
const Tensor& time_zone_tensor = context->input(2);
std::string time_zone = time_zone_tensor.flat<tstring>()(0);
absl::TimeZone tz;
OP_REQUIRES_OK(
context, ToTslStatus(name(), functions::MakeTimeZone(time_zone, &tz)));
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<tstring>();
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(context,
ParseInputTimestamp(timestamp(i), tz, name(), &ts));
// Format the timestamp string.
functions::FormatDateTimestampOptions format_options = {
.expand_Q = true,
.expand_J = true,
};
std::string out;
OP_REQUIRES_OK(
context,
ToTslStatus(name(), functions::FormatTimestampToString(
format, ts, tz, format_options, &out)));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class ParseTimestamp : public OpKernel {
public:
explicit ParseTimestamp(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the format string tensor
const Tensor& format_tensor = context->input(0);
std::string format = format_tensor.flat<tstring>()(0);
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(1);
auto timestamp = timestamp_tensor.flat<tstring>();
// Grab the time_zone tensor
const Tensor& time_zone_tensor = context->input(2);
std::string time_zone = time_zone_tensor.flat<tstring>()(0);
absl::TimeZone tz;
OP_REQUIRES_OK(
context, ToTslStatus(name(), functions::MakeTimeZone(time_zone, &tz)));
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<tstring>();
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(context,
ToTslStatus(name(), functions::ParseStringToTimestamp(
format, timestamp(i), time_zone,
/*parse_version2=*/true, &ts)));
// Format timestamp to string.
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class SafeParseTimestamp : public OpKernel {
public:
explicit SafeParseTimestamp(OpKernelConstruction* context)
: OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the format string tensor
const Tensor& format_tensor = context->input(0);
std::string format = format_tensor.flat<tstring>()(0);
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(1);
auto timestamp = timestamp_tensor.flat<tstring>();
// Grab the time_zone tensor
const Tensor& time_zone_tensor = context->input(2);
std::string time_zone = time_zone_tensor.flat<tstring>()(0);
absl::TimeZone tz;
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<tstring>();
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Safe parse the timestamp.
int64_t ts;
if (!functions::MakeTimeZone(time_zone, &tz).ok() ||
!functions::ParseStringToTimestamp(format, timestamp(i), time_zone,
/*parse_version2=*/true, &ts)
.ok()) {
// Set the NULL-equivalent output value for unsuccessful parsing.
OP_REQUIRES_OK(
context,
ToTslStatus(name(), functions::ParseStringToTimestamp(
kTimestampFormatString, kNullTimestamp,
absl::UTCTimeZone(),
/*parse_version2=*/true, &ts)));
}
// Format timestamp to string.
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
::tsl::Status TimestampFromIntOperator(int64_t in, int64_t scale,
absl::string_view function_name,
int64_t* out) {
// SECONDS: 1000000, MILLIS: 1000, MICROS: 1
if (scale != 1000000 && scale != 1000 && scale != 1) {
return Internal(absl::Substitute("Invalid scale $0 called by $1", scale,
function_name));
}
*out = in;
if (scale != 1 && !functions::Multiply(in, scale, out, /*error=*/nullptr)) {
// Only the SECONDS and MILLIS versions can overflow due to multiplication,
// since the MICROS version has a scale of 1. It's possible that the
// multiplication can succeed but the result is still out of range, in which
// case we return the error message about the timestamp range.
return InvalidArgument(absl::Substitute(
"Timestamp value in $0 overflows: $1", function_name, in));
}
if (*out > types::kTimestampMax || *out < types::kTimestampMin) {
std::string ts_min;
functions::ConvertTimestampToStringWithoutTruncation(
types::kTimestampMin, functions::kMicroseconds, absl::UTCTimeZone(),
&ts_min)
.IgnoreError();
std::string ts_max;
functions::ConvertTimestampToStringWithoutTruncation(
types::kTimestampMax, functions::kMicroseconds, absl::UTCTimeZone(),
&ts_max)
.IgnoreError();
return InvalidArgument(absl::Substitute(
"Timestamp value in $0 is out of allowed range: from $1 to $2.",
function_name, ts_min, ts_max));
}
return ::tsl::OkStatus();
}
class TimestampMicros : public OpKernel {
public:
explicit TimestampMicros(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_int_tensor = context->input(0);
auto timestamp_int = timestamp_int_tensor.flat<int64_t>();
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context,
context->allocate_output(0, timestamp_int_tensor.shape(),
&output_tensor));
auto output_flat = output_tensor->flat<tstring>();
const int N = timestamp_int.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(context,
TimestampFromIntOperator(timestamp_int(i),
/* scale= */ 1, name(), &ts));
// Format timestamp to string.
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class TimestampMillis : public OpKernel {
public:
explicit TimestampMillis(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_int_tensor = context->input(0);
auto timestamp_int = timestamp_int_tensor.flat<int64_t>();
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context,
context->allocate_output(0, timestamp_int_tensor.shape(),
&output_tensor));
auto output_flat = output_tensor->flat<tstring>();
const int N = timestamp_int.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(context,
TimestampFromIntOperator(timestamp_int(i),
/* scale= */ 1000, name(), &ts));
// Format timestamp to string.
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
class TimestampSeconds : public OpKernel {
public:
explicit TimestampSeconds(OpKernelConstruction* context)
: OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_int_tensor = context->input(0);
auto timestamp_int = timestamp_int_tensor.flat<int64_t>();
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context,
context->allocate_output(0, timestamp_int_tensor.shape(),
&output_tensor));
auto output_flat = output_tensor->flat<tstring>();
const int N = timestamp_int.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(
context, TimestampFromIntOperator(timestamp_int(i),
/* scale= */ 1000000, name(), &ts));
// Format timestamp to string.
std::string out;
OP_REQUIRES_OK(context, FormatOutputTimestamp(ts, name(), &out));
// Set the output value.
output_flat(i).reserve(out.size());
output_flat(i) = std::move(out);
}
}
};
::tsl::Status IntFromTimestampOperator(int64_t in, int64_t scale,
absl::string_view function_name,
int64_t* out) {
// SECONDS: 1000000, MILLIS: 1000, MICROS: 1
if (scale != 1000000 && scale != 1000 && scale != 1) {
return Internal(absl::Substitute("Invalid scale $0 called by $1", scale,
function_name));
}
// No overflows possible with division, result truncated downwards;
*out = static_cast<int64_t>(in / scale);
if (in < 0 && in % scale != 0) {
(*out)--;
}
return ::tsl::OkStatus();
}
class UnixMicros : public OpKernel {
public:
explicit UnixMicros(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(0);
auto timestamp = timestamp_tensor.flat<tstring>();
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<int64_t>();
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(
context,
ParseInputTimestamp(timestamp(i), absl::UTCTimeZone(), name(), &ts));
// Convert timestamp to micros.
int64_t out;
OP_REQUIRES_OK(context,
IntFromTimestampOperator(ts,
/* scale= */ 1, name(), &out));
// Set the output value.
output_flat(i) = out;
}
}
};
class UnixMillis : public OpKernel {
public:
explicit UnixMillis(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(0);
auto timestamp = timestamp_tensor.flat<tstring>();
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<int64_t>();
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(
context,
ParseInputTimestamp(timestamp(i), absl::UTCTimeZone(), name(), &ts));
// Convert timestamp to millis.
int64_t out;
OP_REQUIRES_OK(context,
IntFromTimestampOperator(ts,
/* scale= */ 1000, name(), &out));
// Set the output value.
output_flat(i) = out;
}
}
};
class UnixSeconds : public OpKernel {
public:
explicit UnixSeconds(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
// Grab the timestamp tensor
const Tensor& timestamp_tensor = context->input(0);
auto timestamp = timestamp_tensor.flat<tstring>();
// Create an output tensor with the shape of the timestamp tensor
Tensor* output_tensor = NULL;
OP_REQUIRES_OK(context, context->allocate_output(
0, timestamp_tensor.shape(), &output_tensor));
auto output_flat = output_tensor->flat<int64_t>();
const int N = timestamp.size();
for (int i = 0; i < N; i++) {
// Parse the timestamp.
int64_t ts;
OP_REQUIRES_OK(
context,
ParseInputTimestamp(timestamp(i), absl::UTCTimeZone(), name(), &ts));
// Convert timestamp to seconds.
int64_t out;
OP_REQUIRES_OK(context, IntFromTimestampOperator(ts,
/* scale= */ 1000000,
name(), &out));
// Set the output value.
output_flat(i) = out;
}
}
};
// Register the kernels.
REGISTER_KERNEL_BUILDER(Name("ExtractFromTimestamp").Device(DEVICE_CPU),
ExtractFromTimestamp);
REGISTER_KERNEL_BUILDER(Name("StringFromTimestamp").Device(DEVICE_CPU),
StringFromTimestamp);
REGISTER_KERNEL_BUILDER(Name("TimestampFromString").Device(DEVICE_CPU),
TimestampFromString);
REGISTER_KERNEL_BUILDER(Name("TimestampFromDate").Device(DEVICE_CPU),
TimestampFromDate);
REGISTER_KERNEL_BUILDER(Name("TimestampFromDatetime").Device(DEVICE_CPU),
TimestampFromDatetime);
REGISTER_KERNEL_BUILDER(Name("TimestampAdd").Device(DEVICE_CPU), TimestampAdd);
REGISTER_KERNEL_BUILDER(Name("TimestampSub").Device(DEVICE_CPU), TimestampSub);
REGISTER_KERNEL_BUILDER(Name("TimestampDiff").Device(DEVICE_CPU),
TimestampDiff);
REGISTER_KERNEL_BUILDER(Name("TimestampTrunc").Device(DEVICE_CPU),
TimestampTrunc);
REGISTER_KERNEL_BUILDER(Name("FormatTimestamp").Device(DEVICE_CPU),
FormatTimestamp);
REGISTER_KERNEL_BUILDER(Name("ParseTimestamp").Device(DEVICE_CPU),
ParseTimestamp);
REGISTER_KERNEL_BUILDER(Name("SafeParseTimestamp").Device(DEVICE_CPU),
SafeParseTimestamp);
REGISTER_KERNEL_BUILDER(Name("TimestampMicros").Device(DEVICE_CPU),
TimestampMicros);
REGISTER_KERNEL_BUILDER(Name("TimestampMillis").Device(DEVICE_CPU),
TimestampMillis);
REGISTER_KERNEL_BUILDER(Name("TimestampSeconds").Device(DEVICE_CPU),
TimestampSeconds);
REGISTER_KERNEL_BUILDER(Name("UnixMicros").Device(DEVICE_CPU), UnixMicros);
REGISTER_KERNEL_BUILDER(Name("UnixMillis").Device(DEVICE_CPU), UnixMillis);
REGISTER_KERNEL_BUILDER(Name("UnixSeconds").Device(DEVICE_CPU), UnixSeconds);
} // namespace bigquery_ml_utils