in contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.cc [287:709]
void PaxOrcEncoder::AppendInternal(const int64 data, bool is_flush) {
bool already_append_before_delta_changed = false;
bool keep_push_status = true;
if (is_flush) {
SwitchStatusTo(kFlush);
}
while (keep_push_status) {
keep_push_status = false;
switch (status_) {
case EncoderStatus::kInvalid: {
Assert(false);
break;
}
case EncoderStatus::kInit: {
AppendData(data);
encoder_context_.fixed_len = 1;
encoder_context_.var_len = 1;
SwitchStatusTo(kTwoElements);
break;
}
case EncoderStatus::kTwoElements: {
encoder_context_.prev_delta =
data - ((*data_buffer_)[data_buffer_->GetSize() - 1]);
AppendData(data);
if (encoder_context_.prev_delta == 0) {
encoder_context_.fixed_len = 2;
encoder_context_.var_len = 0;
} else {
encoder_context_.fixed_len = 0;
encoder_context_.var_len = 2;
}
SwitchStatusTo(kUntreated);
break;
}
case kUntreated: {
if (data_buffer_->UnTreated() == 0) {
keep_push_status = true;
SwitchStatusTo(kInit);
break;
}
encoder_context_.current_delta =
data - ((*data_buffer_)[data_buffer_->GetSize() - 1]);
// Temporary data is duplicated(minimum of 3 repetitions, eq.
// ORC_MIN_REPEAT) But it is not confirmed whether it is already
// duplicated at the beginning
if (encoder_context_.current_delta == encoder_context_.prev_delta &&
encoder_context_.current_delta == 0) {
AppendData(data);
if (encoder_context_.var_len > 0) {
// If variable run is non-zero then we are seeing repeating
// values at the end of variable run in which case fixed Run
// length is 2
encoder_context_.fixed_len = 2;
}
encoder_context_.fixed_len++;
if (encoder_context_.fixed_len >= ORC_MIN_REPEAT &&
encoder_context_.var_len > 0) {
// Ok, got at lease 3 repetitions
// Encode and flush the non-repeating data
keep_push_status = true;
SwitchStatusTo(kTreatPrevBuffer);
break;
}
// The data becomes repeated at the beginning
if (encoder_context_.fixed_len == ORC_MAX_LITERAL_SIZE) {
encoder_context_.delta_ctx->is_fixed_delta = true;
keep_push_status = true;
SwitchStatusTo(kTreatDelta);
break;
}
/// no need switch to next type
break;
}
// Below is encoder_context_.current_delta !=
// encoder_context_.prev_delta != 0 That can happen in two ways:
// 1. Repeating consecutive sequences become non-repeating
// 2. Temporary data is not repeated at the beginning
if (encoder_context_.fixed_len >= ORC_MIN_REPEAT) {
keep_push_status = true;
SwitchStatusTo(kUntreatedDiscontinuous);
break;
}
// Repetitions data numbers can less than 3 here
// ex. 0 0 1, then current_delta != 0. and data will transfer to
// non-repeating data
if (encoder_context_.fixed_len > 0 &&
encoder_context_.fixed_len < ORC_MIN_REPEAT &&
encoder_context_.current_delta != 0) {
encoder_context_.var_len = encoder_context_.fixed_len;
encoder_context_.fixed_len = 0;
}
// No data remain
if (data_buffer_->UnTreated() == 0) {
keep_push_status = true;
SwitchStatusTo(kInit);
break;
}
// non-repeating data
encoder_context_.prev_delta = encoder_context_.current_delta;
AppendData(data);
encoder_context_.var_len++;
if (encoder_context_.var_len == ORC_MAX_LITERAL_SIZE) {
keep_push_status = true;
SwitchStatusTo(kDetermineFlushPrevBuffer);
break;
}
/// no need switch to next type
break;
}
case EncoderStatus::kUntreatedDiscontinuous: {
already_append_before_delta_changed = true;
keep_push_status = true;
if (encoder_context_.fixed_len <= ORC_MAX_SHORT_REPEAT_LENGTH) {
SwitchStatusTo(kTreatShortRepeat);
} else {
encoder_context_.delta_ctx->is_fixed_delta = true;
SwitchStatusTo(kTreatDelta);
}
break;
}
case EncoderStatus::kTreatPrevBuffer: {
Assert(data_buffer_->UnTreated());
Assert(data_buffer_->UnTouched() == 0);
// fixed_len must equal to ORC_MIN_REPEAT
// the status kTreatPrevBuffer will only occur when
// non-repeating elements change to ORC_MIN_REPEAT repeating elements
// means there must be non-repeating elements before repeating elements
Assert(encoder_context_.fixed_len == ORC_MIN_REPEAT);
// will shift tail fixed in kTreatDone
data_buffer_->BrushBackUnTreated(sizeof(int64) * ORC_MIN_REPEAT);
encoder_context_.var_len -= (ORC_MIN_REPEAT - 1);
keep_push_status = true;
SwitchStatusTo(kDetermineFlushPrevBuffer);
break;
}
case EncoderStatus::kFlush:
if (data_buffer_->Used() == 0) {
keep_push_status = true;
SwitchStatusTo(kFinish);
break;
}
Assert(data_buffer_->UnTreated() == data_buffer_->Used());
// must check delta & short repeat can treat or not.
if (encoder_context_.fixed_len != 0) {
if (encoder_context_.fixed_len < ORC_MIN_REPEAT) {
encoder_context_.var_len = encoder_context_.fixed_len;
encoder_context_.fixed_len = 0;
} else if ( // encoder_context_.fixed_len >= ORC_MIN_REPEAT &&
encoder_context_.fixed_len <= ORC_MAX_SHORT_REPEAT_LENGTH) {
keep_push_status = true;
SwitchStatusTo(kTreatShortRepeat);
break;
} else { // encoder_context_.fixed_len > ORC_MAX_SHORT_REPEAT_LENGTH
keep_push_status = true;
encoder_context_.delta_ctx->is_fixed_delta = true;
SwitchStatusTo(kTreatDelta);
break;
}
}
keep_push_status = true;
SwitchStatusTo(kDetermineFlushPrevBuffer);
break;
case EncoderStatus::kDetermineFlushPrevBuffer: {
size_t data_lens = data_buffer_->UnTreated() / sizeof(int64);
if (data_lens <= ORC_MIN_REPEAT) {
keep_push_status = true;
SwitchStatusTo(kTreatDirect);
break;
}
bool increasing = true;
bool decreasing = true;
auto delta_ctx = encoder_context_.delta_ctx;
auto direct_ctx = encoder_context_.direct_ctx;
auto pb_ctx = encoder_context_.pb_ctx;
int64 init_delta_val = (*data_buffer_)[1] - (*data_buffer_)[0];
int64 curr_delta = 0;
int64 max_delta = 0;
uint32 zigzag_bits_90_p = 0;
delta_ctx->is_fixed_delta = true;
pb_ctx->min = (*data_buffer_)[0];
pb_ctx->max = (*data_buffer_)[0];
delta_ctx->adj_deltas[delta_ctx->adj_deltas_idx++] = init_delta_val;
for (size_t i = 1; i < data_lens; i++) {
const int64 l1 = (*data_buffer_)[i];
const int64 l0 = (*data_buffer_)[i - 1];
curr_delta = l1 - l0;
pb_ctx->min = std::min(pb_ctx->min, l1);
pb_ctx->max = std::max(pb_ctx->max, l1);
increasing &= (l0 <= l1);
decreasing &= (l0 >= l1);
delta_ctx->is_fixed_delta &= (curr_delta == init_delta_val);
if (i > 1) {
delta_ctx->adj_deltas[delta_ctx->adj_deltas_idx++] =
std::abs(curr_delta);
max_delta = std::max(max_delta, delta_ctx->adj_deltas[i - 1]);
}
}
// it's faster to exit under delta overflow condition without checking
// for PATCHED_BASE condition as encoding using DIRECT is faster and has
// less overhead than PATCHED_BASE
auto is_safe_subtract = [](int64 left, int64 right) -> bool {
return ((left ^ right) >= 0) || ((left ^ (left - right)) >= 0);
};
if (!is_safe_subtract(pb_ctx->max, pb_ctx->min)) {
keep_push_status = true;
SwitchStatusTo(kTreatDirect);
break;
}
// invariant - subtracting any number from any other in the literals
// after option point won't overflow
// if min is equal to max then the delta is 0, option condition happens
// for fixed values run >10 which cannot be encoded with SHORT_REPEAT
if (pb_ctx->min == pb_ctx->max) {
Assert(delta_ctx->is_fixed_delta);
Assert(!curr_delta);
delta_ctx->fixed_delta_val = 0;
keep_push_status = true;
SwitchStatusTo(kTreatDelta);
break;
}
if (delta_ctx->is_fixed_delta) {
Assert(curr_delta == init_delta_val);
delta_ctx->fixed_delta_val = curr_delta;
keep_push_status = true;
SwitchStatusTo(kTreatDelta);
break;
}
if (init_delta_val != 0) {
delta_ctx->bits_delta_max = FindClosestBits(max_delta);
// monotonic condition
if (increasing || decreasing) {
keep_push_status = true;
SwitchStatusTo(kTreatDelta);
break;
}
}
// Without flush as delta then reset delta ctx
encoder_context_.ResetDeltaCtx();
pb_ctx->histogram_len = data_lens;
if (encoder_context_.is_sign) {
zigzag_buffer_->BrushBackAll();
if (zigzag_buffer_->Capacity() < data_lens * sizeof(int64)) {
zigzag_buffer_->ReSize(data_lens * sizeof(int64));
}
ZigZagBuffers(data_buffer_->StartT(), zigzag_buffer_->StartT(),
data_lens);
zigzag_buffer_->Brush(data_lens * sizeof(int64));
}
// PATCHED_BASE encoding check
// percentile values are computed for the zigzag encoded values. if the
// number of bit requirement between 90th and 100th percentile varies
// beyond a threshold then we need to patch the values. if the variation
// is not significant then we can use direct encoding
BuildHistogram(pb_ctx->histogram,
encoder_context_.is_sign ? zigzag_buffer_->StartT()
: data_buffer_->StartT(),
data_lens);
direct_ctx->zz_bits_100_p_inited = true;
direct_ctx->zigzag_bits_100_p =
GetPercentileBits(pb_ctx->histogram, pb_ctx->histogram_len, 1.0);
zigzag_bits_90_p =
GetPercentileBits(pb_ctx->histogram, pb_ctx->histogram_len, 0.9);
// if the difference between 90th percentile and 100th percentile fixed
// bits is > 1 then we need patch the values
if (direct_ctx->zigzag_bits_100_p - zigzag_bits_90_p > 1) {
for (size_t i = 0; i < data_lens; i++) {
pb_ctx->base_patch_buffer[pb_ctx->base_patch_buffer_count++] =
((*data_buffer_)[i] - pb_ctx->min);
}
pb_ctx->histogram_len = data_lens;
// rebuild histogram with literals[*] - min
BuildHistogram(pb_ctx->histogram, pb_ctx->base_patch_buffer,
data_lens);
// 95th percentile width is used to determine max allowed value
// after which patching will be done
pb_ctx->hist_bits_95_p =
GetPercentileBits(pb_ctx->histogram, pb_ctx->histogram_len, 0.95);
// 100th percentile is used to compute the max patch width
pb_ctx->hist_bits_100_p =
GetPercentileBits(pb_ctx->histogram, pb_ctx->histogram_len, 1.0);
// after base reducing the values, if the difference in bits between
// 95th percentile and 100th percentile value is zero then there
// is no point in patching the values, in which case we will
// fallback to DIRECT encoding.
// The decision to use patched base was based on zigzag values, but
// the actual patching is done on base reduced literals.
if ((pb_ctx->hist_bits_100_p - pb_ctx->hist_bits_95_p) != 0) {
keep_push_status = true;
SwitchStatusTo(kTreatPatchedBase);
break;
}
}
encoder_context_.ResetPbCtx();
keep_push_status = true;
SwitchStatusTo(kTreatDirect);
break;
}
case EncoderStatus::kTreatShortRepeat: {
TreatShortRepeat();
encoder_context_.fixed_len = 0;
keep_push_status = true;
SwitchStatusTo(kTreatDone);
break;
}
case EncoderStatus::kTreatDirect: {
TreatDirect();
encoder_context_.var_len = 0;
keep_push_status = true;
SwitchStatusTo(kTreatDone);
break;
}
case EncoderStatus::kTreatPatchedBase: {
TreatPatchedBase();
encoder_context_.var_len = 0;
keep_push_status = true;
SwitchStatusTo(kTreatDone);
break;
}
case EncoderStatus::kTreatDelta: {
bool reset_fix = TreatDelta();
if (reset_fix) {
encoder_context_.fixed_len = 0;
} else {
encoder_context_.var_len = 0;
}
keep_push_status = true;
SwitchStatusTo(kTreatDone);
break;
}
case EncoderStatus::kTreatDone: {
Assert(data_buffer_->UnTreated() != 0);
encoder_context_.ResetDeltaCtx();
encoder_context_.ResetDirectCtx();
encoder_context_.ResetPbCtx();
// left shift
data_buffer_->TreatedAll();
data_buffer_->BrushUnTreatedAll();
if (is_flush) {
keep_push_status = true;
SwitchStatusTo(
(encoder_context_.fixed_len == 0 && encoder_context_.var_len == 0)
? kFinish
: kFlush);
} else {
keep_push_status = already_append_before_delta_changed;
SwitchStatusTo(kUntreated);
already_append_before_delta_changed = false;
}
break;
}
case EncoderStatus::kFinish:
keep_push_status = false;
SwitchStatusTo(kInvalid);
break;
default:
break;
}
}
}