scripts/weka.py (71 lines of code) (raw):
import glob
import subprocess
from typing import List, Union
def parse_input_paths(input_data: Union[str, List[str]]) -> List[str]:
"""
Parse input paths from either a list or a YAML-style string.
Args:
input_data: Either a list of paths or a string in YAML format
Returns:
List of paths
"""
if isinstance(input_data, list):
return input_data
try:
# Try to parse as YAML
lines = input_data.strip().split("\n")
# Remove common YAML list indicators and clean up paths
paths = [line.strip().strip("- ") for line in lines if line.strip()]
return [path for path in paths if path] # Filter out empty strings
except Exception as e:
raise ValueError(f"Could not parse input paths: {str(e)}")
def get_warmup_percentages(input_paths: Union[str, List[str]]) -> dict:
"""
Calculate the warm-up percentage for each folder by checking all files within.
Args:
input_paths: Either a list of paths or a YAML-style string with paths
Returns:
dict: Dictionary mapping folder paths to their warm-up percentages
"""
folder_paths = parse_input_paths(input_paths)
print(f"Checking warmup status for {len(folder_paths)} folders...")
results = {}
for folder_path in folder_paths:
try:
folder_path = folder_path.rstrip("/")
# print(f"\n=== Processing {folder_path} ===")
# Get all files and pass them directly to weka command
all_files = glob.glob(f"{folder_path}/*")
if not all_files:
print(f"No files found in {folder_path}")
results[folder_path] = 0.0
continue
# print(f"Found {len(all_files)} files")
# Pass all files as separate arguments
cmd = (
["weka", "fs", "tier", "location"]
+ all_files
+ ["--no-header", "--raw-units", "-o", "path,size,ssdRead"]
)
output = subprocess.check_output(cmd, text=True)
# print("First line of output:", output.split("\n")[0]) # Fixed debug print
total_size = 0
total_cached = 0
for line in output.strip().split("\n"):
if line:
parts = line.split()
# Format is: path size_value B cached_value B
# Example: /path/to/file 1234 B 1234 B
if len(parts) >= 5: # Make sure we have all parts
size = float(parts[-4]) # size value is 2nd to last before 'B'
cached = float(parts[-2]) # cached value is 2nd to last before 'B'
total_size += size
total_cached += cached
if total_size > 0:
warmup_percentage = (total_cached / total_size) * 100
results[folder_path] = round(warmup_percentage, 2)
print(
f"{folder_path}: {results[folder_path]}% warmed up ({total_cached/1e9:.2f}GB / {total_size/1e9:.2f}GB)"
)
else:
results[folder_path] = 0.0
print(f"{folder_path}: No data found")
except subprocess.CalledProcessError as e:
print(f"Error processing {folder_path}: {str(e)}")
results[folder_path] = -1
return results
def warmup_datasets(input_paths: Union[str, List[str]]) -> None:
"""
Warm up datasets by fetching all files in the given folders.
Uses find and xargs to parallelize the fetching.
Args:
input_paths: Either a list of paths or a YAML-style string with paths
"""
folder_paths = parse_input_paths(input_paths)
print(f"Warming up {len(folder_paths)} folders...")
for folder_path in folder_paths:
try:
folder_path = folder_path.rstrip("/")
print(f"\nWarming up {folder_path}")
# Use find to get all files and pipe to xargs for parallel fetching
cmd = f"find -L {folder_path} -type f | xargs -d '\\n' -r -n512 -P64 weka fs tier fetch"
subprocess.run(cmd, shell=True, check=True, text=True)
print(f"Finished warming up {folder_path}")
except subprocess.CalledProcessError as e:
print(f"Error warming up {folder_path}: {str(e)}")
# Example usage:
if __name__ == "__main__":
# Option 2: With a YAML-style string
yaml_input = """
- /fsx/loubna/datasets/llama_tokenized/fineweb-edu/merged
- /fsx/loubna/datasets/llama_tokenized/other_sources/dclm/
- /fsx/loubna/datasets/llama_tokenized/pes2o/standard
- /fsx/loubna/datasets/llama_tokenized/other_sources/wiki
- /fsx/loubna/datasets/llama_tokenized/other_sources/fw2-hq-fra_Latn/
- /fsx/loubna/datasets/llama_tokenized/other_sources/fw2-hq-spa_Latn/
- /fsx/loubna/datasets/llama_tokenized/other_sources/fw2-hq-deu_Latn/
- /fsx/loubna/datasets/llama_tokenized/fw2-hq-ita_Latn/standard
- /fsx/loubna/datasets/llama_tokenized/other_sources/fw2-hq-por_Latn/
- /fsx/loubna/datasets/llama_tokenized/fw2-hq-cmn_Hani/standard
- /fsx/loubna/datasets/llama_tokenized/fw2-hq-rus_Cyrl/standard
- /fsx/loubna/datasets/llama_tokenized/other_sources/fw2-hq-fas_Arab/
- /fsx/loubna/datasets/llama_tokenized/other_sources/fw2-hq-jpn_Jpan/
- /fsx/loubna/datasets/llama_tokenized/fw2-kor_Hang/standard
- /fsx/loubna/datasets/llama_tokenized/other_sources/fw2-hin_Deva/
- /fsx/loubna/datasets/llama_tokenized/other_sources/fw2-tha_Thai/
- /fsx/loubna/datasets/llama_tokenized/other_sources/fw2-hq-vie_Latn/
- /fsx/loubna/datasets/llama_tokenized/other_sources/fw2-hq-ell_Grek/
- /fsx/loubna/datasets/llama_tokenized/other_sources/infiwebmath-3plus/
- /fsx/loubna/datasets/llama_tokenized/other_sources/finemath-3plus/
- /fsx/loubna/datasets/llama_tokenized/other_sources/stack-edu-Python/
- /fsx/loubna/datasets/llama_tokenized/other_sources/stack-edu-Java/
- /fsx/loubna/datasets/llama_tokenized/other_sources/stack-edu-JavaScript/
- /fsx/loubna/datasets/llama_tokenized/kaggle/standard
"""
# First warm up the datasets
print("Starting dataset warm-up...")
warmup_datasets(yaml_input)
# Then check warm-up status
print("\nChecking warm-up status...")
warmup_stats = get_warmup_percentages(yaml_input)