def poke()

in extras/airflow/aws_operators_plugin.py [0:0]


    def poke(self, context):
        execution_status = self.get_hook().describe_execution(self.execution_arn)
        state = execution_status["status"]
        output = json.loads(execution_status["output"]) if "output" in execution_status else None

        if state in self.FAILURE_STATES:
            raise AirflowException(f"Step Function sensor failed. State Machine Output: {output}")

        if state in self.INTERMEDIATE_STATES:
            return False

        self.log.info("Doing xcom_push of output")
        self.xcom_push(context, "output", output)
        return True