data-analytics/minigolf-demo/upload.py (64 lines of code) (raw):
# Copyright 2024 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This script monitors a designated folder for new video files and uploads them to a Google Cloud Storage bucket.
It is designed to run continuously, checking for new files at regular intervals.
It waits for a specified period of inactivity before considering a file "complete" and uploading it.
- Replace `VIDEO_BUCKET` and `PROJECT_ID` with your Google Cloud Storage bucket name and project ID.
- (Optional) Modify `MONITORING_INTERVAL` to change how often the script checks for new files.
- Run the Script: python3 upload.py /path/to/folder
"""
import os
import time
import sys
from pathlib import Path
from google.cloud import storage
# Replace these values with your actual bucket name and project ID
VIDEO_BUCKET = ""
PROJECT_ID = ""
# Directory paths
TEMP_FOLDER = "/tmp"
# Monitoring interval (in seconds)
MONITORING_INTERVAL = 3
storage_client = storage.Client(project=PROJECT_ID)
bucket = storage_client.bucket(VIDEO_BUCKET)
# Helper Functions
def get_latest_file(directory):
"""
Finds and returns the most recently modified file within the specified directory.
Args:
directory (str): The path to the directory to search.
Returns:
Path: A Path object representing the latest file, or None if the directory is empty.
"""
directory_path = Path(directory)
try:
return max((p for p in directory_path.iterdir() if p.is_file()), key=os.path.getmtime)
except ValueError: # Handle empty directory
return None
def get_file_number():
"""
Gets the next available file number based on existing files in Cloud Storage.
Returns:
str: The formatted file name string, like "minigolf_0001.mp4".
"""
blobs = list(bucket.list_blobs(prefix="minigolf_")) # List "minigolf_" files
blobs.sort(key=lambda blob: blob.name, reverse=True) # Sort by name descending
if blobs:
latest_file = blobs[0].name # Get the latest file name
latest_number = int(latest_file[9:13]) # Extract the number
next_number = latest_number + 1
else:
next_number = 1 # Start from 1 if no files exist
return f"minigolf_{next_number:04d}.mp4"
def upload_file_to_gcs(src_path):
"""
Uploads a file from the local filesystem to a Google Cloud Storage bucket.
Args:
src_path (str): The path to the local file to upload.
"""
start_time = time.time()
print(f"Uploading {src_path}")
dst_file = get_file_number()
blob = bucket.blob(dst_file)
blob.upload_from_filename(src_path, timeout=300)
end_time = time.time()
print(f"Uploaded {src_path} to gs://{VIDEO_BUCKET}/{dst_file} in {end_time - start_time:.2f} seconds.")
# Main Monitoring Logic
def monitor_and_upload(folder_path):
"""
Continuously monitors the Pixel folder for new video files and uploads them to GCS.
"""
if not os.path.isdir(folder_path):
print(f"Error: Invalid folder path '{folder_path}'.")
sys.exit(1) # Exit with an error code
# Keep track of existing files and their last modified times
existing_files = {
file_path.name: file_path.stat().st_mtime
for file_path in Path(folder_path).iterdir() if file_path.is_file()
}
while True:
latest_file = get_latest_file(folder_path)
if latest_file and latest_file.name not in existing_files:
file_path = str(latest_file.absolute())
file_size = 0
while file_size < os.path.getsize(file_path):
print("File is recording, waiting until recording is completed...")
file_size = os.path.getsize(file_path)
time.sleep(3)
print("recording is completed. Uploading...")
upload_file_to_gcs(file_path)
existing_files[latest_file.name] = latest_file.stat().st_mtime
else:
print("No new file detected. Monitoring...")
time.sleep(MONITORING_INTERVAL)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python3 upload.py /path/to/folder")
sys.exit(1)
folder_path = sys.argv[1]
monitor_and_upload(folder_path)