people-and-planet-ai/timeseries-classification/main.py (116 lines of code) (raw):
#!/usr/bin/env python
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import os
import re
import subprocess
import flask
import numpy as np
app = flask.Flask(__name__)
# Default values for dataset creation.
DEFAULT_TRAIN_EVAL_SPLIT = [80, 20]
# Default values for training in Vertex AI.
DEFAULT_TRAIN_EPOCHS = 100
DEFAULT_BATCH_SIZE = 128
DEFAULT_MACHINE_TYPE = "n1-standard-4"
DEFAULT_GPU_TYPE = "NVIDIA_TESLA_T4"
DEFAULT_GPU_COUNT = 2
# Google Cloud resources.
PROJECT = os.environ["PROJECT"]
REGION = os.environ["REGION"]
STORAGE_PATH = os.environ["STORAGE_PATH"]
CONTAINER_IMAGE = os.environ["CONTAINER_IMAGE"]
RAW_DATA_DIR = f"{STORAGE_PATH}/data"
RAW_LABELS_DIR = f"{STORAGE_PATH}/labels"
TRAIN_DATA_DIR = f"{STORAGE_PATH}/datasets/train"
EVAL_DATA_DIR = f"{STORAGE_PATH}/datasets/eval"
TRAINING_DIR = f"{STORAGE_PATH}/training"
TEMP_DIR = f"{STORAGE_PATH}/temp"
@app.route("/ping", methods=["POST"])
def run_root() -> dict:
args = flask.request.get_json() or {}
return {
"response": "Your request was successful! 🎉",
"args": args,
}
@app.route("/create-datasets", methods=["POST"])
def run_create_datasets() -> dict:
try:
args = flask.request.get_json() or {}
job_name = f"global-fishing-watch-create-datasets-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
cmd = [
"python",
"create_datasets.py",
f"--raw-data-dir={args.get('raw_data_dir', RAW_DATA_DIR)}",
f"--raw-labels-dir={args.get('raw_labels_dir', RAW_LABELS_DIR)}",
f"--train-data-dir={args.get('train_data_dir', TRAIN_DATA_DIR)}",
f"--eval-data-dir={args.get('eval_data_dir', EVAL_DATA_DIR)}",
f"--train-eval-split={args.get('train_eval_split', DEFAULT_TRAIN_EVAL_SPLIT)}",
"--runner=DataflowRunner",
f"--job_name={job_name}",
f"--project={args.get('project', PROJECT)}",
f"--region={args.get('region', REGION)}",
f"--temp_location={args.get('temp_location', TEMP_DIR)}",
f"--sdk_container_image={CONTAINER_IMAGE}",
]
result = subprocess.run(cmd, check=True, capture_output=True)
output = result.stdout.decode("utf-8")
m = re.search(r"^job_id: (.*)$", output)
if m:
job_id = m.group(1)
job_url = (
f"https://console.cloud.google.com/dataflow/jobs/{REGION}/{job_id}?project={PROJECT}",
)
else:
job_id = ""
job_url = ""
return {
"method": "create-datasets",
"job_name": job_name,
"job_id": job_id,
"job_url": job_url,
}
except Exception as e:
return {"error": f"{type(e).__name__}: {e}"}
@app.route("/train-model", methods=["POST"])
def run_train_model() -> dict:
import train_model
try:
args = flask.request.get_json() or {}
params = {
"project": args.get("project", PROJECT),
"region": args.get("region", REGION),
"train_data_dir": args.get("train_data_dir", TRAIN_DATA_DIR),
"eval_data_dir": args.get("eval_data_dir", EVAL_DATA_DIR),
"training_dir": args.get("training_dir", TRAINING_DIR),
"train_epochs": args.get("train_epochs", DEFAULT_TRAIN_EPOCHS),
"batch_size": args.get("batch_size", DEFAULT_BATCH_SIZE),
"machine_type": args.get("machine_type", DEFAULT_MACHINE_TYPE),
"gpu_type": args.get("gpu_type", DEFAULT_GPU_TYPE),
"gpu_count": args.get("gpu_count", DEFAULT_GPU_COUNT),
"sync": args.get("sync", False),
}
train_model.run(**params)
return {
"method": "train-model",
"params": params,
}
except Exception as e:
return {"error": f"{type(e).__name__}: {e}"}
@app.route("/predict", methods=["POST"])
def run_predict() -> dict:
import predict
try:
args = flask.request.get_json() or {}
params = {
"model_dir": args.get("model_dir", f"{TRAINING_DIR}/model"),
"inputs": args["inputs"],
}
predictions = predict.run(**params)
# Convert the numpy arrays to Python lists to make them JSON-encodable.
return {
"method": "predict",
"model_dir": params["model_dir"],
"input_shapes": {
name: np.shape(values) for name, values in params["inputs"].items()
},
"predictions": predictions,
}
except Exception as e:
return {"error": f"{type(e).__name__}: {e}"}
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))