in pathology/orchestrator/pathology_operations_handler.py [0:0]
def _get_operation_by_id(self, dpas_op_id: int) -> operations_pb2.Operation:
"""Gets an operation by looking up the DPAS Operation Id.
Args:
dpas_op_id: Unique id of operation to retrieve.
Returns:
Operation proto instance
Raises:
RpcFailureError if operation is not found.
"""
row = self._spanner_client.read_data(
_OPERATIONS_TABLE,
schema_resources.OPERATIONS_COLS,
spanner.KeySet([[dpas_op_id]]),
)
try:
row_dict = dict(zip(schema_resources.OPERATIONS_COLS, row.one()))
except exceptions.NotFound as exc:
raise rpc_status.RpcFailureError(
rpc_status.build_rpc_method_status_and_log(
grpc.StatusCode.NOT_FOUND,
'Could not retrieve Operation.'
f' Operation with id {dpas_op_id} not found.',
exp=exc,
)
) from exc
is_done = (
row_dict[schema_resources.OPERATION_STATUS]
== schema_resources.OperationStatus.COMPLETE.value
)
# Convert stored metadata to api metadata.
stored_metadata = spanner_resources_pb2.StoredPathologyOperationMetadata()
stored_metadata.ParseFromString(
base64.b64decode(
row_dict[schema_resources.STORED_PATHOLOGY_OPERATION_METADATA]
)
)
op_metadata = any_pb2.Any()
op_metadata.Pack(
spanner_resource_converters.from_stored_operation_metadata(
stored_metadata
)
)
operation = operations_pb2.Operation(
name=pathology_resources_util.convert_operation_id_to_name(dpas_op_id),
metadata=op_metadata,
done=is_done,
)
# If operation is complete, build response.
if is_done:
if row_dict[schema_resources.RPC_ERROR_CODE] != code_pb2.OK:
operation.error.CopyFrom(
status_pb2.Status(code=row_dict[schema_resources.RPC_ERROR_CODE])
)
else:
response = any_pb2.Any()
if stored_metadata.export_metadata.gcs_dest_path:
response.Pack(
cohorts_pb2.ExportPathologyCohortResponse(
status=status_pb2.Status(code=code_pb2.Code.OK)
)
)
else:
response.Pack(
cohorts_pb2.TransferDeIdPathologyCohortResponse(
status=status_pb2.Status(code=code_pb2.Code.OK)
)
)
operation.response.CopyFrom(response)
return operation