in python/pyspark/pandas/resample.py [0:0]
def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:
key_type = self._resamplekey_type
origin_scol = F.lit(origin)
(rule_code, n) = (self._offset.rule_code, getattr(self._offset, "n"))
left_closed, right_closed = (self._closed == "left", self._closed == "right")
left_labeled, right_labeled = (self._label == "left", self._label == "right")
if rule_code == "A-DEC":
assert (
origin.month == 12
and origin.day == 31
and origin.hour == 0
and origin.minute == 0
and origin.second == 0
)
diff = F.year(ts_scol) - F.year(origin_scol)
mod = F.lit(0) if n == 1 else (diff % n)
edge_cond = (mod == 0) & (F.month(ts_scol) == 12) & (F.dayofmonth(ts_scol) == 31)
edge_label = F.year(ts_scol)
if left_closed and right_labeled:
edge_label += n
elif right_closed and left_labeled:
edge_label -= n
if left_labeled:
non_edge_label = F.when(mod == 0, F.year(ts_scol) - n).otherwise(
F.year(ts_scol) - mod
)
else:
non_edge_label = F.when(mod == 0, F.year(ts_scol)).otherwise(
F.year(ts_scol) - (mod - n)
)
ret = F.to_timestamp(
F.make_date(
F.when(edge_cond, edge_label).otherwise(non_edge_label), F.lit(12), F.lit(31)
)
)
elif rule_code in ["ME", "M"]:
assert (
origin.is_month_end
and origin.hour == 0
and origin.minute == 0
and origin.second == 0
)
diff = (
(F.year(ts_scol) - F.year(origin_scol)) * 12
+ F.month(ts_scol)
- F.month(origin_scol)
)
mod = F.lit(0) if n == 1 else (diff % n)
edge_cond = (mod == 0) & (F.dayofmonth(ts_scol) == F.dayofmonth(F.last_day(ts_scol)))
truncated_ts_scol = F.date_trunc("MONTH", ts_scol)
edge_label = truncated_ts_scol
if left_closed and right_labeled:
edge_label += SF.make_interval("MONTH", n)
elif right_closed and left_labeled:
edge_label -= SF.make_interval("MONTH", n)
if left_labeled:
non_edge_label = F.when(
mod == 0,
truncated_ts_scol - SF.make_interval("MONTH", n),
).otherwise(truncated_ts_scol - SF.make_interval("MONTH", mod))
else:
non_edge_label = F.when(mod == 0, truncated_ts_scol).otherwise(
truncated_ts_scol - SF.make_interval("MONTH", mod - n)
)
ret = F.to_timestamp(
F.last_day(F.when(edge_cond, edge_label).otherwise(non_edge_label))
)
elif rule_code == "D":
assert origin.hour == 0 and origin.minute == 0 and origin.second == 0
if n == 1:
# NOTE: the logic to process '1D' is different from the cases with n>1,
# since hour/minute/second parts are taken into account to determine edges!
edge_cond = (
(F.hour(ts_scol) == 0) & (F.minute(ts_scol) == 0) & (F.second(ts_scol) == 0)
)
if left_closed and left_labeled:
ret = F.date_trunc("DAY", ts_scol)
elif left_closed and right_labeled:
ret = F.date_trunc("DAY", F.date_add(ts_scol, 1))
elif right_closed and left_labeled:
ret = F.when(edge_cond, F.date_trunc("DAY", F.date_sub(ts_scol, 1))).otherwise(
F.date_trunc("DAY", ts_scol)
)
else:
ret = F.when(edge_cond, F.date_trunc("DAY", ts_scol)).otherwise(
F.date_trunc("DAY", F.date_add(ts_scol, 1))
)
else:
diff = F.datediff(end=ts_scol, start=origin_scol)
mod = diff % n
edge_cond = mod == 0
truncated_ts_scol = F.date_trunc("DAY", ts_scol)
edge_label = truncated_ts_scol
if left_closed and right_labeled:
edge_label = F.date_add(truncated_ts_scol, n)
elif right_closed and left_labeled:
edge_label = F.date_sub(truncated_ts_scol, n)
if left_labeled:
non_edge_label = F.date_sub(truncated_ts_scol, mod)
else:
non_edge_label = F.date_sub(truncated_ts_scol, mod - n)
ret = F.when(edge_cond, edge_label).otherwise(non_edge_label)
elif rule_code in ["h", "min", "s", "H", "T", "S"]:
unit_mapping = {
"h": "HOUR",
"min": "MINUTE",
"s": "SECOND",
"H": "HOUR",
"T": "MINUTE",
"S": "SECOND",
}
unit_str = unit_mapping[rule_code]
truncated_ts_scol = F.date_trunc(unit_str, ts_scol)
if isinstance(key_type, TimestampNTZType):
truncated_ts_scol = F.to_timestamp_ntz(truncated_ts_scol)
diff = F.timestamp_diff(unit_str, origin_scol, truncated_ts_scol)
mod = F.lit(0) if n == 1 else (diff % F.lit(n))
if rule_code in ["h", "H"]:
assert origin.minute == 0 and origin.second == 0
edge_cond = (mod == 0) & (F.minute(ts_scol) == 0) & (F.second(ts_scol) == 0)
elif rule_code in ["min", "T"]:
assert origin.second == 0
edge_cond = (mod == 0) & (F.second(ts_scol) == 0)
else:
edge_cond = mod == 0
edge_label = truncated_ts_scol
if left_closed and right_labeled:
edge_label += SF.make_interval(unit_str, n)
elif right_closed and left_labeled:
edge_label -= SF.make_interval(unit_str, n)
if left_labeled:
non_edge_label = F.when(mod == 0, truncated_ts_scol).otherwise(
truncated_ts_scol - SF.make_interval(unit_str, mod)
)
else:
non_edge_label = F.when(
mod == 0,
truncated_ts_scol + SF.make_interval(unit_str, n),
).otherwise(truncated_ts_scol - SF.make_interval(unit_str, mod - n))
ret = F.when(edge_cond, edge_label).otherwise(non_edge_label)
else:
raise ValueError("Got the unexpected unit {}".format(rule_code))
if isinstance(key_type, TimestampNTZType):
return F.to_timestamp_ntz(ret)
else:
return ret