extensions/usb-camera/GetUSBCamera.cpp (327 lines of code) (raw):
/**
 * @file GetUSBCamera.cpp
 * GetUSBCamera class implementation
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "GetUSBCamera.h"
#include <png.h>
#include <utility>
#include <string>
#include <vector>
#include <algorithm>
#include "utils/gsl.h"
#include "core/ProcessSessionFactory.h"
#include "core/Resource.h"
namespace org::apache::nifi::minifi::processors {
void GetUSBCamera::initialize() {
  setSupportedProperties(Properties);
  setSupportedRelationships(Relationships);
}
void GetUSBCamera::onFrame(uvc_frame_t *frame, void *ptr) {
  auto cb_data = reinterpret_cast<GetUSBCamera::CallbackData *>(ptr);
  std::unique_lock<std::recursive_mutex> lock(*(cb_data->dev_access_mtx), std::try_to_lock);
  if (!lock.owns_lock()) {
    return;
  }
  auto now = std::chrono::steady_clock::now();
  if (now - cb_data->last_frame_time < std::chrono::milliseconds(static_cast<int>(1000.0 / cb_data->target_fps))) {
    return;
  }
  cb_data->last_frame_time = now;
  try {
    uvc_error_t ret;
    cb_data->logger->log_debug("Got frame");
    ret = uvc_any2rgb(frame, cb_data->frame_buffer);
    if (ret) {
      cb_data->logger->log_error("Failed to convert frame to RGB: %s", uvc_strerror(ret));
      return;
    }
    auto session = cb_data->session_factory->createSession();
    auto flow_file = session->create();
    std::string flow_file_name;
    flow_file->getAttribute("filename", flow_file_name);
    cb_data->logger->log_info("Created flow file: %s", flow_file_name);
    if (cb_data->format == "RAW") {
      session->writeBuffer(flow_file, gsl::make_span(static_cast<const std::byte*>(cb_data->frame_buffer->data), cb_data->frame_buffer->data_bytes));
    } else {
      if (cb_data->format != "PNG") {
        cb_data->logger->log_warn("Invalid format specified (%s); defaulting to PNG", cb_data->format);
      }
      session->write(flow_file, GetUSBCamera::PNGWriteCallback{
          cb_data->png_write_mtx,
          cb_data->frame_buffer,
          cb_data->device_width,
          cb_data->device_height});
    }
    session->transfer(flow_file, GetUSBCamera::Success);
    session->commit();
  } catch (std::exception &exception) {
    cb_data->logger->log_debug("GetUSBCamera Caught Exception %s", exception.what());
  } catch (...) {
    cb_data->logger->log_debug("GetUSBCamera Caught Unknown Exception");
  }
}
void GetUSBCamera::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *session_factory) {
  std::lock_guard<std::recursive_mutex> lock(*dev_access_mtx_);
  double default_fps = 1;
  double target_fps = default_fps;
  std::string conf_fps_str;
  context->getProperty("FPS", conf_fps_str);
  if (conf_fps_str.empty()) {
    logger_->log_info("FPS property was not set; using default %f", default_fps);
  } else {
    try {
      target_fps = std::stod(conf_fps_str);
    } catch (std::invalid_argument &e) {
      logger_->log_error("Could not parse configured FPS value (will use default %f): %s", default_fps, conf_fps_str);
    }
  }
  uint16_t target_width = 0;
  uint16_t target_height = 0;
  std::string conf_width_str;
  context->getProperty("Width", conf_width_str);
  if (!conf_width_str.empty()) {
    auto target_width_ul = std::stoul(conf_width_str);
    if (target_width_ul > UINT16_MAX) {
      logger_->log_error("Configured target width %s is out of range", conf_width_str);
    } else {
      target_width = static_cast<uint16_t>(target_width_ul);
    }
    logger_->log_info("Using configured target width: %i", target_width);
  }
  std::string conf_height_str;
  context->getProperty("Height", conf_height_str);
  if (!conf_height_str.empty()) {
    auto target_height_ul = std::stoul(conf_height_str);
    if (target_height_ul > UINT16_MAX) {
      logger_->log_error("Configured target height %s is out of range", conf_height_str);
    } else {
      target_height = static_cast<uint16_t>(target_height_ul);
    }
    logger_->log_info("Using configured target height: %i", target_height);
  }
  std::string conf_format_str;
  context->getProperty("Format", conf_format_str);
  int usb_vendor_id;
  std::string conf_vendor_id;
  context->getProperty("USB Vendor ID", conf_vendor_id);
  std::stringstream(conf_vendor_id) >> std::hex >> usb_vendor_id;
  logger_->log_info("Using USB Vendor ID: %x", usb_vendor_id);
  int usb_product_id;
  std::string conf_product_id;
  context->getProperty("USB Product ID", conf_product_id);
  std::stringstream(conf_product_id) >> std::hex >> usb_product_id;
  logger_->log_info("Using USB Product ID: %x", usb_product_id);
  const char *usb_serial_no = nullptr;
  std::string conf_serial;
  context->getProperty("USB Serial No.", conf_serial);
  if (!conf_serial.empty()) {
    usb_serial_no = conf_serial.c_str();
    logger_->log_info("Using USB Serial No.: %s", conf_serial);
  }
  cleanupUvc();
  logger_->log_info("Beginning to capture frames from USB camera");
  uvc_stream_ctrl_t ctrl{};
  uvc_error_t res;
  res = uvc_init(&ctx_, nullptr);
  if (res < 0) {
    logger_->log_error("Failed to initialize UVC service context");
    ctx_ = nullptr;
    return;
  }
  logger_->log_debug("UVC initialized");
  // Locate device
  res = uvc_find_device(ctx_, &dev_, usb_vendor_id, usb_product_id, usb_serial_no);
  if (res < 0) {
    logger_->log_error("Unable to find device: %s", uvc_strerror(res));
    dev_ = nullptr;
  } else {
    logger_->log_info("Device found");
    // Open the device
    res = uvc_open(dev_, &devh_);
    if (res < 0) {
      logger_->log_error("Unable to open device: %s", uvc_strerror(res));
      devh_ = nullptr;
    } else {
      logger_->log_info("Device opened");
      // Iterate resolutions & framerates >= context fps, or nearest
      uint16_t width = 0;
      uint16_t height = 0;
      uint32_t max_size = 0;
      uint32_t fps = 0;
      double min_diff = -1;
      double current_diff = -1;
      double current_width_diff = -1;
      double current_height_diff = -1;
      for (auto fmt_desc = uvc_get_format_descs(devh_); fmt_desc; fmt_desc = fmt_desc->next) {
        uvc_frame_desc_t *frame_desc;
        switch (fmt_desc->bDescriptorSubtype) {
          case UVC_VS_FORMAT_UNCOMPRESSED:
          case UVC_VS_FORMAT_FRAME_BASED:
            for (frame_desc = fmt_desc->frame_descs; frame_desc; frame_desc = frame_desc->next) {
              uint32_t frame_fps = 10000000 / frame_desc->dwDefaultFrameInterval;
              logger_->log_info("Discovered supported format %ix%i @ %i",
                                frame_desc->wWidth,
                                frame_desc->wHeight,
                                frame_fps);
              if (target_height > 0 && target_width > 0) {
                if (frame_fps >= target_fps) {
                  current_width_diff = abs(frame_desc->wWidth - target_width) / static_cast<double>(target_width);
                  logger_->log_debug("Current frame format width difference is %f", current_width_diff);
                  current_height_diff = abs(frame_desc->wHeight - target_height) / static_cast<double>(target_height);
                  logger_->log_debug("Current frame format height difference is %f", current_height_diff);
                  current_diff = (current_width_diff + current_height_diff) / 2;
                  logger_->log_debug("Current frame format difference is %f", current_diff);
                  if (min_diff < 0 || current_diff < min_diff) {
                    logger_->log_info("Format %ix%i @ %i is now closest to target",
                                      frame_desc->wWidth,
                                      frame_desc->wHeight,
                                      frame_fps);
                    width = frame_desc->wWidth;
                    height = frame_desc->wHeight;
                    max_size = frame_desc->dwMaxVideoFrameBufferSize;
                    fps = frame_fps;
                    min_diff = current_diff;
                  }
                }
              } else {
                if (frame_desc->dwMaxVideoFrameBufferSize > max_size && frame_fps >= target_fps) {
                  width = frame_desc->wWidth;
                  height = frame_desc->wHeight;
                  max_size = frame_desc->dwMaxVideoFrameBufferSize;
                  fps = frame_fps;
                }
              }
            }
            break;
          case UVC_VS_FORMAT_MJPEG:
            logger_->log_info("Skipping MJPEG frame formats");
            break;
          default:logger_->log_warn("Found unknown format");
        }
      }
      if (fps == 0) {
        logger_->log_error("Could not find suitable frame format from device. "
                           "Try changing configuration (lower FPS) or device.");
        return;
      }
      logger_->log_info("Negotiating stream profile (looking for %dx%d @ %d)", width, height, fps);
      res = uvc_get_stream_ctrl_format_size(devh_, &ctrl, UVC_FRAME_FORMAT_UNCOMPRESSED, width, height, fps);
      if (res < 0) {
        logger_->log_error("Failed to find a matching stream profile: %s", uvc_strerror(res));
      } else {
        cb_data_.session_factory = session_factory;
        if (frame_buffer_ != nullptr) {
          uvc_free_frame(frame_buffer_);
        }
        frame_buffer_ = uvc_allocate_frame(width * height * 3);
        if (!frame_buffer_) {
          printf("unable to allocate bgr frame!");
          logger_->log_error("Unable to allocate RGB frame");
          return;
        }
        cb_data_.frame_buffer = frame_buffer_;
        cb_data_.context = context;
        cb_data_.png_write_mtx = png_write_mtx_;
        cb_data_.dev_access_mtx = dev_access_mtx_;
        cb_data_.logger = logger_;
        cb_data_.format = conf_format_str;
        cb_data_.device_width = width;
        cb_data_.device_height = height;
        cb_data_.device_fps = fps;
        cb_data_.target_fps = target_fps;
        cb_data_.last_frame_time = std::chrono::steady_clock::time_point();
        res = uvc_start_streaming(devh_, &ctrl, onFrame, &cb_data_, 0);
        if (res < 0) {
          logger_->log_error("Unable to start streaming: %s", uvc_strerror(res));
        } else {
          logger_->log_info("Streaming...");
          // Enable auto-exposure
          uvc_set_ae_mode(devh_, 1);
        }
      }
    }
  }
}
void GetUSBCamera::cleanupUvc() {
  std::lock_guard<std::recursive_mutex> lock(*dev_access_mtx_);
  if (frame_buffer_ != nullptr) {
    logger_->log_debug("Deallocating frame buffer");
    uvc_free_frame(frame_buffer_);
  }
  if (devh_ != nullptr) {
    logger_->log_debug("Stopping UVC streaming");
    uvc_stop_streaming(devh_);
    logger_->log_debug("Closing UVC device handle");
    uvc_close(devh_);
  }
  if (dev_ != nullptr) {
    logger_->log_debug("Closing UVC device descriptor");
    uvc_unref_device(dev_);
  }
  if (ctx_ != nullptr) {
    logger_->log_debug("Closing UVC context");
    uvc_exit(ctx_);
  }
  if (camera_thread_ != nullptr) {
    camera_thread_->join();
    logger_->log_debug("UVC thread ended");
  }
}
void GetUSBCamera::onTrigger(core::ProcessContext* /*context*/, core::ProcessSession *session) {
  auto flowFile = session->get();
  if (flowFile) {
    logger_->log_error("Received flowfile, but this processor does not support input flow files; routing to failure");
    session->transfer(flowFile, Failure);
  }
}
GetUSBCamera::PNGWriteCallback::PNGWriteCallback(std::shared_ptr<std::mutex> write_mtx,
                                                 uvc_frame_t *frame,
                                                 uint32_t width,
                                                 uint32_t height)
    : png_write_mtx_(std::move(write_mtx)),
      frame_(frame),
      width_(width),
      height_(height) {
}
int64_t GetUSBCamera::PNGWriteCallback::operator()(const std::shared_ptr<io::OutputStream>& stream) {
  std::lock_guard<std::mutex> lock(*png_write_mtx_);
  logger_->log_info("Writing %d bytes of raw capture data to PNG output", frame_->data_bytes);
  png_structp png = png_create_write_struct(PNG_LIBPNG_VER_STRING, nullptr, nullptr, nullptr);
  if (!png) {
    logger_->log_error("Failed to create PNG write struct");
    return 0;
  }
  png_infop info = png_create_info_struct(png);
  if (!info) {
    logger_->log_error("Failed to create PNG info struct");
    return 0;
  }
  if (setjmp(png_jmpbuf(png))) {  // NOLINT(cert-err52-cpp)
    logger_->log_error("Failed to set PNG jmpbuf");
    return 0;
  }
  try {
    png_set_write_fn(png, this, [](png_structp out_png,
                                   png_bytep out_data,
                                   png_size_t num_bytes) {
                       auto this_callback = reinterpret_cast<PNGWriteCallback *>(png_get_io_ptr(out_png));
                       std::copy(out_data, out_data + num_bytes, std::back_inserter(this_callback->png_output_buf_));
                     },
                     [](png_structp /*flush_png*/) {});
    png_set_IHDR(png, info, width_, height_, 8,
                 PNG_COLOR_TYPE_RGB,
                 PNG_INTERLACE_NONE,
                 PNG_COMPRESSION_TYPE_DEFAULT,
                 PNG_FILTER_TYPE_DEFAULT);
    png_write_info(png, info);
    std::vector<png_bytep> row_pointers;
    row_pointers.resize(height_);
    for (uint32_t y = 0; y < height_; y++) {
      row_pointers[y] = reinterpret_cast<png_byte *>(frame_->data) + width_ * y * 3;
    }
    png_write_image(png, row_pointers.data());
    png_write_end(png, nullptr);
    png_destroy_write_struct(&png, &info);
  } catch (...) {
    if (png && info) {
      png_destroy_write_struct(&png, &info);
    }
    throw;
  }
  const auto write_ret = stream->write(png_output_buf_.data(), png_output_buf_.size());
  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
}
REGISTER_RESOURCE(GetUSBCamera, Processor);
}  // namespace org::apache::nifi::minifi::processors