def _materialize_data()

in databao/core/thread.py [0:0]


    def _materialize_data(self, rows_limit: int | None) -> "ExecutionResult":
        """Materialize the latest data state by executing pending OPAs if needed."""
        new_opas = self._opas[self._opas_processed_count :]
        if len(new_opas) > 0:
            rows_limit = rows_limit if rows_limit else self._default_rows_limit
            stream = self._stream_ask if self._stream_ask is not None else self._default_stream_ask
            for opa in new_opas:
                self._data_result = self._agent.executor.execute(
                    opa,
                    cache=self._agent.cache.scoped(self._cache_scope),
                    llm_config=self._agent.llm_config,
                    sources=self._agent.sources,
                    rows_limit=rows_limit,
                    stream=stream,
                )
                self._meta.update(self._data_result.meta)
            self._opas_processed_count += len(new_opas)
            self._data_materialized_rows = rows_limit
        if self._data_result is None:
            raise RuntimeError("_data_result is None after materialization")
        return self._data_result