std::unique_ptr RFC1867Codec::onIngress()

in proxygen/lib/http/experimental/RFC1867.cpp [56:170]


std::unique_ptr<IOBuf> RFC1867Codec::onIngress(std::unique_ptr<IOBuf> data) {
  static auto dummyBuf =
      IOBuf::wrapBuffer(kDummyGet.data(), kDummyGet.length());
  IOBufQueue result{IOBufQueue::cacheChainLength()};
  bool foundBoundary = false;
  BoundaryResult br = BoundaryResult::NO;

  input_.append(std::move(data));
  while (!input_.empty()) {
    switch (state_) {
      case ParserState::START:
        // first time, must start with boundary without leading \n
        br = isBoundary(
            *input_.front(), 0, boundary_.data() + 1, boundary_.length() - 1);
        if (br == BoundaryResult::NO) {
          if (callback_) {
            LOG(ERROR) << "Invalid starting sequence";
            callback_->onError();
          }
          state_ = ParserState::ERROR;
          return nullptr;
        } else if (br == BoundaryResult::PARTIAL) {
          return input_.move();
        }
        input_.trimStart(boundary_.length() - 1);
        bytesProcessed_ += boundary_.length() - 1;
        state_ = ParserState::HEADERS_START;
        // fall through

      case ParserState::HEADERS_START: {
        if (input_.chainLength() < 3) {
          return input_.move();
        }
        Cursor c(input_.front());
        char firstTwo[2];
        c.pull(firstTwo, 2);
        // We have at least 3 chars available to read
        uint8_t toTrim = 3;
        if (memcmp(firstTwo, "--", 2) == 0) {
          do {
            auto ch = c.read<char>();
            if (ch == '\n') {
              input_.trimStart(toTrim);
              state_ = ParserState::DONE;
            } else if (ch == '\r') {
              // Every \r we encounter is a char we must trim but we must
              // make sure we have sufficient data available in input_ to
              // keep reading (toTrim is always one pos ahead to handle the
              // expected \n)
              ++toTrim;
              if (input_.chainLength() < toTrim) {
                return input_.move();
              }
            } else {
              state_ = ParserState::ERROR;
            }
          } while (state_ == ParserState::HEADERS_START);
          break;
        }
      }
        headerParser_.setParserPaused(false);
        headerParser_.onIngress(*dummyBuf);
        CHECK(!parseError_);
        state_ = ParserState::HEADERS;
        // fall through

      case ParserState::HEADERS:
        while (!parseError_ && input_.front() &&
               state_ == ParserState::HEADERS) {
          size_t bytesParsed = headerParser_.onIngress(*input_.front());
          input_.trimStart(bytesParsed);
          bytesProcessed_ += bytesParsed;
        }
        if (parseError_) {
          if (callback_) {
            LOG(ERROR) << "Error parsing header data: ";
            VLOG(3) << IOBufPrinter::printHexFolly(input_.front());
            callback_->onError();
          }
          state_ = ParserState::ERROR;
          return nullptr;
        }
        break;

      case ParserState::FIELD_DATA:
        result = readToBoundary(foundBoundary);
        value_.append(result.move());
        if (!value_.empty() && callback_) {
          if (callback_->onFieldData(value_.move(), bytesProcessed_) < 0) {
            LOG(ERROR) << "Callback returned error";
            state_ = ParserState::ERROR;
            return nullptr;
          }
        }
        if (foundBoundary) {
          if (callback_) {
            callback_->onFieldEnd(true, bytesProcessed_);
          }
          state_ = ParserState::HEADERS_START;
        } else {
          if (input_.chainLength() > 0) {
            VLOG(5) << "Trailing input="
                    << IOBufPrinter::printHexFolly(input_.front());
          }
          return input_.move();
        }
        break;
      case ParserState::DONE:
      case ParserState::ERROR:
        // abort, consume all input
        return nullptr;
    }
  }
  return nullptr;
}