void op_state_machine_locked()

in src/core/ext/transport/inproc/inproc_transport.cc [586:854]


void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
  // This function gets called when we have contents in the unprocessed reads
  // Get what we want based on our ops wanted
  // Schedule our appropriate closures
  // and then return to ops_needed state if still needed

  grpc_error* new_err = GRPC_ERROR_NONE;

  bool needs_close = false;

  INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
  // cancellation takes precedence
  inproc_stream* other = s->other_side;

  if (s->cancel_self_error != GRPC_ERROR_NONE) {
    fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error));
    goto done;
  } else if (s->cancel_other_error != GRPC_ERROR_NONE) {
    fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error));
    goto done;
  } else if (error != GRPC_ERROR_NONE) {
    fail_helper_locked(s, GRPC_ERROR_REF(error));
    goto done;
  }

  if (s->send_message_op && other) {
    if (other->recv_message_op) {
      message_transfer_locked(s, other);
      maybe_process_ops_locked(other, GRPC_ERROR_NONE);
    } else if (!s->t->is_client && s->trailing_md_sent) {
      // A server send will never be matched if the server already sent status
      s->send_message_op->payload->send_message.send_message.reset();
      complete_if_batch_end_locked(
          s, GRPC_ERROR_NONE, s->send_message_op,
          "op_state_machine scheduling send-message-on-complete");
      s->send_message_op = nullptr;
    }
  }
  // Pause a send trailing metadata if there is still an outstanding
  // send message unless we know that the send message will never get
  // matched to a receive. This happens on the client if the server has
  // already sent status or on the server if the client has requested
  // status
  if (s->send_trailing_md_op &&
      (!s->send_message_op ||
       (s->t->is_client &&
        (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
       (!s->t->is_client && other &&
        (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
         other->recv_trailing_md_op)))) {
    grpc_metadata_batch* dest = (other == nullptr)
                                    ? &s->write_buffer_trailing_md
                                    : &other->to_read_trailing_md;
    bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
                                          : &other->to_read_trailing_md_filled;
    if (*destfilled || s->trailing_md_sent) {
      // The buffer is already in use; that's an error!
      INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
      new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
      fail_helper_locked(s, GRPC_ERROR_REF(new_err));
      goto done;
    } else {
      if (!other || !other->closed) {
        fill_in_metadata(s,
                         s->send_trailing_md_op->payload->send_trailing_metadata
                             .send_trailing_metadata,
                         0, dest, nullptr, destfilled);
      }
      s->trailing_md_sent = true;
      if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
        INPROC_LOG(GPR_INFO,
                   "op_state_machine %p scheduling trailing-metadata-ready", s);
        grpc_core::ExecCtx::Run(
            DEBUG_LOCATION,
            s->recv_trailing_md_op->payload->recv_trailing_metadata
                .recv_trailing_metadata_ready,
            GRPC_ERROR_NONE);
        INPROC_LOG(GPR_INFO,
                   "op_state_machine %p scheduling trailing-md-on-complete", s);
        grpc_core::ExecCtx::Run(DEBUG_LOCATION,
                                s->recv_trailing_md_op->on_complete,
                                GRPC_ERROR_NONE);
        s->recv_trailing_md_op = nullptr;
        needs_close = true;
      }
    }
    maybe_process_ops_locked(other, GRPC_ERROR_NONE);
    complete_if_batch_end_locked(
        s, GRPC_ERROR_NONE, s->send_trailing_md_op,
        "op_state_machine scheduling send-trailing-metadata-on-complete");
    s->send_trailing_md_op = nullptr;
  }
  if (s->recv_initial_md_op) {
    if (s->initial_md_recvd) {
      new_err =
          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
      INPROC_LOG(
          GPR_INFO,
          "op_state_machine %p scheduling on_complete errors for already "
          "recvd initial md %p",
          s, new_err);
      fail_helper_locked(s, GRPC_ERROR_REF(new_err));
      goto done;
    }

    if (s->to_read_initial_md_filled) {
      s->initial_md_recvd = true;
      new_err = fill_in_metadata(
          s, &s->to_read_initial_md, s->to_read_initial_md_flags,
          s->recv_initial_md_op->payload->recv_initial_metadata
              .recv_initial_metadata,
          s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
          nullptr);
      s->recv_initial_md_op->payload->recv_initial_metadata
          .recv_initial_metadata->deadline = s->deadline;
      if (s->recv_initial_md_op->payload->recv_initial_metadata
              .trailing_metadata_available != nullptr) {
        *s->recv_initial_md_op->payload->recv_initial_metadata
             .trailing_metadata_available =
            (other != nullptr && other->send_trailing_md_op != nullptr);
      }
      grpc_metadata_batch_clear(&s->to_read_initial_md);
      s->to_read_initial_md_filled = false;
      INPROC_LOG(GPR_INFO,
                 "op_state_machine %p scheduling initial-metadata-ready %p", s,
                 new_err);
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION,
          s->recv_initial_md_op->payload->recv_initial_metadata
              .recv_initial_metadata_ready,
          GRPC_ERROR_REF(new_err));
      complete_if_batch_end_locked(
          s, new_err, s->recv_initial_md_op,
          "op_state_machine scheduling recv-initial-metadata-on-complete");
      s->recv_initial_md_op = nullptr;

      if (new_err != GRPC_ERROR_NONE) {
        INPROC_LOG(GPR_INFO,
                   "op_state_machine %p scheduling on_complete errors2 %p", s,
                   new_err);
        fail_helper_locked(s, GRPC_ERROR_REF(new_err));
        goto done;
      }
    }
  }
  if (s->recv_message_op) {
    if (other && other->send_message_op) {
      message_transfer_locked(other, s);
      maybe_process_ops_locked(other, GRPC_ERROR_NONE);
    }
  }
  if (s->to_read_trailing_md_filled) {
    if (s->trailing_md_recvd) {
      new_err =
          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
      INPROC_LOG(
          GPR_INFO,
          "op_state_machine %p scheduling on_complete errors for already "
          "recvd trailing md %p",
          s, new_err);
      fail_helper_locked(s, GRPC_ERROR_REF(new_err));
      goto done;
    }
    if (s->recv_message_op != nullptr) {
      // This message needs to be wrapped up because it will never be
      // satisfied
      *s->recv_message_op->payload->recv_message.recv_message = nullptr;
      INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION,
          s->recv_message_op->payload->recv_message.recv_message_ready,
          GRPC_ERROR_NONE);
      complete_if_batch_end_locked(
          s, new_err, s->recv_message_op,
          "op_state_machine scheduling recv-message-on-complete");
      s->recv_message_op = nullptr;
    }
    if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
      // Nothing further will try to receive from this stream, so finish off
      // any outstanding send_message op
      s->send_message_op->payload->send_message.send_message.reset();
      complete_if_batch_end_locked(
          s, new_err, s->send_message_op,
          "op_state_machine scheduling send-message-on-complete");
      s->send_message_op = nullptr;
    }
    if (s->recv_trailing_md_op != nullptr) {
      // We wanted trailing metadata and we got it
      s->trailing_md_recvd = true;
      new_err =
          fill_in_metadata(s, &s->to_read_trailing_md, 0,
                           s->recv_trailing_md_op->payload
                               ->recv_trailing_metadata.recv_trailing_metadata,
                           nullptr, nullptr);
      grpc_metadata_batch_clear(&s->to_read_trailing_md);
      s->to_read_trailing_md_filled = false;

      // We should schedule the recv_trailing_md_op completion if
      // 1. this stream is the client-side
      // 2. this stream is the server-side AND has already sent its trailing md
      //    (If the server hasn't already sent its trailing md, it doesn't have
      //     a final status, so don't mark this op complete)
      if (s->t->is_client || s->trailing_md_sent) {
        INPROC_LOG(GPR_INFO,
                   "op_state_machine %p scheduling trailing-md-on-complete %p",
                   s, new_err);
        grpc_core::ExecCtx::Run(
            DEBUG_LOCATION,
            s->recv_trailing_md_op->payload->recv_trailing_metadata
                .recv_trailing_metadata_ready,
            GRPC_ERROR_REF(new_err));
        grpc_core::ExecCtx::Run(DEBUG_LOCATION,
                                s->recv_trailing_md_op->on_complete,
                                GRPC_ERROR_REF(new_err));
        s->recv_trailing_md_op = nullptr;
        needs_close = s->trailing_md_sent;
      } else {
        INPROC_LOG(GPR_INFO,
                   "op_state_machine %p server needs to delay handling "
                   "trailing-md-on-complete %p",
                   s, new_err);
      }
    } else {
      INPROC_LOG(
          GPR_INFO,
          "op_state_machine %p has trailing md but not yet waiting for it", s);
    }
  }
  if (s->trailing_md_recvd && s->recv_message_op) {
    // No further message will come on this stream, so finish off the
    // recv_message_op
    INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
    *s->recv_message_op->payload->recv_message.recv_message = nullptr;
    grpc_core::ExecCtx::Run(
        DEBUG_LOCATION,
        s->recv_message_op->payload->recv_message.recv_message_ready,
        GRPC_ERROR_NONE);
    complete_if_batch_end_locked(
        s, new_err, s->recv_message_op,
        "op_state_machine scheduling recv-message-on-complete");
    s->recv_message_op = nullptr;
  }
  if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
      s->send_message_op) {
    // Nothing further will try to receive from this stream, so finish off
    // any outstanding send_message op
    s->send_message_op->payload->send_message.send_message.reset();
    complete_if_batch_end_locked(
        s, new_err, s->send_message_op,
        "op_state_machine scheduling send-message-on-complete");
    s->send_message_op = nullptr;
  }
  if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
      s->recv_message_op || s->recv_trailing_md_op) {
    // Didn't get the item we wanted so we still need to get
    // rescheduled
    INPROC_LOG(
        GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
        s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
        s->recv_message_op, s->recv_trailing_md_op);
    s->ops_needed = true;
  }
done:
  if (needs_close) {
    close_other_side_locked(s, "op_state_machine");
    close_stream_locked(s);
  }
  GRPC_ERROR_UNREF(new_err);
}