Datum pg_aod_sketch_build_agg()

in src/aod_sketch_pg_functions.c [79:159]


Datum pg_aod_sketch_build_agg(PG_FUNCTION_ARGS) {
  struct aod_agg_state* stateptr;
  float p;

  // anyelement
  Oid   element_type;
  Datum element;
  int16 typlen;
  bool  typbyval;
  char  typalign;

  // input array of doubles
  ArrayType* arr_in;
  Oid elmtype_in;
  int16 elmlen_in;
  bool elmbyval_in;
  char elmalign_in;
  Datum* data_in;
  bool* nulls_in;
  int arr_len;
  double* values;
  int i;

  MemoryContext oldcontext;
  MemoryContext aggcontext;

  if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
    PG_RETURN_NULL();
  } else if (PG_ARGISNULL(1)) {
    PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
  }

  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
    elog(ERROR, "aod_sketch_build_agg called in non-aggregate context");
  }
  oldcontext = MemoryContextSwitchTo(aggcontext);

  // look at the array of values first to know the array length in case we need to create a new sketch
  arr_in = PG_GETARG_ARRAYTYPE_P(2);
  elmtype_in = ARR_ELEMTYPE(arr_in);
  get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
  deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len);

  values = palloc(sizeof(double) * arr_len);
  for (i = 0; i < arr_len; i++) {
    values[i] = DatumGetFloat8(data_in[i]);
  }

  if (PG_ARGISNULL(0)) {
    stateptr = palloc(sizeof(struct aod_agg_state));
    stateptr->type = MUTABLE_SKETCH;
    stateptr->lg_k = PG_NARGS() > 3 ? PG_GETARG_INT32(3) : 0;
    stateptr->num_values = arr_len;
    p = PG_NARGS() > 4 ? PG_GETARG_FLOAT4(4) : 0;
    if (stateptr->lg_k) {
      stateptr->ptr = p ? aod_sketch_new_lgk_p(arr_len, stateptr->lg_k, p) : aod_sketch_new_lgk(arr_len, stateptr->lg_k);
    } else {
      stateptr->ptr = aod_sketch_new(arr_len);
    }
  } else {
    stateptr = (struct aod_agg_state*) PG_GETARG_POINTER(0);
  }

  element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
  element = PG_GETARG_DATUM(1);
  get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
  if (typlen == -1) {
    // varlena
    aod_sketch_update(stateptr->ptr, VARDATA_ANY(element), VARSIZE_ANY_EXHDR(element), values);
  } else if (typbyval) {
    // fixed-length passed by value
    aod_sketch_update(stateptr->ptr, &element, typlen, values);
  } else {
    // fixed-length passed by reference
    aod_sketch_update(stateptr->ptr, (void*)element, typlen, values);
  }
  pfree(values);
  MemoryContextSwitchTo(oldcontext);

  PG_RETURN_POINTER(stateptr);
}