void perform_stream_op()

in src/core/ext/transport/inproc/inproc_transport.cc [914:1092]


void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
                       grpc_transport_stream_op_batch* op) {
  INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
  inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
  gpr_mu* mu = &s->t->mu->mu;  // save aside in case s gets closed
  gpr_mu_lock(mu);

  if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
    if (op->send_initial_metadata) {
      log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
                   s->t->is_client, true);
    }
    if (op->send_trailing_metadata) {
      log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
                   s->t->is_client, false);
    }
  }
  grpc_error* error = GRPC_ERROR_NONE;
  grpc_closure* on_complete = op->on_complete;
  // TODO(roth): This is a hack needed because we use data inside of the
  // closure itself to do the barrier calculation (i.e., to ensure that
  // we don't schedule the closure until all ops in the batch have been
  // completed).  This can go away once we move to a new C++ closure API
  // that provides the ability to create a barrier closure.
  if (on_complete == nullptr) {
    on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
                                    nullptr, grpc_schedule_on_exec_ctx);
  }

  if (op->cancel_stream) {
    // Call cancel_stream_locked without ref'ing the cancel_error because
    // this function is responsible to make sure that that field gets unref'ed
    cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
    // this op can complete without an error
  } else if (s->cancel_self_error != GRPC_ERROR_NONE) {
    // already self-canceled so still give it an error
    error = GRPC_ERROR_REF(s->cancel_self_error);
  } else {
    INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
               s->t->is_client ? "client" : "server",
               op->send_initial_metadata ? " send_initial_metadata" : "",
               op->send_message ? " send_message" : "",
               op->send_trailing_metadata ? " send_trailing_metadata" : "",
               op->recv_initial_metadata ? " recv_initial_metadata" : "",
               op->recv_message ? " recv_message" : "",
               op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
  }

  inproc_stream* other = s->other_side;
  if (error == GRPC_ERROR_NONE &&
      (op->send_initial_metadata || op->send_trailing_metadata)) {
    if (s->t->is_closed) {
      error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
    }
    if (error == GRPC_ERROR_NONE && op->send_initial_metadata) {
      grpc_metadata_batch* dest = (other == nullptr)
                                      ? &s->write_buffer_initial_md
                                      : &other->to_read_initial_md;
      uint32_t* destflags = (other == nullptr)
                                ? &s->write_buffer_initial_md_flags
                                : &other->to_read_initial_md_flags;
      bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
                                            : &other->to_read_initial_md_filled;
      if (*destfilled || s->initial_md_sent) {
        // The buffer is already in use; that's an error!
        INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
        error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
      } else {
        if (!s->other_side_closed) {
          fill_in_metadata(
              s, op->payload->send_initial_metadata.send_initial_metadata,
              op->payload->send_initial_metadata.send_initial_metadata_flags,
              dest, destflags, destfilled);
        }
        if (s->t->is_client) {
          grpc_millis* dl =
              (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
          *dl = GPR_MIN(*dl, op->payload->send_initial_metadata
                                 .send_initial_metadata->deadline);
          s->initial_md_sent = true;
        }
      }
      maybe_process_ops_locked(other, error);
    }
  }

  if (error == GRPC_ERROR_NONE &&
      (op->send_message || op->send_trailing_metadata ||
       op->recv_initial_metadata || op->recv_message ||
       op->recv_trailing_metadata)) {
    // Mark ops that need to be processed by the state machine
    if (op->send_message) {
      s->send_message_op = op;
    }
    if (op->send_trailing_metadata) {
      s->send_trailing_md_op = op;
    }
    if (op->recv_initial_metadata) {
      s->recv_initial_md_op = op;
    }
    if (op->recv_message) {
      s->recv_message_op = op;
    }
    if (op->recv_trailing_metadata) {
      s->recv_trailing_md_op = op;
    }

    // We want to initiate the state machine if:
    // 1. We want to send a message and the other side wants to receive
    // 2. We want to send trailing metadata and there isn't an unmatched send
    //    or the other side wants trailing metadata
    // 3. We want initial metadata and the other side has sent it
    // 4. We want to receive a message and there is a message ready
    // 5. There is trailing metadata, even if nothing specifically wants
    //    that because that can shut down the receive message as well
    if ((op->send_message && other && other->recv_message_op != nullptr) ||
        (op->send_trailing_metadata &&
         (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
        (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
        (op->recv_message && other && other->send_message_op != nullptr) ||
        (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
      op_state_machine_locked(s, error);
    } else {
      s->ops_needed = true;
    }
  } else {
    if (error != GRPC_ERROR_NONE) {
      // Consume any send message that was sent here but that we are not pushing
      // to the other side
      if (op->send_message) {
        op->payload->send_message.send_message.reset();
      }
      // Schedule op's closures that we didn't push to op state machine
      if (op->recv_initial_metadata) {
        if (op->payload->recv_initial_metadata.trailing_metadata_available !=
            nullptr) {
          // Set to true unconditionally, because we're failing the call, so
          // even if we haven't actually seen the send_trailing_metadata op
          // from the other side, we're going to return trailing metadata
          // anyway.
          *op->payload->recv_initial_metadata.trailing_metadata_available =
              true;
        }
        INPROC_LOG(
            GPR_INFO,
            "perform_stream_op error %p scheduling initial-metadata-ready %p",
            s, error);
        grpc_core::ExecCtx::Run(
            DEBUG_LOCATION,
            op->payload->recv_initial_metadata.recv_initial_metadata_ready,
            GRPC_ERROR_REF(error));
      }
      if (op->recv_message) {
        INPROC_LOG(
            GPR_INFO,
            "perform_stream_op error %p scheduling recv message-ready %p", s,
            error);
        grpc_core::ExecCtx::Run(DEBUG_LOCATION,
                                op->payload->recv_message.recv_message_ready,
                                GRPC_ERROR_REF(error));
      }
      if (op->recv_trailing_metadata) {
        INPROC_LOG(
            GPR_INFO,
            "perform_stream_op error %p scheduling trailing-metadata-ready %p",
            s, error);
        grpc_core::ExecCtx::Run(
            DEBUG_LOCATION,
            op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
            GRPC_ERROR_REF(error));
      }
    }
    INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
               error);
    grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error));
  }
  gpr_mu_unlock(mu);
  GRPC_ERROR_UNREF(error);
}