in mobile_cv/predictor/model_wrappers.py [0:0]
def forward(self, inputs):
assert len(inputs) == len(
self._input_blobs
), "Number of input tensors ({}) doesn't match the required input blobs: {}".format(
len(inputs), self._input_blobs
)
with ScopedWS(self.ws_name, is_reset=False, is_cleanup=False) as ws:
# Feed inputs
for b, tensor in zip(self._input_blobs, inputs):
# feed torch.Tensor directly, maybe need to cast to numpy first
ws.FeedBlob(b, tensor)
# Run predict net
try:
ws.RunNet(self.net.Proto().name)
except RuntimeError as e:
if not str(e) in self._error_msgs:
self._error_msgs.add(str(e))
logger.warning("Encountered new RuntimeError: \n{}".format(str(e)))
logger.warning("Catch the error and use partial results.")
c2_outputs = [ws.FetchBlob(b) for b in self.net.Proto().external_output]
# Remove outputs of current run, this is necessary in order to
# prevent fetching the result from previous run if the model fails
# in the middle.
for b in self.net.Proto().external_output:
# Needs to create uninitialized blob to make the net runable.
# This is "equivalent" to: ws.RemoveBlob(b) then ws.CreateBlob(b),
# but there'no such API.
ws.FeedBlob(
b, f"{b}, a C++ native class of type nullptr (uninitialized)."
)
# Cast output to torch.Tensor on the desired device
output_devices = (
self._infer_output_devices(inputs)
if any(t.device.type != "cpu" for t in inputs)
else ["cpu" for _ in self.net.Proto().external_output]
)
outputs = []
for name, c2_output, device in zip(
self.net.Proto().external_output, c2_outputs, output_devices
):
if not isinstance(c2_output, np.ndarray):
raise RuntimeError(
"Invalid output for blob {}, received: {}".format(name, c2_output)
)
outputs.append(torch.Tensor(c2_output).to(device=device))
return tuple(outputs)