def get_pipeline_workflows()

in analytics/circleci_analyze.py [0:0]


    def get_pipeline_workflows(self, pipeline) -> List:
        c = self.db.cursor()
        c.execute("SELECT json FROM pipeline_workflows WHERE id=?", (pipeline,))
        rc = c.fetchone()
        if rc is not None:
            rc = json.loads(rc[0])
            if not any(is_workflow_in_progress(w) for w in rc) or self.is_offline():
                return rc
        if self.is_offline():
            return []
        rc = self._get_paged_items_list(f'{self.url_prefix}/pipeline/{pipeline}/workflow')
        self.db.execute("INSERT OR REPLACE INTO pipeline_workflows(id, json) VALUES (?, ?)", (pipeline, json.dumps(rc)))
        self.db.commit()
        return rc