flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java [257:302]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static Object formatTimestampField(
          Object timestampField, LogicalType fieldType, ZoneId sessionTimeZone) {
    switch (fieldType.getTypeRoot()) {
      case TIMESTAMP_WITHOUT_TIME_ZONE:
        final int precision = getPrecision(fieldType);
        if (timestampField instanceof java.sql.Timestamp) {
          // conversion between java.sql.Timestamp and TIMESTAMP_WITHOUT_TIME_ZONE
          return timestampToString(
                  ((Timestamp) timestampField).toLocalDateTime(), precision);
        } else if (timestampField instanceof java.time.LocalDateTime) {
          return timestampToString(((LocalDateTime) timestampField), precision);
        } else if (timestampField instanceof TimestampData) {
          return timestampToString(
                  ((TimestampData) timestampField).toLocalDateTime(), precision);
        } else {
          return timestampField;
        }
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        Instant instant = null;
        if (timestampField instanceof java.time.Instant) {
          instant = ((Instant) timestampField);
        } else if (timestampField instanceof java.sql.Timestamp) {
          Timestamp timestamp = ((Timestamp) timestampField);
          // conversion between java.sql.Timestamp and TIMESTAMP_WITH_LOCAL_TIME_ZONE
          instant =
                  TimestampData.fromEpochMillis(
                                  timestamp.getTime(), timestamp.getNanos() % 1000_000)
                          .toInstant();
        } else if (timestampField instanceof TimestampData) {
          instant = ((TimestampData) timestampField).toInstant();
        } else if (timestampField instanceof Integer) {
          instant = Instant.ofEpochSecond((Integer) timestampField);
        } else if (timestampField instanceof Long) {
          instant = Instant.ofEpochMilli((Long) timestampField);
        }
        if (instant != null) {
          return timestampToString(
                  instant.atZone(sessionTimeZone).toLocalDateTime(),
                  getPrecision(fieldType));
        } else {
          return timestampField;
        }
      default:
        return timestampField;
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java [257:302]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static Object formatTimestampField(
          Object timestampField, LogicalType fieldType, ZoneId sessionTimeZone) {
    switch (fieldType.getTypeRoot()) {
      case TIMESTAMP_WITHOUT_TIME_ZONE:
        final int precision = getPrecision(fieldType);
        if (timestampField instanceof java.sql.Timestamp) {
          // conversion between java.sql.Timestamp and TIMESTAMP_WITHOUT_TIME_ZONE
          return timestampToString(
                  ((Timestamp) timestampField).toLocalDateTime(), precision);
        } else if (timestampField instanceof java.time.LocalDateTime) {
          return timestampToString(((LocalDateTime) timestampField), precision);
        } else if (timestampField instanceof TimestampData) {
          return timestampToString(
                  ((TimestampData) timestampField).toLocalDateTime(), precision);
        } else {
          return timestampField;
        }
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        Instant instant = null;
        if (timestampField instanceof java.time.Instant) {
          instant = ((Instant) timestampField);
        } else if (timestampField instanceof java.sql.Timestamp) {
          Timestamp timestamp = ((Timestamp) timestampField);
          // conversion between java.sql.Timestamp and TIMESTAMP_WITH_LOCAL_TIME_ZONE
          instant =
                  TimestampData.fromEpochMillis(
                                  timestamp.getTime(), timestamp.getNanos() % 1000_000)
                          .toInstant();
        } else if (timestampField instanceof TimestampData) {
          instant = ((TimestampData) timestampField).toInstant();
        } else if (timestampField instanceof Integer) {
          instant = Instant.ofEpochSecond((Integer) timestampField);
        } else if (timestampField instanceof Long) {
          instant = Instant.ofEpochMilli((Long) timestampField);
        }
        if (instant != null) {
          return timestampToString(
                  instant.atZone(sessionTimeZone).toLocalDateTime(),
                  getPrecision(fieldType));
        } else {
          return timestampField;
        }
      default:
        return timestampField;
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java [257:302]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static Object formatTimestampField(
          Object timestampField, LogicalType fieldType, ZoneId sessionTimeZone) {
    switch (fieldType.getTypeRoot()) {
      case TIMESTAMP_WITHOUT_TIME_ZONE:
        final int precision = getPrecision(fieldType);
        if (timestampField instanceof java.sql.Timestamp) {
          // conversion between java.sql.Timestamp and TIMESTAMP_WITHOUT_TIME_ZONE
          return timestampToString(
                  ((Timestamp) timestampField).toLocalDateTime(), precision);
        } else if (timestampField instanceof java.time.LocalDateTime) {
          return timestampToString(((LocalDateTime) timestampField), precision);
        } else if (timestampField instanceof TimestampData) {
          return timestampToString(
                  ((TimestampData) timestampField).toLocalDateTime(), precision);
        } else {
          return timestampField;
        }
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        Instant instant = null;
        if (timestampField instanceof java.time.Instant) {
          instant = ((Instant) timestampField);
        } else if (timestampField instanceof java.sql.Timestamp) {
          Timestamp timestamp = ((Timestamp) timestampField);
          // conversion between java.sql.Timestamp and TIMESTAMP_WITH_LOCAL_TIME_ZONE
          instant =
                  TimestampData.fromEpochMillis(
                                  timestamp.getTime(), timestamp.getNanos() % 1000_000)
                          .toInstant();
        } else if (timestampField instanceof TimestampData) {
          instant = ((TimestampData) timestampField).toInstant();
        } else if (timestampField instanceof Integer) {
          instant = Instant.ofEpochSecond((Integer) timestampField);
        } else if (timestampField instanceof Long) {
          instant = Instant.ofEpochMilli((Long) timestampField);
        }
        if (instant != null) {
          return timestampToString(
                  instant.atZone(sessionTimeZone).toLocalDateTime(),
                  getPrecision(fieldType));
        } else {
          return timestampField;
        }
      default:
        return timestampField;
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



