in tfx_bsl/cc/sketches/sketches_submodule.cc [226:354]
void DefineQuantilesSketchClass(py::module sketch_module) {
py::class_<QuantilesSketch>(sketch_module, "QuantilesSketch")
.def(py::init(
[](double eps, int64_t max_num_elements, int64_t num_streams) {
std::unique_ptr<QuantilesSketch> result;
absl::Status s = QuantilesSketch::Make(eps, max_num_elements,
num_streams, &result);
if (!s.ok()) {
throw std::runtime_error(s.ToString());
}
return result;
}),
py::arg("eps"), py::arg("max_num_elements"), py::arg("num_streams"),
py::doc("A sketch to estimate quantiles of streams of numbers.\n\n"
"eps: Controls the approximation error. Must be >0.\n"
"max_num_elements: An estimate of maximum number of input "
"values. If not known at the time of construction, a "
"large-enough number (e.g. 2^32) may be specified at the "
"cost of extra memory usage. Must be >= 1.\n"
"num_streams: Number of quantile streams being processed at "
"the same time. Must be >=1."))
.def(py::pickle(
[](QuantilesSketch& sketch) {
std::string serialized;
{
py::gil_scoped_release release_gil;
absl::Status s = sketch.Serialize(serialized);
if (!s.ok()) {
throw std::runtime_error(s.ToString());
}
}
return py::bytes(serialized);
},
[](py::bytes byte_string) {
char* data;
Py_ssize_t size;
PyBytes_AsStringAndSize(byte_string.ptr(), &data, &size);
std::unique_ptr<QuantilesSketch> result;
absl::Status s = QuantilesSketch::Deserialize(
absl::string_view(data, size), &result);
if (!s.ok()) {
throw std::runtime_error(s.ToString());
}
return result;
}))
.def(
"Merge",
[](QuantilesSketch& sketch, const QuantilesSketch& other) {
absl::Status s = sketch.Merge(other);
if (!s.ok()) {
throw std::runtime_error(s.ToString());
}
},
py::call_guard<py::gil_scoped_release>(),
py::doc("Merges the sketch with `other`."))
.def(
"AddValues",
[](QuantilesSketch& sketch,
const std::shared_ptr<arrow::Array>& values,
const std::shared_ptr<arrow::Array>& weights) {
absl::Status s = sketch.AddWeightedValues(values, weights);
if (!s.ok()) {
throw std::runtime_error(s.ToString());
}
},
py::call_guard<py::gil_scoped_release>(),
py::doc(
"Add values with weights to the sketch. If we consider that "
"values are given by rows and streams are given by columns, then "
"values array must have C-contiguous order (stream index varies "
"the fastest). Weights are considered to be the same for all "
"streams. Any numerical arrow array type is accepted. But they "
"will be converted to float64 if they are not of the type. Float "
"truncation may happen (for large int64 values). Values with "
"negative or zero weights will be ignored. Nulls in the array "
"will be skipped."))
.def(
"AddValues",
[](QuantilesSketch& sketch,
const std::shared_ptr<arrow::Array>& values) {
absl::Status s = sketch.AddValues(values);
if (!s.ok()) {
throw std::runtime_error(s.ToString());
}
},
py::call_guard<py::gil_scoped_release>(),
py::doc(
"Add values with unit weights to the sketch. If we consider that "
"values are given by rows and streams are given by columns, then "
"values array must have C-contiguous order (stream index varies "
"the fastest). Any numerical arrow array type is accepted. But "
"they will be converted to float64 if they are not of the type. "
"Float truncation may happen (for large int64 values). Values "
"with negative or zero weights will be ignored. Nulls in the "
"array will be skipped."))
.def(
"Compact",
[](QuantilesSketch& sketch) {
absl::Status s = sketch.Compact();
if (!s.ok()) {
throw std::runtime_error(s.ToString());
}
},
py::call_guard<py::gil_scoped_release>(),
py::doc("Compacts state of the sketch if it wasn't done before and "
"no compact sketches were merged to this, otherwise it is a "
"no-op. Compact() before Serialize() will reduce the size of "
"the sketch. For instance, if eps=0.0001, "
"max_num_elements=2^32, num_streams=1 and the sketch is "
"full, then its size will be reduced by ~16x. "
"Note that the error bound of the sketch is adjusted for use "
"of Compact on the implementation level. It is therefore "
"recommended (but not mandatory) to use Compact() once. "))
.def(
"GetQuantiles",
[](QuantilesSketch& sketch, int64_t num_quantiles) {
std::shared_ptr<arrow::Array> result;
absl::Status s = sketch.GetQuantiles(num_quantiles, &result);
if (!s.ok()) {
throw std::runtime_error(s.ToString());
}
return result;
},
py::call_guard<py::gil_scoped_release>(),
py::doc("Finalize the sketch and get quantiles of the numbers added "
"so far. The result will be a FixedSizeListArray<float64> "
"where lists represent output for each stream. num_quantiles "
"must be >= 2."));
}