awswrangler/timestream/_read.py (228 lines of code) (raw):
"""Amazon Timestream Read Module."""
from __future__ import annotations
import json
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Any, Iterator, Literal, cast
import boto3
import pandas as pd
from botocore.config import Config
from awswrangler import _utils, exceptions, s3
from awswrangler._config import apply_configs
_logger: logging.Logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from mypy_boto3_timestream_query.type_defs import PaginatorConfigTypeDef, QueryResponseTypeDef, RowTypeDef
def _cast_value(value: str, dtype: str) -> Any: # noqa: PLR0911
if dtype == "VARCHAR":
return value
if dtype in ("INTEGER", "BIGINT"):
return int(value)
if dtype == "DOUBLE":
return float(value)
if dtype == "BOOLEAN":
return value.lower() == "true"
if dtype == "TIMESTAMP":
return datetime.strptime(value[:-3], "%Y-%m-%d %H:%M:%S.%f")
if dtype == "DATE":
return datetime.strptime(value, "%Y-%m-%d").date()
if dtype == "TIME":
return datetime.strptime(value[:-3], "%H:%M:%S.%f").time()
if dtype == "ARRAY":
return str(value)
raise ValueError(f"Not supported Amazon Timestream type: {dtype}")
def _process_row(schema: list[dict[str, str]], row: "RowTypeDef") -> list[Any]:
row_processed: list[Any] = []
for col_schema, col in zip(schema, row["Data"]):
if col.get("NullValue", False):
row_processed.append(None)
elif "ScalarValue" in col:
row_processed.append(_cast_value(value=col["ScalarValue"], dtype=col_schema["type"]))
elif "ArrayValue" in col:
row_processed.append(_cast_value(value=col["ArrayValue"], dtype="ARRAY")) # type: ignore[arg-type]
else:
raise ValueError(
f"Query with non ScalarType/ArrayColumnInfo/NullValue for column {col_schema['name']}. "
f"Expected {col_schema['type']} instead of {col}"
)
return row_processed
def _rows_to_df(
rows: list[list[Any]], schema: list[dict[str, str]], df_metadata: dict[str, str] | None = None
) -> pd.DataFrame:
df = pd.DataFrame(data=rows, columns=[c["name"] for c in schema])
if df_metadata:
try:
df.attrs = df_metadata
except AttributeError as ex:
# Modin does not support attribute assignment
_logger.error(ex)
for col in schema:
if col["type"] == "VARCHAR":
df[col["name"]] = df[col["name"]].astype("string")
return df
def _process_schema(page: "QueryResponseTypeDef") -> list[dict[str, str]]:
schema: list[dict[str, str]] = []
for col in page["ColumnInfo"]:
if "ScalarType" in col["Type"]:
schema.append({"name": col["Name"], "type": col["Type"]["ScalarType"]})
elif "ArrayColumnInfo" in col["Type"]:
schema.append({"name": col["Name"], "type": col["Type"]["ArrayColumnInfo"]})
else:
raise ValueError(f"Query with non ScalarType or ArrayColumnInfo for column {col['Name']}: {col['Type']}")
return schema
def _paginate_query(
sql: str,
chunked: bool,
pagination_config: "PaginatorConfigTypeDef" | None,
boto3_session: boto3.Session | None = None,
) -> Iterator[pd.DataFrame]:
client = _utils.client(
service_name="timestream-query",
session=boto3_session,
botocore_config=Config(read_timeout=60, retries={"max_attempts": 10}),
)
paginator = client.get_paginator("query")
rows: list[list[Any]] = []
schema: list[dict[str, str]] = []
page_iterator = paginator.paginate(QueryString=sql, PaginationConfig=pagination_config or {})
for page in page_iterator:
if not schema:
schema = _process_schema(page=page)
_logger.debug("schema: %s", schema)
for row in page["Rows"]:
rows.append(_process_row(schema=schema, row=row))
if len(rows) > 0:
df_metadata = {}
if chunked:
if "NextToken" in page:
df_metadata["NextToken"] = page["NextToken"]
df_metadata["QueryId"] = page["QueryId"]
yield _rows_to_df(rows, schema, df_metadata)
rows = []
def _get_column_names_from_metadata(unload_path: str, boto3_session: boto3.Session | None = None) -> list[str]:
client_s3 = _utils.client(service_name="s3", session=boto3_session)
metadata_path = s3.list_objects(path=unload_path, suffix="_metadata.json", boto3_session=boto3_session)[0]
bucket, key = _utils.parse_path(metadata_path)
metadata_content = json.loads(client_s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8"))
columns = [column["Name"] for column in metadata_content["ColumnInfo"]]
_logger.debug("Read %d columns from metadata file in: %s", len(columns), metadata_path)
return columns
def query(
sql: str,
chunked: bool = False,
pagination_config: dict[str, Any] | None = None,
boto3_session: boto3.Session | None = None,
) -> pd.DataFrame | Iterator[pd.DataFrame]:
"""Run a query and retrieve the result as a Pandas DataFrame.
Parameters
----------
sql
SQL query.
chunked
If True returns DataFrame iterator, and a single DataFrame otherwise. False by default.
pagination_config
Pagination configuration dictionary of a form {'MaxItems': 10, 'PageSize': 10, 'StartingToken': '...'}
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
`Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
Examples
--------
Run a query and return the result as a Pandas DataFrame or an iterable.
>>> import awswrangler as wr
>>> df = wr.timestream.query('SELECT * FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 10')
"""
result_iterator = _paginate_query(sql, chunked, cast("PaginatorConfigTypeDef", pagination_config), boto3_session)
if chunked:
return result_iterator
# Prepending an empty DataFrame ensures returning an empty DataFrame if result_iterator is empty
results = list(result_iterator)
if len(results) > 0:
# Modin's concat() can not concatenate empty data frames
return pd.concat(results, ignore_index=True)
return pd.DataFrame()
@_utils.validate_distributed_kwargs(
unsupported_kwargs=["boto3_session", "s3_additional_kwargs"],
)
@apply_configs
def unload(
sql: str,
path: str,
unload_format: Literal["CSV", "PARQUET"] | None = None,
compression: Literal["GZIP", "NONE"] | None = None,
partition_cols: list[str] | None = None,
encryption: Literal["SSE_KMS", "SSE_S3"] | None = None,
kms_key_id: str | None = None,
field_delimiter: str | None = ",",
escaped_by: str | None = "\\",
chunked: bool | int = False,
keep_files: bool = False,
use_threads: bool | int = True,
boto3_session: boto3.Session | None = None,
s3_additional_kwargs: dict[str, str] | None = None,
pyarrow_additional_kwargs: dict[str, Any] | None = None,
) -> pd.DataFrame | Iterator[pd.DataFrame]:
"""
Unload query results to Amazon S3 and read the results as Pandas Data Frame.
https://docs.aws.amazon.com/timestream/latest/developerguide/export-unload.html
Parameters
----------
sql
SQL query
path
S3 path to write stage files (e.g. ``s3://bucket_name/any_name/``)
unload_format
Format of the unloaded S3 objects from the query.
Valid values: "CSV", "PARQUET". Case sensitive. Defaults to "PARQUET"
compression
Compression of the unloaded S3 objects from the query.
Valid values: "GZIP", "NONE". Defaults to "GZIP"
partition_cols
Specifies the partition keys for the unload operation
encryption
Encryption of the unloaded S3 objects from the query.
Valid values: "SSE_KMS", "SSE_S3". Defaults to "SSE_S3"
kms_key_id
Specifies the key ID for an AWS Key Management Service (AWS KMS) key to be
used to encrypt data files on Amazon S3
field_delimiter
A single ASCII character that is used to separate fields in the output file,
such as pipe character (|), a comma (,), or tab (/t). Only used with CSV format
escaped_by
The character that should be treated as an escape character in the data file
written to S3 bucket. Only used with CSV format
chunked
If passed will split the data in a Iterable of DataFrames (Memory friendly).
If `True` awswrangler iterates on the data by files in the most efficient way without guarantee of chunksize.
If an `INTEGER` is passed awswrangler will iterate on the data by number of rows equal the received INTEGER.
keep_files
Should keep stage files?
use_threads
True to enable concurrent requests, False to disable multiple threads.
If enabled os.cpu_count() will be used as the max number of threads.
If integer is provided, specified number is used.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
s3_additional_kwargs
Forward to botocore requests.
pyarrow_additional_kwargs
Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame.
Valid values include "split_blocks", "self_destruct", "ignore_metadata".
e.g. pyarrow_additional_kwargs={'split_blocks': True}.
Returns
-------
Result as Pandas DataFrame(s).
Examples
--------
Unload and read as Parquet (default).
>>> import awswrangler as wr
>>> df = wr.timestream.unload(
... sql="SELECT time, measure, dimension FROM database.mytable",
... path="s3://bucket/extracted_parquet_files/",
... )
Unload and read partitioned Parquet. Note: partition columns must be at the end of the table.
>>> import awswrangler as wr
>>> df = wr.timestream.unload(
... sql="SELECT time, measure, dim1, dim2 FROM database.mytable",
... path="s3://bucket/extracted_parquet_files/",
... partition_cols=["dim2"],
... )
Unload and read as CSV.
>>> import awswrangler as wr
>>> df = wr.timestream.unload(
... sql="SELECT time, measure, dimension FROM database.mytable",
... path="s3://bucket/extracted_parquet_files/",
... unload_format="CSV",
... )
"""
path = path if path.endswith("/") else f"{path}/"
if unload_format not in [None, "CSV", "PARQUET"]:
raise exceptions.InvalidArgumentValue("<unload_format> argument must be 'CSV' or 'PARQUET'")
unload_to_files(
sql=sql,
path=path,
unload_format=unload_format,
compression=compression,
partition_cols=partition_cols,
encryption=encryption,
kms_key_id=kms_key_id,
field_delimiter=field_delimiter,
escaped_by=escaped_by,
boto3_session=boto3_session,
)
results_path = f"{path}results/"
try:
if unload_format == "CSV":
column_names: list[str] = _get_column_names_from_metadata(path, boto3_session)
return s3.read_csv(
path=results_path,
header=None,
names=[column for column in column_names if column not in set(partition_cols)]
if partition_cols is not None
else column_names,
dataset=True,
use_threads=use_threads,
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
)
else:
return s3.read_parquet(
path=results_path,
chunked=chunked,
dataset=True,
use_threads=use_threads,
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
)
finally:
if keep_files is False:
_logger.debug("Deleting objects in S3 path: %s", path)
s3.delete_objects(
path=path,
use_threads=use_threads,
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
)
@_utils.validate_distributed_kwargs(
unsupported_kwargs=["boto3_session"],
)
@apply_configs
def unload_to_files(
sql: str,
path: str,
unload_format: Literal["CSV", "PARQUET"] | None = None,
compression: Literal["GZIP", "NONE"] | None = None,
partition_cols: list[str] | None = None,
encryption: Literal["SSE_KMS", "SSE_S3"] | None = None,
kms_key_id: str | None = None,
field_delimiter: str | None = ",",
escaped_by: str | None = "\\",
boto3_session: boto3.Session | None = None,
) -> None:
"""
Unload query results to Amazon S3.
https://docs.aws.amazon.com/timestream/latest/developerguide/export-unload.html
Parameters
----------
sql
SQL query
path
S3 path to write stage files (e.g. s3://bucket_name/any_name/)
unload_format
Format of the unloaded S3 objects from the query.
Valid values: "CSV", "PARQUET". Case sensitive. Defaults to "PARQUET"
compression
Compression of the unloaded S3 objects from the query.
Valid values: "GZIP", "NONE". Defaults to "GZIP"
partition_cols
Specifies the partition keys for the unload operation
encryption
Encryption of the unloaded S3 objects from the query.
Valid values: "SSE_KMS", "SSE_S3". Defaults to "SSE_S3"
kms_key_id
Specifies the key ID for an AWS Key Management Service (AWS KMS) key to be
used to encrypt data files on Amazon S3
field_delimiter
A single ASCII character that is used to separate fields in the output file,
such as pipe character (|), a comma (,), or tab (/t). Only used with CSV format
escaped_by
The character that should be treated as an escape character in the data file
written to S3 bucket. Only used with CSV format
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Examples
--------
Unload and read as Parquet (default).
>>> import awswrangler as wr
>>> wr.timestream.unload_to_files(
... sql="SELECT time, measure, dimension FROM database.mytable",
... path="s3://bucket/extracted_parquet_files/",
... )
Unload and read partitioned Parquet. Note: partition columns must be at the end of the table.
>>> import awswrangler as wr
>>> wr.timestream.unload_to_files(
... sql="SELECT time, measure, dim1, dim2 FROM database.mytable",
... path="s3://bucket/extracted_parquet_files/",
... partition_cols=["dim2"],
... )
Unload and read as CSV.
>>> import awswrangler as wr
>>> wr.timestream.unload_to_files(
... sql="SELECT time, measure, dimension FROM database.mytable",
... path="s3://bucket/extracted_parquet_files/",
... unload_format="CSV",
... )
"""
timestream_client = _utils.client(service_name="timestream-query", session=boto3_session)
partitioned_by_str: str = (
f"""partitioned_by = ARRAY [{",".join([f"'{col}'" for col in partition_cols])}],\n"""
if partition_cols is not None
else ""
)
kms_key_id_str: str = f"kms_key = '{kms_key_id}',\n" if kms_key_id is not None else ""
field_delimiter_str: str = f"field_delimiter = '{field_delimiter}',\n" if unload_format == "CSV" else ""
escaped_by_str: str = f"escaped_by = '{escaped_by}',\n" if unload_format == "CSV" else ""
sql = (
f"UNLOAD ({sql})\n"
f"TO '{path}'\n"
f"WITH (\n"
f"{partitioned_by_str}"
f"format='{unload_format or 'PARQUET'}',\n"
f"compression='{compression or 'GZIP'}',\n"
f"{field_delimiter_str}"
f"{escaped_by_str}"
f"{kms_key_id_str}"
f"encryption='{encryption or 'SSE_S3'}'\n"
f")"
)
timestream_client.query(QueryString=sql)