in connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java [137:276]
static {
TRANSLATORS.put(TYPE_STRING, new TimestampTranslator() {
@Override
public Date toRaw(Config config, Object orig) {
if (!(orig instanceof String))
throw new DataException("Expected string timestamp to be a String, but found " + orig.getClass());
try {
return config.format.parse((String) orig);
} catch (ParseException e) {
throw new DataException("Could not parse timestamp: value (" + orig + ") does not match pattern ("
+ config.format.toPattern() + ")", e);
}
}
@Override
public Schema typeSchema(boolean isOptional) {
return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
}
@Override
public String toType(Config config, Date orig) {
synchronized (config.format) {
return config.format.format(orig);
}
}
});
TRANSLATORS.put(TYPE_UNIX, new TimestampTranslator() {
@Override
public Date toRaw(Config config, Object orig) {
if (!(orig instanceof Long unixTime))
throw new DataException("Expected Unix timestamp to be a Long, but found " + orig.getClass());
switch (config.unixPrecision) {
case UNIX_PRECISION_SECONDS:
return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.SECONDS.toMillis(unixTime));
case UNIX_PRECISION_MICROS:
return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.MICROSECONDS.toMillis(unixTime));
case UNIX_PRECISION_NANOS:
return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.NANOSECONDS.toMillis(unixTime));
case UNIX_PRECISION_MILLIS:
default:
return Timestamp.toLogical(Timestamp.SCHEMA, unixTime);
}
}
@Override
public Schema typeSchema(boolean isOptional) {
return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA;
}
@Override
public Long toType(Config config, Date orig) {
Long unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, orig);
switch (config.unixPrecision) {
case UNIX_PRECISION_SECONDS:
return TimeUnit.MILLISECONDS.toSeconds(unixTimeMillis);
case UNIX_PRECISION_MICROS:
return TimeUnit.MILLISECONDS.toMicros(unixTimeMillis);
case UNIX_PRECISION_NANOS:
return TimeUnit.MILLISECONDS.toNanos(unixTimeMillis);
case UNIX_PRECISION_MILLIS:
default:
return unixTimeMillis;
}
}
});
TRANSLATORS.put(TYPE_DATE, new TimestampTranslator() {
@Override
public Date toRaw(Config config, Object orig) {
if (!(orig instanceof Date))
throw new DataException("Expected Date to be a java.util.Date, but found " + orig.getClass());
// Already represented as a java.util.Date and Connect Dates are a subset of valid java.util.Date values
return (Date) orig;
}
@Override
public Schema typeSchema(boolean isOptional) {
return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA;
}
@Override
public Date toType(Config config, Date orig) {
Calendar result = Calendar.getInstance(UTC);
result.setTime(orig);
result.set(Calendar.HOUR_OF_DAY, 0);
result.set(Calendar.MINUTE, 0);
result.set(Calendar.SECOND, 0);
result.set(Calendar.MILLISECOND, 0);
return result.getTime();
}
});
TRANSLATORS.put(TYPE_TIME, new TimestampTranslator() {
@Override
public Date toRaw(Config config, Object orig) {
if (!(orig instanceof Date))
throw new DataException("Expected Time to be a java.util.Date, but found " + orig.getClass());
// Already represented as a java.util.Date and Connect Times are a subset of valid java.util.Date values
return (Date) orig;
}
@Override
public Schema typeSchema(boolean isOptional) {
return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
}
@Override
public Date toType(Config config, Date orig) {
Calendar origCalendar = Calendar.getInstance(UTC);
origCalendar.setTime(orig);
Calendar result = Calendar.getInstance(UTC);
result.setTimeInMillis(0L);
result.set(Calendar.HOUR_OF_DAY, origCalendar.get(Calendar.HOUR_OF_DAY));
result.set(Calendar.MINUTE, origCalendar.get(Calendar.MINUTE));
result.set(Calendar.SECOND, origCalendar.get(Calendar.SECOND));
result.set(Calendar.MILLISECOND, origCalendar.get(Calendar.MILLISECOND));
return result.getTime();
}
});
TRANSLATORS.put(TYPE_TIMESTAMP, new TimestampTranslator() {
@Override
public Date toRaw(Config config, Object orig) {
if (!(orig instanceof Date))
throw new DataException("Expected Timestamp to be a java.util.Date, but found " + orig.getClass());
return (Date) orig;
}
@Override
public Schema typeSchema(boolean isOptional) {
return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA;
}
@Override
public Date toType(Config config, Date orig) {
return orig;
}
});
}