scripts/render/pipeline.py (383 lines of code) (raw):

#!/usr/bin/env python3 # Copyright 2004-present Facebook. All Rights Reserved. # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """Defines independent building blocks (stages) of the render pipeline. Defines a Pipeline class with various stages corresponding to different stages of the render process. The class can easily be extended to allow for additional stages. The stages are designed to be independent of one another to allow the pipeline to be modifiable and create many entrypoints. Example: The standard pipeline is defined in render.py, but a modified version that uses the yet undefined awesome_special_effects pipeline stage can be seen as: >>> pipeline_stages = [ (pipeline.depth_estimation, FLAGS.run_depth_estimation), (pipeline.convert_to_binary, FLAGS.run_convert_to_binary), (pipeline.fusion, FLAGS.run_fusion), (pipeline.awesome_special_effects, FLAGS.run_awesome_special_effects), ] >>> pipeline.run(pipeline_stages) """ import json import os import sys import time from copy import copy import pika import progressbar dir_scripts = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) dir_root = os.path.dirname(dir_scripts) sys.path.append(dir_root) sys.path.append(os.path.join(dir_scripts, "util")) import config import setup from network import ( Address, download, get_frame_fns, get_frame_name, get_frame_range, listdir, remote_image_type_path, ) from scripts.util.system_util import image_type_paths class Pipeline: """Pipeline class for rendering stages. Pipeline stages process sequentially. Attributes: background_frame (dict[str, str]): Map of the background frame chunk with keys "first" and "last" corresponding to the appropriate frame names for the chunk. base_params (dict[str, _]): Map of all the FLAGS defined in render.py. force_recompute (bool): Whether or not to overwrite previous computations. frame_chunks (list[dict[str, str]]): List of frame chunk with keys "first" and "last" corresponding to the appropriate frame names for the chunk. master_ip (str): IP of the master host. """ def __init__( self, master_ip, base_params, frame_chunks, background_frame, force_recompute=False, ): """Constructs empty pipeline. Args: master_ip (str): IP of the master host. base_params (dict[str, _]): Map of all the FLAGS defined in render.py. frame_chunks (list[dict[str, str]]): List of frame chunk with keys "first" and "last" corresponding to the appropriate frame names for the chunk. background_frame (dict[str, str]): Map of the background frame chunk with keys "first" and "last" corresponding to the appropriate frame names for the chunk. force_recompute (bool, optional): Whether or not to overwrite previous computations. """ self.base_params = base_params self.frame_chunks = frame_chunks self.background_frame = background_frame self.master_ip = master_ip self.force_recompute = force_recompute # We only need to spawn the master if there is no RabbitMQ node already available try: pika.BlockingConnection(pika.ConnectionParameters(master_ip)) except Exception: setup.setup_master(base_params) # Since the master is spawned asynchronously, we have to await the queue to be available while True: try: self.purge_queue(config.QUEUE_NAME) break except Exception: time.sleep(1) continue def purge_queue(self, queue_name): """Clears contents of a queue. Args: queue_name (str): Name of the queue to clear. """ connection = pika.BlockingConnection(pika.ConnectionParameters(self.master_ip)) channel = connection.channel() channel.queue_declare(queue=queue_name) channel.queue_purge(queue=queue_name) # clears queue from previous runs def _get_missing_chunks_level(self, params, level, frame_chunks): dst_dir = remote_image_type_path(params, params["dst_image_type"], level) dst_frames = get_frame_range(frame_chunks[0]["first"], frame_chunks[-1]["last"]) remote = Address(dst_dir) uncompressed = remote.protocol != "s3" try: expected_frame_fns = set( get_frame_fns(params, dst_frames, uncompressed, dst_dir) ) actual_frame_fns = listdir(dst_dir, run_silently=True, recursive=True) except Exception as e: print(e) return None missing_frames_fns = expected_frame_fns - actual_frame_fns missing_frames = [ os.path.splitext(os.path.basename(frames_fn))[0] for frames_fn in missing_frames_fns ] return missing_frames def _get_missing_chunks(self, params, frame_chunks): if params["force_recompute"]: return frame_chunks print(f"Checking cache for {params['app']}...") if isinstance(params["dst_level"], list): missing_frames = set() for dst_level in params["dst_level"]: new_missing_chunks = self._get_missing_chunks_level( params, dst_level, frame_chunks ) if new_missing_chunks is None: return frame_chunks missing_frames = missing_frames.union(new_missing_chunks) else: missing_frames = self._get_missing_chunks_level( params, params["dst_level"], frame_chunks ) if missing_frames is None: return frame_chunks if len(missing_frames) == 0: return [] missing_frame_chunks = [] for frame_chunk in frame_chunks: for frame in get_frame_range(frame_chunk["first"], frame_chunk["last"]): if frame in missing_frames: missing_frame_chunks.append(frame_chunk) break return missing_frame_chunks def run_halted_queue(self, params, frame_chunks): """Runs a queue with params for each of the frame chunks. The program halts while awaiting the completion of tasks in the queue and shows a progress bar meanwhile. Any frame chunks that have been previously completed will be marked as complete unless running with force_recompute. Args: params (dict[str, _]): Message to be published to RabbitMQ. frame_chunks (list[dict[str, str]]): List of frame chunk with keys "first" and "last" corresponding to the appropriate frame names for the chunk. """ connection = pika.BlockingConnection( pika.ConnectionParameters(self.master_ip, heartbeat=0) ) channel = connection.channel() channel.queue_declare(queue=config.QUEUE_NAME) channel.queue_declare(queue=config.RESPONSE_QUEUE_NAME) self.purge_queue(config.QUEUE_NAME) self.purge_queue(config.RESPONSE_QUEUE_NAME) # force_recompute can be specified over the entire pipeline or particular stages frame_chunks = self._get_missing_chunks(params, frame_chunks) if len(frame_chunks) == 0: return for frame_chunk in frame_chunks: params.update(frame_chunk) msg = json.dumps(params) channel.basic_publish( exchange="", routing_key=config.QUEUE_NAME, body=msg, properties=pika.BasicProperties( delivery_mode=2 ), # make message persistent ) # Waits until the queue is empty before returning for next step queue_state = channel.queue_declare(config.RESPONSE_QUEUE_NAME) queue_size = queue_state.method.message_count progress = "█" widgets = [ f"{progress} ", f"{params['app']}:", progressbar.Bar(progress, "|", "|"), progressbar.Percentage(), " (Workers: ", progressbar.FormatLabel("0"), ") (", progressbar.FormatLabel("%(elapsed)s"), ")", ] bar = progressbar.ProgressBar(maxval=len(frame_chunks), widgets=widgets) bar.start() no_worker_period = None while queue_size != len(frame_chunks): time.sleep(1.0) queue_size = channel.queue_declare( config.RESPONSE_QUEUE_NAME ).method.message_count num_workers = channel.queue_declare(config.QUEUE_NAME).method.consumer_count widgets[5] = str(num_workers) if num_workers != 0: no_worker_period = None if num_workers == 0: if no_worker_period is None: no_worker_period = time.time() if time.time() - no_worker_period > config.NO_WORKER_TIMEOUT: raise Exception( "No workers for extended time! Check worker logs for errors..." ) bar.update(queue_size) bar.finish() def generate_foreground_masks(self): """Runs distributed foreground mask generation.""" depth_params = copy(self.base_params) depth_params.update( { "app": "GenerateForegroundMasks", "level": 0, "dst_level": None, "dst_image_type": "foreground_masks", } ) self.run_halted_queue(depth_params, self.frame_chunks) def _resize_job(self, resize_params, image_type, frame_chunks, threshold=None): """Runs distributed arbitrary image type resizing. Args: params (dict[str, _]): Message to be published to RabbitMQ. image_type (str): Name of an image type (re: source/util/ImageTypes.h). frame_chunks (list[dict[str, str]]): List of frame chunk with keys "first" and "last" corresponding to the appropriate frame names for the chunk. threshold (int, optional): Binary threshold to be applied to resizing operation. If None is passed in, no thresholding is performed. """ resize_params["app"] = f"Resize: {image_type.capitalize()}" resize_params["image_type"] = image_type resize_params["threshold"] = threshold resize_params["dst_image_type"] = image_type resize_params["dst_level"] = [level for level, _ in enumerate(config.WIDTHS)] self.run_halted_queue(resize_params, frame_chunks) def precompute_resizes_foreground(self): """Runs distributed foreground mask resizing.""" resize_params = copy(self.base_params) self._resize_job( resize_params, "foreground_masks", self.frame_chunks, threshold=127 ) def precompute_resizes(self): """Runs distributed color, background color, and background disparity resizing.""" resize_params = copy(self.base_params) if resize_params["disparity_type"] == "background_disp": self._resize_job(resize_params, "background_color", self.background_frame) elif resize_params["disparity_type"] == "disparity": self._resize_job(resize_params, "color", self.frame_chunks) if self.background_frame is not None: self._resize_job( resize_params, "background_color", self.background_frame ) self._resize_job( resize_params, "background_disp", self.background_frame ) else: raise Exception( f"Invalid disparity type: {resize_params['disparity_type']}" ) def depth_estimation(self): """Runs distributed depth estimation with temporal filtering.""" post_resize_params = copy(self.base_params) if self.base_params["disparity_type"] == "disparity": post_resize_params["color"] = os.path.join( self.base_params["input_root"], image_type_paths[config.type_to_levels_type["color"]], ) post_resize_params["foreground_masks"] = os.path.join( self.base_params["input_root"], image_type_paths[config.type_to_levels_type["foreground_masks"]], ) post_resize_params["background_disp"] = os.path.join( self.base_params["input_root"], image_type_paths[config.type_to_levels_type["background_disp"]], ) elif self.base_params["disparity_type"] == "background_disp": post_resize_params["color"] = os.path.join( self.base_params["input_root"], image_type_paths[config.type_to_levels_type["color"]], ) start_level = ( post_resize_params["level_start"] if post_resize_params["level_start"] != -1 else len(config.WIDTHS) - 1 ) if post_resize_params["level_end"] != -1: end_level = post_resize_params["level_end"] else: for level, width in enumerate(config.WIDTHS): if post_resize_params["resolution"] >= width: end_level = level break # Ranges for temporal filtering (only used if performing temporal filtering) filter_ranges = [ { "first": frame_chunk["first"], "last": frame_chunk["last"], "filter_first": get_frame_name( max( int(post_resize_params["first"]), int(frame_chunk["first"]) - post_resize_params["time_radius"], ) ), "filter_last": get_frame_name( min( int(post_resize_params["last"]), int(frame_chunk["last"]) + post_resize_params["time_radius"], ) ), } for frame_chunk in self.frame_chunks ] for level in range(start_level, end_level - 1, -1): depth_params = copy(post_resize_params) if level != end_level: depth_params[ "output_formats" ] = "pfm" # Force only PFM at non-finest levels depth_params.update( { "app": f"DerpCLI: Level {level}", "level_start": level, "level_end": level, "image_type": depth_params["disparity_type"], "dst_level": level, "dst_image_type": depth_params["disparity_type"], } ) self.run_halted_queue(depth_params, self.frame_chunks) if post_resize_params["do_temporal_filter"]: filter_params = copy(post_resize_params) filter_params.update( { "app": "TemporalBilateralFilter", "level": level, "use_foreground_masks": post_resize_params[ "do_temporal_masking" ], "dst_level": level, "dst_image_type": "disparity_time_filtered", } ) self.run_halted_queue(filter_params, filter_ranges) transfer_params = copy(post_resize_params) transfer_params.update( { "app": "Transfer", "src_level": level, "src_image_type": "disparity_time_filtered", "dst_level": level, "dst_image_type": "disparity", "force_recompute": True, } ) self.run_halted_queue(transfer_params, self.frame_chunks) if post_resize_params["resolution"] > config.WIDTHS[end_level]: # The upsampling color level is the smallest one larger than our last level dst_level = end_level - 1 if end_level > 0 else None upsample_params = copy(self.base_params) upsample_params.update( { "app": "UpsampleDisparity", "level": end_level, "image_type": post_resize_params["disparity_type"], "dst_level": dst_level, "dst_image_type": config.type_to_upsample_type["disparity"], } ) if post_resize_params["disparity_type"] == "background_disp": frame_chunks = self.background_frame elif post_resize_params["disparity_type"] == "disparity": frame_chunks = self.frame_chunks else: raise Exception( f"Invalid disparity type {post_resize_params['disparity_type']}" ) self.run_halted_queue(upsample_params, frame_chunks) transfer_params = copy(post_resize_params) transfer_params.update( { "app": "Transfer", "src_level": None, "src_image_type": config.type_to_upsample_type["disparity"], "dst_level": None, "dst_image_type": post_resize_params["disparity_type"], } ) self.run_halted_queue(transfer_params, frame_chunks) else: transfer_params = copy(post_resize_params) transfer_params.update( { "app": "Transfer", "src_level": end_level, "src_image_type": post_resize_params["disparity_type"], "dst_level": None, "dst_image_type": post_resize_params["disparity_type"], } ) self.run_halted_queue(transfer_params, self.frame_chunks) def convert_to_binary(self): """Runs distributed binary conversion.""" convert_to_binary_params = copy(self.base_params) convert_to_binary_params.update( { "app": "ConvertToBinary: Meshing", "level": None, "foreground_masks": "", "run_conversion": True, "dst_level": None, "dst_image_type": "bin", } ) self.run_halted_queue(convert_to_binary_params, self.frame_chunks) def fusion(self): """Runs distributed binary striping.""" fusion_params = copy(self.base_params) fusion_params.update( { "app": "ConvertToBinary: Striping", "run_conversion": False, "dst_level": None, "dst_image_type": "fused", } ) self.run_halted_queue( fusion_params, [ { "first": self.frame_chunks[0]["first"], "last": self.frame_chunks[-1]["last"], } ], ) def simple_mesh_renderer(self): simple_mesh_renderer_params = copy(self.base_params) simple_mesh_renderer_params.update( { "app": "SimpleMeshRenderer", "level": None, "dst_level": None, "dst_image_type": f"exports_{simple_mesh_renderer_params['format']}", } ) self.run_halted_queue(simple_mesh_renderer_params, self.frame_chunks) def run(self, stages): """Runs the pipeline stages. Args: stages (list[tuple(func : void -> void), bool]): List of functions and whether or not they are to be executed. """ for stage, run_stage in stages: if run_stage: stage()