def _get_operation_by_id()

in pathology/orchestrator/pathology_operations_handler.py [0:0]


  def _get_operation_by_id(self, dpas_op_id: int) -> operations_pb2.Operation:
    """Gets an operation by looking up the DPAS Operation Id.

    Args:
      dpas_op_id: Unique id of operation to retrieve.

    Returns:
      Operation proto instance

    Raises:
      RpcFailureError if operation is not found.
    """
    row = self._spanner_client.read_data(
        _OPERATIONS_TABLE,
        schema_resources.OPERATIONS_COLS,
        spanner.KeySet([[dpas_op_id]]),
    )
    try:
      row_dict = dict(zip(schema_resources.OPERATIONS_COLS, row.one()))
    except exceptions.NotFound as exc:
      raise rpc_status.RpcFailureError(
          rpc_status.build_rpc_method_status_and_log(
              grpc.StatusCode.NOT_FOUND,
              'Could not retrieve Operation.'
              f' Operation with id {dpas_op_id} not found.',
              exp=exc,
          )
      ) from exc

    is_done = (
        row_dict[schema_resources.OPERATION_STATUS]
        == schema_resources.OperationStatus.COMPLETE.value
    )

    # Convert stored metadata to api metadata.
    stored_metadata = spanner_resources_pb2.StoredPathologyOperationMetadata()
    stored_metadata.ParseFromString(
        base64.b64decode(
            row_dict[schema_resources.STORED_PATHOLOGY_OPERATION_METADATA]
        )
    )
    op_metadata = any_pb2.Any()
    op_metadata.Pack(
        spanner_resource_converters.from_stored_operation_metadata(
            stored_metadata
        )
    )

    operation = operations_pb2.Operation(
        name=pathology_resources_util.convert_operation_id_to_name(dpas_op_id),
        metadata=op_metadata,
        done=is_done,
    )

    # If operation is complete, build response.
    if is_done:
      if row_dict[schema_resources.RPC_ERROR_CODE] != code_pb2.OK:
        operation.error.CopyFrom(
            status_pb2.Status(code=row_dict[schema_resources.RPC_ERROR_CODE])
        )
      else:
        response = any_pb2.Any()
        if stored_metadata.export_metadata.gcs_dest_path:
          response.Pack(
              cohorts_pb2.ExportPathologyCohortResponse(
                  status=status_pb2.Status(code=code_pb2.Code.OK)
              )
          )
        else:
          response.Pack(
              cohorts_pb2.TransferDeIdPathologyCohortResponse(
                  status=status_pb2.Status(code=code_pb2.Code.OK)
              )
          )
        operation.response.CopyFrom(response)

    return operation