in bsp_server/scip_sync_util/scip_utils.py [0:0]
def _get_all_outputs(json_data):
"""Get all outputs from the action json data."""
data = json.loads(json_data)
path_fragments = []
if "pathFragments" in data:
path_fragments = data["pathFragments"]
artifacts = {}
if "artifacts" in data:
artifacts = data["artifacts"]
actions = {}
if "actions" in data:
actions = data["actions"]
targets = []
if "targets" in data:
targets = data["targets"]
# Calculate thread pool size as half of available CPUs
max_workers = get_thread_pool_size()
# Create a manager for thread-safe shared objects
manager = multiprocessing.Manager()
# Pre-process path fragments to create a lookup dictionary for parent fragments
# This avoids repeated searches in the process_fragment function
parent_lookup = {}
for fragment in path_fragments:
parent_id = fragment.get("parentId")
if parent_id:
parent_lookup[fragment["id"]] = parent_id
# Create a dictionary to map fragment IDs to their labels for quick lookup
fragment_labels = {fragment["id"]: fragment["label"] for fragment in path_fragments}
# Create a thread-safe dictionary to map pathFragmentId to the full path
path_dict = manager.dict()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
def process_fragment(fragment):
"""Process a single path fragment to build its full path."""
fragment_id = fragment["id"]
# Check if we've already processed this fragment
if fragment_id in path_dict:
return None
# Build the full path by traversing parent IDs
path_parts = []
current_id = fragment_id
# Collect all path parts by traversing up the parent chain
while current_id:
path_parts.append(fragment_labels[current_id])
current_id = parent_lookup.get(current_id)
# Combine path parts in reverse order (from root to leaf)
full_path = "/".join(reversed(path_parts))
return fragment_id, full_path
# Submit all fragments for processing
future_to_fragment = {
executor.submit(process_fragment, fragment): fragment
for fragment in path_fragments
}
# Collect results as they complete
for future in as_completed(future_to_fragment):
result = future.result()
if result: # Skip None results (already processed fragments)
fragment_id, path = result
path_dict[fragment_id] = path
# Create a dictionary to map artifactId to pathFragmentId
artifact_dict = {
artifact["id"]: artifact["pathFragmentId"] for artifact in artifacts
}
# Optimize the target output dictionary creation
# Group actions by target ID to reduce dictionary updates
target_output_dict = {}
for action in actions:
target_id = action["targetId"]
if target_id not in target_output_dict:
target_output_dict[target_id] = {}
if action["mnemonic"] not in target_output_dict[target_id]:
target_output_dict[target_id][action["mnemonic"]] = []
target_output_dict[target_id][action["mnemonic"]].extend(action["outputIds"])
# Create a thread-safe dictionary for the final output
target_output_paths = manager.dict()
# Batch targets for processing to reduce thread overhead
batch_size = max(1, len(targets) // (max_workers * 2))
target_batches = [
targets[i : i + batch_size] for i in range(0, len(targets), batch_size)
]
def process_target_batch(target_batch):
"""Process a batch of targets to build their output paths."""
batch_results = {}
for target in target_batch:
target_id = target["id"]
target_label = target["label"]
mnemonic_to_output_ids = target_output_dict.get(target_id, {})
target_results = {}
for mnemonic, output_ids in mnemonic_to_output_ids.items():
# Use list comprehension with pre-filtering to improve performance
valid_output_ids = [oid for oid in output_ids if oid in artifact_dict]
output_paths = [
path_dict[artifact_dict[oid]] for oid in valid_output_ids
]
if output_paths: # Only add non-empty results
target_results[mnemonic] = output_paths
if target_results: # Only add targets with results
batch_results[target_label] = target_results
return batch_results
# Process target batches in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
batch_results = list(executor.map(process_target_batch, target_batches))
# Merge all batch results into the final dictionary
final_results = {}
for batch_result in batch_results:
for target_label, mnemonics in batch_result.items():
if target_label not in final_results:
final_results[target_label] = {}
for mnemonic, paths in mnemonics.items():
if mnemonic not in final_results[target_label]:
final_results[target_label][mnemonic] = []
final_results[target_label][mnemonic].extend(paths)
return final_results