1. Amazon SageMaker Processing/preprocess.py [57:175]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    parser.add_argument('--base_dir', type=str, default="/opt/ml/processing")
    args, _ = parser.parse_known_args()
    return args

def extract_zones(zones_file: str, zones_dir: str):
    logger.info(f"Extracting zone file: {zones_file}")
    with ZipFile(zones_file, "r") as zip:
        zip.extractall(zones_dir)


def load_zones(zones_dir: str):
    logging.info(f"Loading zones from {zones_dir}")
    # Load the shape file and get the geometry and lat/lon
    zone_df = gpd.read_file(os.path.join(zones_dir, "taxi_zones.shp"))
    # Get centroids as EPSG code of 3310 to measure distance
    zone_df["centroid"] = zone_df.geometry.centroid.to_crs(epsg=3310)
    # Convert cordinates to the WSG84 lat/long CRS has a EPSG code of 4326.
    zone_df["latitude"] = zone_df.centroid.to_crs(epsg=4326).x
    zone_df["longitude"] = zone_df.centroid.to_crs(epsg=4326).y
    return zone_df


def load_data(file_list: list):
    # Define dates, and columns to use
    use_cols = [
        "fare_amount",
        "lpep_pickup_datetime",
        "lpep_dropoff_datetime",
        "passenger_count",
        "PULocationID",
        "DOLocationID",
    ]
    # Concat input files with select columns
    dfs = []
    for file in file_list:
        dfs.append(pd.read_csv(file, usecols=use_cols))
    return pd.concat(dfs, ignore_index=True)


def enrich_data(trip_df: pd.DataFrame, zone_df: pd.DataFrame):
    # Join trip DF to zones for poth pickup and drop off locations
    trip_df = gpd.GeoDataFrame(
        trip_df.join(zone_df, on="PULocationID").join(
            zone_df, on="DOLocationID", rsuffix="_DO", lsuffix="_PU"
        )
    )
    trip_df["geo_distance"] = (
        trip_df["centroid_PU"].distance(trip_df["centroid_DO"]) / 1000
    )

    # Add date parts
    trip_df["lpep_pickup_datetime"] = pd.to_datetime(trip_df["lpep_pickup_datetime"])
    trip_df["hour"] = trip_df["lpep_pickup_datetime"].dt.hour
    trip_df["weekday"] = trip_df["lpep_pickup_datetime"].dt.weekday
    trip_df["month"] = trip_df["lpep_pickup_datetime"].dt.month

    # Get calculated duration in minutes
    trip_df["lpep_dropoff_datetime"] = pd.to_datetime(trip_df["lpep_dropoff_datetime"])
    trip_df["duration_minutes"] = (
        trip_df["lpep_dropoff_datetime"] - trip_df["lpep_pickup_datetime"]
    ).dt.seconds / 60

    # Rename and filter cols
    trip_df = trip_df.rename(
        columns={
            "latitude_PU": "pickup_latitude",
            "longitude_PU": "pickup_longitude",
            "latitude_DO": "dropoff_latitude",
            "longitude_DO": "dropoff_longitude",
        }
    )
    
    trip_df['FS_ID'] = trip_df.index + 1000
    current_time_sec = int(round(time.time()))
    trip_df["FS_time"] = pd.Series([current_time_sec]*len(trip_df), dtype="float64")
    return trip_df


def clean_data(trip_df: pd.DataFrame):
    # Remove outliers
    trip_df = trip_df[
        (trip_df.fare_amount > 0)
        & (trip_df.fare_amount < 200)
        & (trip_df.passenger_count > 0)
        & (trip_df.duration_minutes > 0)
        & (trip_df.duration_minutes < 120)
        & (trip_df.geo_distance > 0)
        & (trip_df.geo_distance < 121)
    ].dropna()

    # Filter columns
    cols = [
        "fare_amount",
        "passenger_count",
        "pickup_latitude",
        "pickup_longitude",
        "dropoff_latitude",
        "dropoff_longitude",
        "geo_distance",
        "hour",
        "weekday",
        "month",
    ]
    
    cols_fg = [
        "fare_amount",
        "passenger_count",
        "pickup_latitude",
        "pickup_longitude",
        "dropoff_latitude",
        "dropoff_longitude",
        "geo_distance",
        "hour",
        "weekday",
        "month",
        "FS_ID",
        "FS_time"
    ]
    return trip_df[cols], trip_df[cols_fg]
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



5. MLOps SageMaker Project/sagemaker-workshop-preprocess-seedcode-v1/pipelines/preprocess/preprocess.py [36:154]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    parser.add_argument('--base_dir', type=str, default="/opt/ml/processing")
    args, _ = parser.parse_known_args()
    return args

def extract_zones(zones_file: str, zones_dir: str):
    logger.info(f"Extracting zone file: {zones_file}")
    with ZipFile(zones_file, "r") as zip:
        zip.extractall(zones_dir)


def load_zones(zones_dir: str):
    logging.info(f"Loading zones from {zones_dir}")
    # Load the shape file and get the geometry and lat/lon
    zone_df = gpd.read_file(os.path.join(zones_dir, "taxi_zones.shp"))
    # Get centroids as EPSG code of 3310 to measure distance
    zone_df["centroid"] = zone_df.geometry.centroid.to_crs(epsg=3310)
    # Convert cordinates to the WSG84 lat/long CRS has a EPSG code of 4326.
    zone_df["latitude"] = zone_df.centroid.to_crs(epsg=4326).x
    zone_df["longitude"] = zone_df.centroid.to_crs(epsg=4326).y
    return zone_df


def load_data(file_list: list):
    # Define dates, and columns to use
    use_cols = [
        "fare_amount",
        "lpep_pickup_datetime",
        "lpep_dropoff_datetime",
        "passenger_count",
        "PULocationID",
        "DOLocationID",
    ]
    # Concat input files with select columns
    dfs = []
    for file in file_list:
        dfs.append(pd.read_csv(file, usecols=use_cols))
    return pd.concat(dfs, ignore_index=True)


def enrich_data(trip_df: pd.DataFrame, zone_df: pd.DataFrame):
    # Join trip DF to zones for poth pickup and drop off locations
    trip_df = gpd.GeoDataFrame(
        trip_df.join(zone_df, on="PULocationID").join(
            zone_df, on="DOLocationID", rsuffix="_DO", lsuffix="_PU"
        )
    )
    trip_df["geo_distance"] = (
        trip_df["centroid_PU"].distance(trip_df["centroid_DO"]) / 1000
    )

    # Add date parts
    trip_df["lpep_pickup_datetime"] = pd.to_datetime(trip_df["lpep_pickup_datetime"])
    trip_df["hour"] = trip_df["lpep_pickup_datetime"].dt.hour
    trip_df["weekday"] = trip_df["lpep_pickup_datetime"].dt.weekday
    trip_df["month"] = trip_df["lpep_pickup_datetime"].dt.month

    # Get calculated duration in minutes
    trip_df["lpep_dropoff_datetime"] = pd.to_datetime(trip_df["lpep_dropoff_datetime"])
    trip_df["duration_minutes"] = (
        trip_df["lpep_dropoff_datetime"] - trip_df["lpep_pickup_datetime"]
    ).dt.seconds / 60

    # Rename and filter cols
    trip_df = trip_df.rename(
        columns={
            "latitude_PU": "pickup_latitude",
            "longitude_PU": "pickup_longitude",
            "latitude_DO": "dropoff_latitude",
            "longitude_DO": "dropoff_longitude",
        }
    )
    
    trip_df['FS_ID'] = trip_df.index + 1000
    current_time_sec = int(round(time.time()))
    trip_df["FS_time"] = pd.Series([current_time_sec]*len(trip_df), dtype="float64")
    return trip_df


def clean_data(trip_df: pd.DataFrame):
    # Remove outliers
    trip_df = trip_df[
        (trip_df.fare_amount > 0)
        & (trip_df.fare_amount < 200)
        & (trip_df.passenger_count > 0)
        & (trip_df.duration_minutes > 0)
        & (trip_df.duration_minutes < 120)
        & (trip_df.geo_distance > 0)
        & (trip_df.geo_distance < 121)
    ].dropna()

    # Filter columns
    cols = [
        "fare_amount",
        "passenger_count",
        "pickup_latitude",
        "pickup_longitude",
        "dropoff_latitude",
        "dropoff_longitude",
        "geo_distance",
        "hour",
        "weekday",
        "month",
    ]
    
    cols_fg = [
        "fare_amount",
        "passenger_count",
        "pickup_latitude",
        "pickup_longitude",
        "dropoff_latitude",
        "dropoff_longitude",
        "geo_distance",
        "hour",
        "weekday",
        "month",
        "FS_ID",
        "FS_time"
    ]
    return trip_df[cols], trip_df[cols_fg]
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



