# Working with Data

In this notebook you will learn how to use the AzureML SDK to:

1. Read/write data in a job.
1. Create a data asset to share with others in your team.
1. Abstract schema for tabular data using `MLTable`.

## Connect to Azure Machine Learning Workspace

To connect to a workspace, we need identifier parameters - a subscription, resource group and workspace name. We will use these details in the `MLClient` from `azure.ai.ml` to get a handle to the required Azure Machine Learning workspace. We use the default [default azure authentication](https://docs.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python) for this tutorial. Check the [configuration notebook](../../jobs/configuration.ipynb) for more details on how to configure credentials and connect to a workspace.
```

In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# enter details of your AML workspace
subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"

# get a handle to the workspace
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

## Reading/writing data in a job

In this example we will use the titanic dataset in this repo - ([./sample_data/titanic.csv](./sample_data/titanic.csv)) and set-up a command that executes the following python code:

```python
df = pd.read_csv(args.input_data)
print(df.head(10))
```

Below is the code for submitting the command to the cloud - note that both the code *and* the data is automatically uploaded to the cloud. Note: The data is only re-uploaded on subsequent job submissions if data has changed.

In [None]:
from azure.ai.ml import command
from azure.ai.ml.entities import Data
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes

# === Note on path ===
# can be can be a local path or a cloud path. AzureML supports https://`, `abfss://`, `wasbs://` and `azureml://` URIs.
# Local paths are automatically uploaded to the default datastore in the cloud.
# More details on supported paths: https://docs.microsoft.com/azure/machine-learning/how-to-read-write-data-v2#supported-paths

inputs = {
    "input_data": Input(type=AssetTypes.URI_FILE, path="./sample_data/titanic.csv")
}

job = command(
    code="./src",  # local path where the code is stored
    command="python read_data.py --input_data ${{inputs.input_data}}",
    inputs=inputs,
    environment="azureml://registries/azureml/environments/sklearn-1.5/labels/latest",
    compute="cpu-cluster",
)

# submit the command
returned_job = ml_client.jobs.create_or_update(job)
# get a URL for the status of the job
returned_job.studio_url

### Reading *and* writing data in a job

By design, you cannot *write* to `Inputs` only `Outputs`. The code below creates an `Output` that will mount your AzureML default datastore (Azure Blob) in Read-*Write* mode. The python code simply takes the CSV as import and exports it as a parquet file, i.e.

```python
df = pd.read_csv(args.input_data)
output_path = os.path.join(args.output_folder, "my_output.parquet")
df.to_parquet(output_path)
```

In [None]:
from azure.ai.ml import command
from azure.ai.ml.entities import Data
from azure.ai.ml import Input, Output
from azure.ai.ml.constants import AssetTypes

inputs = {
    "input_data": Input(type=AssetTypes.URI_FILE, path="./sample_data/titanic.csv")
}

outputs = {
    "output_folder": Output(
        type=AssetTypes.URI_FOLDER,
        path=f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace}/datastores/workspaceblobstore/paths/",
    )
}

job = command(
    code="./src",  # local path where the code is stored
    command="python read_write_data.py --input_data ${{inputs.input_data}} --output_folder ${{outputs.output_folder}}",
    inputs=inputs,
    outputs=outputs,
    environment="azureml://registries/azureml/environments/sklearn-1.5/labels/latest",
    compute="cpu-cluster",
)

# submit the command
returned_job = ml_client.create_or_update(job)
# get a URL for the status of the job
returned_job.studio_url

## Create Data Assets

You can create a data asset in Azure Machine Learning, which has the following benefits:

- Easy to share with other members of the team (no need to remember file locations)
- Versioning of the metadata (location, description, etc)
- Lineage tracking

Below we show an example of versioning the sample data in this repo. The data is uploaded to cloud storage and registered as an asset.

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

try:
    registered_data_asset = ml_client.data.get(name="titanic", version="1")
    print("Found data asset. Will not create again")
except Exception as ex:
    my_data = Data(
        path="./sample_data/titanic.csv",
        type=AssetTypes.URI_FILE,
        description="Titanic Data",
        name="titanic",
        version="1",
    )
    ml_client.data.create_or_update(my_data)
    registered_data_asset = ml_client.data.get(name="titanic", version="1")
    print("Created data asset")

### Authenticate with user identity

When running a job on a compute cluster, you can also use your user identity to access data. To enable job to access data on behald of you, specify **identity=UserIdentity()** in job definition, as shown below. For more details, see [Accessing storage services](https://learn.microsoft.com/azure/machine-learning/how-to-identity-based-service-authentication)

In [None]:
from azure.ai.ml import UserIdentityConfiguration

my_job_inputs = {"input_data": Input(type=AssetTypes.MLTABLE, path="./sample_data")}
job = command(
    code="./src",
    command="python read_data.py --input_data ${{inputs.input_data}}",
    inputs=my_job_inputs,
    environment="azureml://registries/azureml/environments/sklearn-1.5/labels/latest",
    compute="cpu-cluster",
    identity=UserIdentityConfiguration(),
)

## MLTable

`MLTable` is a way to abstract the schema definition for tabular data so that it is easier for consumers of the data to materialize the table into a Pandas/Dask/Spark dataframe. [A more detailed explanation and motivation is provided on docs.microsoft.com.](https://docs.microsoft.com/azure/machine-learning/concept-data#mltable).

The ideal scenarios to use `MLTable` are:

- The schema of your data is complex and/or changes frequently.
- You only need a subset of data (for example: a sample of rows or files, specific columns, etc).
- AutoML jobs requiring tabular data.

If your scenario does not fit the above then it is likely that URIs are a more suitable type.

### The `MLTable` file

The `MLTable` file defines the schema for tabular data. Below is a sample:

In [None]:
!cat ./sample-mltable/MLTable

We recommend that you co-locate your `MLTable` file with the underlying data (i.e. the `MLTable` file should be in the same (or parent) directory. You can can load an `MLTable` artefact using the `mltable` library - below below.

In [None]:
import mltable

# Note: the uri below can be a local folder or folder located in cloud storage. The folder must contain a valid MLTable file.
tbl = mltable.load(uri="./sample-mltable")
tbl.to_pandas_dataframe()

## Import Data from external sources

You can create a data asset in Azure Machine Learning by importing data from external sources like Snowflake or Amazon S3 and caching it, which has the following benefits:

- All the benefits of caching including faster and more reliable access of data for the training jobs
- Less points of failure than connecting to data in external sources directly
- Easy to share with other members of the team (no need to remember file locations)
- Versioning of the metadata (location, description, etc) even for data from SQL/ relational database 
- Lineage tracking



Firstly, you need to create a workspace connection with details on how to connect to the external source. 


Creating a connection to Snowflake DB when you want to import data from Snowflake

In [None]:
from azure.ai.ml import MLClient
from azure.ai.ml.entities import WorkspaceConnection
from azure.ai.ml.entities import UsernamePasswordConfiguration

name = "my_snowflakedb_connection"

target = "jdbc:snowflake://<myaccount>.snowflakecomputing.com/?db=<mydb>&warehouse=<mywarehouse>&role=<myrole>"
# add the Snowflake account, database, warehouse name and role name here. If no role name provided it will default to PUBLIC

wps_connection = WorkspaceConnection(
    name=name,
    type="snowflake",
    target=target,
    credentials=UsernamePasswordConfiguration(username="XXXXX", password="XXXXXX"),
)
ml_client.connections.create_or_update(workspace_connection=wps_connection)

Creating a connection to Azure SQL DB when you want to import data from Azure SQL

In [None]:
from azure.ai.ml import MLClient
from azure.ai.ml.entities import WorkspaceConnection
from azure.ai.ml.entities import UsernamePasswordConfiguration

name = "my_sqldb_connection"
target = "Server=tcp:<myservername>,<port>;Database=<mydatabase>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
# add the sql servername, port address and database

wps_connection = WorkspaceConnection(
    name=name,
    type="azure_sql_db",
    target=target,
    credentials=UsernamePasswordConfiguration(username="XXXXX", password="XXXXXX"),
)
ml_client.connections.create_or_update(workspace_connection=wps_connection)

Creating a connection to Amazon S3 when you want to import data from S3 bucket

In [None]:
from azure.ai.ml import MLClient
from azure.ai.ml.entities import WorkspaceConnection
from azure.ai.ml.entities import AccessKeyConfiguration

name = "my_s3_connection"
target = "<mybucket>"  # add the s3 bucket details
wps_connection = WorkspaceConnection(
    name=name,
    type="s3",
    target=target,
    credentials=AccessKeyConfiguration(
        access_key_id="XXXXXX", secret_access_key="XXXXXXXX"
    ),
)
ml_client.connections.create_or_update(workspace_connection=wps_connection)

Once the connection is created, you need to initiate an Import job

Here is an example of importing data from Snowflake

In [None]:
from azure.ai.ml.entities import DataImport
from azure.ai.ml.data_transfer import Database
from azure.ai.ml import MLClient

# Supported connections include:
# Connection: azureml:<workspace_connection_name>
# Supported paths include:
# Datastore: azureml://datastores/<data_store_name>/paths/<my_path>/${{name}}

data_import = DataImport(
    name="snowflake_sample",
    source=Database(
        connection="azureml:my_snowflakedb_connection",
        query="select * from my_sample_table",
    ),
    path="azureml://datastores/workspaceblobstore/paths/snowflake/${{name}}",
)
ml_client.data.import_data(data_import=data_import)

Here is an example of importing data from Azure SQL

In [None]:
from azure.ai.ml.entities import DataImport
from azure.ai.ml.data_transfer import Database
from azure.ai.ml import MLClient

# Supported connections include:
# Connection: azureml:<workspace_connection_name>
# Supported paths include:
# Datastore: azureml://datastores/<data_store_name>/paths/<my_path>/${{name}}

data_import = DataImport(
    name="azuresql_sample",
    source=Database(
        connection="azureml:my_sqldb_connection", query="select * from my_table"
    ),
    path="azureml://datastores/workspaceblobstore/paths/azuresql/${{name}}",
)
ml_client.data.import_data(data_import=data_import)

Here is an example of importing data from Amazon S3

In [None]:
from azure.ai.ml.entities import DataImport
from azure.ai.ml.data_transfer import FileSystem
from azure.ai.ml import MLClient

# Supported connections include:
# Connection: azureml:<workspace_connection_name>
# Supported paths include:
# Datastore: azureml://datastores/<data_store_name>/paths/<my_path>/${{name}}

data_import = DataImport(
    name="s3_sample",
    source=FileSystem(
        connection="azureml:my_s3_connection", path="myfiles/titanic.csv"
    ),
    path="azureml://datastores/workspaceblobstore/paths/s3/${{name}}",
)
ml_client.data.import_data(data_import=data_import)

Importing data on a Schedule. 
You can import data on a schedule created on a recurrence trigger or a cron trigger

Here is an example of importing data from Snowflake on Recurrence trigger

In [None]:
from azure.ai.ml.data_transfer import Database
from azure.ai.ml.constants import TimeZone
from azure.ai.ml.entities import (
    ImportDataSchedule,
    RecurrenceTrigger,
    RecurrencePattern,
)
from datetime import datetime, timedelta

source = Database(connection="azureml:my_sf_connection", query="select * from my_table")

path = "azureml://datastores/workspaceblobstore/paths/snowflake/schedule/${{name}}"


my_data = DataImport(
    type="mltable", source=source, path=path, name="my_schedule_sfds_test"
)

schedule_name = "my_simple_sdk_create_schedule_recurrence"

schedule_start_time = datetime.utcnow()

recurrence_trigger = RecurrenceTrigger(
    frequency="day",
    interval=1,
    schedule=RecurrencePattern(hours=1, minutes=[0, 1]),
    start_time=schedule_start_time,
    time_zone=TimeZone.UTC,
)


import_schedule = ImportDataSchedule(
    name=schedule_name, trigger=recurrence_trigger, import_data=my_data
)

ml_client.schedules.begin_create_or_update(import_schedule).result()

Here is a similar example of creating an import data on a schedule - this time it is a Cron Trigger

In [None]:
from azure.ai.ml.entities import CronTrigger, ImportDataSchedule

source = Database(connection="azureml:my_sf_connection", query="select * from my_table")

path = "azureml://datastores/workspaceblobstore/paths/snowflake/schedule/${{name}}"


my_data = DataImport(
    type="mltable", source=source, path=path, name="my_schedule_sfds_test"
)

schedule_name = "my_simple_sdk_create_schedule_cron"
start_time = datetime.utcnow()
end_time = start_time + timedelta(days=7)  # Set end_time to 7 days later

cron_trigger = CronTrigger(
    expression="15 10 * * 1",
    start_time=start_time,
    end_time=end_time,
)
import_schedule = ImportDataSchedule(
    name=schedule_name, trigger=cron_trigger, import_data=my_data
)
ml_client.schedules.begin_create_or_update(import_schedule).result()

NOTE: The import schedule is a schedule, so all the other CRUD operations of Schedule are available on this as well.

Disable the schedule

In [None]:
ml_client.schedules.begin_disable(name=schedule_name).result()

Check the detail of the schedule

In [None]:
created_schedule = ml_client.schedules.get(name=schedule_name)
[created_schedule.name]

List schedules in a workspace

In [None]:
schedules = ml_client.schedules.list()
[s.name for s in schedules]

Enable a schedule

In [None]:
ml_client.schedules.begin_enable(name=schedule_name).result()

Update a schedule

In [None]:
# Update trigger expression
import_schedule.trigger.expression = "10 10 * * 1"
import_schedule = ml_client.schedules.begin_create_or_update(
    schedule=import_schedule
).result()
print(import_schedule)

Delete the schedule

In [None]:
# Only disabled schedules can be deleted
ml_client.schedules.begin_disable(name=schedule_name).result()
ml_client.schedules.begin_delete(name=schedule_name).result()

Data Management on Workspace managed datastore:

Data import can be performed to an AzureML managed HOBO storage called "workspacemanageddatastore" by specifying 
path: azureml://datastores/workspacemanageddatastore
The datastore will be automatically back-filled if not present.

When done so, it comes with the added benefit of data life cycle management.

The following shows a simple data import on to the workspacemanageddatastore. Same can be done using the schedules defined above - 

In [None]:
from azure.ai.ml.entities import DataImport
from azure.ai.ml.data_transfer import Database
from azure.ai.ml import MLClient

# Supported connections include:
# Connection: azureml:<workspace_connection_name>
# Supported paths include:
# Datastore: azureml://datastores/<data_store_name>/paths/<my_path>/${{name}}

data_import = DataImport(
    name="my_sf_managedasset",
    source=Database(
        connection="azureml:my_snowflakedb_connection",
        query="select * from my_sample_table",
    ),
    path="azureml://datastores/workspacemanageddatastore",
)
ml_client.data.import_data(data_import=data_import)

Following are the examples of doing data lifecycle management aka altering the auto_delete_settings

Get imported data asset details:
```python


# Get data asset details
name = "my_sf_managedasset"
version = "1"
my_data = ml_client.data.get(name=name, version=version)

```

Update auto delete settings - 

```python
from azure.ai.ml.entities import Data
from azure.ai.ml.entities._assets.auto_delete_setting import AutoDeleteSetting
from azure.ai.ml.constants import AssetTypes

# update auto delete setting
name = "my_sf_managedasset"
version = "1"
my_data = ml_client.data.get(name=name, version=version)
my_data.auto_delete_setting = AutoDeleteSetting(
    condition="created_greater_than", value="45d"
)
my_data = ml_client.data.create_or_update(my_data)
print("Update auto delete setting:", my_data)
```

Update auto delete settings - 

```python
from azure.ai.ml.entities import Data
from azure.ai.ml.entities._assets.auto_delete_setting import AutoDeleteSetting
from azure.ai.ml.constants import AssetTypes

# update auto delete setting
name = "my_sf_managedasset"
version = "1"
my_data = ml_client.data.get(name=name, version=version)
my_data.auto_delete_setting = AutoDeleteSetting(
    condition="last_accessed_greater_than", value="30d"
)
my_data = ml_client.data.create_or_update(my_data)
print("Update auto delete setting:", my_data)
```

Delete auto delete settings - 

```python
from azure.ai.ml.entities import Data
from azure.ai.ml.entities._assets.auto_delete_setting import AutoDeleteSetting
from azure.ai.ml.constants import AssetTypes

# remove auto delete setting
name = "my_sf_managedasset"
version = "1"
my_data = ml_client.data.get(name=name, version=version)
my_data.auto_delete_setting = None
my_data = ml_client.data.create_or_update(my_data)
print("Remove auto delete setting:", my_data)

```

> Note: Whilst the above example shows a local file. Remember that `path` supports cloud storage (`https`, `abfss`, `wasbs` protocols). Therefore, if you want to register data in a cloud location just specify the path with any of the supported protocols.

### Consume data assets in a job

Below shows how to consume a data asset in the job:


In [None]:
from azure.ai.ml import command, Input, Output
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

registered_data_asset = ml_client.data.get(name="titanic", version="1")

my_job_inputs = {
    "input_data": Input(type=AssetTypes.URI_FILE, path=registered_data_asset.id)
}

job = command(
    code="./src",
    command="python read_data.py --input_data ${{inputs.input_data}}",
    inputs=my_job_inputs,
    environment="azureml://registries/azureml/environments/sklearn-1.5/labels/latest",
    compute="cpu-cluster",
)

# submit the command
returned_job = ml_client.create_or_update(job)
# get a URL for the status of the job
returned_job.studio_url

### Read an MLTable in a job

#### Create an environment

Firstly, you need to create an environment that contains the mltable Python Library:

In [None]:
from azure.ai.ml.entities import Environment

env_docker_conda = Environment(
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    conda_file="env-mltable.yml",
    name="mltable",
    description="Environment created for consuming MLTable.",
)

ml_client.environments.create_or_update(env_docker_conda)

#### Create a job

In [None]:
from azure.ai.ml import command
from azure.ai.ml.entities import Data
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes

inputs = {"input_data": Input(type=AssetTypes.MLTABLE, path="./sample-mltable")}

job = command(
    code="./src",  # local path where the code is stored
    command="python read_mltable.py --input_data ${{inputs.input_data}}",
    inputs=inputs,
    environment=env_docker_conda,
    compute="cpu-cluster",
)

# submit the command
returned_job = ml_client.jobs.create_or_update(job)
# get a URL for the status of the job
returned_job.studio_url