front/admin_ui/app.py (672 lines of code) (raw):

import json import os import urllib.parse from itertools import product import duckdb import gradio as gr import huggingface_hub as hfh import matplotlib import matplotlib.pyplot as plt import networkx as nx import pandas as pd import requests from libcommon.processing_graph import processing_graph from tqdm.contrib.concurrent import thread_map matplotlib.use("SVG") DEV = os.environ.get("DEV", False) HF_ENDPOINT = os.environ.get("HF_ENDPOINT", "https://huggingface.co") PROD_DV_ENDPOINT = os.environ.get( "PROD_DV_ENDPOINT", "https://datasets-server.huggingface.co" ) DEV_DV_ENDPOINT = os.environ.get("DEV_DV_ENDPOINT", "http://localhost:8100") ADMIN_HF_ORGANIZATION = os.environ.get("ADMIN_HF_ORGANIZATION", "huggingface") HF_TOKEN = os.environ.get("HF_TOKEN") DV_ENDPOINT = DEV_DV_ENDPOINT if DEV else PROD_DV_ENDPOINT # global state (shared with all the user sessions) pending_jobs_df = None def healthcheck(): try: response = requests.head(f"{DV_ENDPOINT}/admin/healthcheck", timeout=10) except requests.ConnectionError as error: return f"❌ Failed to connect to {DV_ENDPOINT} (error {error})" if response.status_code == 200: return f"*Connected to {DV_ENDPOINT}*" else: return f"❌ Failed to connect to {DV_ENDPOINT} (error {response.status_code})" def draw_graph(width, height): graph = processing_graph._nx_graph pos = nx.nx_agraph.graphviz_layout(graph, prog="dot") fig = plt.figure(figsize=(width, height)) nx.draw_networkx(graph, pos=pos, node_color="#d1b2f8", node_size=500) return fig with gr.Blocks() as demo: gr.Markdown("## Datasets-server admin page") gr.Markdown(healthcheck) with gr.Row(visible=HF_TOKEN is None) as auth_page: with gr.Column(): auth_title = gr.Markdown( "Enter your token ([settings](https://huggingface.co/settings/tokens)):" ) token_box = gr.Textbox( HF_TOKEN or "", label="token", placeholder="hf_xxx", type="password" ) auth_error = gr.Markdown("", visible=False) with gr.Row(visible=HF_TOKEN is not None) as main_page: with gr.Column(): welcome_title = gr.Markdown("### Welcome") with gr.Tab("Home dashboard"): home_dashboard_fetch_button = gr.Button("Fetch") gr.Markdown("### Dataset infos") home_dashboard_trending_datasets_infos_by_builder_name_table = ( gr.DataFrame( pd.DataFrame( { "Builder name": [], "Count": [], r"% of all datasets with infos": [], r"% of all public datasets": [], } ) ) ) gr.Markdown("### Trending datasets coverage (is-valid)") home_dashboard_trending_datasets_coverage_stats_table = gr.DataFrame( pd.DataFrame( { "Num trending datasets": [], "HTTP Status": [], "Preview": [], "Viewer": [], "Search": [], "Filter": [], "Statistics": [], } ) ) home_dashboard_trending_datasets_coverage_table = gr.DataFrame( pd.DataFrame( { "All trending datasets": [], "HTTP Status": [], "Preview": [], "Viewer": [], "Search": [], "Filter": [], "Statistics": [], } ) ) def fetch_home_dashboard(token): out = { home_dashboard_trending_datasets_infos_by_builder_name_table: gr.DataFrame( value=None ), home_dashboard_trending_datasets_coverage_stats_table: gr.DataFrame( value=None ), home_dashboard_trending_datasets_coverage_table: gr.DataFrame( value=None ), } headers = {"Authorization": f"Bearer {token}"} response = requests.get( f"{DV_ENDPOINT}/admin/num-dataset-infos-by-builder-name", headers=headers, timeout=60, ) if response.status_code == 200: num_infos_by_builder_name = response.json() total_num_infos = sum(num_infos_by_builder_name.values()) num_public_datasets = sum( 1 for _ in hfh.HfApi(endpoint=HF_ENDPOINT).list_datasets() ) out[ home_dashboard_trending_datasets_infos_by_builder_name_table ] = gr.DataFrame( visible=True, value=pd.DataFrame( { "Builder name": list( num_infos_by_builder_name.keys() ), "Count": list(num_infos_by_builder_name.values()), r"% of all datasets with infos": [ f"{round(100 * num_infos / total_num_infos, 2)}%" for num_infos in num_infos_by_builder_name.values() ], r"% of all public datasets": [ f"{round(100 * num_infos / num_public_datasets, 2)}%" for num_infos in num_infos_by_builder_name.values() ], } ), ) else: out[ home_dashboard_trending_datasets_infos_by_builder_name_table ] = gr.DataFrame( visible=True, value=pd.DataFrame( { "Error": [ f"❌ Failed to fetch dataset infos from {DV_ENDPOINT} (error {response.status_code})" ] } ), ) response = requests.get( f"{HF_ENDPOINT}/api/trending?type=dataset&limit=20", timeout=60 ) if response.status_code == 200: trending_datasets = [ repo_info["repoData"]["id"] for repo_info in response.json()["recentlyTrending"] ] def get_is_valid_response(dataset: str): return requests.get( f"{DV_ENDPOINT}/is-valid?dataset={dataset}", headers=headers, timeout=60, ) is_valid_responses = thread_map( get_is_valid_response, trending_datasets, desc="get_is_valid_response", ) trending_datasets_coverage = {"All trending datasets": []} error_datasets = [] unauthorized_datasets = [] for dataset, is_valid_response in zip( trending_datasets, is_valid_responses ): if is_valid_response.status_code == 200: response_json = is_valid_response.json() trending_datasets_coverage[ "All trending datasets" ].append(dataset) for is_valid_field in response_json: pretty_field = is_valid_field.replace( "_", " " ).capitalize() if pretty_field not in trending_datasets_coverage: trending_datasets_coverage[pretty_field] = [] trending_datasets_coverage[pretty_field].append( "✅" if response_json[is_valid_field] is True else "❌" ) elif is_valid_response.status_code == 500: error_datasets.append(dataset) else: unauthorized_datasets.append(dataset) def fill_empty_cells(datasets, sign): trending_datasets_coverage[ "All trending datasets" ] += datasets for pretty_field in trending_datasets_coverage: trending_datasets_coverage[pretty_field] += [sign] * ( len( trending_datasets_coverage[ "All trending datasets" ] ) - len(trending_datasets_coverage[pretty_field]) ) fill_empty_cells(error_datasets, "❌") fill_empty_cells(unauthorized_datasets, "🚫") out[ home_dashboard_trending_datasets_coverage_table ] = gr.DataFrame( visible=True, value=pd.DataFrame(trending_datasets_coverage) ) trending_datasets_coverage_stats = { "Num trending datasets": [len(trending_datasets)], **{ is_valid_field: [ f"{round(100 * sum(1 for coverage in trending_datasets_coverage[is_valid_field] if coverage == '✅') / len(trending_datasets), 2)}%" ] for is_valid_field in trending_datasets_coverage if is_valid_field != "All trending datasets" }, } out[ home_dashboard_trending_datasets_coverage_stats_table ] = gr.DataFrame( visible=True, value=pd.DataFrame(trending_datasets_coverage_stats), ) else: out[ home_dashboard_trending_datasets_coverage_table ] = gr.DataFrame( visible=True, value=pd.DataFrame( { "Error": [ f"❌ Failed to fetch trending datasets from {HF_ENDPOINT} (error {response.status_code})" ] } ), ) return out home_dashboard_fetch_button.click( fetch_home_dashboard, inputs=[token_box], outputs=[ home_dashboard_trending_datasets_infos_by_builder_name_table, home_dashboard_trending_datasets_coverage_stats_table, home_dashboard_trending_datasets_coverage_table, ], ) with gr.Tab("View pending jobs"): fetch_pending_jobs_button = gr.Button("Fetch pending jobs") gr.Markdown("### Pending jobs summary") pending_jobs_summary_table = gr.DataFrame( pd.DataFrame({"Jobs": [], "Waiting": [], "Started": []}) ) gr.Markdown("### Most recent") recent_pending_jobs_table = gr.DataFrame() gr.Markdown("### Query the pending jobs table") pending_jobs_query = gr.Textbox( label="Query pending_jobs_df", placeholder="SELECT * FROM pending_jobs_df WHERE dataset LIKE 'allenai/c4", value="SELECT * FROM pending_jobs_df WHERE dataset LIKE 'allenai/c4'", lines=3, ) query_pending_jobs_button = gr.Button("Run") pending_jobs_query_result_df = gr.DataFrame() def view_jobs(token): global pending_jobs_df headers = {"Authorization": f"Bearer {token}"} response = requests.get( f"{DV_ENDPOINT}/admin/pending-jobs", headers=headers, timeout=60, ) if response.status_code == 200: pending_jobs = response.json() df = pd.DataFrame( [ job for job_type in pending_jobs for job_state in pending_jobs[job_type] for job in pending_jobs[job_type][job_state] ] ) if "started_at" in df.columns: df["started_at"] = pd.to_datetime( df["started_at"], errors="coerce" ) if "last_heartbeat" in df.columns: df["last_heartbeat"] = pd.to_datetime( df["last_heartbeat"], errors="coerce", ) if "created_at" in df.columns: df["created_at"] = pd.to_datetime( df["created_at"], errors="coerce" ) most_recent = df.nlargest(5, "created_at") else: most_recent = pd.DataFrame() pending_jobs_df = df return { pending_jobs_summary_table: gr.DataFrame( visible=True, value=pd.DataFrame( { "Jobs": list(pending_jobs), "Waiting": [ len(pending_jobs[job_type]["waiting"]) for job_type in pending_jobs ], "Started": [ len(pending_jobs[job_type]["started"]) for job_type in pending_jobs ], } ), ), recent_pending_jobs_table: gr.DataFrame(value=most_recent), } else: return { pending_jobs_summary_table: gr.DataFrame( visible=True, value=pd.DataFrame( { "Error": [ f"❌ Failed to view pending jobs to {DV_ENDPOINT} (error {response.status_code})" ] } ), ), recent_pending_jobs_table: gr.DataFrame(value=None), } def query_jobs(pending_jobs_query): global pending_jobs_df if pending_jobs_df is None: return { pending_jobs_query_result_df: gr.DataFrame( value=pd.DataFrame( { "Error": [ "❌ Please, fetch the pending jobs first" ] } ) ) } try: result = duckdb.query(pending_jobs_query).to_df() except ( duckdb.ParserException, duckdb.CatalogException, duckdb.BinderException, ) as error: return { pending_jobs_query_result_df: gr.DataFrame( value=pd.DataFrame({"Error": [f"❌ {str(error)}"]}) ) } return {pending_jobs_query_result_df: gr.DataFrame(value=result)} fetch_pending_jobs_button.click( view_jobs, inputs=token_box, outputs=[recent_pending_jobs_table, pending_jobs_summary_table], ) query_pending_jobs_button.click( query_jobs, inputs=pending_jobs_query, outputs=[pending_jobs_query_result_df], ) with gr.Tab("Refresh dataset step"): job_types = [ processing_step.job_type for processing_step in processing_graph.get_alphabetically_ordered_processing_steps() ] def on_change_refresh_job_type(job_type): return processing_graph.get_processing_step(job_type).difficulty refresh_type = gr.Dropdown( job_types, multiselect=False, type="value", label="job type", value=job_types[0], ) refresh_dataset_name = gr.Textbox( label="dataset", placeholder="allenai/c4" ) refresh_config_name = gr.Textbox( label="config (optional)", placeholder="en" ) refresh_split_name = gr.Textbox( label="split (optional)", placeholder="train, test" ) gr.Markdown( "*you can select multiple values by separating them with commas, e.g. split='train, test'*" ) refresh_difficulty = gr.Slider( 0, 100, processing_graph.get_processing_step(job_types[0]).difficulty, step=10, interactive=True, label="difficulty", ) refresh_type.change( on_change_refresh_job_type, refresh_type, refresh_difficulty ) refresh_priority = gr.Dropdown( ["low", "normal", "high"], multiselect=False, label="priority", value="high", ) refresh_dataset_button = gr.Button("Force refresh dataset") refresh_dataset_output = gr.Markdown("") def refresh_dataset( token, refresh_type, refresh_dataset_names, refresh_config_names, refresh_split_names, refresh_priority, refresh_difficulty, ): headers = {"Authorization": f"Bearer {token}"} all_results = "" for ( refresh_dataset_name, refresh_config_name, refresh_split_name, ) in product( refresh_dataset_names.split(","), refresh_config_names.split(","), refresh_split_names.split(","), ): refresh_dataset_name = refresh_dataset_name.strip() params = { "dataset": refresh_dataset_name, "priority": refresh_priority, } if refresh_config_name: refresh_config_name = refresh_config_name.strip() params["config"] = refresh_config_name if refresh_split_name: refresh_split_name = refresh_split_name.strip() params["split"] = refresh_split_name if refresh_difficulty: params["difficulty"] = refresh_difficulty params = urllib.parse.urlencode(params) response = requests.post( f"{DV_ENDPOINT}/admin/force-refresh/{refresh_type}?{params}", headers=headers, timeout=60, ) if response.status_code == 200: result = f"[{refresh_dataset_name}] ✅ Added processing step to the queue: '{refresh_type}'" if refresh_config_name: result += f", for config '{refresh_config_name}'" if refresh_split_name: result += f", for split '{refresh_split_name}'" else: result = f"[{refresh_dataset_name}] ❌ Failed to add processing step to the queue. Error {response.status_code}" try: if response.json().get("error"): result += f": {response.json()['error']}" except requests.JSONDecodeError: result += f": {response.content}" all_results += result.strip("\n") + "\n" return "```\n" + all_results + "\n```" refresh_dataset_button.click( refresh_dataset, inputs=[ token_box, refresh_type, refresh_dataset_name, refresh_config_name, refresh_split_name, refresh_priority, refresh_difficulty, ], outputs=refresh_dataset_output, ) with gr.Tab("Recreate dataset"): delete_and_recreate_dataset_name = gr.Textbox( label="dataset", placeholder="stanfordnlp/imdb" ) delete_and_recreate_priority = gr.Dropdown( ["low", "normal", "high"], multiselect=False, label="priority", value="high", ) gr.Markdown( "Beware: this will delete all the jobs, cache entries and assets for the dataset (for all the revisions). The dataset viewer will be unavailable until the cache is rebuilt." ) delete_and_recreate_dataset_button = gr.Button("Delete and recreate") delete_and_recreate_dataset_output = gr.Markdown("") def delete_and_recreate_dataset( token, delete_and_recreate_dataset_name, delete_and_recreate_priority, ): headers = {"Authorization": f"Bearer {token}"} delete_and_recreate_dataset_name = ( delete_and_recreate_dataset_name.strip() ) params = { "dataset": delete_and_recreate_dataset_name, "priority": delete_and_recreate_priority, } params = urllib.parse.urlencode(params) response = requests.post( f"{DV_ENDPOINT}/admin/recreate-dataset?{params}", headers=headers, timeout=60, ) if response.status_code == 200: result = f"[{delete_and_recreate_dataset_name}] ✅ All the assets have been deleted. A new job has been created to generate the cache again." else: result = f"[{refresh_dataset_name}] ❌ Failed to delete and recreate the dataset. Error {response.status_code}" try: if response.json().get("error"): result += f": {response.json()['error']}" except requests.JSONDecodeError: result += f": {response.content}" return result.strip("\n") + "\n" delete_and_recreate_dataset_button.click( delete_and_recreate_dataset, inputs=[ token_box, delete_and_recreate_dataset_name, delete_and_recreate_priority, ], outputs=delete_and_recreate_dataset_output, ) with gr.Tab("Dataset status"): dataset_name = gr.Textbox(label="dataset", placeholder="allenai/c4") dataset_status_button = gr.Button("Get dataset status") gr.Markdown("### Pending jobs") jobs_table = gr.DataFrame() gr.Markdown("### Cached responses") cached_responses_table = gr.DataFrame() def get_dataset_status(token, dataset): headers = {"Authorization": f"Bearer {token}"} response = requests.get( f"{DV_ENDPOINT}/admin/dataset-status?dataset={dataset}", headers=headers, timeout=60, ) if response.status_code == 200: dataset_status = response.json() cached_responses_df = pd.DataFrame( [ { "kind": cached_response["kind"], "dataset": cached_response["dataset"], "config": cached_response["config"], "split": cached_response["split"], "http_status": cached_response["http_status"], "error_code": cached_response["error_code"], "job_runner_version": cached_response[ "job_runner_version" ], "dataset_git_revision": cached_response[ "dataset_git_revision" ], "progress": cached_response["progress"], "updated_at": cached_response["updated_at"], "failed_runs": cached_response["failed_runs"], "details": json.dumps(cached_response["details"]), } for content in dataset_status.values() for cached_response in content["cached_responses"] ] ) jobs_df = pd.DataFrame( [ { "type": job["type"], "dataset": job["dataset"], "revision": job["revision"], "config": job["config"], "split": job["split"], "namespace": job["namespace"], "priority": job["priority"], "status": job["status"], "difficulty": job["difficulty"], "created_at": job["created_at"], "started_at": job["started_at"], "last_heartbeat": job["last_heartbeat"], } for content in dataset_status.values() for job in content["jobs"] ] ) return { cached_responses_table: gr.DataFrame( value=cached_responses_df ), jobs_table: gr.DataFrame(value=jobs_df), } else: return { cached_responses_table: gr.DataFrame( value=pd.DataFrame( [ { "error": f"❌ Failed to get status for {dataset} (error {response.status_code})" } ] ) ), jobs_table: gr.DataFrame( value=pd.DataFrame([{"content": str(response.content)}]) ), } dataset_status_button.click( get_dataset_status, inputs=[token_box, dataset_name], outputs=[cached_responses_table, jobs_table], ) with gr.Tab("Processing graph"): gr.Markdown( "## 💫 Please, don't forget to rebuild (factory reboot) this space immediately after each deploy 💫" ) gr.Markdown( "### so that we get the 🚀 production 🚀 version of the graph here " ) with gr.Row(): width = gr.Slider(1, 30, 19, step=1, label="Width") height = gr.Slider(1, 30, 15, step=1, label="Height") output = gr.Plot() draw_button = gr.Button("Plot processing graph") draw_button.click(draw_graph, inputs=[width, height], outputs=output) def auth(token): if not token: return {auth_error: gr.Markdown(value="", visible=False)} try: user = hfh.whoami(token=token) except requests.HTTPError as err: return {auth_error: gr.Markdown(value=f"❌ Error ({err})", visible=True)} orgs = [org["name"] for org in user["orgs"]] if ADMIN_HF_ORGANIZATION in orgs: return { auth_page: gr.Row(visible=False), welcome_title: gr.Markdown(value=f"### Welcome {user['name']}"), main_page: gr.Row(visible=True), } else: return { auth_error: gr.Markdown( value=f"❌ Unauthorized (user '{user['name']} is not a member of '{ADMIN_HF_ORGANIZATION}')" ) } token_box.change( auth, inputs=token_box, outputs=[auth_error, welcome_title, auth_page, main_page], ) if __name__ == "__main__": demo.launch()