void PaxOrcEncoder::AppendInternal()

in contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.cc [287:709]


void PaxOrcEncoder::AppendInternal(const int64 data, bool is_flush) {
  bool already_append_before_delta_changed = false;
  bool keep_push_status = true;

  if (is_flush) {
    SwitchStatusTo(kFlush);
  }

  while (keep_push_status) {
    keep_push_status = false;

    switch (status_) {
      case EncoderStatus::kInvalid: {
        Assert(false);
        break;
      }
      case EncoderStatus::kInit: {
        AppendData(data);
        encoder_context_.fixed_len = 1;
        encoder_context_.var_len = 1;
        SwitchStatusTo(kTwoElements);
        break;
      }
      case EncoderStatus::kTwoElements: {
        encoder_context_.prev_delta =
            data - ((*data_buffer_)[data_buffer_->GetSize() - 1]);

        AppendData(data);

        if (encoder_context_.prev_delta == 0) {
          encoder_context_.fixed_len = 2;
          encoder_context_.var_len = 0;
        } else {
          encoder_context_.fixed_len = 0;
          encoder_context_.var_len = 2;
        }

        SwitchStatusTo(kUntreated);
        break;
      }
      case kUntreated: {
        if (data_buffer_->UnTreated() == 0) {
          keep_push_status = true;
          SwitchStatusTo(kInit);
          break;
        }

        encoder_context_.current_delta =
            data - ((*data_buffer_)[data_buffer_->GetSize() - 1]);

        // Temporary data is duplicated(minimum of 3 repetitions, eq.
        // ORC_MIN_REPEAT) But it is not confirmed whether it is already
        // duplicated at the beginning
        if (encoder_context_.current_delta == encoder_context_.prev_delta &&
            encoder_context_.current_delta == 0) {
          AppendData(data);
          if (encoder_context_.var_len > 0) {
            // If variable run is non-zero then we are seeing repeating
            // values at the end of variable run in which case fixed Run
            // length is 2
            encoder_context_.fixed_len = 2;
          }
          encoder_context_.fixed_len++;

          if (encoder_context_.fixed_len >= ORC_MIN_REPEAT &&
              encoder_context_.var_len > 0) {
            // Ok, got at lease 3 repetitions
            // Encode and flush the non-repeating data
            keep_push_status = true;
            SwitchStatusTo(kTreatPrevBuffer);
            break;
          }

          // The data becomes repeated at the beginning
          if (encoder_context_.fixed_len == ORC_MAX_LITERAL_SIZE) {
            encoder_context_.delta_ctx->is_fixed_delta = true;

            keep_push_status = true;
            SwitchStatusTo(kTreatDelta);
            break;
          }

          /// no need switch to next type
          break;
        }

        // Below is encoder_context_.current_delta !=
        // encoder_context_.prev_delta != 0 That can happen in two ways:
        // 1. Repeating consecutive sequences become non-repeating
        // 2. Temporary data is not repeated at the beginning

        if (encoder_context_.fixed_len >= ORC_MIN_REPEAT) {
          keep_push_status = true;
          SwitchStatusTo(kUntreatedDiscontinuous);
          break;
        }

        // Repetitions data numbers can less than 3 here
        // ex. 0 0 1, then current_delta != 0. and data will transfer to
        // non-repeating data
        if (encoder_context_.fixed_len > 0 &&
            encoder_context_.fixed_len < ORC_MIN_REPEAT &&
            encoder_context_.current_delta != 0) {
          encoder_context_.var_len = encoder_context_.fixed_len;
          encoder_context_.fixed_len = 0;
        }

        // No data remain
        if (data_buffer_->UnTreated() == 0) {
          keep_push_status = true;
          SwitchStatusTo(kInit);
          break;
        }

        // non-repeating data
        encoder_context_.prev_delta = encoder_context_.current_delta;
        AppendData(data);
        encoder_context_.var_len++;

        if (encoder_context_.var_len == ORC_MAX_LITERAL_SIZE) {
          keep_push_status = true;
          SwitchStatusTo(kDetermineFlushPrevBuffer);
          break;
        }

        /// no need switch to next type
        break;
      }
      case EncoderStatus::kUntreatedDiscontinuous: {
        already_append_before_delta_changed = true;
        keep_push_status = true;

        if (encoder_context_.fixed_len <= ORC_MAX_SHORT_REPEAT_LENGTH) {
          SwitchStatusTo(kTreatShortRepeat);
        } else {
          encoder_context_.delta_ctx->is_fixed_delta = true;
          SwitchStatusTo(kTreatDelta);
        }
        break;
      }
      case EncoderStatus::kTreatPrevBuffer: {
        Assert(data_buffer_->UnTreated());
        Assert(data_buffer_->UnTouched() == 0);

        // fixed_len must equal to ORC_MIN_REPEAT
        // the status kTreatPrevBuffer will only occur when
        // non-repeating elements change to ORC_MIN_REPEAT repeating elements
        // means there must be non-repeating elements before repeating elements
        Assert(encoder_context_.fixed_len == ORC_MIN_REPEAT);

        // will shift tail fixed in kTreatDone
        data_buffer_->BrushBackUnTreated(sizeof(int64) * ORC_MIN_REPEAT);
        encoder_context_.var_len -= (ORC_MIN_REPEAT - 1);

        keep_push_status = true;
        SwitchStatusTo(kDetermineFlushPrevBuffer);
        break;
      }
      case EncoderStatus::kFlush:
        if (data_buffer_->Used() == 0) {
          keep_push_status = true;
          SwitchStatusTo(kFinish);
          break;
        }

        Assert(data_buffer_->UnTreated() == data_buffer_->Used());

        // must check delta & short repeat can treat or not.
        if (encoder_context_.fixed_len != 0) {
          if (encoder_context_.fixed_len < ORC_MIN_REPEAT) {
            encoder_context_.var_len = encoder_context_.fixed_len;
            encoder_context_.fixed_len = 0;
          } else if (  // encoder_context_.fixed_len >= ORC_MIN_REPEAT &&
              encoder_context_.fixed_len <= ORC_MAX_SHORT_REPEAT_LENGTH) {
            keep_push_status = true;
            SwitchStatusTo(kTreatShortRepeat);
            break;
          } else {  // encoder_context_.fixed_len > ORC_MAX_SHORT_REPEAT_LENGTH
            keep_push_status = true;
            encoder_context_.delta_ctx->is_fixed_delta = true;
            SwitchStatusTo(kTreatDelta);
            break;
          }
        }

        keep_push_status = true;
        SwitchStatusTo(kDetermineFlushPrevBuffer);
        break;
      case EncoderStatus::kDetermineFlushPrevBuffer: {
        size_t data_lens = data_buffer_->UnTreated() / sizeof(int64);

        if (data_lens <= ORC_MIN_REPEAT) {
          keep_push_status = true;
          SwitchStatusTo(kTreatDirect);
          break;
        }

        bool increasing = true;
        bool decreasing = true;
        auto delta_ctx = encoder_context_.delta_ctx;
        auto direct_ctx = encoder_context_.direct_ctx;
        auto pb_ctx = encoder_context_.pb_ctx;
        int64 init_delta_val = (*data_buffer_)[1] - (*data_buffer_)[0];
        int64 curr_delta = 0;
        int64 max_delta = 0;
        uint32 zigzag_bits_90_p = 0;

        delta_ctx->is_fixed_delta = true;
        pb_ctx->min = (*data_buffer_)[0];
        pb_ctx->max = (*data_buffer_)[0];

        delta_ctx->adj_deltas[delta_ctx->adj_deltas_idx++] = init_delta_val;

        for (size_t i = 1; i < data_lens; i++) {
          const int64 l1 = (*data_buffer_)[i];
          const int64 l0 = (*data_buffer_)[i - 1];
          curr_delta = l1 - l0;
          pb_ctx->min = std::min(pb_ctx->min, l1);
          pb_ctx->max = std::max(pb_ctx->max, l1);

          increasing &= (l0 <= l1);
          decreasing &= (l0 >= l1);

          delta_ctx->is_fixed_delta &= (curr_delta == init_delta_val);
          if (i > 1) {
            delta_ctx->adj_deltas[delta_ctx->adj_deltas_idx++] =
                std::abs(curr_delta);
            max_delta = std::max(max_delta, delta_ctx->adj_deltas[i - 1]);
          }
        }

        // it's faster to exit under delta overflow condition without checking
        // for PATCHED_BASE condition as encoding using DIRECT is faster and has
        // less overhead than PATCHED_BASE
        auto is_safe_subtract = [](int64 left, int64 right) -> bool {
          return ((left ^ right) >= 0) || ((left ^ (left - right)) >= 0);
        };

        if (!is_safe_subtract(pb_ctx->max, pb_ctx->min)) {
          keep_push_status = true;
          SwitchStatusTo(kTreatDirect);
          break;
        }

        // invariant - subtracting any number from any other in the literals
        // after option point won't overflow

        // if min is equal to max then the delta is 0, option condition happens
        // for fixed values run >10 which cannot be encoded with SHORT_REPEAT
        if (pb_ctx->min == pb_ctx->max) {
          Assert(delta_ctx->is_fixed_delta);
          Assert(!curr_delta);

          delta_ctx->fixed_delta_val = 0;

          keep_push_status = true;
          SwitchStatusTo(kTreatDelta);
          break;
        }

        if (delta_ctx->is_fixed_delta) {
          Assert(curr_delta == init_delta_val);
          delta_ctx->fixed_delta_val = curr_delta;

          keep_push_status = true;
          SwitchStatusTo(kTreatDelta);
          break;
        }

        if (init_delta_val != 0) {
          delta_ctx->bits_delta_max = FindClosestBits(max_delta);

          // monotonic condition
          if (increasing || decreasing) {
            keep_push_status = true;
            SwitchStatusTo(kTreatDelta);
            break;
          }
        }

        // Without flush as delta then reset delta ctx
        encoder_context_.ResetDeltaCtx();

        pb_ctx->histogram_len = data_lens;
        if (encoder_context_.is_sign) {
          zigzag_buffer_->BrushBackAll();
          if (zigzag_buffer_->Capacity() < data_lens * sizeof(int64)) {
            zigzag_buffer_->ReSize(data_lens * sizeof(int64));
          }
          ZigZagBuffers(data_buffer_->StartT(), zigzag_buffer_->StartT(),
                        data_lens);
          zigzag_buffer_->Brush(data_lens * sizeof(int64));
        }

        // PATCHED_BASE encoding check

        // percentile values are computed for the zigzag encoded values. if the
        // number of bit requirement between 90th and 100th percentile varies
        // beyond a threshold then we need to patch the values. if the variation
        // is not significant then we can use direct encoding

        BuildHistogram(pb_ctx->histogram,
                       encoder_context_.is_sign ? zigzag_buffer_->StartT()
                                                : data_buffer_->StartT(),
                       data_lens);

        direct_ctx->zz_bits_100_p_inited = true;
        direct_ctx->zigzag_bits_100_p =
            GetPercentileBits(pb_ctx->histogram, pb_ctx->histogram_len, 1.0);
        zigzag_bits_90_p =
            GetPercentileBits(pb_ctx->histogram, pb_ctx->histogram_len, 0.9);

        // if the difference between 90th percentile and 100th percentile fixed
        // bits is > 1 then we need patch the values
        if (direct_ctx->zigzag_bits_100_p - zigzag_bits_90_p > 1) {
          for (size_t i = 0; i < data_lens; i++) {
            pb_ctx->base_patch_buffer[pb_ctx->base_patch_buffer_count++] =
                ((*data_buffer_)[i] - pb_ctx->min);
          }

          pb_ctx->histogram_len = data_lens;
          // rebuild histogram with literals[*] - min
          BuildHistogram(pb_ctx->histogram, pb_ctx->base_patch_buffer,
                         data_lens);

          // 95th percentile width is used to determine max allowed value
          // after which patching will be done
          pb_ctx->hist_bits_95_p =
              GetPercentileBits(pb_ctx->histogram, pb_ctx->histogram_len, 0.95);

          // 100th percentile is used to compute the max patch width
          pb_ctx->hist_bits_100_p =
              GetPercentileBits(pb_ctx->histogram, pb_ctx->histogram_len, 1.0);

          // after base reducing the values, if the difference in bits between
          // 95th percentile and 100th percentile value is zero then there
          // is no point in patching the values, in which case we will
          // fallback to DIRECT encoding.
          // The decision to use patched base was based on zigzag values, but
          // the actual patching is done on base reduced literals.
          if ((pb_ctx->hist_bits_100_p - pb_ctx->hist_bits_95_p) != 0) {
            keep_push_status = true;
            SwitchStatusTo(kTreatPatchedBase);
            break;
          }
        }

        encoder_context_.ResetPbCtx();

        keep_push_status = true;
        SwitchStatusTo(kTreatDirect);
        break;
      }
      case EncoderStatus::kTreatShortRepeat: {
        TreatShortRepeat();
        encoder_context_.fixed_len = 0;

        keep_push_status = true;
        SwitchStatusTo(kTreatDone);
        break;
      }
      case EncoderStatus::kTreatDirect: {
        TreatDirect();
        encoder_context_.var_len = 0;

        keep_push_status = true;
        SwitchStatusTo(kTreatDone);
        break;
      }
      case EncoderStatus::kTreatPatchedBase: {
        TreatPatchedBase();
        encoder_context_.var_len = 0;

        keep_push_status = true;
        SwitchStatusTo(kTreatDone);
        break;
      }
      case EncoderStatus::kTreatDelta: {
        bool reset_fix = TreatDelta();
        if (reset_fix) {
          encoder_context_.fixed_len = 0;
        } else {
          encoder_context_.var_len = 0;
        }

        keep_push_status = true;
        SwitchStatusTo(kTreatDone);

        break;
      }
      case EncoderStatus::kTreatDone: {
        Assert(data_buffer_->UnTreated() != 0);
        encoder_context_.ResetDeltaCtx();
        encoder_context_.ResetDirectCtx();
        encoder_context_.ResetPbCtx();

        // left shift
        data_buffer_->TreatedAll();
        data_buffer_->BrushUnTreatedAll();

        if (is_flush) {
          keep_push_status = true;
          SwitchStatusTo(
              (encoder_context_.fixed_len == 0 && encoder_context_.var_len == 0)
                  ? kFinish
                  : kFlush);
        } else {
          keep_push_status = already_append_before_delta_changed;
          SwitchStatusTo(kUntreated);
          already_append_before_delta_changed = false;
        }

        break;
      }
      case EncoderStatus::kFinish:
        keep_push_status = false;
        SwitchStatusTo(kInvalid);
        break;
      default:
        break;
    }
  }
}