in databao/core/thread.py [0:0]
def _materialize_data(self, rows_limit: int | None) -> "ExecutionResult":
"""Materialize the latest data state by executing pending OPAs if needed."""
new_opas = self._opas[self._opas_processed_count :]
if len(new_opas) > 0:
rows_limit = rows_limit if rows_limit else self._default_rows_limit
stream = self._stream_ask if self._stream_ask is not None else self._default_stream_ask
for opa in new_opas:
self._data_result = self._agent.executor.execute(
opa,
cache=self._agent.cache.scoped(self._cache_scope),
llm_config=self._agent.llm_config,
sources=self._agent.sources,
rows_limit=rows_limit,
stream=stream,
)
self._meta.update(self._data_result.meta)
self._opas_processed_count += len(new_opas)
self._data_materialized_rows = rows_limit
if self._data_result is None:
raise RuntimeError("_data_result is None after materialization")
return self._data_result