void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked()

in src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc [985:1149]


void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
    void* arg, grpc_error* /*error*/) {
  BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
  GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
  // Null payload means the LB call was cancelled.
  if (lb_calld != grpclb_policy->lb_calld_.get() ||
      lb_calld->recv_message_payload_ == nullptr) {
    lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
    return;
  }
  grpc_byte_buffer_reader bbr;
  grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
  grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
  grpc_byte_buffer_reader_destroy(&bbr);
  grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
  lb_calld->recv_message_payload_ = nullptr;
  GrpcLbResponse response;
  upb::Arena arena;
  if (!GrpcLbResponseParse(response_slice, arena.ptr(), &response) ||
      (response.type == response.INITIAL && lb_calld->seen_initial_response_)) {
    char* response_slice_str =
        grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
    gpr_log(GPR_ERROR,
            "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
            "Ignoring.",
            grpclb_policy, lb_calld, response_slice_str);
    gpr_free(response_slice_str);
  } else {
    switch (response.type) {
      case response.INITIAL: {
        if (response.client_stats_report_interval != 0) {
          lb_calld->client_stats_report_interval_ =
              GPR_MAX(GPR_MS_PER_SEC, response.client_stats_report_interval);
          if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
            gpr_log(GPR_INFO,
                    "[grpclb %p] lb_calld=%p: Received initial LB response "
                    "message; client load reporting interval = %" PRId64
                    " milliseconds",
                    grpclb_policy, lb_calld,
                    lb_calld->client_stats_report_interval_);
          }
        } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
          gpr_log(GPR_INFO,
                  "[grpclb %p] lb_calld=%p: Received initial LB response "
                  "message; client load reporting NOT enabled",
                  grpclb_policy, lb_calld);
        }
        lb_calld->seen_initial_response_ = true;
        break;
      }
      case response.SERVERLIST: {
        GPR_ASSERT(lb_calld->lb_call_ != nullptr);
        auto serverlist_wrapper =
            MakeRefCounted<Serverlist>(std::move(response.serverlist));
        if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
          grpc_core::UniquePtr<char> serverlist_text =
              serverlist_wrapper->AsText();
          gpr_log(GPR_INFO,
                  "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
                  " servers received:\n%s",
                  grpclb_policy, lb_calld,
                  serverlist_wrapper->serverlist().size(),
                  serverlist_text.get());
        }
        lb_calld->seen_serverlist_ = true;
        // Start sending client load report only after we start using the
        // serverlist returned from the current LB call.
        if (lb_calld->client_stats_report_interval_ > 0 &&
            lb_calld->client_stats_ == nullptr) {
          lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
          // Ref held by callback.
          lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
          lb_calld->ScheduleNextClientLoadReportLocked();
        }
        // Check if the serverlist differs from the previous one.
        if (grpclb_policy->serverlist_ != nullptr &&
            *grpclb_policy->serverlist_ == *serverlist_wrapper) {
          if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
            gpr_log(GPR_INFO,
                    "[grpclb %p] lb_calld=%p: Incoming server list identical "
                    "to current, ignoring.",
                    grpclb_policy, lb_calld);
          }
        } else {  // New serverlist.
          // Dispose of the fallback.
          // TODO(roth): Ideally, we should stay in fallback mode until we
          // know that we can reach at least one of the backends in the new
          // serverlist.  Unfortunately, we can't do that, since we need to
          // send the new addresses to the child policy in order to determine
          // if they are reachable, and if we don't exit fallback mode now,
          // CreateOrUpdateChildPolicyLocked() will use the fallback
          // addresses instead of the addresses from the new serverlist.
          // However, if we can't reach any of the servers in the new
          // serverlist, then the child policy will never switch away from
          // the fallback addresses, but the grpclb policy will still think
          // that we're not in fallback mode, which means that we won't send
          // updates to the child policy when the fallback addresses are
          // updated by the resolver.  This is sub-optimal, but the only way
          // to fix it is to maintain a completely separate child policy for
          // fallback mode, and that's more work than we want to put into
          // the grpclb implementation at this point, since we're deprecating
          // it in favor of the xds policy.  We will implement this the
          // right way in the xds policy instead.
          if (grpclb_policy->fallback_mode_) {
            gpr_log(GPR_INFO,
                    "[grpclb %p] Received response from balancer; exiting "
                    "fallback mode",
                    grpclb_policy);
            grpclb_policy->fallback_mode_ = false;
          }
          if (grpclb_policy->fallback_at_startup_checks_pending_) {
            grpclb_policy->fallback_at_startup_checks_pending_ = false;
            grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
            grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
          }
          // Update the serverlist in the GrpcLb instance. This serverlist
          // instance will be destroyed either upon the next update or when the
          // GrpcLb instance is destroyed.
          grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
          grpclb_policy->CreateOrUpdateChildPolicyLocked();
        }
        break;
      }
      case response.FALLBACK: {
        if (!grpclb_policy->fallback_mode_) {
          gpr_log(GPR_INFO,
                  "[grpclb %p] Entering fallback mode as requested by balancer",
                  grpclb_policy);
          if (grpclb_policy->fallback_at_startup_checks_pending_) {
            grpclb_policy->fallback_at_startup_checks_pending_ = false;
            grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
            grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
          }
          grpclb_policy->fallback_mode_ = true;
          grpclb_policy->CreateOrUpdateChildPolicyLocked();
          // Reset serverlist, so that if the balancer exits fallback
          // mode by sending the same serverlist we were previously
          // using, we don't incorrectly ignore it as a duplicate.
          grpclb_policy->serverlist_.reset();
        }
        break;
      }
    }
  }
  grpc_slice_unref_internal(response_slice);
  if (!grpclb_policy->shutting_down_) {
    // Keep listening for serverlist updates.
    grpc_op op;
    memset(&op, 0, sizeof(op));
    op.op = GRPC_OP_RECV_MESSAGE;
    op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
    op.flags = 0;
    op.reserved = nullptr;
    // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
    GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_message_received_,
                      GrpcLb::BalancerCallState::OnBalancerMessageReceived,
                      lb_calld, grpc_schedule_on_exec_ctx);
    const grpc_call_error call_error = grpc_call_start_batch_and_execute(
        lb_calld->lb_call_, &op, 1,
        &lb_calld->lb_on_balancer_message_received_);
    GPR_ASSERT(GRPC_CALL_OK == call_error);
  } else {
    lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
  }
}