Modules/_asynciomodule.c (6,766 lines of code) (raw):

/* Portions copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com) */ #include "Python.h" #include "structmember.h" #include "weakrefobject.h" #include "pycore_pyerrors.h" #include "internal/pycore_shadow_frame.h" /** facebook: USDT import */ #include "Include/folly/tracing/StaticTracepoint.h" /*[clinic input] module _asyncio [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=8fd17862aa989c69]*/ /* identifiers used from some functions */ _Py_IDENTIFIER(__asyncio_running_event_loop__); _Py_IDENTIFIER(_asyncio_future_blocking); _Py_IDENTIFIER(add_done_callback); _Py_IDENTIFIER(remove_done_callback); _Py_IDENTIFIER(call_soon); _Py_IDENTIFIER(cancel); _Py_IDENTIFIER(current_task); _Py_IDENTIFIER(get_event_loop); _Py_IDENTIFIER(throw); _Py_IDENTIFIER(done); /* facebook: method table */ _Py_IDENTIFIER(_step); _Py_IDENTIFIER(get_loop); _Py_IDENTIFIER(_loop); _Py_IDENTIFIER(result); _Py_IDENTIFIER(get_debug); _Py_IDENTIFIER(create_task); _Py_IDENTIFIER(_source_traceback); _Py_IDENTIFIER(create_future); _Py_IDENTIFIER(cancelled); _Py_IDENTIFIER(exception); _Py_IDENTIFIER(set_result); _Py_IDENTIFIER(_log_destroy_pending); _Py_IDENTIFIER(_set_fut_waiter); _Py_IDENTIFIER(_set_task_context); /* facebook: method table */ /* State of the _asyncio module */ static PyObject *asyncio_mod; static PyObject *traceback_extract_stack; static PyObject *asyncio_get_event_loop_policy; static PyObject *asyncio_future_repr_info_func; static PyObject *asyncio_iscoroutine_func; static PyObject *asyncio_task_get_stack_func; static PyObject *asyncio_task_print_stack_func; static PyObject *asyncio_task_repr_info_func; static PyObject *asyncio_InvalidStateError; static PyObject *asyncio_CancelledError; static PyObject *context_kwname; static PyObject *collections_abc_Awaitable; static int module_initialized; static PyObject *cached_running_holder; static volatile uint64_t cached_running_holder_tsid; static PyObject *known_coroutine_types; static PyObject *awaitable_types_cache; static PyObject *asyncio_coroutines__COROUTINE_TYPES; static PyObject *asyncio_tasks__wrap_awaitable; /* Private API for the LOAD_METHOD opcode. */ extern int _PyObject_GetMethod(PyObject *, PyObject *, PyObject **); /* A head of double-linked list that stores all tasks that derive from asyncio.Task */ static PyObject *all_asyncio_tasks; /* WeakSet containing all alive tasks that are not derived from asyncio.Task. */ static PyObject *all_non_asyncio_tasks; /* Counter for autogenerated Task names */ static uint64_t task_name_counter = 0; /* Dictionary containing tasks that are currently active in all running event loops. {EventLoop: Task} */ static PyObject *current_tasks; /* An isinstance type cache for the 'is_coroutine()' function. */ static PyObject *iscoroutine_typecache; static PyObject *get_loop_name; static PyObject *add_done_callback_name; static PyObject *remove_done_callback_name; static PyObject *result_name; static PyObject *cancel_name; static PyObject *done_name; static PyObject *create_task_name; static PyObject *minus_one; static PyObject *throw_name; static PyObject *create_future_name; /**********************Context aware helpers ******************************/ // returns current context typedef PyObject *(*get_current_context_f)(void); // sets context on context aware task typedef int (*context_aware_task_set_ctx_f)(PyObject *task, PyObject *ctx); // reset context to the provided value // and then resets context to the provided value typedef int (*reset_context_f)(PyObject *task, PyObject *ctx, context_aware_task_set_ctx_f set_ctx); static PyObject * get_current_context_default(void) { Py_RETURN_NONE; } static int reset_context_default(PyObject *Py_UNUSED(task), PyObject *Py_UNUSED(ctx), context_aware_task_set_ctx_f Py_UNUSED(set_ctx)) { return 0; } static get_current_context_f get_current_context = get_current_context_default; static reset_context_f reset_context = reset_context_default; static int call_reset_context(PyThreadState *tstate, PyObject *task, PyObject *ctx, context_aware_task_set_ctx_f setter, int error_occured) { if (_Py_LIKELY(error_occured == 0)) { return reset_context(task, ctx, setter); } // exception occurred but context reset hook should still be called PyObject *et, *ev, *tb; PyErr_Fetch(&et, &ev, &tb); _PyErr_StackItem *previous_exc_info = tstate->exc_info; _PyErr_StackItem exc_info = {.exc_type = et, .exc_value = ev, .exc_traceback = tb, .previous_item = previous_exc_info}; tstate->exc_info = &exc_info; int ok = reset_context(task, ctx, setter); tstate->exc_info = previous_exc_info; if (ok < 0) { // 'reset_context' itself has raised exception // release initial error, it was already captured // as cause Py_DECREF(et); Py_XDECREF(ev); Py_XDECREF(tb); } else { // restore original error and clear reset result // to indicate that error was re-raised PyErr_Restore(et, ev, tb); } return -1; } static int _set_context_helpers(PyObject *get_current_context_obj, PyObject *reset_context_obj) { #define FETCH_VALUE(obj, var) \ if (!PyCapsule_CheckExact(obj)) { \ PyErr_Format(PyExc_TypeError, "PyCapsule expected as %s", #obj); \ return -1; \ } \ void *var = PyCapsule_GetPointer(obj, NULL); \ if (var == NULL) { \ return -1; \ } FETCH_VALUE(get_current_context_obj, f0); FETCH_VALUE(reset_context_obj, f1); #undef FETCH_VALUE get_current_context = (get_current_context_f)f0; reset_context = (reset_context_f)f1; return 0; } static void _reset_context_helpers() { get_current_context = get_current_context_default; reset_context = reset_context_default; } /**************************************************************************/ typedef enum { STATE_PENDING, STATE_CANCELLED, STATE_FINISHED, STATE_FAULTED, } fut_state; #define FutureObj_HEAD(prefix) \ PyObject_HEAD \ PyObject *prefix##_loop; \ PyObject *prefix##_callback0; \ PyObject *prefix##_context0; \ PyObject *prefix##_callbacks; \ PyObject *prefix##_exception; \ PyObject *prefix##_result; \ PyObject *prefix##_source_tb; \ fut_state prefix##_state; \ int prefix##_log_tb; \ int prefix##_blocking; \ PyObject *dict; \ PyObject *prefix##_weakreflist; typedef struct { FutureObj_HEAD(fut) } FutureObj; static inline uintptr_t tag_counter(int c) { return ((uintptr_t)c << (uintptr_t)1) | (uintptr_t)1; } static inline int untag_counter(uintptr_t tagged) { return tagged >> (uintptr_t)1; } static inline int is_string_name(uintptr_t tagged) { return tagged != 0 && ((tagged & (uintptr_t)1) == 0); } static PyObject * counter_to_task_name(uintptr_t tagged) { return PyUnicode_FromFormat("Task-%" PRIu64, untag_counter(tagged)); } typedef struct { FutureObj_HEAD(task) PyObject *task_fut_waiter; PyObject *task_coro; // either pointer to task name string or tagged counter value that is used to produce name uintptr_t task_name_or_counter; PyObject *task_context; int task_must_cancel; int task_log_destroy_pending; PyObject *prev; // borrowed PyObject *next; // borrowed } TaskObj; typedef struct { TaskObj task; PyObject *context; } ContextAwareTaskObj; typedef struct _SuspendedCoroNode { PyObject *coro; struct _SuspendedCoroNode *prev; } _SuspendedCoroNode; // linked list of coroutines that were moved forward // prior to calling constructor of the Task. // In the __init__ method of the Task we need to know // if coroutine was already stepped into and suspended // or it was not yet started. For native coroutines this // is simple to check however for other implementations // it is not that trivial. In order to solve this we // store coroutine in the linked list before calling // 'create_task' and remove it afterwards. static _SuspendedCoroNode *suspended_coroutines = NULL; typedef struct { // used to avoid allocations for cases <=64 bit uint64_t b_data; uint64_t *b_ptr; size_t b_size; } _Bitset; // helper macro to iterate over set bits in bitset // it walks over individual non-zero elements in set // and for each element it: // - gets the index of trailing non-zero bit via __builtin_ctzl // - isolate last set bit via '__chunk & -__chunk' and then clear it // - compute actual bit index by adding index to base #define FOREACH_INDEX(b, i) \ size_t __n = (b).b_size; \ uint64_t *__buf = (b).b_ptr; \ for (size_t __i = 0; __i < __n; ++__i) { \ size_t __base = __i * UINT64_WIDTH; \ uint64_t __chunk = __buf[__i]; \ while (__chunk != 0) { \ int __bit = __builtin_ctzl(__chunk); \ __chunk ^= __chunk & -__chunk; \ size_t(i) = __base + __bit; #define FOREACH_INDEX_END() \ } \ } static inline int _Bitset_init(_Bitset *b, Py_ssize_t n) { // for most of cases we can use inplace storage if (n < UINT64_WIDTH) { b->b_data = 0; b->b_ptr = (uint64_t *)&b->b_data; b->b_size = 1; return 0; } b->b_size = n / UINT64_WIDTH + 1; if ((b->b_ptr = (uint64_t *)PyMem_Calloc(b->b_size, sizeof(uint64_t))) == NULL) { return -1; } return 0; } static inline void _Bitset_set(_Bitset *b, Py_ssize_t index) { b->b_ptr[index / UINT64_WIDTH] |= (uint64_t)1 << (index % UINT64_WIDTH); } static inline void _Bitset_dealloc(_Bitset *b) { if (b->b_ptr && b->b_ptr != (uint64_t *)&b->b_data) { PyMem_Free(b->b_ptr); } b->b_ptr = NULL; } typedef struct { FutureObj_HEAD(gf); // List that stores both results of finished child awaitables // and pending futures for non-finished ones. // Kind of value at index i can be determined with gf_datamap PyObject *gf_data; // bit at position i is 0 if gf_data[i] stores result // and 1 and gf_data[i] is future _Bitset gf_datamap; int gf_cancel_requested; int gf_return_exceptions; Py_ssize_t gf_pending; } _GatheringFutureObj; typedef struct { PyObject_HEAD TaskObj *sw_task; PyObject *sw_arg; } TaskStepMethWrapper; typedef struct { PyObject_HEAD PyObject *rl_loop; #if defined(HAVE_GETPID) && !defined(MS_WINDOWS) pid_t rl_pid; #endif } PyRunningLoopHolder; typedef enum { ALV_NOT_STARTED, ALV_RUNNING, ALV_DONE, } ASYNC_LAZY_VALUE_STATE; typedef struct { PyObject_HEAD PyObject *alv_args; PyObject *alv_kwargs; PyObject *alv_result; PyObject *alv_futures; ASYNC_LAZY_VALUE_STATE alv_state; } AsyncLazyValueObj; typedef struct { PyObject_HEAD AsyncLazyValueObj *alvc_target; _PyErr_StackItem alvc_exc_state; PyObject *alvc_coroobj; // actual coroutine object computing the value of // 'alvc_target' // This may be set if someone tries to set the awaiter before we've started // running the computation. This happens during non-eager execution because // we call _PyAwaitable_SetAwaiter in both the JIT/interpreter before // starting the compute object. We'll check for this when we start the computation // and call _PyAwaitable_SetAwaiter with the stored value on the awaitable // that is created. // // Stores a borrowed reference. PyObject *alvc_pending_awaiter; } AsyncLazyValueComputeObj; typedef struct { PyObject_HEAD; PyObject *av_value; } AwaitableValueObj; typedef struct { FutureObj_HEAD(af); // The future we're waiting on PyObject *af_awaited; // The callback that will be invoked when awaited is done PyObject *af_done_callback; } _AwaitingFutureObj; static PyTypeObject FutureType; static PyTypeObject TaskType; static PyTypeObject ContextAwareTaskType; static PyTypeObject _GatheringFutureType; static PyTypeObject PyRunningLoopHolder_Type; static PyTypeObject _AsyncLazyValue_Type; static PyTypeObject _AsyncLazyValueCompute_Type; static PyTypeObject AwaitableValue_Type; static PyTypeObject _AwaitingFuture_Type; #if defined(HAVE_GETPID) && !defined(MS_WINDOWS) static pid_t current_pid; #endif #define Future_CheckExact(obj) (Py_TYPE(obj) == &FutureType) #define Task_CheckExact(obj) (Py_TYPE(obj) == &TaskType) #define _AsyncLazyValue_CheckExact(obj) \ (Py_TYPE(obj) == (PyTypeObject *)&_AsyncLazyValue_Type) #define _AsyncLazyValueCompute_CheckExact(obj) \ (Py_TYPE(obj) == (PyTypeObject *)&_AsyncLazyValueCompute_Type) #define _GatheringFuture_CheckExact(obj) \ (Py_TYPE(obj) == &_GatheringFutureType) #define _AwaitingFuture_CheckExact(obj) \ (Py_TYPE(obj) == &_AwaitingFutureType) #define Future_Check(obj) PyObject_TypeCheck(obj, &FutureType) #define Task_Check(obj) PyObject_TypeCheck(obj, &TaskType) #include "clinic/_asynciomodule.c.h" /* facebook: PyAsyncioMethodTable */ // forward declarations /* Unboxed result of calling _asyncio_future_blocking Normally we'd use int with commit convention to indicate: true (>0), false (0), error(<0) However it is permitted for this property to return None we need to represent 4 states so enum fits quite nicely */ typedef enum { BLOCKING_TRUE, BLOCKING_FALSE, BLOCKING_ERROR, BLOCKING_NONE, } fut_blocking_state; static PyObject * FutureObj_get_blocking(FutureObj *fut, void *Py_UNUSED(ignored)); static fut_blocking_state future_get_blocking_impl(FutureObj *fut); static int future_cancel_impl(FutureObj *fut, PyObject *Py_UNUSED(ignored)); static int task_cancel_impl(TaskObj *self, PyObject *Py_UNUSED(ignored)); static int FutureObj_set_blocking(FutureObj *fut, PyObject *val, void *Py_UNUSED(ignored)); static int future_set_blocking_impl(FutureObj* fut, int val); static PyObject * _asyncio_Future_get_loop_impl(FutureObj *fut); static PyObject * _asyncio__GatheringFuture_cancel_impl(_GatheringFutureObj *self); static int GatheringFuture_cancel_impl(_GatheringFutureObj *self, PyObject *Py_UNUSED(ignored)); static fut_blocking_state FutureLike_get_blocking(PyObject *fut) { PyObject *res = _PyObject_GetAttrId(fut, &PyId__asyncio_future_blocking); if (res == NULL) { return BLOCKING_ERROR; } if (res == Py_None) { Py_DECREF(res); return BLOCKING_NONE; } int is_true = PyObject_IsTrue(res); Py_DECREF(res); if (is_true == -1) { return BLOCKING_ERROR; } if (is_true) { return BLOCKING_TRUE; } else { return BLOCKING_FALSE; } } static int FutureLike_set_blocking(PyObject *fut, int val) { PyObject *v = val ? Py_True : Py_False; return _PyObject_SetAttrId(fut, &PyId__asyncio_future_blocking, v); } static PyObject * FutureLike_get_loop(PyObject* fut) { PyObject *getloop = NULL; int meth_found = _PyObject_GetMethod(fut, get_loop_name, &getloop); if (getloop != NULL) { PyObject *res; if (meth_found == 1) { PyObject *args[1] = { fut }; res = _PyObject_FastCall(getloop, args, 1); } else { res = _PyObject_CallNoArg(getloop);; } Py_DECREF(getloop); return res; } if (PyErr_Occurred()) { if (PyErr_ExceptionMatches(PyExc_AttributeError)) { PyErr_Clear(); } else { return NULL; } } return _PyObject_GetAttrId(fut, &PyId__loop); } static PyObject * FutureLike_add_done_callback(PyObject *fut, PyObject *cb, PyObject *context) { PyObject *add_cb = NULL; int meth_found = _PyObject_GetMethod(fut, add_done_callback_name, &add_cb); if (add_cb == NULL) { return NULL; } int release_context = 0; if (context == NULL) { context = PyContext_CopyCurrent(); if (context == NULL) { Py_DECREF(add_cb); return NULL; } release_context = 1; } PyObject *stack[3]; stack[1] = cb; stack[2] = (PyObject *)context; PyObject *res; if (meth_found == 1) { stack[0] = fut; res = _PyObject_Vectorcall(add_cb, stack, 2, context_kwname); } else { res = _PyObject_Vectorcall(add_cb, stack + 1, 1, context_kwname); } Py_DECREF(add_cb); if (release_context) { Py_DECREF(context); } return res; } static PyObject * FutureLike_remove_done_callback(PyObject *fut, PyObject *cb) { PyObject *remove_cb = NULL; int meth_found = _PyObject_GetMethod(fut, remove_done_callback_name, &remove_cb); if (remove_cb == NULL) { return NULL; } PyObject *stack[2]; stack[1] = cb; PyObject *res = NULL; if (meth_found == 1) { stack[0] = fut; res = _PyObject_Vectorcall(remove_cb, stack, 2, NULL); } else { res = _PyObject_Vectorcall(remove_cb, stack + 1, 1, NULL); } Py_DECREF(remove_cb); return res; } static inline PyObject * call_method_id_noarg(PyObject *obj, PyObject *name) { PyObject *method = NULL; int meth_found = _PyObject_GetMethod(obj, name, &method); if (method == NULL) { return NULL; } PyObject *res; if (meth_found == 1) { PyObject *stack[1] = { obj }; res = _PyObject_FastCall(method, stack, 1); } else { res = _PyObject_CallNoArg(method); } Py_DECREF(method); return res; } static PyObject * FutureLike_result(PyObject *fut) { return call_method_id_noarg(fut, result_name); } static int FutureLike_cancel(PyObject *fut) { PyObject *res = call_method_id_noarg(fut, cancel_name); if (res == NULL) { return -1; } int is_true = PyObject_IsTrue(res); Py_DECREF(res); return is_true; } static PyObject * Task_step_thunk(PyObject *self, PyObject *exn, PyObject *Py_UNUSED(real_step)) { return _asyncio_Task__step_impl((TaskObj*)self, exn); } static PyObject * ContextAwareTask_step_thunk(PyObject *self, PyObject *exn, PyObject *Py_UNUSED(real_step)) { return _asyncio_ContextAwareTask__step_impl((ContextAwareTaskObj *)self, exn); } static PyObject * Callable_step(PyObject *self, PyObject *exn, PyObject *real_step) { if (exn == NULL) { exn = Py_None; } PyObject *args[] = {self, exn}; return _PyObject_Vectorcall(real_step, args, 2, NULL); } static PyObject * TaskLike_step(PyObject *self, PyObject *exn, PyObject *Py_UNUSED(real_step)) { return _PyObject_CallMethodIdObjArgs(self, &PyId__step, exn, NULL); } static PyObject *FutureObj_get_source_traceback(FutureObj *fut, void *Py_UNUSED(ignored)); static PyObject * FutureObj_get_traceback(PyObject *fut) { return FutureObj_get_source_traceback((FutureObj *)fut, NULL); } static PyObject * FutureLike_get_traceback(PyObject *fut) { return _PyObject_GetAttrId(fut, &PyId__source_traceback); } static PyObject *_asyncio_Future_cancelled(FutureObj *self, PyObject *Py_UNUSED(ignored)); static int FutureObj_cancelled(FutureObj *self); static int FutureLike_cancelled(PyObject *self) { PyObject *res = _PyObject_CallMethodIdObjArgs(self, &PyId_cancelled, NULL); if (res == NULL) { return -1; } int is_true = PyObject_IsTrue(res); Py_DECREF(res); return is_true; } static PyObject *_asyncio_Future_exception(FutureObj *self, PyObject *Py_UNUSED(ignored)); static PyObject *_asyncio_Future_exception_impl(FutureObj *self); static PyObject * FutureLike_exception(PyObject *self) { return _PyObject_CallMethodIdObjArgs(self, &PyId_exception, NULL); } static PyObject *_asyncio_Future_set_result(FutureObj *self, PyObject *result); static int future_set_result(FutureObj *fut, PyObject *res); static int FutureLike_set_result(PyObject *fut, PyObject *res) { PyObject *retval = _PyObject_CallMethodIdObjArgs(fut, &PyId_set_result, res, NULL); if (retval == NULL) { return -1; } Py_DECREF(retval); return 0; } static int TaskObj_set_log_destroy_pending(TaskObj *task, PyObject *val, void *Py_UNUSED(ignored)); static int TaskObj_set_log_destroy_pending_impl(TaskObj *task, int val) { task->task_log_destroy_pending = val; return 0; } static int FutureLike_set_log_destroy_pending(PyObject *self, int val) { return _PyObject_SetAttrId( self, &PyId__log_destroy_pending, val ? Py_True : Py_False); } static PyObject *task_set_fut_waiter(TaskObj *task, PyObject *result); static PyObject * FutureLike_set_fut_waiter(PyObject *fut, PyObject *result) { return _PyObject_CallMethodIdObjArgs( fut, &PyId__set_fut_waiter, result, NULL); } static PyObject *task_set_task_context(TaskObj *task, PyObject *context); static PyObject * FutureLike_set_task_context(PyObject *task, PyObject *context) { return _PyObject_CallMethodIdObjArgs( task, &PyId__set_task_context, context, NULL); } typedef PyObject* (*get_loop_f)(PyObject* obj); typedef fut_blocking_state (*get_is_blocking_f)(PyObject* obj); typedef int (*set_is_blocking_f)(PyObject* obj, int val); typedef PyObject* (*on_completed_f)(PyObject* obj, PyObject* cb, PyObject* ctx); typedef PyObject* (*result_f)(PyObject *); typedef int (*cancel_f)(PyObject *, PyObject *); typedef PyObject* (*step_thunk_f)(PyObject *fut, PyObject *exn, PyObject *real_step); typedef PyObject *(*get_source_traceback)(PyObject *fut); typedef int (*cancelled_f)(PyObject *); typedef PyObject *(*exception_f)(PyObject *); typedef int (*set_result_f)(PyObject *, PyObject *); typedef int (*set_log_destroy_pending_f)(PyObject *, int); typedef PyObject *(*set_fut_waiter_f)(PyObject *, PyObject *); typedef PyObject *(*set_task_context_f)(PyObject *, PyObject *); typedef struct { PyWeakReference weakref; // actual method table int is_future_like; get_loop_f get_loop; get_is_blocking_f get_is_blocking; set_is_blocking_f set_is_blocking; on_completed_f on_completed; result_f result; cancel_f cancel; step_thunk_f step_thunk; PyObject* step; get_source_traceback source_traceback; cancelled_f cancelled; exception_f exception; set_result_f set_result; set_log_destroy_pending_f set_log_destroy_pending; set_fut_waiter_f set_fut_waiter; set_task_context_f set_task_context; } PyMethodTableRef; static PyMethodTableRef *GatheringFuture_TableRef; static PyMethodTableRef *Future_TableRef; // Auxiliary macros for cases when method tables are used to // perform actions over awaitables stores in collections. // In this case they typically all have the same type so // we can save bit of time by not pulling the same method table // over and over again. #define DECLARE_METHODTABLE(var) \ PyMethodTableRef *__prevtable_##var = NULL, *var = NULL; \ PyTypeObject *__prevtype_##var = NULL; #define FETCH_METHOD_TABLE(var, type) \ var = ((type) == __prevtype_##var) \ ? __prevtable_##var \ : (__prevtable_##var = \ get_or_create_method_table(__prevtype_##var = (type))); \ assert((var) != NULL) static int methodtableref_traverse(PyMethodTableRef *self, visitproc visit, void *arg) { Py_VISIT(self->step); return _PyWeakref_RefType.tp_traverse((PyObject*)self, visit, arg); } static int methodtableref_clear(PyMethodTableRef *self) { Py_CLEAR(self->step); return _PyWeakref_RefType.tp_clear((PyObject*)self); } static void methodtable_dealloc(PyMethodTableRef *self) { methodtableref_clear(self); _PyWeakref_RefType.tp_dealloc((PyObject*)self); } static PyObject* methodtable_callback_impl(PyObject *Py_UNUSED(self), PyMethodTableRef *tableref) { // decref methodtable to make sure it is collected Py_DECREF(tableref); Py_RETURN_NONE; } static PyObject* clear_method_table(PyObject *Py_UNUSED(module), PyMethodTableRef *arg) { if (arg == Future_TableRef) { Py_RETURN_NONE; } _PyWeakref_ClearRef((PyWeakReference*)arg); return methodtable_callback_impl(NULL, arg); } PyMethodDef _MethodTableRefCallback = { "methodtableref_callback", (PyCFunction)methodtable_callback_impl, METH_O, NULL }; static PyTypeObject _MethodTable_RefType = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "future_method_table", .tp_doc = "future_method_table", .tp_basicsize = sizeof(PyMethodTableRef), .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC, .tp_dealloc = (destructor)methodtable_dealloc, .tp_base = &_PyWeakref_RefType, .tp_traverse = (traverseproc)methodtableref_traverse, .tp_clear = (inquiry)methodtableref_clear }; static PyObject *methodref_callback; #define MethodTable_Check(obj) (Py_TYPE(obj) == &_MethodTable_RefType) static PyObject* lookup_attr( PyTypeObject *type, struct _Py_Identifier *id, PyTypeObject *attr_type ) { PyObject *t = _PyType_LookupId(type, id); // t is borrowed if (t == NULL || Py_TYPE(t) != attr_type) { return NULL; } return t; } static int is_impl_method(PyMethodDescrObject *descr, PyCFunction f) { return descr != NULL && descr->d_method != NULL && descr->d_method->ml_meth == f; } static int is_get_impl_method(PyGetSetDescrObject *descr, getter f) { return descr != NULL && descr->d_getset != NULL && descr->d_getset->get == f; } static int is_set_impl_method(PyGetSetDescrObject *descr, setter f) { return descr != NULL && descr->d_getset != NULL && descr->d_getset->set == f; } static inline PyMethodTableRef* get_method_table(PyTypeObject *type) { PyWeakReference **weakrefs = (PyWeakReference**)PyObject_GET_WEAKREFS_LISTPTR(type); if (weakrefs != NULL) { PyWeakReference *p = *weakrefs; while (p != NULL) { if (MethodTable_Check(p)) { return (PyMethodTableRef*)p; } p = p->wr_next; } } return NULL; } static void populate_method_table(PyMethodTableRef *tableref, PyTypeObject *type) { PyGetSetDescrObject *get_set_is_blocking = NULL, *source_traceback = NULL, *log_destroy_pending = NULL; PyMethodDescrObject *add_done_callback = NULL, *get_loop = NULL, *cancel = NULL, *result = NULL, *cancelled = NULL, *exception = NULL, *set_result = NULL, *set_fut_waiter = NULL, *set_task_context = NULL; PyObject *step = NULL; PyObject *t = _PyType_LookupId(type, &PyId__asyncio_future_blocking); // t is borrowed if (t) { tableref->is_future_like = 1; } else { tableref->is_future_like = 0; } if (PyType_IsSubtype(type, &FutureType)) { if (Py_TYPE(t) == &PyGetSetDescr_Type) { get_set_is_blocking = (PyGetSetDescrObject*)t; } get_loop = (PyMethodDescrObject*)lookup_attr(type, &PyId_get_loop, &PyMethodDescr_Type); add_done_callback = (PyMethodDescrObject*)lookup_attr(type, &PyId_add_done_callback, &PyMethodDescr_Type); cancel = (PyMethodDescrObject*)lookup_attr(type, &PyId_cancel, &PyMethodDescr_Type); result = (PyMethodDescrObject*)lookup_attr(type, &PyId_result, &PyMethodDescr_Type); step = _PyType_LookupId(type, &PyId__step); source_traceback = (PyGetSetDescrObject *)lookup_attr( type, &PyId__source_traceback, &PyGetSetDescr_Type); log_destroy_pending = (PyGetSetDescrObject *)lookup_attr( type, &PyId__log_destroy_pending, &PyGetSetDescr_Type); cancelled = (PyMethodDescrObject *)lookup_attr( type, &PyId_cancelled, &PyMethodDescr_Type); exception = (PyMethodDescrObject *)lookup_attr( type, &PyId_exception, &PyMethodDescr_Type); set_result = (PyMethodDescrObject *)lookup_attr( type, &PyId_set_result, &PyMethodDescr_Type); set_fut_waiter = (PyMethodDescrObject *)lookup_attr( type, &PyId__set_fut_waiter, &PyMethodDescr_Type); set_task_context = (PyMethodDescrObject *)lookup_attr( type, &PyId__set_task_context, &PyMethodDescr_Type); } if (is_get_impl_method(get_set_is_blocking, (getter)FutureObj_get_blocking)) { tableref->get_is_blocking = (get_is_blocking_f)future_get_blocking_impl; } else { tableref->get_is_blocking = (get_is_blocking_f)FutureLike_get_blocking; } if (is_set_impl_method(get_set_is_blocking, (setter)FutureObj_set_blocking)) { tableref->set_is_blocking = (set_is_blocking_f)future_set_blocking_impl; } else { tableref->set_is_blocking = (set_is_blocking_f)FutureLike_set_blocking; } if (is_impl_method(get_loop, (PyCFunction)_asyncio_Future_get_loop)) { tableref->get_loop = (get_loop_f)_asyncio_Future_get_loop_impl; } else { tableref->get_loop = (get_loop_f)FutureLike_get_loop; } if (is_impl_method(add_done_callback, (PyCFunction)_asyncio_Future_add_done_callback)) { tableref->on_completed = (on_completed_f)_asyncio_Future_add_done_callback_impl; } else if (is_impl_method(add_done_callback, (PyCFunction)_asyncio_ContextAwareTask_add_done_callback)) { // for purposes of this module we can use Future.add_done_callback for context aware tasks // it is used either: // - for gather that does not need context information // - for wakeup which will do step and will restore the current context from task context anyways tableref->on_completed = (on_completed_f)_asyncio_Future_add_done_callback_impl; } else { tableref->on_completed = (on_completed_f)FutureLike_add_done_callback; } if (is_impl_method(result, (PyCFunction)_asyncio_Future_result)) { tableref->result = (result_f)_asyncio_Future_result_impl; } else { tableref->result = (result_f)FutureLike_result; } if (is_impl_method(cancel, (PyCFunction)_asyncio_Future_cancel)) { tableref->cancel = (cancel_f)future_cancel_impl; } else if (is_impl_method(cancel, (PyCFunction)_asyncio_Task_cancel)) { tableref->cancel = (cancel_f)task_cancel_impl; } else if (is_impl_method(cancel, (PyCFunction)_asyncio__GatheringFuture_cancel_impl)) { tableref->cancel = (cancel_f)GatheringFuture_cancel_impl; } else { tableref->cancel = (cancel_f)FutureLike_cancel; } if (is_impl_method(cancelled, (PyCFunction)_asyncio_Future_cancelled)) { tableref->cancelled = (cancelled_f)FutureObj_cancelled; } else { tableref->cancelled = (cancelled_f)FutureLike_cancelled; } if (is_impl_method(exception, (PyCFunction)_asyncio_Future_exception)) { tableref->exception = (exception_f)_asyncio_Future_exception_impl; } else { tableref->exception = (exception_f)FutureLike_exception; } if (is_impl_method(set_result, (PyCFunction)_asyncio_Future_set_result)) { tableref->set_result = (set_result_f)future_set_result; } else { tableref->set_result = (set_result_f)FutureLike_set_result; } if (is_get_impl_method(source_traceback, (getter)FutureObj_get_source_traceback)) { tableref->source_traceback = FutureObj_get_traceback; } else { tableref->source_traceback = FutureLike_get_traceback; } if (is_impl_method(set_fut_waiter, (PyCFunction)task_set_fut_waiter)) { tableref->set_fut_waiter = (set_fut_waiter_f)task_set_fut_waiter; } else { tableref->set_fut_waiter = (set_fut_waiter_f)FutureLike_set_fut_waiter; } if (is_impl_method(set_task_context, (PyCFunction)task_set_task_context)) { tableref->set_task_context = (set_task_context_f)task_set_task_context; } else { tableref->set_task_context = (set_task_context_f)FutureLike_set_task_context; } if (is_set_impl_method(log_destroy_pending, (setter)TaskObj_set_log_destroy_pending)) { tableref->set_log_destroy_pending = (set_log_destroy_pending_f)TaskObj_set_log_destroy_pending_impl; } else { tableref->set_log_destroy_pending = (set_log_destroy_pending_f)FutureLike_set_log_destroy_pending; } tableref->step_thunk = TaskLike_step; tableref->step = NULL; if (step != NULL) { if (Py_TYPE(step) == &PyMethodDescr_Type && is_impl_method((PyMethodDescrObject*)step, (PyCFunction)_asyncio_Task__step)) { tableref->step_thunk = Task_step_thunk; } if (Py_TYPE(step) == &PyMethodDescr_Type && is_impl_method((PyMethodDescrObject *)step, (PyCFunction)_asyncio_ContextAwareTask__step)) { tableref->step_thunk = ContextAwareTask_step_thunk; } else if (PyCallable_Check(step)) { tableref->step_thunk = Callable_step; tableref->step = step; Py_INCREF(step); } } } static PyMethodTableRef* create_method_table(PyTypeObject *type) { PyObject *args = PyTuple_New(2); if (args == NULL) { return NULL; } PyTuple_SET_ITEM(args, 0, (PyObject*)type); Py_INCREF(type); PyTuple_SET_ITEM(args, 1, methodref_callback); Py_INCREF(methodref_callback); PyMethodTableRef *tableref =(PyMethodTableRef *) _PyWeakref_RefType.tp_new(&_MethodTable_RefType, args, NULL); Py_DECREF(args); if (tableref == NULL) { return NULL; } populate_method_table(tableref, type); return tableref; } static PyMethodTableRef* get_or_create_method_table(PyTypeObject *type) { if (type == &FutureType) { return Future_TableRef; } PyMethodTableRef *tableref = get_method_table(type); if (tableref != NULL) { return tableref; } return create_method_table(type); } struct PyEventLoopDispatchTable; typedef PyObject* (*invoke_get_debug_f)( struct PyEventLoopDispatchTable*, PyObject *loop ); typedef PyObject* (*invoke_call_soon_f)( struct PyEventLoopDispatchTable*, PyObject *loop, PyObject *callback, PyObject *arg, PyObject *ctx ); typedef struct PyEventLoopDispatchTable { PyObject_HEAD unsigned int version_tag; PyObject *get_debug_method_object; PyCFunction get_debug_method; invoke_get_debug_f invoke_get_debug; PyObject *call_soon_method_object; PyCFunctionWithKeywords call_soon_method; invoke_call_soon_f invoke_call_soon; } PyEventLoopDispatchTable; static PyObject* invoke_get_debug_method(PyEventLoopDispatchTable *table, PyObject *loop) { // METH_NOARGS return table->get_debug_method(loop, NULL); } static PyObject* invoke_get_debug_vectorcall(PyEventLoopDispatchTable *table, PyObject *loop) { PyObject *args[] = {loop}; return _PyObject_Vectorcall( table->get_debug_method_object, args, 1, NULL); } static PyObject* invoke_get_debug(PyEventLoopDispatchTable *Py_UNUSED(table), PyObject *loop) { return _PyObject_CallMethodId(loop, &PyId_get_debug, NULL); } // pooled values for argument passing static PyObject *call_soon_args1; static PyObject *call_soon_args2; static PyObject *call_soon_kwargs; // "context" string static PyObject *context_name = NULL; // pre-computed hash for the context_name static Py_hash_t context_name_hash; #define GRAB_POOLED_VALUE(lhs, pooled) \ (lhs) = (pooled); \ (pooled) = NULL; \ static PyObject* prepare_args(PyObject *func, PyObject *arg) { PyObject *args; Py_ssize_t nargs = arg != NULL ? 2 : 1; if (nargs == 1) { GRAB_POOLED_VALUE(args, call_soon_args1); } else { GRAB_POOLED_VALUE(args, call_soon_args2); } assert(args == NULL || Py_REFCNT(args) == 1); // if there were no pooled value - create new if (args == NULL) { if ((args = _PyTuple_NewNoTrack(nargs)) == NULL) { return NULL; } } PyTuple_SET_ITEM(args, 0, func); Py_INCREF(func); if (nargs == 2) { PyTuple_SET_ITEM(args, 1, arg); Py_INCREF(arg); } return args; } static void release_args(PyObject *args) { if (Py_REFCNT(args) > 1) { // arg was captured during the call - release our end PyObject_GC_Track(args); Py_DECREF(args); return; } if (PyTuple_GET_SIZE(args) == 1) { // if pooled value was already set - drop our reference and leave if (call_soon_args1 != NULL) { Py_DECREF(args); return; } // clear internal slot but keep the tuple itself Py_CLEAR(PyTuple_GET_ITEM(args, 0)); call_soon_args1 = args; } else { // if pooled value was already set - drop our reference and leave if (call_soon_args2 != NULL) { Py_DECREF(args); return; } // clear internal slots but keep the tuple itself Py_CLEAR(PyTuple_GET_ITEM(args, 0)); Py_CLEAR(PyTuple_GET_ITEM(args, 1)); call_soon_args2 = args; } } static PyObject* prepare_kwargs(PyObject *ctx) { assert(ctx != NULL); PyObject *kwargs; GRAB_POOLED_VALUE(kwargs, call_soon_kwargs); assert(kwargs == NULL || Py_REFCNT(kwargs) == 1); if (kwargs == NULL) { if ((kwargs = _PyDict_NewPresized(1)) == NULL) { return NULL; } } // set kwarg value if (_PyDict_SetItem_KnownHash(kwargs, context_name, ctx, context_name_hash) < 0) { Py_DECREF(kwargs); return NULL; } return kwargs; } static int release_kwargs(PyObject *kwargs) { if (kwargs == NULL) { return 0; } // if value has leaked during the call or pooled value is already set // drop our reference and leave if (Py_REFCNT(kwargs) > 1 || call_soon_kwargs != NULL) { Py_DECREF(kwargs); return 0; } // set the None to the slot if (_PyDict_SetItem_KnownHash(kwargs, context_name, Py_None, context_name_hash) < 0) { Py_DECREF(kwargs); return -1; } if (PyDict_GET_SIZE(kwargs) != 1) { // dict size is not equal to 1 value which meahs that it was mutated by user Py_DECREF(kwargs); return 0; } call_soon_kwargs = kwargs; return 0; } static PyObject* invoke_call_soon_method( PyEventLoopDispatchTable *table, PyObject *loop, PyObject *func, PyObject *arg, PyObject *ctx ) { PyObject *args = prepare_args(func, arg); if (args == NULL) { return NULL; } PyObject *kwargs = NULL; if (ctx != NULL) { kwargs = prepare_kwargs(ctx); if (kwargs == NULL) { Py_DECREF(args); return NULL; } } // METH_KEYWORDS | METH_VARARGS PyObject *res = table->call_soon_method(loop, args, kwargs); release_args(args); if (res == NULL) { Py_DECREF(kwargs); return NULL; } if (release_kwargs(kwargs) < 0) { Py_DECREF(res); return NULL; } return res; } static PyObject* invoke_call_soon_vectorcall( PyEventLoopDispatchTable *table, PyObject *loop, PyObject *func, PyObject *arg, PyObject *ctx ) { PyObject *args[4] = { loop, func, NULL, NULL }; Py_ssize_t nargs = 2; if (arg != NULL) { args[nargs] = arg; nargs++; } PyObject *kwnames = NULL; if (ctx != NULL) { args[nargs] = ctx; kwnames = context_kwname; } return _PyObject_Vectorcall( table->call_soon_method_object, args, nargs, kwnames); } static PyObject* invoke_call_soon( PyEventLoopDispatchTable *Py_UNUSED(table), PyObject *loop, PyObject *func, PyObject *arg, PyObject *ctx ) { PyObject *callable = _PyObject_GetAttrId(loop, &PyId_call_soon); if (callable == NULL) { return NULL; } PyObject *stack[3]; Py_ssize_t nargs = 1; PyObject *kwargs = NULL; stack[0] = func; if (arg != NULL) { stack[1] = arg; nargs++; } if (ctx != NULL){ stack[nargs] = ctx; kwargs = context_kwname; } PyObject *res = _PyObject_Vectorcall(callable, stack, nargs, kwargs); Py_DECREF(callable); return res; } static int event_loop_dispatch_table_traverse( PyEventLoopDispatchTable *obj, visitproc visit, void *arg ) { Py_VISIT(obj->get_debug_method_object); Py_VISIT(obj->call_soon_method_object); return 0; } static int event_loop_dispatch_table_clear(PyEventLoopDispatchTable *obj) { Py_CLEAR(obj->get_debug_method_object); Py_CLEAR(obj->call_soon_method_object); return 0; } static void event_loop_dispatch_table_dealloc(PyEventLoopDispatchTable *obj) { event_loop_dispatch_table_clear(obj); PyObject_GC_UnTrack(obj); PyObject_GC_Del(obj); } static PyTypeObject _PyEventLoopDispatchTable_Type = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "event_loop_dispatch_table", .tp_doc = "event_loop_dispatch_table", .tp_basicsize = sizeof(PyEventLoopDispatchTable), .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, .tp_traverse = (traverseproc)event_loop_dispatch_table_traverse, .tp_clear = (inquiry)event_loop_dispatch_table_clear, .tp_dealloc = (destructor)event_loop_dispatch_table_dealloc }; // type->dispatch table static PyObject* event_loop_dispatch_tables; // shortcut - last used event loop type/version tag static PyObject* last_used_eventloop_type; // shortcut - last used event loop dispatch table static PyEventLoopDispatchTable* last_used_eventloop_dispatch_table; // generic dispatch table that does dynamic calls static PyEventLoopDispatchTable* fallback_dispatch_table; static PyEventLoopDispatchTable* PyEventLoopDispatchTable_new() { PyEventLoopDispatchTable *table = PyObject_GC_New(PyEventLoopDispatchTable, &_PyEventLoopDispatchTable_Type); if (table == NULL) { return NULL; } table->invoke_call_soon = invoke_call_soon; table->invoke_get_debug = invoke_get_debug; table->call_soon_method_object = NULL; table->get_debug_method_object = NULL; PyObject_GC_Track(table); return table; } static PyEventLoopDispatchTable* get_dispatch_table(PyTypeObject *type) { assert(type != NULL); if ((PyObject*)type == last_used_eventloop_type) { if (PyType_HasFeature(type, Py_TPFLAGS_VALID_VERSION_TAG) && type->tp_version_tag == last_used_eventloop_dispatch_table->version_tag ) { // the same type that was seen last time with the same version // can return the same dispatch table that was returned last time return (PyEventLoopDispatchTable*)last_used_eventloop_dispatch_table; } else if (last_used_eventloop_type) { // type was modified - drop its dispatch table from the cache and shortcuts Py_CLEAR(last_used_eventloop_type); Py_CLEAR(last_used_eventloop_dispatch_table); if (PyDict_DelItem(event_loop_dispatch_tables, (PyObject*)type) < 0) { return NULL; } } } PyEventLoopDispatchTable *table = (PyEventLoopDispatchTable*)PyDict_GetItem(event_loop_dispatch_tables, (PyObject*)type); if (!PyType_HasFeature(type, Py_TPFLAGS_VALID_VERSION_TAG) || (table != NULL && table->version_tag != type->tp_version_tag)) { if (table) { if (PyDict_DelItem(event_loop_dispatch_tables, (PyObject*)type) < 0) { return NULL; } table = NULL; } if (!PyType_HasFeature(type, Py_TPFLAGS_VALID_VERSION_TAG)) { return fallback_dispatch_table; } } // stale or missing entry for the type but type has Py_TPFLAGS_VALID_VERSION_TAG // create a new dispatch table if (table == NULL) { table = PyEventLoopDispatchTable_new(); if (table == NULL) { return NULL; } table->version_tag = type->tp_version_tag; PyObject *call_soon = _PyType_LookupId(type, &PyId_call_soon); if (call_soon != NULL) { if (Py_TYPE(call_soon) == &PyMethodDescr_Type && ((PyMethodDescrObject*)call_soon)->d_method->ml_flags == (METH_VARARGS | METH_KEYWORDS)) { table->call_soon_method_object = call_soon; Py_INCREF(call_soon); table->invoke_call_soon = invoke_call_soon_method; table->call_soon_method = (PyCFunctionWithKeywords)((PyMethodDescrObject*)call_soon)->d_method->ml_meth; } else if (PyType_HasFeature(Py_TYPE(call_soon), Py_TPFLAGS_METHOD_DESCRIPTOR)) { table->call_soon_method_object = call_soon; Py_INCREF(call_soon); table->invoke_call_soon = invoke_call_soon_vectorcall; table->call_soon_method = NULL; } } PyObject *get_debug = _PyType_LookupId(type, &PyId_get_debug); if (get_debug != NULL) { if (Py_TYPE(get_debug) == &PyMethodDescr_Type && ((PyMethodDescrObject*)get_debug)->d_method->ml_flags == METH_NOARGS) { table->get_debug_method_object = get_debug; Py_INCREF(get_debug); table->invoke_get_debug = invoke_get_debug_method; table->get_debug_method = ((PyMethodDescrObject*)get_debug)->d_method->ml_meth; } else if (PyType_HasFeature(Py_TYPE(get_debug), Py_TPFLAGS_METHOD_DESCRIPTOR)) { table->get_debug_method_object = get_debug; Py_INCREF(get_debug); table->invoke_get_debug = invoke_get_debug_vectorcall; table->get_debug_method = NULL; } } if (PyDict_SetItem(event_loop_dispatch_tables, (PyObject*)type, (PyObject*)table) < 0) { Py_DECREF(table); return NULL; } // drop one ref so the only reference to table is in the cache Py_DECREF(table); } assert(Py_REFCNT(table) == 1); Py_XSETREF(last_used_eventloop_dispatch_table, (PyEventLoopDispatchTable*)table); Py_INCREF(table); Py_XSETREF(last_used_eventloop_type, (PyObject*)type); Py_INCREF(type); return table; } /* facebook: PyAsyncioMethodTable */ /*[clinic input] class _asyncio.Future "FutureObj *" "&Future_Type" [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=00d3e4abca711e0f]*/ /* Get FutureIter from Future */ static PyObject * future_new_iter(PyObject *); static PyRunningLoopHolder * new_running_loop_holder(PyObject *); static inline int try_add_task(PyObject *set, PyObject *task, PyObject *loop, int only_running) { PyMethodTableRef *tableref = get_or_create_method_table(Py_TYPE(task)); if (tableref == NULL) { return -1; } PyObject *task_loop = tableref->get_loop(task); if (task_loop == NULL) { return -1; } if (task_loop != loop) { Py_DECREF(task_loop); return 0; } Py_DECREF(task_loop); if (only_running) { PyObject *res = call_method_id_noarg(task, done_name); if (res == NULL) { return -1; } int is_done = PyObject_IsTrue(res); Py_DECREF(res); if (is_done < 0) { return -1; } if (is_done) { return 0; } } return PySet_Add(set, task); } static PyObject* _all_tasks(PyObject *loop, int only_running) { PyObject *s = PySet_New(NULL); if (s == NULL) { return NULL; } PyObject *t = all_asyncio_tasks; while (t) { // pin task Py_INCREF(t); if (try_add_task(s, t, loop, only_running) == -1) { Py_DECREF(t); Py_DECREF(s); return NULL; } // unpin task Py_DECREF(t); t = ((TaskObj*)t)->next; } PyObject *iter = PyObject_GetIter(all_non_asyncio_tasks); if (iter == NULL) { Py_DECREF(s); return NULL; } while ((t = PyIter_Next(iter))) { int ok = try_add_task(s, t, loop, only_running); Py_DECREF(t); if (ok == -1) { Py_DECREF(iter); Py_DECREF(s); return NULL; } } Py_DECREF(iter); if (PyErr_Occurred()) { Py_DECREF(s); return NULL; } return s; } static int _is_coroutine(PyObject *coro) { /* 'coro' is not a native coroutine, call asyncio.iscoroutine() to check if it's another coroutine flavour. Do this check after 'future_init()'; in case we need to raise an error, __del__ needs a properly initialized object. */ PyObject *res = PyObject_CallFunctionObjArgs( asyncio_iscoroutine_func, coro, NULL); if (res == NULL) { return -1; } int is_res_true = PyObject_IsTrue(res); Py_DECREF(res); if (is_res_true <= 0) { return is_res_true; } if (PySet_GET_SIZE(iscoroutine_typecache) < 100) { /* Just in case we don't want to cache more than 100 positive types. That shouldn't ever happen, unless someone stressing the system on purpose. */ if (PySet_Add(iscoroutine_typecache, (PyObject*) Py_TYPE(coro))) { return -1; } } return 1; } static int isfuture(PyObject *obj) { if (Future_CheckExact(obj) || Task_CheckExact(obj)) { return 1; } if (PyGen_CheckExact(obj) || PyCoro_CheckExact(obj)) { return 0; } _Py_IDENTIFIER(__class__); PyObject* class = _PyObject_GetAttrId(obj, &PyId___class__); if (class == NULL) { return -1; } _Py_IDENTIFIER(_asyncio_future_blocking); int class_has_attr = _PyObject_HasAttrId( class, &PyId__asyncio_future_blocking); Py_DECREF(class); if (class_has_attr < 0) { return -1; } int is_future = 0; if (class_has_attr) { PyObject* obj_attr = _PyObject_GetAttrId(obj, &PyId__asyncio_future_blocking); if (obj_attr == NULL) { // not a future PyErr_Clear(); } else { is_future = obj_attr != Py_None; Py_DECREF(obj_attr); } } return is_future; } /*[clinic input] _asyncio.isfuture obj: object / Return True if obj is a Future instance. This returns True when obj is a Future instance or is advertising itself as duck-type compatible by setting _asyncio_future_blocking. See comment in Future for more details. [clinic start generated code]*/ static PyObject * _asyncio_isfuture(PyObject *module, PyObject *obj) /*[clinic end generated code: output=3c79d083f507d4fa input=a71fdef4d9b354b4]*/ { int ok = isfuture(obj); if (ok == -1) { return NULL; } if (ok) { Py_RETURN_TRUE; } else { Py_RETURN_FALSE; } } static inline int is_coroutine(PyObject *coro) { if (PyCoro_CheckExact(coro)) { return 1; } /* Check if `type(coro)` is in the cache. Caching makes is_coroutine() function almost as fast as PyCoro_CheckExact() for non-native coroutine-like objects (like coroutines compiled with Cython). asyncio.iscoroutine() has its own type caching mechanism. This cache allows us to avoid the cost of even calling a pure-Python function in 99.9% cases. */ int has_it = PySet_Contains( iscoroutine_typecache, (PyObject*) Py_TYPE(coro)); if (has_it == 0) { /* type(coro) is not in iscoroutine_typecache */ return _is_coroutine(coro); } /* either an error has occurred or type(coro) is in iscoroutine_typecache */ return has_it; } static int get_running_loop(PyObject **loop) { PyObject *rl; PyThreadState *ts = PyThreadState_Get(); if (ts->id == cached_running_holder_tsid && cached_running_holder != NULL) { // Fast path, check the cache. rl = cached_running_holder; // borrowed } else { if (ts->dict == NULL) { goto not_found; } rl = _PyDict_GetItemIdWithError( ts->dict, &PyId___asyncio_running_event_loop__); // borrowed if (rl == NULL) { if (PyErr_Occurred()) { goto error; } else { goto not_found; } } cached_running_holder = rl; // borrowed cached_running_holder_tsid = ts->id; } assert(Py_TYPE(rl) == &PyRunningLoopHolder_Type); PyObject *running_loop = ((PyRunningLoopHolder *)rl)->rl_loop; if (running_loop == Py_None) { goto not_found; } #if defined(HAVE_GETPID) && !defined(MS_WINDOWS) /* On Windows there is no getpid, but there is also no os.fork(), so there is no need for this check. */ if (current_pid != ((PyRunningLoopHolder *)rl)->rl_pid) { goto not_found; } #endif Py_INCREF(running_loop); *loop = running_loop; return 0; not_found: *loop = NULL; return 0; error: *loop = NULL; return -1; } static int set_running_loop(PyObject *loop) { PyObject *ts_dict = PyThreadState_GetDict(); // borrowed if (ts_dict == NULL) { PyErr_SetString( PyExc_RuntimeError, "thread-local storage is not available"); return -1; } PyRunningLoopHolder *rl = new_running_loop_holder(loop); if (rl == NULL) { return -1; } if (_PyDict_SetItemId( ts_dict, &PyId___asyncio_running_event_loop__, (PyObject *)rl) < 0) { Py_DECREF(rl); // will cleanup loop & current_pid return -1; } Py_DECREF(rl); cached_running_holder = (PyObject *)rl; /* safe to assume state is not NULL as the call to PyThreadState_GetDict() above already checks if state is NULL */ cached_running_holder_tsid = PyThreadState_Get()->id; return 0; } static PyObject * get_event_loop(void) { PyObject *loop; PyObject *policy; if (get_running_loop(&loop)) { return NULL; } if (loop != NULL) { return loop; } policy = _PyObject_CallNoArg(asyncio_get_event_loop_policy); if (policy == NULL) { return NULL; } loop = _PyObject_CallMethodId(policy, &PyId_get_event_loop, NULL); Py_DECREF(policy); return loop; } static int call_soon(PyObject *loop, PyObject *func, PyObject *arg, PyObject *ctx) { PyEventLoopDispatchTable *t = get_dispatch_table(Py_TYPE(loop)); if (t == NULL) { return -1; } PyObject *res = t->invoke_call_soon(t, loop, func, arg, ctx); if (res == NULL) { return -1; } Py_DECREF(res); return 0; } static inline int future_is_alive(FutureObj *fut) { return fut->fut_loop != NULL; } static inline int future_ensure_alive(FutureObj *fut) { if (!future_is_alive(fut)) { PyErr_SetString(PyExc_RuntimeError, "Future object is not initialized."); return -1; } return 0; } #define ENSURE_FUTURE_ALIVE(fut) \ do { \ assert(Future_Check(fut) || Task_Check(fut)); \ if (future_ensure_alive((FutureObj*)fut)) { \ return NULL; \ } \ } while(0); static int future_schedule_callbacks(FutureObj *fut) { Py_ssize_t len; Py_ssize_t i; if (fut->fut_callback0 != NULL) { /* There's a 1st callback */ int ret = call_soon( fut->fut_loop, fut->fut_callback0, (PyObject *)fut, fut->fut_context0); Py_CLEAR(fut->fut_callback0); Py_CLEAR(fut->fut_context0); if (ret) { /* If an error occurs in pure-Python implementation, all callbacks are cleared. */ Py_CLEAR(fut->fut_callbacks); return ret; } /* we called the first callback, now try calling callbacks from the 'fut_callbacks' list. */ } if (fut->fut_callbacks == NULL) { /* No more callbacks, return. */ return 0; } len = PyList_GET_SIZE(fut->fut_callbacks); if (len == 0) { /* The list of callbacks was empty; clear it and return. */ Py_CLEAR(fut->fut_callbacks); return 0; } for (i = 0; i < len; i++) { PyObject *cb_tup = PyList_GET_ITEM(fut->fut_callbacks, i); PyObject *cb = PyTuple_GET_ITEM(cb_tup, 0); PyObject *ctx = PyTuple_GET_ITEM(cb_tup, 1); if (call_soon(fut->fut_loop, cb, (PyObject *)fut, ctx)) { /* If an error occurs in pure-Python implementation, all callbacks are cleared. */ Py_CLEAR(fut->fut_callbacks); return -1; } } Py_CLEAR(fut->fut_callbacks); return 0; } static int future_init(FutureObj *fut, PyObject *loop) { PyObject *res; int is_true; // Same to FutureObj_clear() but not clearing fut->dict Py_CLEAR(fut->fut_loop); Py_CLEAR(fut->fut_callback0); Py_CLEAR(fut->fut_context0); Py_CLEAR(fut->fut_callbacks); Py_CLEAR(fut->fut_result); Py_CLEAR(fut->fut_exception); Py_CLEAR(fut->fut_source_tb); fut->fut_state = STATE_PENDING; fut->fut_log_tb = 0; fut->fut_blocking = 0; if (loop == Py_None) { loop = get_event_loop(); if (loop == NULL) { return -1; } } else { Py_INCREF(loop); } fut->fut_loop = loop; PyEventLoopDispatchTable *t = get_dispatch_table(Py_TYPE(loop)); if (t == NULL) { return -1; } res = t->invoke_get_debug(t, loop); if (res == NULL) { return -1; } is_true = PyObject_IsTrue(res); Py_DECREF(res); if (is_true < 0) { return -1; } if (is_true && !_Py_IsFinalizing()) { /* Only try to capture the traceback if the interpreter is not being finalized. The original motivation to add a `_Py_IsFinalizing()` call was to prevent SIGSEGV when a Future is created in a __del__ method, which is called during the interpreter shutdown and the traceback module is already unloaded. */ fut->fut_source_tb = _PyObject_CallNoArg(traceback_extract_stack); if (fut->fut_source_tb == NULL) { return -1; } } return 0; } static int future_set_result(FutureObj *fut, PyObject *res) { if (future_ensure_alive(fut)) { return -1; } if (fut->fut_state != STATE_PENDING) { PyErr_SetString(asyncio_InvalidStateError, "invalid state"); return -1; } assert(!fut->fut_result); Py_INCREF(res); fut->fut_result = res; fut->fut_state = STATE_FINISHED; if (future_schedule_callbacks(fut) == -1) { return -1; } return 0; } static int future_set_exception(FutureObj *fut, PyObject *exc) { PyObject *exc_val = NULL; if (fut->fut_state != STATE_PENDING) { PyErr_SetString(asyncio_InvalidStateError, "invalid state"); return -1; } if (PyExceptionClass_Check(exc)) { exc_val = _PyObject_CallNoArg(exc); if (exc_val == NULL) { return -1; } if (fut->fut_state != STATE_PENDING) { Py_DECREF(exc_val); PyErr_SetString(asyncio_InvalidStateError, "invalid state"); return -1; } } else { exc_val = exc; Py_INCREF(exc_val); } if (!PyExceptionInstance_Check(exc_val)) { Py_DECREF(exc_val); PyErr_SetString(PyExc_TypeError, "invalid exception object"); return -1; } if ((PyObject*)Py_TYPE(exc_val) == PyExc_StopIteration) { Py_DECREF(exc_val); PyErr_SetString(PyExc_TypeError, "StopIteration interacts badly with generators " "and cannot be raised into a Future"); return -1; } assert(!fut->fut_exception); fut->fut_exception = exc_val; fut->fut_state = STATE_FINISHED; if (future_schedule_callbacks(fut) == -1) { return -1; } fut->fut_log_tb = 1; return 0; } static int _future_raise_cancelled_error(FutureObj *fut) { PyObject *err; err = _PyObject_CallNoArg(asyncio_CancelledError); if (!err) { return -1; } PyErr_SetObject(asyncio_CancelledError, err); Py_DECREF(err); return 0; } static int future_get_result(FutureObj *fut, PyObject **result) { if (fut->fut_state == STATE_CANCELLED) { _future_raise_cancelled_error(fut); return -1; } if (fut->fut_state != STATE_FINISHED) { PyErr_SetString(asyncio_InvalidStateError, "Result is not set."); return -1; } fut->fut_log_tb = 0; if (fut->fut_exception != NULL) { Py_INCREF(fut->fut_exception); *result = fut->fut_exception; return 1; } Py_INCREF(fut->fut_result); *result = fut->fut_result; return 0; } static PyObject * future_add_done_callback(FutureObj *fut, PyObject *arg, PyObject *ctx) { if (!future_is_alive(fut)) { PyErr_SetString(PyExc_RuntimeError, "uninitialized Future object"); return NULL; } if (fut->fut_state != STATE_PENDING) { /* The future is done/cancelled, so schedule the callback right away. */ if (call_soon(fut->fut_loop, arg, (PyObject*) fut, ctx)) { return NULL; } } else { /* The future is pending, add a callback. Callbacks in the future object are stored as follows: callback0 -- a pointer to the first callback callbacks -- a list of 2nd, 3rd, ... callbacks Invariants: * callbacks != NULL: There are some callbacks in in the list. Just add the new callback to it. * callbacks == NULL and callback0 == NULL: This is the first callback. Set it to callback0. * callbacks == NULL and callback0 != NULL: This is a second callback. Initialize callbacks with a new list and add the new callback to it. */ if (fut->fut_callbacks == NULL && fut->fut_callback0 == NULL) { Py_INCREF(arg); fut->fut_callback0 = arg; Py_INCREF(ctx); fut->fut_context0 = ctx; } else { PyObject *tup = PyTuple_New(2); if (tup == NULL) { return NULL; } Py_INCREF(arg); PyTuple_SET_ITEM(tup, 0, arg); Py_INCREF(ctx); PyTuple_SET_ITEM(tup, 1, (PyObject *)ctx); if (fut->fut_callbacks != NULL) { int err = PyList_Append(fut->fut_callbacks, tup); if (err) { Py_DECREF(tup); return NULL; } Py_DECREF(tup); } else { fut->fut_callbacks = PyList_New(1); if (fut->fut_callbacks == NULL) { return NULL; } PyList_SET_ITEM(fut->fut_callbacks, 0, tup); /* borrow */ } } } Py_RETURN_NONE; } static int future_cancel_impl(FutureObj *fut, PyObject *Py_UNUSED(ignored)) { fut->fut_log_tb = 0; if (fut->fut_state != STATE_PENDING) { return 0; } fut->fut_state = STATE_CANCELLED; if (future_schedule_callbacks(fut) == -1) { return -1; } return 1; } /*[clinic input] _asyncio.Future.__init__ * loop: object = None This class is *almost* compatible with concurrent.futures.Future. Differences: - result() and exception() do not take a timeout argument and raise an exception when the future isn't done yet. - Callbacks registered with add_done_callback() are always called via the event loop's call_soon_threadsafe(). - This class is not compatible with the wait() and as_completed() methods in the concurrent.futures package. [clinic start generated code]*/ static int _asyncio_Future___init___impl(FutureObj *self, PyObject *loop) /*[clinic end generated code: output=9ed75799eaccb5d6 input=89af317082bc0bf8]*/ { return future_init(self, loop); } static int FutureObj_clear(FutureObj *fut) { Py_CLEAR(fut->fut_loop); Py_CLEAR(fut->fut_callback0); Py_CLEAR(fut->fut_context0); Py_CLEAR(fut->fut_callbacks); Py_CLEAR(fut->fut_result); Py_CLEAR(fut->fut_exception); Py_CLEAR(fut->fut_source_tb); Py_CLEAR(fut->dict); return 0; } static int FutureObj_traverse(FutureObj *fut, visitproc visit, void *arg) { Py_VISIT(fut->fut_loop); Py_VISIT(fut->fut_callback0); Py_VISIT(fut->fut_context0); Py_VISIT(fut->fut_callbacks); Py_VISIT(fut->fut_result); Py_VISIT(fut->fut_exception); Py_VISIT(fut->fut_source_tb); Py_VISIT(fut->dict); return 0; } /*[clinic input] _asyncio.Future.result Return the result this future represents. If the future has been cancelled, raises CancelledError. If the future's result isn't yet available, raises InvalidStateError. If the future is done and has an exception set, this exception is raised. [clinic start generated code]*/ static PyObject * _asyncio_Future_result_impl(FutureObj *self) /*[clinic end generated code: output=f35f940936a4b1e5 input=49ecf9cf5ec50dc5]*/ { PyObject *result; if (!future_is_alive(self)) { PyErr_SetString(asyncio_InvalidStateError, "Future object is not initialized."); return NULL; } int res = future_get_result(self, &result); if (res == -1) { return NULL; } if (res == 0) { return result; } assert(res == 1); PyErr_SetObject(PyExceptionInstance_Class(result), result); Py_DECREF(result); return NULL; } /*[clinic input] _asyncio.Future.exception Return the exception that was set on this future. The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError. If the future isn't done yet, raises InvalidStateError. [clinic start generated code]*/ static PyObject * _asyncio_Future_exception_impl(FutureObj *self) /*[clinic end generated code: output=88b20d4f855e0710 input=733547a70c841c68]*/ { if (!future_is_alive(self)) { PyErr_SetString(asyncio_InvalidStateError, "Future object is not initialized."); return NULL; } if (self->fut_state == STATE_CANCELLED) { PyErr_SetNone(asyncio_CancelledError); return NULL; } if (self->fut_state != STATE_FINISHED) { PyErr_SetString(asyncio_InvalidStateError, "Exception is not set."); return NULL; } if (self->fut_exception != NULL) { self->fut_log_tb = 0; Py_INCREF(self->fut_exception); return self->fut_exception; } Py_RETURN_NONE; } /*[clinic input] _asyncio.Future.set_result result: object / Mark the future done and set its result. If the future is already done when this method is called, raises InvalidStateError. [clinic start generated code]*/ static PyObject * _asyncio_Future_set_result(FutureObj *self, PyObject *result) /*[clinic end generated code: output=1ec2e6bcccd6f2ce input=8b75172c2a7b05f1]*/ { ENSURE_FUTURE_ALIVE(self) if (future_set_result(self, result) < 0) { return NULL; } Py_RETURN_NONE; } /*[clinic input] _asyncio.Future.set_exception exception: object / Mark the future done and set an exception. If the future is already done when this method is called, raises InvalidStateError. [clinic start generated code]*/ static PyObject * _asyncio_Future_set_exception(FutureObj *self, PyObject *exception) /*[clinic end generated code: output=f1c1b0cd321be360 input=e45b7d7aa71cc66d]*/ { ENSURE_FUTURE_ALIVE(self) if (future_set_exception(self, exception) < 0) { return NULL; } Py_RETURN_NONE; } /*[clinic input] _asyncio.Future.add_done_callback fn: object / * context: object = NULL Add a callback to be run when the future becomes done. The callback is called with a single argument - the future object. If the future is already done when this is called, the callback is scheduled with call_soon. [clinic start generated code]*/ static PyObject * _asyncio_Future_add_done_callback_impl(FutureObj *self, PyObject *fn, PyObject *context) /*[clinic end generated code: output=7ce635bbc9554c1e input=15ab0693a96e9533]*/ { if (context == NULL) { context = PyContext_CopyCurrent(); if (context == NULL) { return NULL; } PyObject *res = future_add_done_callback(self, fn, context); Py_DECREF(context); return res; } return future_add_done_callback(self, fn, context); } /*[clinic input] _asyncio.Future.remove_done_callback fn: object / Remove all instances of a callback from the "call when done" list. Returns the number of callbacks removed. [clinic start generated code]*/ static PyObject * _asyncio_Future_remove_done_callback(FutureObj *self, PyObject *fn) /*[clinic end generated code: output=5ab1fb52b24ef31f input=0a43280a149d505b]*/ { PyObject *newlist; Py_ssize_t len, i, j=0; Py_ssize_t cleared_callback0 = 0; ENSURE_FUTURE_ALIVE(self) if (self->fut_callback0 != NULL) { int cmp = PyObject_RichCompareBool(fn, self->fut_callback0, Py_EQ); if (cmp == -1) { return NULL; } if (cmp == 1) { /* callback0 == fn */ Py_CLEAR(self->fut_callback0); Py_CLEAR(self->fut_context0); cleared_callback0 = 1; } } if (self->fut_callbacks == NULL) { return PyLong_FromSsize_t(cleared_callback0); } len = PyList_GET_SIZE(self->fut_callbacks); if (len == 0) { Py_CLEAR(self->fut_callbacks); return PyLong_FromSsize_t(cleared_callback0); } if (len == 1) { PyObject *cb_tup = PyList_GET_ITEM(self->fut_callbacks, 0); int cmp = PyObject_RichCompareBool( fn, PyTuple_GET_ITEM(cb_tup, 0), Py_EQ); if (cmp == -1) { return NULL; } if (cmp == 1) { /* callbacks[0] == fn */ Py_CLEAR(self->fut_callbacks); return PyLong_FromSsize_t(1 + cleared_callback0); } /* callbacks[0] != fn and len(callbacks) == 1 */ return PyLong_FromSsize_t(cleared_callback0); } newlist = PyList_New(len); if (newlist == NULL) { return NULL; } for (i = 0; i < PyList_GET_SIZE(self->fut_callbacks); i++) { int ret; PyObject *item = PyList_GET_ITEM(self->fut_callbacks, i); Py_INCREF(item); ret = PyObject_RichCompareBool(fn, PyTuple_GET_ITEM(item, 0), Py_EQ); if (ret == 0) { if (j < len) { PyList_SET_ITEM(newlist, j, item); j++; continue; } ret = PyList_Append(newlist, item); } Py_DECREF(item); if (ret < 0) { goto fail; } } if (j == 0) { Py_CLEAR(self->fut_callbacks); Py_DECREF(newlist); return PyLong_FromSsize_t(len + cleared_callback0); } if (j < len) { Py_SIZE(newlist) = j; } j = PyList_GET_SIZE(newlist); len = PyList_GET_SIZE(self->fut_callbacks); if (j != len) { if (PyList_SetSlice(self->fut_callbacks, 0, len, newlist) < 0) { goto fail; } } Py_DECREF(newlist); return PyLong_FromSsize_t(len - j + cleared_callback0); fail: Py_DECREF(newlist); return NULL; } /*[clinic input] _asyncio.Future.cancel Cancel the future and schedule callbacks. If the future is already done or cancelled, return False. Otherwise, change the future's state to cancelled, schedule the callbacks and return True. [clinic start generated code]*/ static PyObject * _asyncio_Future_cancel_impl(FutureObj *self) /*[clinic end generated code: output=e45b932ba8bd68a1 input=515709a127995109]*/ { ENSURE_FUTURE_ALIVE(self) int ok = future_cancel_impl(self, NULL); if (ok < 0) { return 0; } if (ok) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } static int FutureObj_cancelled(FutureObj *self) { return future_is_alive(self) && self->fut_state == STATE_CANCELLED; } /*[clinic input] _asyncio.Future.cancelled Return True if the future was cancelled. [clinic start generated code]*/ static PyObject * _asyncio_Future_cancelled_impl(FutureObj *self) /*[clinic end generated code: output=145197ced586357d input=943ab8b7b7b17e45]*/ { if (FutureObj_cancelled(self)) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } static inline int future_done(FutureObj *self) { if (!future_is_alive(self) || self->fut_state == STATE_PENDING) { return 0; } return 1; } /*[clinic input] _asyncio.Future.done Return True if the future is done. Done means either that a result / exception are available, or that the future was cancelled. [clinic start generated code]*/ static PyObject * _asyncio_Future_done_impl(FutureObj *self) /*[clinic end generated code: output=244c5ac351145096 input=28d7b23fdb65d2ac]*/ { if (future_done(self)) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } /*[clinic input] _asyncio.Future.get_loop Return the event loop the Future is bound to. [clinic start generated code]*/ static PyObject * _asyncio_Future_get_loop_impl(FutureObj *self) /*[clinic end generated code: output=119b6ea0c9816c3f input=cba48c2136c79d1f]*/ { ENSURE_FUTURE_ALIVE(self) Py_INCREF(self->fut_loop); return self->fut_loop; } static fut_blocking_state future_get_blocking_impl(FutureObj *fut) { if (future_is_alive(fut) && fut->fut_blocking) { return BLOCKING_TRUE; } else { return BLOCKING_FALSE; } } static PyObject * FutureObj_get_blocking(FutureObj *fut, void *Py_UNUSED(ignored)) { if (future_get_blocking_impl(fut) == BLOCKING_TRUE) { Py_RETURN_TRUE; } else { Py_RETURN_FALSE; } } static int future_set_blocking_impl(FutureObj* fut, int val) { if (future_ensure_alive(fut)) { return -1; } fut->fut_blocking = val; return 0; } static int FutureObj_set_blocking(FutureObj *fut, PyObject *val, void *Py_UNUSED(ignored)) { if (val == NULL) { PyErr_SetString(PyExc_AttributeError, "cannot delete attribute"); return -1; } int is_true = PyObject_IsTrue(val); if (is_true < 0) { return -1; } return future_set_blocking_impl(fut, is_true); } static PyObject * FutureObj_get_log_traceback(FutureObj *fut, void *Py_UNUSED(ignored)) { ENSURE_FUTURE_ALIVE(fut) if (fut->fut_log_tb) { Py_RETURN_TRUE; } else { Py_RETURN_FALSE; } } static int FutureObj_set_log_traceback(FutureObj *fut, PyObject *val, void *Py_UNUSED(ignored)) { if (val == NULL) { PyErr_SetString(PyExc_AttributeError, "cannot delete attribute"); return -1; } int is_true = PyObject_IsTrue(val); if (is_true < 0) { return -1; } if (is_true) { PyErr_SetString(PyExc_ValueError, "_log_traceback can only be set to False"); return -1; } fut->fut_log_tb = is_true; return 0; } static PyObject * FutureObj_get_loop(FutureObj *fut, void *Py_UNUSED(ignored)) { if (!future_is_alive(fut)) { Py_RETURN_NONE; } Py_INCREF(fut->fut_loop); return fut->fut_loop; } static PyObject * FutureObj_get_callbacks(FutureObj *fut, void *Py_UNUSED(ignored)) { Py_ssize_t i; ENSURE_FUTURE_ALIVE(fut) if (fut->fut_callback0 == NULL) { if (fut->fut_callbacks == NULL) { Py_RETURN_NONE; } Py_INCREF(fut->fut_callbacks); return fut->fut_callbacks; } Py_ssize_t len = 1; if (fut->fut_callbacks != NULL) { len += PyList_GET_SIZE(fut->fut_callbacks); } PyObject *new_list = PyList_New(len); if (new_list == NULL) { return NULL; } PyObject *tup0 = PyTuple_New(2); if (tup0 == NULL) { Py_DECREF(new_list); return NULL; } Py_INCREF(fut->fut_callback0); PyTuple_SET_ITEM(tup0, 0, fut->fut_callback0); assert(fut->fut_context0 != NULL); Py_INCREF(fut->fut_context0); PyTuple_SET_ITEM(tup0, 1, (PyObject *)fut->fut_context0); PyList_SET_ITEM(new_list, 0, tup0); if (fut->fut_callbacks != NULL) { for (i = 0; i < PyList_GET_SIZE(fut->fut_callbacks); i++) { PyObject *cb = PyList_GET_ITEM(fut->fut_callbacks, i); Py_INCREF(cb); PyList_SET_ITEM(new_list, i + 1, cb); } } return new_list; } static PyObject * FutureObj_get_result(FutureObj *fut, void *Py_UNUSED(ignored)) { ENSURE_FUTURE_ALIVE(fut) if (fut->fut_result == NULL) { Py_RETURN_NONE; } Py_INCREF(fut->fut_result); return fut->fut_result; } static PyObject * FutureObj_get_exception(FutureObj *fut, void *Py_UNUSED(ignored)) { ENSURE_FUTURE_ALIVE(fut) if (fut->fut_exception == NULL) { Py_RETURN_NONE; } Py_INCREF(fut->fut_exception); return fut->fut_exception; } static PyObject * FutureObj_get_source_traceback(FutureObj *fut, void *Py_UNUSED(ignored)) { if (!future_is_alive(fut) || fut->fut_source_tb == NULL) { Py_RETURN_NONE; } Py_INCREF(fut->fut_source_tb); return fut->fut_source_tb; } static PyObject * FutureObj_get_state(FutureObj *fut, void *Py_UNUSED(ignored)) { _Py_IDENTIFIER(PENDING); _Py_IDENTIFIER(CANCELLED); _Py_IDENTIFIER(FINISHED); PyObject *ret = NULL; ENSURE_FUTURE_ALIVE(fut) switch (fut->fut_state) { case STATE_PENDING: ret = _PyUnicode_FromId(&PyId_PENDING); break; case STATE_CANCELLED: ret = _PyUnicode_FromId(&PyId_CANCELLED); break; case STATE_FINISHED: ret = _PyUnicode_FromId(&PyId_FINISHED); break; default: assert (0); } Py_XINCREF(ret); return ret; } /*[clinic input] _asyncio.Future._repr_info [clinic start generated code]*/ static PyObject * _asyncio_Future__repr_info_impl(FutureObj *self) /*[clinic end generated code: output=fa69e901bd176cfb input=f21504d8e2ae1ca2]*/ { return PyObject_CallFunctionObjArgs( asyncio_future_repr_info_func, self, NULL); } static PyObject * FutureObj_repr(FutureObj *fut) { _Py_IDENTIFIER(_repr_info); ENSURE_FUTURE_ALIVE(fut) PyObject *rinfo = _PyObject_CallMethodIdObjArgs((PyObject*)fut, &PyId__repr_info, NULL); if (rinfo == NULL) { return NULL; } PyObject *rinfo_s = PyUnicode_Join(NULL, rinfo); Py_DECREF(rinfo); if (rinfo_s == NULL) { return NULL; } PyObject *rstr = PyUnicode_FromFormat("<%s %U>", _PyType_Name(Py_TYPE(fut)), rinfo_s); Py_DECREF(rinfo_s); return rstr; } static void FutureObj_finalize(FutureObj *fut) { _Py_IDENTIFIER(call_exception_handler); _Py_IDENTIFIER(message); _Py_IDENTIFIER(exception); _Py_IDENTIFIER(future); _Py_IDENTIFIER(source_traceback); PyObject *error_type, *error_value, *error_traceback; PyObject *context; PyObject *message = NULL; PyObject *func; if (!fut->fut_log_tb) { return; } assert(fut->fut_exception != NULL); fut->fut_log_tb = 0; /* Save the current exception, if any. */ PyErr_Fetch(&error_type, &error_value, &error_traceback); context = PyDict_New(); if (context == NULL) { goto finally; } message = PyUnicode_FromFormat( "%s exception was never retrieved", _PyType_Name(Py_TYPE(fut))); if (message == NULL) { goto finally; } if (_PyDict_SetItemId(context, &PyId_message, message) < 0 || _PyDict_SetItemId(context, &PyId_exception, fut->fut_exception) < 0 || _PyDict_SetItemId(context, &PyId_future, (PyObject*)fut) < 0) { goto finally; } if (fut->fut_source_tb != NULL) { if (_PyDict_SetItemId(context, &PyId_source_traceback, fut->fut_source_tb) < 0) { goto finally; } } func = _PyObject_GetAttrId(fut->fut_loop, &PyId_call_exception_handler); if (func != NULL) { PyObject *res = PyObject_CallFunctionObjArgs(func, context, NULL); if (res == NULL) { PyErr_WriteUnraisable(func); } else { Py_DECREF(res); } Py_DECREF(func); } finally: Py_XDECREF(context); Py_XDECREF(message); /* Restore the saved exception. */ PyErr_Restore(error_type, error_value, error_traceback); } static PyAsyncMethods FutureType_as_async = { (unaryfunc)future_new_iter, /* am_await */ 0, /* am_aiter */ 0 /* am_anext */ }; static PyMethodDef FutureType_methods[] = { _ASYNCIO_FUTURE_RESULT_METHODDEF _ASYNCIO_FUTURE_EXCEPTION_METHODDEF _ASYNCIO_FUTURE_SET_RESULT_METHODDEF _ASYNCIO_FUTURE_SET_EXCEPTION_METHODDEF _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF _ASYNCIO_FUTURE_CANCEL_METHODDEF _ASYNCIO_FUTURE_CANCELLED_METHODDEF _ASYNCIO_FUTURE_DONE_METHODDEF _ASYNCIO_FUTURE_GET_LOOP_METHODDEF _ASYNCIO_FUTURE__REPR_INFO_METHODDEF {NULL, NULL} /* Sentinel */ }; // clang-format off #define FUTURE_COMMON_GETSETLIST \ {"_state", (getter)FutureObj_get_state, NULL, NULL}, \ {"_asyncio_future_blocking", (getter)FutureObj_get_blocking, \ (setter)FutureObj_set_blocking, NULL}, \ {"_loop", (getter)FutureObj_get_loop, NULL, NULL}, \ {"_callbacks", (getter)FutureObj_get_callbacks, NULL, NULL}, \ {"_result", (getter)FutureObj_get_result, NULL, NULL}, \ {"_exception", (getter)FutureObj_get_exception, NULL, NULL}, \ {"_log_traceback", (getter)FutureObj_get_log_traceback, \ (setter)FutureObj_set_log_traceback, NULL}, \ {"_source_traceback", (getter)FutureObj_get_source_traceback, NULL, NULL}, // clang-format on static PyGetSetDef FutureType_getsetlist[] = { FUTURE_COMMON_GETSETLIST {NULL} /* Sentinel */ }; static void FutureObj_dealloc(PyObject *self); static PyTypeObject FutureType = { PyVarObject_HEAD_INIT(NULL, 0) "_asyncio.Future", sizeof(FutureObj), /* tp_basicsize */ .tp_dealloc = FutureObj_dealloc, .tp_as_async = &FutureType_as_async, .tp_repr = (reprfunc)FutureObj_repr, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE, .tp_doc = _asyncio_Future___init____doc__, .tp_traverse = (traverseproc)FutureObj_traverse, .tp_clear = (inquiry)FutureObj_clear, .tp_weaklistoffset = offsetof(FutureObj, fut_weakreflist), .tp_iter = (getiterfunc)future_new_iter, .tp_methods = FutureType_methods, .tp_getset = FutureType_getsetlist, .tp_dictoffset = offsetof(FutureObj, dict), .tp_init = (initproc)_asyncio_Future___init__, .tp_new = PyType_GenericNew, .tp_finalize = (destructor)FutureObj_finalize, }; static void FutureObj_dealloc(PyObject *self) { FutureObj *fut = (FutureObj *)self; if (Future_CheckExact(fut)) { /* When fut is subclass of Future, finalizer is called from * subtype_dealloc. */ if (PyObject_CallFinalizerFromDealloc(self) < 0) { // resurrected. return; } } PyObject_GC_UnTrack(self); if (fut->fut_weakreflist != NULL) { PyObject_ClearWeakRefs(self); } (void)FutureObj_clear(fut); Py_TYPE(fut)->tp_free(fut); } /*********************** Future Iterator **************************/ typedef struct { PyObject_HEAD FutureObj *future; } futureiterobject; #define FI_FREELIST_MAXLEN 255 static futureiterobject *fi_freelist = NULL; static Py_ssize_t fi_freelist_len = 0; static void FutureIter_dealloc(futureiterobject *it) { PyObject_GC_UnTrack(it); Py_CLEAR(it->future); if (fi_freelist_len < FI_FREELIST_MAXLEN) { fi_freelist_len++; it->future = (FutureObj*) fi_freelist; fi_freelist = it; } else { PyObject_GC_Del(it); } } static PySendResult FutureIter_itersend(PyThreadState* Py_UNUSED(tstate), futureiterobject *it, PyObject *Py_UNUSED(sentValue), PyObject **pResult) { *pResult = NULL; PyObject *res; FutureObj *fut = it->future; if (fut == NULL) { return PYGEN_ERROR; } if (fut->fut_state == STATE_PENDING) { if (!fut->fut_blocking) { fut->fut_blocking = 1; Py_INCREF(fut); *pResult = (PyObject*)fut; return PYGEN_NEXT; } PyErr_SetString(PyExc_RuntimeError, "await wasn't used with future"); return PYGEN_ERROR; } it->future = NULL; res = _asyncio_Future_result_impl(fut); PySendResult gen_status = PYGEN_ERROR; if (res != NULL) { /* facebook: USDT for latency profiler */ FOLLY_SDT(python, future_iter_resume, PyThreadState_GET()->frame); *pResult = res; gen_status = PYGEN_RETURN; } Py_DECREF(fut); return gen_status; } static void FutureIter_setawaiter(futureiterobject *it, PyObject *awaiter) { if (it->future != NULL) { _PyAwaitable_SetAwaiter((PyObject *)it->future, awaiter); } } static inline PyObject* gen_status_to_iter(PySendResult gen_status, PyObject *result) { if (gen_status == PYGEN_ERROR || gen_status == PYGEN_NEXT) { return result; } assert(gen_status == PYGEN_RETURN); _PyGen_SetStopIterationValue(result); Py_DECREF(result); return NULL; } static PyObject * FutureIter_iternext(futureiterobject *it) { PyObject *result = NULL; PySendResult gen_status = FutureIter_itersend(NULL, it, NULL, &result); return gen_status_to_iter(gen_status, result); } static PyObject * FutureIter_send(futureiterobject *self, PyObject *unused) { /* Future.__iter__ doesn't care about values that are pushed to the * generator, it just returns self.result(). */ return FutureIter_iternext(self); } static PyObject * FutureIter_throw(futureiterobject *self, PyObject *args) { PyObject *type, *val = NULL, *tb = NULL; if (!PyArg_ParseTuple(args, "O|OO", &type, &val, &tb)) return NULL; if (val == Py_None) { val = NULL; } if (tb == Py_None) { tb = NULL; } else if (tb != NULL && !PyTraceBack_Check(tb)) { PyErr_SetString(PyExc_TypeError, "throw() third argument must be a traceback"); return NULL; } Py_INCREF(type); Py_XINCREF(val); Py_XINCREF(tb); if (PyExceptionClass_Check(type)) { PyErr_NormalizeException(&type, &val, &tb); /* No need to call PyException_SetTraceback since we'll be calling PyErr_Restore for `type`, `val`, and `tb`. */ } else if (PyExceptionInstance_Check(type)) { if (val) { PyErr_SetString(PyExc_TypeError, "instance exception may not have a separate value"); goto fail; } val = type; type = PyExceptionInstance_Class(type); Py_INCREF(type); if (tb == NULL) tb = PyException_GetTraceback(val); } else { PyErr_SetString(PyExc_TypeError, "exceptions must be classes deriving BaseException or " "instances of such a class"); goto fail; } Py_CLEAR(self->future); PyErr_Restore(type, val, tb); return NULL; fail: Py_DECREF(type); Py_XDECREF(val); Py_XDECREF(tb); return NULL; } static PyObject * FutureIter_close(futureiterobject *self, PyObject *arg) { Py_CLEAR(self->future); Py_RETURN_NONE; } static int FutureIter_traverse(futureiterobject *it, visitproc visit, void *arg) { Py_VISIT(it->future); return 0; } static PyMethodDef FutureIter_methods[] = { {"send", (PyCFunction)FutureIter_send, METH_O, NULL}, {"throw", (PyCFunction)FutureIter_throw, METH_VARARGS, NULL}, {"close", (PyCFunction)FutureIter_close, METH_NOARGS, NULL}, {NULL, NULL} /* Sentinel */ }; static PyAsyncMethodsWithExtra FutureIterType_as_async = { .ame_async_methods = { 0, /* am_await */ 0, /* am_aiter */ 0, /* am_anext */ }, .ame_send = (sendfunc)FutureIter_itersend, .ame_setawaiter = (setawaiterfunc)FutureIter_setawaiter, }; static PyTypeObject FutureIterType = { PyVarObject_HEAD_INIT(NULL, 0) "_asyncio.FutureIter", .tp_basicsize = sizeof(futureiterobject), .tp_itemsize = 0, .tp_dealloc = (destructor)FutureIter_dealloc, .tp_getattro = PyObject_GenericGetAttr, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_HAVE_AM_EXTRA, .tp_traverse = (traverseproc)FutureIter_traverse, .tp_iter = PyObject_SelfIter, .tp_iternext = (iternextfunc)FutureIter_iternext, .tp_methods = FutureIter_methods, .tp_as_async = (PyAsyncMethods*)&FutureIterType_as_async, }; static PyObject * future_new_iter(PyObject *fut) { futureiterobject *it; if (!PyObject_TypeCheck(fut, &FutureType)) { PyErr_BadInternalCall(); return NULL; } ENSURE_FUTURE_ALIVE(fut) if (fi_freelist_len) { fi_freelist_len--; it = fi_freelist; fi_freelist = (futureiterobject*) it->future; it->future = NULL; _Py_NewReference((PyObject*) it); } else { it = PyObject_GC_New(futureiterobject, (PyTypeObject *)&FutureIterType); if (it == NULL) { return NULL; } } Py_INCREF(fut); it->future = (FutureObj*)fut; PyObject_GC_Track(it); return (PyObject*)it; } /*********************** Task **************************/ /*[clinic input] class _asyncio.Task "TaskObj *" "&Task_Type" [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=719dcef0fcc03b37]*/ static int task_call_step_soon(TaskObj *, PyObject *); static PyObject * task_wakeup(TaskObj *, PyObject *); static PyObject * task_step(TaskObj *, PyObject *); /* ----- Task._step wrapper */ static int TaskStepMethWrapper_clear(TaskStepMethWrapper *o) { Py_CLEAR(o->sw_task); Py_CLEAR(o->sw_arg); return 0; } static void TaskStepMethWrapper_dealloc(TaskStepMethWrapper *o) { PyObject_GC_UnTrack(o); (void)TaskStepMethWrapper_clear(o); Py_TYPE(o)->tp_free(o); } static inline PyObject * task_call_step(TaskObj *task, PyObject *arg) { if (Task_CheckExact(task)) { return task_step(task, arg); } else { PyMethodTableRef *tableref = get_or_create_method_table(Py_TYPE(task)); if (tableref == NULL) { return NULL; } return tableref->step_thunk((PyObject*)task, arg, tableref->step); } } static PyObject * TaskStepMethWrapper_call(TaskStepMethWrapper *o, PyObject *args, PyObject *kwds) { if (kwds != NULL && PyDict_GET_SIZE(kwds) != 0) { PyErr_SetString(PyExc_TypeError, "function takes no keyword arguments"); return NULL; } if (args != NULL && PyTuple_GET_SIZE(args) != 0) { PyErr_SetString(PyExc_TypeError, "function takes no positional arguments"); return NULL; } return task_call_step(o->sw_task, o->sw_arg); } static int TaskStepMethWrapper_traverse(TaskStepMethWrapper *o, visitproc visit, void *arg) { Py_VISIT(o->sw_task); Py_VISIT(o->sw_arg); return 0; } static PyObject * TaskStepMethWrapper_get___self__(TaskStepMethWrapper *o, void *Py_UNUSED(ignored)) { if (o->sw_task) { Py_INCREF(o->sw_task); return (PyObject*)o->sw_task; } Py_RETURN_NONE; } static PyGetSetDef TaskStepMethWrapper_getsetlist[] = { {"__self__", (getter)TaskStepMethWrapper_get___self__, NULL, NULL}, {NULL} /* Sentinel */ }; static PyTypeObject TaskStepMethWrapper_Type = { PyVarObject_HEAD_INIT(NULL, 0) "TaskStepMethWrapper", .tp_basicsize = sizeof(TaskStepMethWrapper), .tp_itemsize = 0, .tp_getset = TaskStepMethWrapper_getsetlist, .tp_dealloc = (destructor)TaskStepMethWrapper_dealloc, .tp_call = (ternaryfunc)TaskStepMethWrapper_call, .tp_getattro = PyObject_GenericGetAttr, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, .tp_traverse = (traverseproc)TaskStepMethWrapper_traverse, .tp_clear = (inquiry)TaskStepMethWrapper_clear, }; static PyObject * TaskStepMethWrapper_new(TaskObj *task, PyObject *arg) { TaskStepMethWrapper *o; o = PyObject_GC_New(TaskStepMethWrapper, &TaskStepMethWrapper_Type); if (o == NULL) { return NULL; } Py_INCREF(task); o->sw_task = task; Py_XINCREF(arg); o->sw_arg = arg; PyObject_GC_Track(o); return (PyObject*) o; } /* ----- Task._wakeup wrapper */ static PyObject * wakeup_meth_call(TaskObj *task, PyObject *fut) { return task_wakeup(task, fut); } PyMethodDef _TaskWakeupMethod = { "task_wakeup_method", (PyCFunction)wakeup_meth_call, METH_O, NULL }; /* ----- Task introspection helpers */ static int register_asyncio_task(TaskObj *task) { if ((PyObject*)task == all_asyncio_tasks || task->prev != NULL || task->next != NULL) { // register is idempotent return 0; } task->prev = task->next = NULL; if (all_asyncio_tasks == NULL) { all_asyncio_tasks = (PyObject*) task; } else { task->next = all_asyncio_tasks; ((TaskObj*)all_asyncio_tasks)->prev = (PyObject*)task; all_asyncio_tasks = (PyObject*)task; } return 0; } static int unregister_asyncio_task(TaskObj *task) { if ((PyObject*)task != all_asyncio_tasks && task->prev == NULL && task->next == NULL) { // unregister is idempotent return 0; } if (task->prev) { TaskObj* prev = (TaskObj*)task->prev; prev->next = task->next; if (task->next) { ((TaskObj*)task->next)->prev = (PyObject*)prev; } } else { all_asyncio_tasks = task->next; if (all_asyncio_tasks) { ((TaskObj*)all_asyncio_tasks)->prev = NULL; } } task->prev = task->next = NULL; return 0; } static int register_task(PyObject *task) { _Py_IDENTIFIER(add); int is_true = PyObject_IsInstance(task, (PyObject*)&TaskType); if (is_true < 0) { return -1; } if (is_true) { return register_asyncio_task((TaskObj*)task); } else { PyObject *res = _PyObject_CallMethodIdObjArgs( all_non_asyncio_tasks, &PyId_add, task, NULL); if (res == NULL) { return -1; } Py_DECREF(res); return 0; } } static int unregister_task(PyObject *task) { _Py_IDENTIFIER(discard); int is_true = PyObject_IsInstance(task, (PyObject*)&TaskType); if (is_true < 0) { return -1; } if (is_true) { return unregister_asyncio_task((TaskObj*)task); } else { PyObject *res = _PyObject_CallMethodIdObjArgs( all_non_asyncio_tasks, &PyId_discard, task, NULL); if (res == NULL) { return -1; } Py_DECREF(res); return 0; } } static int enter_task(PyObject *loop, PyObject *task) { PyObject *item; Py_hash_t hash; hash = PyObject_Hash(loop); if (hash == -1) { return -1; } item = _PyDict_GetItem_KnownHash(current_tasks, loop, hash); if (item != NULL) { Py_INCREF(item); PyErr_Format( PyExc_RuntimeError, "Cannot enter into task %R while another " \ "task %R is being executed.", task, item, NULL); Py_DECREF(item); return -1; } if (PyErr_Occurred()) { return -1; } return _PyDict_SetItem_KnownHash(current_tasks, loop, task, hash); } static int leave_task(PyObject *loop, PyObject *task) /*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/ { PyObject *item; Py_hash_t hash; hash = PyObject_Hash(loop); if (hash == -1) { return -1; } item = _PyDict_GetItem_KnownHash(current_tasks, loop, hash); if (item != task) { if (item == NULL) { /* Not entered, replace with None */ item = Py_None; } PyErr_Format( PyExc_RuntimeError, "Leaving task %R does not match the current task %R.", task, item, NULL); return -1; } return _PyDict_DelItem_KnownHash(current_tasks, loop, hash); } static inline int _is_coro_suspended(PyObject *coro) { if ((PyCoro_CheckExact(coro) || PyGen_CheckExact(coro)) && _PyGen_IsSuspended((PyGenObject *)coro)) { // do not automatically schedule suspended coroutines return 1; } _SuspendedCoroNode *n = suspended_coroutines; while (n != NULL) { if (coro == n->coro) { return 1; } n = n->prev; } return 0; } /* ----- Task */ /*[clinic input] _asyncio.Task.__init__ coro: object * loop: object = None name: object = None A coroutine wrapped in a Future. [clinic start generated code]*/ static int _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, PyObject *name) /*[clinic end generated code: output=88b12b83d570df50 input=352a3137fe60091d]*/ { if (future_init((FutureObj*)self, loop)) { return -1; } int is_coro = is_coroutine(coro); if (is_coro == -1) { return -1; } if (is_coro == 0) { self->task_log_destroy_pending = 0; PyErr_Format(PyExc_TypeError, "a coroutine was expected, got %R", coro, NULL); return -1; } Py_XSETREF(self->task_context, PyContext_CopyCurrent()); if (self->task_context == NULL) { return -1; } Py_CLEAR(self->task_fut_waiter); self->task_must_cancel = 0; self->task_log_destroy_pending = 1; Py_INCREF(coro); Py_XSETREF(self->task_coro, coro); uintptr_t name_or_counter; if (name == Py_None) { // name is not specified - save counter value name_or_counter = tag_counter(++task_name_counter); } else if (!PyUnicode_CheckExact(name)) { name = PyObject_Str(name); if (name == NULL) { return -1; } name_or_counter = (uintptr_t)name; } else { if (name == NULL) { return -1; } Py_INCREF(name); name_or_counter = (uintptr_t)name; } uintptr_t prev = self->task_name_or_counter; self->task_name_or_counter = name_or_counter; if (is_string_name(prev)) { Py_DECREF((PyObject*)prev); } if (_is_coro_suspended(coro) == 0) { if (task_call_step_soon(self, NULL)) { return -1; } } return register_asyncio_task(self); } static int TaskObj_clear(TaskObj *task) { (void)FutureObj_clear((FutureObj*) task); Py_CLEAR(task->task_context); Py_CLEAR(task->task_coro); if (is_string_name(task->task_name_or_counter)) { Py_DECREF((PyObject *)task->task_name_or_counter); task->task_name_or_counter = 0; } Py_CLEAR(task->task_fut_waiter); return 0; } static int TaskObj_traverse(TaskObj *task, visitproc visit, void *arg) { Py_VISIT(task->task_context); Py_VISIT(task->task_coro); Py_VISIT(task->task_fut_waiter); (void)FutureObj_traverse((FutureObj*) task, visit, arg); return 0; } static PyObject * TaskObj_get_log_destroy_pending(TaskObj *task, void *Py_UNUSED(ignored)) { if (task->task_log_destroy_pending) { Py_RETURN_TRUE; } else { Py_RETURN_FALSE; } } static int TaskObj_set_log_destroy_pending(TaskObj *task, PyObject *val, void *Py_UNUSED(ignored)) { if (val == NULL) { PyErr_SetString(PyExc_AttributeError, "cannot delete attribute"); return -1; } int is_true = PyObject_IsTrue(val); if (is_true < 0) { return -1; } task->task_log_destroy_pending = is_true; return 0; } static PyObject * TaskObj_get_must_cancel(TaskObj *task, void *Py_UNUSED(ignored)) { if (task->task_must_cancel) { Py_RETURN_TRUE; } else { Py_RETURN_FALSE; } } static PyObject * TaskObj_get_coro(TaskObj *task, void *Py_UNUSED(ignored)) { if (task->task_coro) { Py_INCREF(task->task_coro); return task->task_coro; } Py_RETURN_NONE; } static PyObject * TaskObj_get_fut_waiter(TaskObj *task, void *Py_UNUSED(ignored)) { if (task->task_fut_waiter) { Py_INCREF(task->task_fut_waiter); return task->task_fut_waiter; } Py_RETURN_NONE; } /*[clinic input] @classmethod _asyncio.Task.current_task loop: object = None Return the currently running task in an event loop or None. By default the current task for the current event loop is returned. None is returned when called not in the context of a Task. [clinic start generated code]*/ static PyObject * _asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop) /*[clinic end generated code: output=99fbe7332c516e03 input=cd14770c5b79c7eb]*/ { PyObject *ret; PyObject *current_task_func; if (PyErr_WarnEx(PyExc_DeprecationWarning, "Task.current_task() is deprecated, " \ "use asyncio.current_task() instead", 1) < 0) { return NULL; } current_task_func = _PyObject_GetAttrId(asyncio_mod, &PyId_current_task); if (current_task_func == NULL) { return NULL; } if (loop == Py_None) { loop = get_event_loop(); if (loop == NULL) { Py_DECREF(current_task_func); return NULL; } ret = PyObject_CallFunctionObjArgs(current_task_func, loop, NULL); Py_DECREF(current_task_func); Py_DECREF(loop); return ret; } else { ret = PyObject_CallFunctionObjArgs(current_task_func, loop, NULL); Py_DECREF(current_task_func); return ret; } } /*[clinic input] @classmethod _asyncio.Task.all_tasks loop: object = None Return a set of all tasks for an event loop. By default all tasks for the current event loop are returned. [clinic start generated code]*/ static PyObject * _asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop) /*[clinic end generated code: output=11f9b20749ccca5d input=497f80bc9ce726b5]*/ { if (PyErr_WarnEx(PyExc_DeprecationWarning, "Task.all_tasks() is deprecated, " \ "use asyncio.all_tasks() instead", 1) < 0) { return NULL; } if (loop != Py_None) { return _all_tasks(loop, 0); } else { PyObject *current_loop = get_event_loop(); if (current_loop == NULL) { return NULL; } PyObject *res = _all_tasks(current_loop, 0); Py_DECREF(current_loop); return res; } } /*[clinic input] _asyncio.Task._repr_info [clinic start generated code]*/ static PyObject * _asyncio_Task__repr_info_impl(TaskObj *self) /*[clinic end generated code: output=6a490eb66d5ba34b input=3c6d051ed3ddec8b]*/ { return PyObject_CallFunctionObjArgs( asyncio_task_repr_info_func, self, NULL); } static int task_cancel_impl(TaskObj *self, PyObject *Py_UNUSED(ignored)) { self->task_log_tb = 0; if (self->task_state != STATE_PENDING) { return 0; } if (self->task_fut_waiter) { int is_true; PyMethodTableRef *tableref = get_or_create_method_table(Py_TYPE(self->task_fut_waiter)); if (tableref == NULL) { return -1; } is_true = tableref->cancel(self->task_fut_waiter, NULL); if (is_true < 0) { return -1; } if (is_true) { return 1; } } self->task_must_cancel = 1; return 1; } /*[clinic input] _asyncio.Task.cancel Request that this task cancel itself. This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop. The coroutine then has a chance to clean up or even deny the request using try/except/finally. Unlike Future.cancel, this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception. Immediately after this method is called, Task.cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called). [clinic start generated code]*/ static PyObject * _asyncio_Task_cancel_impl(TaskObj *self) /*[clinic end generated code: output=6bfc0479da9d5757 input=13f9bf496695cb52]*/ { int ok = task_cancel_impl(self, NULL); if (ok == -1) { return NULL; } if (ok) { Py_RETURN_TRUE; } else { Py_RETURN_FALSE; } } /*[clinic input] _asyncio.Task.get_stack * limit: object = None Return the list of stack frames for this task's coroutine. If the coroutine is not done, this returns the stack where it is suspended. If the coroutine has completed successfully or was cancelled, this returns an empty list. If the coroutine was terminated by an exception, this returns the list of traceback frames. The frames are always ordered from oldest to newest. The optional limit gives the maximum number of frames to return; by default all available frames are returned. Its meaning differs depending on whether a stack or a traceback is returned: the newest frames of a stack are returned, but the oldest frames of a traceback are returned. (This matches the behavior of the traceback module.) For reasons beyond our control, only one stack frame is returned for a suspended coroutine. [clinic start generated code]*/ static PyObject * _asyncio_Task_get_stack_impl(TaskObj *self, PyObject *limit) /*[clinic end generated code: output=c9aeeeebd1e18118 input=05b323d42b809b90]*/ { return PyObject_CallFunctionObjArgs( asyncio_task_get_stack_func, self, limit, NULL); } /*[clinic input] _asyncio.Task.print_stack * limit: object = None file: object = None Print the stack or traceback for this task's coroutine. This produces output similar to that of the traceback module, for the frames retrieved by get_stack(). The limit argument is passed to get_stack(). The file argument is an I/O stream to which the output is written; by default output is written to sys.stderr. [clinic start generated code]*/ static PyObject * _asyncio_Task_print_stack_impl(TaskObj *self, PyObject *limit, PyObject *file) /*[clinic end generated code: output=7339e10314cd3f4d input=1a0352913b7fcd92]*/ { return PyObject_CallFunctionObjArgs( asyncio_task_print_stack_func, self, limit, file, NULL); } /*[clinic input] _asyncio.Task.set_result result: object / [clinic start generated code]*/ static PyObject * _asyncio_Task_set_result(TaskObj *self, PyObject *result) /*[clinic end generated code: output=1dcae308bfcba318 input=9d1a00c07be41bab]*/ { PyErr_SetString(PyExc_RuntimeError, "Task does not support set_result operation"); return NULL; } /*[clinic input] _asyncio.Task.set_exception exception: object / [clinic start generated code]*/ static PyObject * _asyncio_Task_set_exception(TaskObj *self, PyObject *exception) /*[clinic end generated code: output=bc377fc28067303d input=9a8f65c83dcf893a]*/ { PyErr_SetString(PyExc_RuntimeError, "Task does not support set_exception operation"); return NULL; } /*[clinic input] _asyncio.Task._step exc: object = None [clinic start generated code]*/ static PyObject * _asyncio_Task__step_impl(TaskObj *self, PyObject *exc) /*[clinic end generated code: output=7ed23f0cefd5ae42 input=1e19a985ace87ca4]*/ { return task_step(self, exc == Py_None ? NULL : exc); } /*[clinic input] _asyncio.Task.get_coro [clinic start generated code]*/ static PyObject * _asyncio_Task_get_coro_impl(TaskObj *self) /*[clinic end generated code: output=bcac27c8cc6c8073 input=d2e8606c42a7b403]*/ { Py_INCREF(self->task_coro); return self->task_coro; } /*[clinic input] _asyncio.Task.get_name [clinic start generated code]*/ static PyObject * _asyncio_Task_get_name_impl(TaskObj *self) /*[clinic end generated code: output=0ecf1570c3b37a8f input=a4a6595d12f4f0f8]*/ { if (self->task_name_or_counter == 0) { Py_RETURN_NONE; } if (!is_string_name(self->task_name_or_counter)) { PyObject *name = counter_to_task_name(self->task_name_or_counter); if (name == NULL) { return NULL; } self->task_name_or_counter = (uintptr_t)name; } PyObject *name = (PyObject *)self->task_name_or_counter; Py_INCREF(name); return name; } /*[clinic input] _asyncio.Task.set_name value: object / [clinic start generated code]*/ static PyObject * _asyncio_Task_set_name(TaskObj *self, PyObject *value) /*[clinic end generated code: output=138a8d51e32057d6 input=a8359b6e65f8fd31]*/ { if (!PyUnicode_CheckExact(value)) { value = PyObject_Str(value); if (value == NULL) { return NULL; } } else { Py_INCREF(value); } uintptr_t prev = self->task_name_or_counter; self->task_name_or_counter = (uintptr_t)value; if (is_string_name(prev)) { Py_DECREF((PyObject *)prev); } Py_RETURN_NONE; } static void TaskObj_finalize(TaskObj *task) { _Py_IDENTIFIER(call_exception_handler); _Py_IDENTIFIER(task); _Py_IDENTIFIER(message); _Py_IDENTIFIER(source_traceback); PyObject *context; PyObject *message = NULL; PyObject *func; PyObject *error_type, *error_value, *error_traceback; unregister_asyncio_task(task); if (task->task_state != STATE_PENDING || !task->task_log_destroy_pending) { goto done; } /* Save the current exception, if any. */ PyErr_Fetch(&error_type, &error_value, &error_traceback); context = PyDict_New(); if (context == NULL) { goto finally; } message = PyUnicode_FromString("Task was destroyed but it is pending!"); if (message == NULL) { goto finally; } if (_PyDict_SetItemId(context, &PyId_message, message) < 0 || _PyDict_SetItemId(context, &PyId_task, (PyObject*)task) < 0) { goto finally; } if (task->task_source_tb != NULL) { if (_PyDict_SetItemId(context, &PyId_source_traceback, task->task_source_tb) < 0) { goto finally; } } func = _PyObject_GetAttrId(task->task_loop, &PyId_call_exception_handler); if (func != NULL) { PyObject *res = PyObject_CallFunctionObjArgs(func, context, NULL); if (res == NULL) { PyErr_WriteUnraisable(func); } else { Py_DECREF(res); } Py_DECREF(func); } finally: Py_XDECREF(context); Py_XDECREF(message); /* Restore the saved exception. */ PyErr_Restore(error_type, error_value, error_traceback); done: FutureObj_finalize((FutureObj*)task); } static void TaskObj_dealloc(PyObject *); /* Needs Task_CheckExact */ static void TaskObj_set_awaiter(TaskObj *self, PyObject *awaiter) { _PyAwaitable_SetAwaiter(self->task_coro, awaiter); } // clang-format off #define TASK_COMMON_METHODS \ _ASYNCIO_FUTURE_RESULT_METHODDEF \ _ASYNCIO_FUTURE_EXCEPTION_METHODDEF \ _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF \ _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF \ _ASYNCIO_FUTURE_CANCELLED_METHODDEF \ _ASYNCIO_FUTURE_DONE_METHODDEF \ _ASYNCIO_TASK_SET_RESULT_METHODDEF \ _ASYNCIO_TASK_SET_EXCEPTION_METHODDEF \ _ASYNCIO_TASK_CURRENT_TASK_METHODDEF \ _ASYNCIO_TASK_ALL_TASKS_METHODDEF \ _ASYNCIO_TASK_CANCEL_METHODDEF \ _ASYNCIO_TASK_GET_STACK_METHODDEF \ _ASYNCIO_TASK_PRINT_STACK_METHODDEF \ _ASYNCIO_TASK__REPR_INFO_METHODDEF \ _ASYNCIO_TASK_GET_NAME_METHODDEF \ _ASYNCIO_TASK_SET_NAME_METHODDEF \ _ASYNCIO_TASK_GET_CORO_METHODDEF \ {"_set_fut_waiter", (PyCFunction)task_set_fut_waiter, METH_O, NULL}, \ {"_set_task_context", (PyCFunction)task_set_task_context, METH_O, NULL}, static PyMethodDef TaskType_methods[] = { TASK_COMMON_METHODS _ASYNCIO_TASK__STEP_METHODDEF {NULL, NULL} /* Sentinel */ }; #define TASK_COMMON_GETSETLIST \ FUTURE_COMMON_GETSETLIST \ {"_log_destroy_pending", (getter)TaskObj_get_log_destroy_pending, \ (setter)TaskObj_set_log_destroy_pending, NULL},\ {"_must_cancel", (getter)TaskObj_get_must_cancel, NULL, NULL}, \ {"_coro", (getter)TaskObj_get_coro, NULL, NULL}, \ {"_fut_waiter", (getter)TaskObj_get_fut_waiter, NULL, NULL}, \ static PyGetSetDef TaskType_getsetlist[] = { TASK_COMMON_GETSETLIST {NULL} /* Sentinel */ }; static PyAsyncMethodsWithExtra TaskType_as_async = { .ame_async_methods = { .am_await = (unaryfunc)future_new_iter, }, .ame_setawaiter = (setawaiterfunc)TaskObj_set_awaiter, }; // clang-format on static PyTypeObject TaskType = { PyVarObject_HEAD_INIT(NULL, 0) "_asyncio.Task", sizeof(TaskObj), /* tp_basicsize */ .tp_base = &FutureType, .tp_dealloc = TaskObj_dealloc, .tp_as_async = (PyAsyncMethods *)&TaskType_as_async, .tp_repr = (reprfunc)FutureObj_repr, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_AM_EXTRA, .tp_doc = _asyncio_Task___init____doc__, .tp_traverse = (traverseproc)TaskObj_traverse, .tp_clear = (inquiry)TaskObj_clear, .tp_weaklistoffset = offsetof(TaskObj, task_weakreflist), .tp_iter = (getiterfunc)future_new_iter, .tp_methods = TaskType_methods, .tp_getset = TaskType_getsetlist, .tp_dictoffset = offsetof(TaskObj, dict), .tp_init = (initproc)_asyncio_Task___init__, .tp_new = PyType_GenericNew, .tp_finalize = (destructor)TaskObj_finalize, }; static void TaskObj_dealloc(PyObject *self) { TaskObj *task = (TaskObj *)self; if (Task_CheckExact(self)) { /* When fut is subclass of Task, finalizer is called from * subtype_dealloc. */ if (PyObject_CallFinalizerFromDealloc(self) < 0) { // resurrected. return; } } PyObject_GC_UnTrack(self); if (task->task_weakreflist != NULL) { PyObject_ClearWeakRefs(self); } (void)TaskObj_clear(task); Py_TYPE(task)->tp_free(task); } static int task_call_step_soon(TaskObj *task, PyObject *arg) { PyObject *cb = TaskStepMethWrapper_new(task, arg); if (cb == NULL) { return -1; } int ret = call_soon(task->task_loop, cb, NULL, task->task_context); Py_DECREF(cb); return ret; } static PyObject * task_set_error_soon(TaskObj *task, PyObject *et, const char *format, ...) { PyObject* msg; va_list vargs; #ifdef HAVE_STDARG_PROTOTYPES va_start(vargs, format); #else va_start(vargs); #endif msg = PyUnicode_FromFormatV(format, vargs); va_end(vargs); if (msg == NULL) { return NULL; } PyObject *e = PyObject_CallFunctionObjArgs(et, msg, NULL); Py_DECREF(msg); if (e == NULL) { return NULL; } if (task_call_step_soon(task, e) == -1) { Py_DECREF(e); return NULL; } Py_DECREF(e); Py_RETURN_NONE; } static PyObject * _asyncio_AsyncLazyValueCompute_throw_impl(AsyncLazyValueComputeObj *self, PyObject *type, PyObject *val, PyObject *tb); /** Raises exception in coroutine. */ static PyObject * coro_throw(PyObject *coro, PyObject *etype, PyObject *eval, PyObject *tb) { if (_AsyncLazyValueCompute_CheckExact(coro)) { return _asyncio_AsyncLazyValueCompute_throw_impl( (AsyncLazyValueComputeObj *)coro, etype, eval, tb); } PyObject *throw_ = NULL; int meth_found = _PyObject_GetMethod(coro, throw_name, &throw_); if (throw_ == NULL) { return NULL; } int start = 0; int nargs; PyObject *stack[4] = {coro, etype, eval, tb}; if (meth_found) { start = 0; nargs = 2; } else { start = 1; nargs = 1; } if (eval != NULL) { nargs++; if (tb != NULL) { nargs++; } } PyObject *res = _PyObject_FastCall(throw_, stack + start, nargs); Py_DECREF(throw_); return res; } // steals ref to result // this is intentional to keep the code as much as possible to original version static PyObject * task_set_fut_waiter_impl(TaskObj *task, PyObject *result) { assert(result != NULL); if (result == (PyObject*)task) { /* We have a task that wants to await on itself */ goto self_await; } /* Check if `result` is FutureObj or TaskObj (and not a subclass) */ if (Future_CheckExact(result) || Task_CheckExact(result)) { PyObject *wrapper; PyObject *res; FutureObj *fut = (FutureObj*)result; /* Check if `result` future is attached to a different loop */ if (fut->fut_loop != task->task_loop) { goto different_loop; } if (!fut->fut_blocking) { goto yield_insteadof_yf; } fut->fut_blocking = 0; /* result.add_done_callback(task._wakeup) */ wrapper = PyCFunction_New(&_TaskWakeupMethod, (PyObject*)task); if (wrapper == NULL) { goto fail; } res = future_add_done_callback( (FutureObj*)result, wrapper, task->task_context); Py_DECREF(wrapper); if (res == NULL) { goto fail; } Py_DECREF(res); /* task._fut_waiter = result */ task->task_fut_waiter = result; /* no incref is necessary */ if (task->task_must_cancel) { int is_true = Future_CheckExact(result) ? future_cancel_impl((FutureObj*)result, NULL) : task_cancel_impl((TaskObj *)result, NULL); if (is_true < 0) { return NULL; } else if (is_true) { task->task_must_cancel = 0; } } Py_RETURN_NONE; } /* Check if `result` is None */ if (result == Py_None) { /* Bare yield relinquishes control for one event loop iteration. */ if (task_call_step_soon(task, NULL)) { goto fail; } return result; } PyMethodTableRef *result_tableref = get_or_create_method_table(Py_TYPE(result)); if (result_tableref == NULL) { return NULL; } /* Check if `result` is a Future-compatible object */ fut_blocking_state blocking_state = result_tableref->get_is_blocking(result); if (blocking_state == BLOCKING_ERROR) { if (PyErr_ExceptionMatches(PyExc_AttributeError)) { PyErr_Clear(); } else { goto fail; } } PyObject *o = NULL; if (_PyObject_LookupAttrId(result, &PyId__asyncio_future_blocking, &o) < 0) { goto fail; } Py_XDECREF(o); if (blocking_state == BLOCKING_TRUE || blocking_state == BLOCKING_FALSE) { /* `result` is a Future-compatible object */ PyObject *wrapper; PyObject *res; /* Check if `result` future is attached to a different loop */ PyObject *oloop = result_tableref->get_loop(result); if (oloop == NULL) { goto fail; } if (oloop != task->task_loop) { Py_DECREF(oloop); goto different_loop; } Py_DECREF(oloop); if (blocking_state == BLOCKING_FALSE) { goto yield_insteadof_yf; } /* result._asyncio_future_blocking = False */ if (result_tableref->set_is_blocking(result, 0) == -1) { goto fail; } wrapper = PyCFunction_New(&_TaskWakeupMethod, (PyObject*)task); if (wrapper == NULL) { goto fail; } /* result.add_done_callback(task._wakeup) */ res = result_tableref->on_completed( result, wrapper, (PyObject *)task->task_context); Py_DECREF(wrapper); if (res == NULL) { goto fail; } Py_DECREF(res); /* task._fut_waiter = result */ task->task_fut_waiter = result; /* no incref is necessary */ if (task->task_must_cancel) { int is_true = result_tableref->cancel(result, NULL); if (is_true < 0) { return NULL; } else if (is_true) { task->task_must_cancel = 0; } } Py_RETURN_NONE; } /* Check if `result` is a generator */ int res = PyObject_IsInstance(result, (PyObject*)&PyGen_Type); if (res < 0) { goto fail; } if (res) { /* `result` is a generator */ o = task_set_error_soon( task, PyExc_RuntimeError, "yield was used instead of yield from for " "generator in task %R with %R", task, result); Py_DECREF(result); return o; } /* The `result` is none of the above */ o = task_set_error_soon( task, PyExc_RuntimeError, "Task got bad yield: %R", result); Py_DECREF(result); return o; self_await: o = task_set_error_soon( task, PyExc_RuntimeError, "Task cannot await on itself: %R", task); Py_DECREF(result); return o; yield_insteadof_yf: o = task_set_error_soon( task, PyExc_RuntimeError, "yield was used instead of yield from " "in task %R with %R", task, result); Py_DECREF(result); return o; different_loop: o = task_set_error_soon( task, PyExc_RuntimeError, "Task %R got Future %R attached to a different loop", task, result); Py_DECREF(result); return o; fail: Py_DECREF(result); return NULL; } static inline int gen_status_from_result(PyObject **result) { if (*result != NULL) { return PYGEN_NEXT; } if (_PyGen_FetchStopIterationValue(result) == 0) { return PYGEN_RETURN; } assert(PyErr_Occurred()); return PYGEN_ERROR; } static PyObject * task_step_impl(TaskObj *task, PyObject *exc) { int res; int clear_exc = 0; PyObject *result = NULL; PyObject *coro; if (task->task_state != STATE_PENDING) { PyErr_Format(asyncio_InvalidStateError, "_step(): already done: %R %R", task, exc ? exc : Py_None); goto fail; } if (task->task_must_cancel) { assert(exc != Py_None); if (exc) { /* Check if exc is a CancelledError */ res = PyObject_IsInstance(exc, asyncio_CancelledError); if (res == -1) { /* An error occurred, abort */ goto fail; } if (res == 0) { /* exc is not CancelledError; reset it to NULL */ exc = NULL; } } if (!exc) { /* exc was not a CancelledError */ exc = _PyObject_CallNoArg(asyncio_CancelledError); if (!exc) { goto fail; } clear_exc = 1; } task->task_must_cancel = 0; } Py_CLEAR(task->task_fut_waiter); coro = task->task_coro; if (coro == NULL) { PyErr_SetString(PyExc_RuntimeError, "uninitialized Task object"); if (clear_exc) { /* We created 'exc' during this call */ Py_DECREF(exc); } return NULL; } PySendResult gen_status; if (exc == NULL) { gen_status = PyIter_Send(PyThreadState_GET(), coro, Py_None, &result); } else { result = coro_throw(coro, exc, NULL, NULL); gen_status = gen_status_from_result(&result); if (clear_exc) { /* We created 'exc' during this call */ Py_DECREF(exc); } } if (gen_status == PYGEN_RETURN || gen_status == PYGEN_ERROR) { PyObject *et, *ev, *tb; if (result != NULL) { /* The error is StopIteration and that means that the underlying coroutine has resolved */ int res; if (task->task_must_cancel) { // Task is cancelled right before coro stops. task->task_must_cancel = 0; res = future_cancel_impl((FutureObj*)task, NULL); } else { res = future_set_result((FutureObj*)task, result); } Py_DECREF(result); if (res < 0) { return NULL; } Py_RETURN_NONE; } if (PyErr_ExceptionMatches(asyncio_CancelledError)) { /* CancelledError */ PyErr_Clear(); int ok; ok = future_cancel_impl((FutureObj*)task, NULL); if (ok < 0) { return NULL; } if (ok) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } /* Some other exception; pop it and call Task.set_exception() */ PyErr_Fetch(&et, &ev, &tb); assert(et); if (!ev || !PyObject_TypeCheck(ev, (PyTypeObject *) et)) { PyErr_NormalizeException(&et, &ev, &tb); } if (tb != NULL) { PyException_SetTraceback(ev, tb); } int ok = future_set_exception((FutureObj*)task, ev); if (ok < 0) { /* An exception in Task.set_exception() */ Py_DECREF(et); Py_XDECREF(tb); Py_XDECREF(ev); goto fail; } if (PyErr_GivenExceptionMatches(et, PyExc_KeyboardInterrupt) || PyErr_GivenExceptionMatches(et, PyExc_SystemExit)) { /* We've got a KeyboardInterrupt or a SystemError; re-raise it */ PyErr_Restore(et, ev, tb); goto fail; } Py_DECREF(et); Py_XDECREF(tb); Py_XDECREF(ev); Py_RETURN_NONE; } assert (gen_status == PYGEN_NEXT); assert (result); return task_set_fut_waiter_impl(task, result); fail: Py_XDECREF(result); return NULL; } static PyObject * task_set_fut_waiter(TaskObj *task, PyObject *result) { // task_set_fut_waiter_impl assumes ownership over result // so add extra ref to result to prevent early deallocation Py_INCREF(result); return task_set_fut_waiter_impl(task, result); } static PyObject * task_set_task_context(TaskObj *task, PyObject *context) { Py_XSETREF(task->task_context, context); Py_INCREF(context); Py_RETURN_NONE; } static PyObject * task_step(TaskObj *task, PyObject *exc) { PyObject *res; if (enter_task(task->task_loop, (PyObject*)task) < 0) { return NULL; } res = task_step_impl(task, exc); if (res == NULL) { PyObject *et, *ev, *tb; PyErr_Fetch(&et, &ev, &tb); leave_task(task->task_loop, (PyObject*)task); _PyErr_ChainExceptions(et, ev, tb); return NULL; } else { if (leave_task(task->task_loop, (PyObject*)task) < 0) { Py_DECREF(res); return NULL; } else { return res; } } } static PyObject * task_wakeup(TaskObj *task, PyObject *o) { PyObject *et, *ev, *tb; PyObject *result; assert(o); if (Future_CheckExact(o) || Task_CheckExact(o)) { PyObject *fut_result = NULL; int res = future_get_result((FutureObj*)o, &fut_result); switch(res) { case -1: assert(fut_result == NULL); break; /* exception raised */ case 0: Py_DECREF(fut_result); return task_call_step(task, NULL); default: assert(res == 1); result = task_call_step(task, fut_result); Py_DECREF(fut_result); return result; } } else { PyMethodTableRef *waiter_tableref = get_or_create_method_table(Py_TYPE(o)); if(waiter_tableref == NULL) { return NULL; } PyObject *fut_result = waiter_tableref->result(o); if (fut_result != NULL) { Py_DECREF(fut_result); return task_call_step(task, NULL); } /* exception raised */ } PyErr_Fetch(&et, &ev, &tb); if (!ev || !PyObject_TypeCheck(ev, (PyTypeObject *) et)) { PyErr_NormalizeException(&et, &ev, &tb); } result = task_call_step(task, ev); Py_DECREF(et); Py_XDECREF(tb); Py_XDECREF(ev); return result; } /********************** ContextAwareTask hooks **************/ // acquire context hook typedef PyObject *(*acquire_context_hook)(void); // execute base step typedef PyObject *(*execute_base_step)(PyObject *self, PyObject *exc); // execute step typedef PyObject *(*execute_step_hook)(PyObject *self, PyObject *exc, PyObject *context, execute_base_step); // clang-format off typedef struct { PyObject_HEAD PyTypeObject *owner; acquire_context_hook acquire_context; execute_step_hook execute_step; } ContextAwareTaskHooksObj; // clang-format on static int ContextAwareTaskHooksObj_clear(ContextAwareTaskHooksObj *self) { Py_CLEAR(self->owner); return 0; } static void ContextAwareTaskHooksObj_dealloc(ContextAwareTaskHooksObj *self) { ContextAwareTaskHooksObj_clear(self); Py_TYPE(self)->tp_free(self); } static int ContextAwareTaskHooksObj_traverse(ContextAwareTaskHooksObj *self, visitproc visit, void *arg) { Py_VISIT(self->owner); return 0; } static PyTypeObject ContextAwareTaskHooksType = { PyVarObject_HEAD_INIT(NULL, 0) "_asyncio.ContextAwareTaskHooks", sizeof(ContextAwareTaskHooksObj), /* tp_basicsize */ .tp_dealloc = (destructor)ContextAwareTaskHooksObj_dealloc, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, .tp_traverse = (traverseproc)ContextAwareTaskHooksObj_traverse, .tp_clear = (inquiry)ContextAwareTaskHooksObj_clear, .tp_new = PyType_GenericNew, }; static PyObject *context_aware_task_hooks; static ContextAwareTaskHooksObj * find_hooks_for_type(PyTypeObject *type) { Py_ssize_t len = PyList_GET_SIZE(context_aware_task_hooks); for (Py_ssize_t i = 0; i < len; ++i) { ContextAwareTaskHooksObj *hooks = (ContextAwareTaskHooksObj *)PyList_GET_ITEM( context_aware_task_hooks, i); if (hooks->owner == type) { return hooks; // returns borrowed ref } } PyErr_Format( PyExc_RuntimeError, "Cannot find hooks for type %s", type->tp_name); return NULL; } /********************** ContextAwareTask ********************/ // clang-format off /*[clinic input] class _asyncio.ContextAwareTask "ContextAwareTaskObj *" "&ContextAwareTask_Type" [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=1c675f1bf1e1dc36]*/ // clang-format on static void ContextAwareTaskObj_dealloc(ContextAwareTaskObj *self) { Py_CLEAR(self->context); TaskObj_dealloc((PyObject *)self); } // clang-format off /*[clinic input] _asyncio.ContextAwareTask.__init__ coro: object * loop: object = None name: object = None Extension for _asyncio.Task that tracks external context. [clinic start generated code]*/ static int _asyncio_ContextAwareTask___init___impl(ContextAwareTaskObj *self, PyObject *coro, PyObject *loop, PyObject *name) /*[clinic end generated code: output=e4d0fb6bda46abdc input=6d950794c7873db4]*/ // clang-format on { if (_asyncio_Task___init___impl((TaskObj *)self, coro, loop, name) < 0) { return -1; } ContextAwareTaskHooksObj *hooks = find_hooks_for_type(Py_TYPE(self)); if (hooks == NULL) { return -1; } PyObject *context = hooks->acquire_context(); if (context == NULL) { return -1; } self->context = context; return 0; } static int ContextAwareTaskObj_traverse(ContextAwareTaskObj *self, visitproc visit, void *arg) { Py_VISIT(self->context); return TaskObj_traverse((TaskObj *)self, visit, arg); } static void ContextAwareTaskObj_clear(ContextAwareTaskObj *self) { Py_CLEAR(self->context); TaskObj_clear((TaskObj *)self); } static PyObject * _asyncio_ContextAwareTask_base_step(TaskObj *self, PyObject *exc) { #ifdef EXTENDED_USDTS // Ideally, we would gate this with a semaphore based probe, but bpftrace // has some issues with high memory usage when using those (these are being // fixed in https://github.com/iovisor/bpftrace/pull/1374). // TODO (aniketpanse): Consider using semaphore based tracing probes _Py_IDENTIFIER(view_name); _Py_IDENTIFIER(getPythonContext); int res; PyObject *view_name, *py_ctx, *folly_ctx; folly_ctx = ((ContextAwareTaskObj *)self)->context; if (folly_ctx != NULL) { // Get a reference to the python context, which has the view names py_ctx = _PyObject_CallMethodId(folly_ctx, &PyId_getPythonContext, NULL); if (!PyErr_Occurred()) { // Get the view name res = _PyObject_LookupAttrId(py_ctx, &PyId_view_name, &view_name); if (res != -1 && view_name != Py_None && view_name != NULL) { // FOLLY_SDT(python, asyncio_contextawaretask_setctx, PyUnicode_DATA(view_name), py_ctx); } else { FOLLY_SDT(python, asyncio_contextawaretask_unsetctx); } Py_XDECREF(view_name); } else { // no py context, move on... PyErr_Clear(); } } #endif return _asyncio_Task__step_impl(self, exc); } typedef struct { PyObject_HEAD PyObject *cb_f; PyObject *cb_ctx; vectorcallfunc cb_vectorcall; } ContextAwareTaskCallbackObj; static int ContextAwareTaskCallbackObj_traverse(ContextAwareTaskCallbackObj *self, visitproc visit, void *arg) { Py_VISIT(self->cb_f); Py_VISIT(self->cb_ctx); return 0; } static int ContextAwareTaskCallbackObj_clear(ContextAwareTaskCallbackObj *self) { Py_CLEAR(self->cb_f); Py_CLEAR(self->cb_ctx); return 0; } static PyObject * ContextAwareTaskCallbackObj_repr(ContextAwareTaskCallbackObj *self) { return PyObject_Repr(self->cb_f); } static PyObject * ContextAwareTaskCallbackObj_vectorcall(ContextAwareTaskCallbackObj *self, PyObject *const*args, size_t nargs, PyObject *kwnames) { PyObject *prev_ctx = get_current_context(); if (prev_ctx == NULL) { return NULL; } int ok = reset_context(Py_None, self->cb_ctx, NULL); if (ok < 0) { Py_DECREF(prev_ctx); return NULL; } PyObject *res = _PyObject_Vectorcall(self->cb_f, args, nargs, kwnames); ok = call_reset_context(PyThreadState_GET(), Py_None, prev_ctx, NULL, res == NULL); Py_DECREF(prev_ctx); if (ok < 0) { Py_XDECREF(res); return NULL; } return res; } static void ContextAwareTaskCallbackObj_dealloc(ContextAwareTaskCallbackObj *o) { PyObject_GC_UnTrack(o); (void)ContextAwareTaskCallbackObj_clear(o); Py_TYPE(o)->tp_free(o); } static PyTypeObject _ContextAwareTaskCallback_Type = { PyVarObject_HEAD_INIT(NULL, 0) "ContextAwareTaskCallbackObj", .tp_basicsize = sizeof(ContextAwareTaskCallbackObj), .tp_itemsize = 0, .tp_dealloc = (destructor)ContextAwareTaskCallbackObj_dealloc, .tp_call = PyVectorcall_Call, .tp_vectorcall_offset = offsetof(ContextAwareTaskCallbackObj, cb_vectorcall), .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | _Py_TPFLAGS_HAVE_VECTORCALL, .tp_traverse = (traverseproc)ContextAwareTaskCallbackObj_traverse, .tp_clear = (inquiry)ContextAwareTaskCallbackObj_clear, .tp_repr = (reprfunc)ContextAwareTaskCallbackObj_repr, }; static PyObject * ContextAwareTaskObj_get_ctx(ContextAwareTaskObj *self, void *Py_UNUSED(ignored)); /*[clinic input] _asyncio.ContextAwareTask.add_done_callback fn: object / * context: object = NULL Add a callback to be run when the future becomes done. [clinic start generated code]*/ static PyObject * _asyncio_ContextAwareTask_add_done_callback_impl(ContextAwareTaskObj *self, PyObject *fn, PyObject *context) /*[clinic end generated code: output=4e0df9fa1206f23c input=458ed3b5d3ab8a80]*/ { PyObject *ctx = ContextAwareTaskObj_get_ctx(self, NULL); if (ctx == NULL) { return NULL; } ContextAwareTaskCallbackObj *callback = PyObject_GC_New( ContextAwareTaskCallbackObj, &_ContextAwareTaskCallback_Type); if(callback == NULL) { Py_DECREF(ctx); return NULL; } callback->cb_vectorcall = (vectorcallfunc)ContextAwareTaskCallbackObj_vectorcall; callback->cb_ctx = ctx; callback->cb_f = fn; Py_INCREF(fn); PyObject *res = _asyncio_Future_add_done_callback_impl((FutureObj *)self, (PyObject *)callback, context); Py_DECREF(callback); return res; } // clang-format off /*[clinic input] _asyncio.ContextAwareTask._step exc: object = None [clinic start generated code]*/ static PyObject * _asyncio_ContextAwareTask__step_impl(ContextAwareTaskObj *self, PyObject *exc) /*[clinic end generated code: output=4f35969f9237e015 input=fc56fc42d239840b]*/ // clang-format on { ContextAwareTaskHooksObj *hooks = find_hooks_for_type(Py_TYPE(self)); if (hooks == NULL) { return NULL; } if (self->context == NULL) { PyErr_SetString(PyExc_AttributeError, "ctx_"); return NULL; } return hooks->execute_step((PyObject *)self, exc, self->context, (execute_base_step)_asyncio_ContextAwareTask_base_step); } static PyObject * get_capsule_from_attr(PyTypeObject *cls, struct _Py_Identifier *id) { PyObject *value = _PyObject_GetAttrId((PyObject *)cls, id); if (value == NULL) { PyErr_Format(PyExc_TypeError, "Expected attribute '%s' on type '%s'", id->string, cls->tp_name); return NULL; } return value; } static void * get_pointer_from_attr(PyTypeObject *cls, struct _Py_Identifier *id) { PyObject *value = get_capsule_from_attr(cls, id); if (value == NULL) { return NULL; } void *ptr = PyCapsule_GetPointer(value, NULL); Py_DECREF(value); return ptr; } static PyObject * ContextAwareTaskObj_get_ctx(ContextAwareTaskObj *self, void *Py_UNUSED(ignored)) { if (self->context == NULL) { PyErr_SetString(PyExc_AttributeError, "ctx_"); return NULL; } Py_INCREF(self->context); return self->context; } static PyObject * get_context_thunk(ContextAwareTaskObj *self) { return ContextAwareTaskObj_get_ctx(self, NULL); } static int ContextAwareTaskObj_set_ctx(ContextAwareTaskObj *self, PyObject *val, void *Py_UNUSED(ignored)) { Py_XSETREF(self->context, val); Py_XINCREF(self->context); return 0; } static PyObject * ContextAwareTask___init_subclass(PyTypeObject *cls, PyObject *args, PyObject *kwargs) { _Py_IDENTIFIER(_acquire_context); _Py_IDENTIFIER(_execute_step); _Py_IDENTIFIER(_get_context); // call super method _Py_IDENTIFIER(__init_subclass__); PyObject *super_args[2] = {(PyObject *)&ContextAwareTaskType, (PyObject *)cls}; PyObject *super = _PyObject_FastCall((PyObject *)&PySuper_Type, super_args, 2); if (super == NULL) { return NULL; } PyObject *super_init = _PyObject_GetAttrId(super, &PyId___init_subclass__); Py_DECREF(super); if (super_init == NULL) { // should at least get object_init_subtype return NULL; } PyObject *result = PyObject_Call(super_init, args, kwargs); Py_DECREF(super_init); if (result == NULL) { return NULL; } Py_DECREF(result); // fetch capsules with hooks from subclass void *acquire_ptr = get_pointer_from_attr(cls, &PyId__acquire_context); if (acquire_ptr == NULL) { return NULL; } void *execute_step = get_pointer_from_attr(cls, &PyId__execute_step); if (execute_step == NULL) { return NULL; } PyObject *get_context = get_capsule_from_attr(cls, &PyId__get_context); if (get_context == NULL) { return NULL; } if (!PyCapsule_IsValid(get_context, NULL)) { PyErr_SetString( PyExc_TypeError, "Expected PyCapsule as value of '_get_context' attribute"); return NULL; } // save hooks ContextAwareTaskHooksObj *hooks = PyObject_GC_New(ContextAwareTaskHooksObj, &ContextAwareTaskHooksType); if (hooks == NULL) { return NULL; } hooks->acquire_context = acquire_ptr; hooks->execute_step = execute_step; hooks->owner = cls; Py_INCREF(cls); int ok = PyList_Append(context_aware_task_hooks, (PyObject *)hooks); Py_DECREF(hooks); if (ok < 0) { return NULL; } // return getter for context via pycapsule if (PyCapsule_SetPointer(get_context, (void *)get_context_thunk) != 0) { return NULL; } Py_RETURN_NONE; } // clang-format off static PyMethodDef ContextAwareTaskType_methods[] = { _ASYNCIO_FUTURE_RESULT_METHODDEF \ _ASYNCIO_FUTURE_EXCEPTION_METHODDEF \ _ASYNCIO_CONTEXTAWARETASK_ADD_DONE_CALLBACK_METHODDEF \ _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF \ _ASYNCIO_FUTURE_CANCELLED_METHODDEF \ _ASYNCIO_FUTURE_DONE_METHODDEF \ _ASYNCIO_TASK_SET_RESULT_METHODDEF \ _ASYNCIO_TASK_SET_EXCEPTION_METHODDEF \ _ASYNCIO_TASK_CURRENT_TASK_METHODDEF \ _ASYNCIO_TASK_ALL_TASKS_METHODDEF \ _ASYNCIO_TASK_CANCEL_METHODDEF \ _ASYNCIO_TASK_GET_STACK_METHODDEF \ _ASYNCIO_TASK_PRINT_STACK_METHODDEF \ _ASYNCIO_TASK__REPR_INFO_METHODDEF \ _ASYNCIO_TASK_GET_NAME_METHODDEF \ _ASYNCIO_TASK_SET_NAME_METHODDEF \ _ASYNCIO_TASK_GET_CORO_METHODDEF \ {"_set_fut_waiter", (PyCFunction)task_set_fut_waiter, METH_O, NULL}, _ASYNCIO_CONTEXTAWARETASK__STEP_METHODDEF {"__init_subclass__", (PyCFunction)ContextAwareTask___init_subclass, METH_VARARGS | METH_KEYWORDS | METH_CLASS}, {"_set_task_context", (PyCFunction)task_set_task_context, METH_O, NULL}, {NULL, NULL} /* Sentinel */ }; static PyGetSetDef ContextAwareTaskType_getsetlist[] = { TASK_COMMON_GETSETLIST {"ctx_", (getter)ContextAwareTaskObj_get_ctx, (setter)ContextAwareTaskObj_set_ctx, NULL}, {NULL} /* Sentinel */ }; // clang-format on static PyTypeObject ContextAwareTaskType = { PyVarObject_HEAD_INIT(NULL, 0) "_asyncio.ContextAwareTask", sizeof(ContextAwareTaskObj), /* tp_basicsize */ .tp_base = &TaskType, .tp_init = (initproc)_asyncio_ContextAwareTask___init__, .tp_doc = _asyncio_ContextAwareTask___init____doc__, .tp_new = PyType_GenericNew, .tp_dealloc = (destructor)ContextAwareTaskObj_dealloc, .tp_as_async = (PyAsyncMethods *)&TaskType_as_async, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_FINALIZE | Py_TPFLAGS_HAVE_AM_EXTRA, .tp_traverse = (traverseproc)ContextAwareTaskObj_traverse, .tp_clear = (inquiry)ContextAwareTaskObj_clear, .tp_iter = (getiterfunc)future_new_iter, .tp_methods = ContextAwareTaskType_methods, .tp_getset = ContextAwareTaskType_getsetlist, .tp_finalize = (destructor)TaskObj_finalize, }; /******************** AsyncLazyValue ************************/ static inline int notify_futures(PyObject *futures, int (*cb)(FutureObj *, PyObject *), PyObject *arg) { if (futures == NULL) { return 0; } Py_ssize_t len = PyList_GET_SIZE(futures); for (Py_ssize_t i = 0; i < len; ++i) { FutureObj *fut = (FutureObj *)PyList_GET_ITEM(futures, i); PyObject *res = _asyncio_Future_done_impl(fut); if (res == NULL) { return -1; } int is_done = PyObject_IsTrue(res); Py_DECREF(res); if (is_done == -1) { return -1; } if (is_done) { continue; } int ok = cb(fut, arg); if (ok < 0) { return -1; } } return 0; } static inline PyObject * AsyncLazyValue_get_result(AsyncLazyValueObj *self) { assert(self->alv_state == ALV_DONE); Py_INCREF(self->alv_result); return self->alv_result; } static int AsyncLazyValue_set_result(AsyncLazyValueObj *self, PyObject *res) { self->alv_result = res; Py_INCREF(res); self->alv_state = ALV_DONE; if (notify_futures(self->alv_futures, future_set_result, res) < 0) { return -1; } Py_CLEAR(self->alv_args); Py_CLEAR(self->alv_kwargs); return 0; } static int AsyncLazyValue_set_error(AsyncLazyValueObj *self, PyObject *exc) { int ok; if (PyObject_IsInstance(exc, asyncio_CancelledError)) { ok = notify_futures(self->alv_futures, future_cancel_impl, exc); } else { ok = notify_futures(self->alv_futures, future_set_exception, exc); } Py_CLEAR(self->alv_futures); self->alv_state = ALV_NOT_STARTED; return ok; } static PyObject * AsyncLazyValue_new_computeobj(AsyncLazyValueObj *self) { AsyncLazyValueComputeObj *obj = PyObject_GC_New(AsyncLazyValueComputeObj, (PyTypeObject *)&_AsyncLazyValueCompute_Type); if (obj == NULL) { return NULL; } obj->alvc_target = self; Py_INCREF(self); obj->alvc_coroobj = NULL; obj->alvc_exc_state.exc_type = NULL; obj->alvc_exc_state.exc_value = NULL; obj->alvc_exc_state.exc_traceback = NULL; obj->alvc_pending_awaiter = NULL; PyObject_GC_Track(obj); return (PyObject *)obj; } static int AsyncLazyValue_init(AsyncLazyValueObj *self, PyObject *args, PyObject *kwargs) { if (PyTuple_GET_SIZE(args) == 0) { PyErr_SetString(PyExc_TypeError, "'coro' argument expected"); return -1; } self->alv_args = args; Py_INCREF(args); self->alv_kwargs = kwargs; Py_XINCREF(kwargs); self->alv_futures = NULL; self->alv_result = NULL; self->alv_state = ALV_NOT_STARTED; return 0; } static PyObject * AsyncLazyValue_new_future(AsyncLazyValueObj *self, PyObject *loop) { FutureObj *fut = (FutureObj *)PyType_GenericNew(&FutureType, NULL, NULL); if (fut == NULL) { return NULL; } if (future_init(fut, loop) < 0) { Py_DECREF(fut); return NULL; } if (self->alv_futures == NULL) { self->alv_futures = PyList_New(0); if (self->alv_futures == NULL) { Py_DECREF(fut); return NULL; } } if (PyList_Append(self->alv_futures, (PyObject *)fut) < 0) { Py_DECREF(fut); return NULL; } return (PyObject *)fut; } static PyObject * AsyncLazyValue_await(AsyncLazyValueObj *self) { switch (self->alv_state) { case ALV_NOT_STARTED: { PyObject *compute = AsyncLazyValue_new_computeobj(self); if (compute == NULL) { return NULL; } self->alv_state = ALV_RUNNING; return compute; } case ALV_RUNNING: { PyObject *fut = AsyncLazyValue_new_future(self, Py_None); if (fut == NULL) { return NULL; } PyObject *res = future_new_iter(fut); Py_DECREF(fut); return res; } case ALV_DONE: { Py_INCREF(self); return (PyObject *)self; } default: Py_UNREACHABLE(); } } static PyObject * create_task(PyObject *coro, PyObject *loop) { PyObject *create_task = NULL; int meth_found = _PyObject_GetMethod(loop, create_task_name, &create_task); if (create_task == NULL) { return NULL; } PyObject *task; if (meth_found) { PyObject *stack[2] = {loop, coro}; task = _PyObject_FastCall(create_task, stack, 2); } else { PyObject *stack[1] = {coro}; task = _PyObject_FastCall(create_task, stack, 1); } Py_DECREF(create_task); if (task == NULL) { return NULL; } PyMethodTableRef *t = get_or_create_method_table(Py_TYPE(task)); if (t == NULL) { Py_DECREF(task); return NULL; } PyObject *tb = t->source_traceback(task); if (tb == NULL) { Py_DECREF(task); return NULL; } int ok = 0; if (tb != Py_None) { ok = PyObject_DelItem(tb, minus_one); } Py_DECREF(tb); if (ok < 0) { Py_CLEAR(task); } return task; } static PyObject * AsyncLazyValue_new_task(AsyncLazyValueObj *self, PyObject *loop) { assert(loop != Py_None); PyObject *computeobj = AsyncLazyValue_new_computeobj(self); if (computeobj == NULL) { return NULL; } PyObject *task = create_task(computeobj, loop); Py_DECREF(computeobj); return task; } static PyObject * AsyncLazyValue_ensure_future(AsyncLazyValueObj *self, PyObject *loop) { switch (self->alv_state) { case ALV_DONE: { FutureObj *fut = (FutureObj *)PyType_GenericNew(&FutureType, NULL, NULL); if (fut == NULL) { return NULL; } if (future_init(fut, loop) < 0) { Py_DECREF(fut); return NULL; } int ok = future_set_result((FutureObj *)fut, self->alv_result); if (ok < 0) { Py_DECREF(fut); return NULL; } return (PyObject *)fut; } case ALV_RUNNING: { return AsyncLazyValue_new_future(self, loop); } case ALV_NOT_STARTED: { int release_loop = 0; if (loop == Py_None) { loop = get_event_loop(); if (loop == NULL) { return NULL; } release_loop = 1; } PyObject *result = AsyncLazyValue_new_task(self, loop); if (release_loop) { Py_DECREF(loop); } if (result) { self->alv_state = ALV_RUNNING; } return result; } default: Py_UNREACHABLE(); } } static int AsyncLazyValue_traverse(AsyncLazyValueObj *self, visitproc visit, void *arg) { Py_VISIT(self->alv_args); Py_VISIT(self->alv_kwargs); Py_VISIT(self->alv_futures); Py_VISIT(self->alv_result); return 0; } static int AsyncLazyValue_clear(AsyncLazyValueObj *self) { Py_CLEAR(self->alv_args); Py_CLEAR(self->alv_kwargs); Py_CLEAR(self->alv_futures); Py_CLEAR(self->alv_result); return 0; } static void AsyncLazyValue_dealloc(AsyncLazyValueObj *self) { AsyncLazyValue_clear(self); PyObject_GC_UnTrack(self); Py_TYPE(self)->tp_free(self); } static PySendResult AsyncLazyValue_itersend(PyThreadState *Py_UNUSED(tstate), AsyncLazyValueObj *self, PyObject *Py_UNUSED(sentValue), PyObject **pResult) { switch (self->alv_state) { case ALV_NOT_STARTED: case ALV_RUNNING: { PyErr_SetString(PyExc_TypeError, "AsyncLazyValue needs to be awaited"); return PYGEN_ERROR; } case ALV_DONE: { Py_INCREF(self->alv_result); *pResult = self->alv_result; return PYGEN_RETURN; } default: Py_UNREACHABLE(); } } static PyObject * AsyncLazyValue_iternext(AsyncLazyValueObj *self) { PyObject *result; PySendResult gen_status = AsyncLazyValue_itersend(NULL, self, NULL, &result); return gen_status_to_iter(gen_status, result); } static PyObject * AsyncLazyValue_get_awaiting_tasks(AsyncLazyValueObj *self, PyObject *Py_UNUSED(ignored)) { Py_ssize_t n = self->alv_futures != NULL ? PyList_GET_SIZE(self->alv_futures) : 0; return PyLong_FromSsize_t(n); } static PyAsyncMethodsWithExtra _AsyncLazyValue_Type_as_async = { .ame_async_methods = { (unaryfunc)AsyncLazyValue_await, /* am_await */ 0, /* am_aiter */ 0, /* am_anext */ }, .ame_send = (sendfunc)AsyncLazyValue_itersend }; static PyMethodDef AsyncLazyValue_methods[] = { {"ensure_future", (PyCFunction)AsyncLazyValue_ensure_future, METH_O, NULL}, {NULL, NULL} /* Sentinel */ }; static PyGetSetDef AsyncLazyValue_getsetlist[] = { {"_awaiting_tasks", (getter)AsyncLazyValue_get_awaiting_tasks, NULL, NULL}, {NULL} /* Sentinel */ }; static PyTypeObject _AsyncLazyValue_Type = { PyVarObject_HEAD_INIT(NULL, 0) "_asyncio.AsyncLazyValue", .tp_basicsize = sizeof(AsyncLazyValueObj), .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_HAVE_AM_EXTRA, .tp_traverse = (traverseproc)AsyncLazyValue_traverse, .tp_clear = (inquiry)AsyncLazyValue_clear, .tp_iter = PyObject_SelfIter, .tp_iternext = (iternextfunc)AsyncLazyValue_iternext, .tp_methods = AsyncLazyValue_methods, .tp_getset = AsyncLazyValue_getsetlist, .tp_as_async = (PyAsyncMethods*)&_AsyncLazyValue_Type_as_async, .tp_init = (initproc)AsyncLazyValue_init, .tp_new = PyType_GenericNew, .tp_dealloc = (destructor)AsyncLazyValue_dealloc, }; /*[clinic input] class _asyncio.AsyncLazyValueCompute "AsyncLazyValueComputeObj *" "&_AsyncLazyValueCompute_Type" [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=a22f5c3d11e2aa0f]*/ static int AsyncLazyValueCompute_traverse(AsyncLazyValueComputeObj *self, visitproc visit, void *arg) { Py_VISIT(self->alvc_target); Py_VISIT(self->alvc_coroobj); Py_VISIT(self->alvc_exc_state.exc_type); Py_VISIT(self->alvc_exc_state.exc_value); Py_VISIT(self->alvc_exc_state.exc_traceback); return 0; } static int AsyncLazyValueCompute_clear(AsyncLazyValueComputeObj *self) { Py_CLEAR(self->alvc_target); Py_CLEAR(self->alvc_coroobj); Py_CLEAR(self->alvc_exc_state.exc_type); Py_CLEAR(self->alvc_exc_state.exc_value); Py_CLEAR(self->alvc_exc_state.exc_traceback); // awaiter is borrowed self->alvc_pending_awaiter = NULL; return 0; } static void AsyncLazyValueCompute_dealloc(AsyncLazyValueComputeObj *self) { AsyncLazyValueCompute_clear(self); PyObject_GC_UnTrack(self); Py_TYPE(self)->tp_free(self); } static PyObject * do_awaited_call(PyObject *func, PyObject **args, Py_ssize_t nargs, PyObject *kwargs) { if (PyMethod_Check(func)) { PyObject *self = PyMethod_GET_SELF(func); PyObject *meth_func = PyMethod_GET_FUNCTION(func); return _PyObject_Call_Prepend_FastCallDict( meth_func, self, args, nargs | _Py_AWAITED_CALL_MARKER, kwargs); } return _PyObject_FastCallDict( func, args, nargs | _Py_AWAITED_CALL_MARKER, kwargs); } static void forward_and_clear_pending_awaiter(AsyncLazyValueComputeObj *self) { assert(self->alvc_coroobj != NULL); if (self->alvc_pending_awaiter == NULL) { return; } _PyAwaitable_SetAwaiter(self->alvc_coroobj, self->alvc_pending_awaiter); self->alvc_pending_awaiter = NULL; } /** Runs a function that was provided to AsyncLazyValue. - if function was not a coroutine - calls _PyCoro_GetAwaitableIter on a result and stores it for subsequent 'send' calls - if function was a coroutine and it was completed eagerly - sets 'did_step' indicator and returns the result - if function was a coroutine but it was not completed eagerly sets 'did_step' indicator, stores coroutine object for subsequent 'send' calls and return result of the step (typically it is future) */ static PyObject * AsyncLazyValueCompute_create_and_set_subcoro(AsyncLazyValueComputeObj *self, int *did_step) { Py_ssize_t nargs = PyTuple_GET_SIZE(self->alvc_target->alv_args); PyObject **args = &PyTuple_GET_ITEM(self->alvc_target->alv_args, 0); PyObject *result = do_awaited_call( args[0], args + 1, nargs - 1, self->alvc_target->alv_kwargs); if (result == NULL) { return NULL; } if (!_PyWaitHandle_CheckExact(result)) { // function being called is not a coroutine PyObject *iter = _PyCoro_GetAwaitableIter(result); Py_DECREF(result); if (iter == NULL) { return NULL; } self->alvc_coroobj = iter; forward_and_clear_pending_awaiter(self); Py_RETURN_NONE; } *did_step = 1; if (((PyWaitHandleObject *)result)->wh_waiter == NULL) { PyObject *retval = ((PyWaitHandleObject *)result)->wh_coro_or_result; _PyWaitHandle_Release(result); return retval; } self->alvc_coroobj = ((PyWaitHandleObject *)result)->wh_coro_or_result; forward_and_clear_pending_awaiter(self); PyObject *waiter = ((PyWaitHandleObject *)result)->wh_waiter; _PyWaitHandle_Release(result); return waiter; } /** Handle error being raised, effectively working as a catch block try: ... except Exception as e: notify-futures if isinstance(e, (GeneratorExit, StopIteration)): pass else: raise */ static PyObject * AsyncLazyValueCompute_handle_error(AsyncLazyValueComputeObj *self, PyThreadState *tstate, int reraise, int closing) { assert(PyErr_Occurred()); PyObject *et, *ev, *tb; PyErr_Fetch(&et, &ev, &tb); PyErr_NormalizeException(&et, &ev, &tb); if (tb != NULL) { PyException_SetTraceback(ev, tb); } else { PyException_SetTraceback(ev, Py_None); } // push information about current exception _PyErr_StackItem *previous_exc_info = tstate->exc_info; _PyErr_StackItem exc_info = {.exc_type = et, .exc_value = ev, .exc_traceback = tb, .previous_item = previous_exc_info}; tstate->exc_info = &exc_info; // set the error in async lazy value int err = AsyncLazyValue_set_error(self->alvc_target, ev); // pop current exception info tstate->exc_info = previous_exc_info; // 1. if exception was raised when setting the result - it will // shadow original exception so we can release it. // 2. also release it if we are not supposed to re-raise it if (err < 0 || !reraise) { Py_DECREF(et); Py_XDECREF(ev); Py_XDECREF(tb); } if (err < 0) { // for closing case ignore StopIteration and GeneratorExit if (closing && (PyErr_ExceptionMatches(PyExc_StopIteration) || PyErr_ExceptionMatches(PyExc_GeneratorExit))) { PyErr_Clear(); Py_RETURN_NONE; } return NULL; } if (reraise) { // reraise previous error PyErr_Restore(et, ev, tb); return NULL; } Py_RETURN_NONE; } static void AsyncLazyValueCompute_set_awaiter(AsyncLazyValueComputeObj *self, PyObject *awaiter) { if (self->alvc_coroobj != NULL) { _PyAwaitable_SetAwaiter(self->alvc_coroobj, awaiter); } else { self->alvc_pending_awaiter = awaiter; } } /** Implementation of a 'send' for AsyncLazyValueCompute */ static PyObject * AsyncLazyValueCompute_itersend_(AsyncLazyValueComputeObj *self, PyThreadState *tstate, PyObject *sentValue, int *pReturn) { if (self->alvc_coroobj == NULL) { int did_step = 0; // here alvc_coroobj coroutine object was not created yet - // call coroutine and set coroutine object for subsequent sends PyObject *retval = AsyncLazyValueCompute_create_and_set_subcoro(self, &did_step); if (retval == NULL) { // failed - handle error return AsyncLazyValueCompute_handle_error(self, tstate, 1, 0); } if (did_step) { // if we did step when calling coroutine - we attempted to run // coroutine eagerly which might have two outcomes if (self->alvc_coroobj == NULL) { // 1. coroutine has finished eagerly // set the successful result to owning AsyncLazyValue int ok = AsyncLazyValue_set_result(self->alvc_target, retval); if (ok < 0) { Py_DECREF(retval); return AsyncLazyValueCompute_handle_error( self, tstate, 1, 0); } // ..and set return indicator *pReturn = 1; } // 2. coroutine was not finished eagerly but we did some work // return without setting return indicator - meaning we yielded return retval; } Py_DECREF(retval); } assert(self->alvc_coroobj != NULL); PyObject *res; PySendResult gen_status = PyIter_Send(tstate, self->alvc_coroobj, sentValue, &res); if (gen_status == PYGEN_RETURN) { // RETURN int ok = AsyncLazyValue_set_result(self->alvc_target, res); if (ok < 0) { Py_DECREF(res); return NULL; } *pReturn = 1; return res; } if (gen_status == PYGEN_NEXT) { // YIELD return res; } assert (gen_status == PYGEN_ERROR); // ERROR return AsyncLazyValueCompute_handle_error(self, tstate, 1, 0); } /** Entrypoint for gennext used by Py_TPFLAGS_HAVE_AM_SEND supported types */ static PySendResult AsyncLazyValueCompute_itersend(PyThreadState *tstate, AsyncLazyValueComputeObj *self, PyObject *sentValue, PyObject **pResult) { _PyErr_StackItem *previous_exc_info = tstate->exc_info; self->alvc_exc_state.previous_item = previous_exc_info; tstate->exc_info = &self->alvc_exc_state; int is_return = 0; PyObject *result = AsyncLazyValueCompute_itersend_(self, tstate, sentValue, &is_return); *pResult = result; tstate->exc_info = previous_exc_info; if (result == NULL) { AsyncLazyValueCompute_clear(self); return PYGEN_ERROR; } if (is_return) { assert(result); AsyncLazyValueCompute_clear(self); return PYGEN_RETURN; } return PYGEN_NEXT; } static PyObject * AsyncLazyValueCompute_send(AsyncLazyValueComputeObj *self, PyObject *val) { PyObject *res; PySendResult gen_status = AsyncLazyValueCompute_itersend(PyThreadState_GET(), self, val, &res); return gen_status_to_iter(gen_status, res); } static PyObject * AsyncLazyValueCompute_next(AsyncLazyValueComputeObj *self) { return AsyncLazyValueCompute_send(self, Py_None); } static PyObject * AsyncLazyValueCompute_close_(AsyncLazyValueComputeObj *self) { if (self->alvc_coroobj == NULL) { // coroutine is not started - just return Py_RETURN_NONE; } // close subgenerator int err = _PyGen_close_yf(self->alvc_coroobj); Py_CLEAR(self->alvc_coroobj); if (err == 0) { PyErr_SetNone(PyExc_GeneratorExit); } // run the error handler with either error from subgenerator // or PyExc_GeneratorExit return AsyncLazyValueCompute_handle_error( self, PyThreadState_GET(), err < 0, 1); } static PyObject * AsyncLazyValueCompute_close(AsyncLazyValueComputeObj *self, PyObject *Py_UNUSED(arg)) { PyObject *res = AsyncLazyValueCompute_close_(self); if (res == NULL) { AsyncLazyValueCompute_clear(self); } return res; } static PyObject * AsyncLazyValueCompute_throw(AsyncLazyValueComputeObj *self, PyObject *type, PyObject *val, PyObject *tb) { if (self->alvc_coroobj == NULL) { _PyGen_restore_error(type, val, tb); return NULL; } if (PyErr_GivenExceptionMatches(type, PyExc_GeneratorExit)) { int err = _PyGen_close_yf(self->alvc_coroobj); Py_CLEAR(self->alvc_coroobj); if (err < 0) { return AsyncLazyValueCompute_handle_error( self, PyThreadState_GET(), 1, 0); } if (_PyGen_restore_error(type, val, tb) == -1) { return NULL; } return AsyncLazyValueCompute_handle_error( self, PyThreadState_GET(), 1, 0); } PyObject *res = coro_throw(self->alvc_coroobj, type, val, tb); if (res != NULL) { return res; } if (_PyGen_FetchStopIterationValue(&res) == 0) { int ok = AsyncLazyValue_set_result(self->alvc_target, res); AsyncLazyValueCompute_clear(self); if (ok < 0) { Py_DECREF(res); return NULL; } _PyGen_SetStopIterationValue(res); Py_DECREF(res); return NULL; } else { return AsyncLazyValueCompute_handle_error( self, PyThreadState_GET(), 1, 0); } } /*[clinic input] _asyncio.AsyncLazyValueCompute.throw type: object val: object = NULL tb: object = NULL / [clinic start generated code]*/ static PyObject * _asyncio_AsyncLazyValueCompute_throw_impl(AsyncLazyValueComputeObj *self, PyObject *type, PyObject *val, PyObject *tb) /*[clinic end generated code: output=df7cdebbf7ef7a8d input=98866d3712aa0539]*/ { PyObject *res = AsyncLazyValueCompute_throw(self, type, val, tb); if (res == NULL) { AsyncLazyValueCompute_clear(self); } return res; } static PyMethodDef AsyncLazyValueCompute_methods[] = { {"send", (PyCFunction)AsyncLazyValueCompute_send, METH_O, NULL}, _ASYNCIO_ASYNCLAZYVALUECOMPUTE_THROW_METHODDEF {"close", (PyCFunction)AsyncLazyValueCompute_close, METH_NOARGS, NULL}, {NULL, NULL} /* Sentinel */ }; static PyAsyncMethodsWithExtra _AsyncLazyValueCompute_Type_as_async = { .ame_async_methods = { (unaryfunc)PyObject_SelfIter, /* am_await */ 0, /* am_aiter */ 0, /* am_anext */ }, .ame_send = (sendfunc)AsyncLazyValueCompute_itersend, .ame_setawaiter = (setawaiterfunc)AsyncLazyValueCompute_set_awaiter, }; static PyTypeObject _AsyncLazyValueCompute_Type = { PyVarObject_HEAD_INIT(NULL, 0) "_asyncio.AsyncLazyValueCompute", .tp_basicsize = sizeof(AsyncLazyValueComputeObj), .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_HAVE_AM_EXTRA, .tp_traverse = (traverseproc)AsyncLazyValueCompute_traverse, .tp_clear = (inquiry)AsyncLazyValueCompute_clear, .tp_methods = AsyncLazyValueCompute_methods, .tp_as_async = (PyAsyncMethods*)&_AsyncLazyValueCompute_Type_as_async, .tp_iternext = (iternextfunc)AsyncLazyValueCompute_next, .tp_iter = PyObject_SelfIter, .tp_dealloc = (destructor)AsyncLazyValueCompute_dealloc, }; /*********************** AwaitableValue *********************/ // clang-format off /*[clinic input] class _asyncio.AwaitableValue "AwaitableValueObj *" "(PyTypeObject *)&AwaitableValue_Type" [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=b8a411a4f4675926]*/ // clang-format on /*[clinic input] _asyncio.AwaitableValue.__init__ value: object / [clinic start generated code]*/ static int _asyncio_AwaitableValue___init___impl(AwaitableValueObj *self, PyObject *value) /*[clinic end generated code: output=9f83da2bdc60bfa7 input=b7af7ac0864a082c]*/ { Py_INCREF(value); Py_XDECREF(self->av_value); self->av_value = value; return 0; } static int AwaitableValueObj_traverse(AwaitableValueObj *obj, visitproc visit, void *arg) { Py_VISIT(obj->av_value); return 0; } static int AwaitableValueObj_clear(AwaitableValueObj *self) { Py_CLEAR(self->av_value); return 0; } static PySendResult AwaitableValueObj_itersend(PyThreadState *Py_UNUSED(tstate), AwaitableValueObj *self, PyObject *Py_UNUSED(sentValue), PyObject **pResult); static PyAsyncMethodsWithExtra AwaitableValue_Type_as_async = { .ame_async_methods = { (unaryfunc)PyObject_SelfIter, /* am_await */ 0, /* am_aiter */ 0, /* am_anext */ }, .ame_send = (sendfunc)AwaitableValueObj_itersend }; static PyObject * AwaitableValueObj_next(AwaitableValueObj *self) { _PyGen_SetStopIterationValue(self->av_value); return NULL; } static void AwaitableValueObj_dealloc(AwaitableValueObj *self) { AwaitableValueObj_clear(self); PyObject_GC_UnTrack(self); Py_TYPE(self)->tp_free(self); } static PySendResult AwaitableValueObj_itersend(PyThreadState *Py_UNUSED(tstate), AwaitableValueObj *self, PyObject *Py_UNUSED(sentValue), PyObject **pResult) { Py_INCREF(self->av_value); *pResult = self->av_value; return PYGEN_RETURN; } static PyObject * AwaitableValueObj_get_value(AwaitableValueObj *self, void *Py_UNUSED(ignored)) { Py_INCREF(self->av_value); return self->av_value; } static PyObject * AwaitableValueObj_new(PyObject *value) { AwaitableValueObj *self = PyObject_GC_New( AwaitableValueObj, (PyTypeObject *)&AwaitableValue_Type); if (self == NULL) { return NULL; } self->av_value = NULL; if (_asyncio_AwaitableValue___init___impl(self, value) < 0) { Py_DECREF(self); return NULL; } PyObject_GC_Track(self); return (PyObject *)self; } static PyGetSetDef AwaitableValue_Type_getsetlist[] = { {"value", (getter)AwaitableValueObj_get_value, NULL, NULL}, {NULL} /* Sentinel */ }; static PyTypeObject AwaitableValue_Type = { PyVarObject_HEAD_INIT(NULL, 0) "_asyncio.AwaitableValue", .tp_basicsize = sizeof(AwaitableValueObj), .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_HAVE_AM_EXTRA, .tp_new = PyType_GenericNew, .tp_traverse = (traverseproc)AwaitableValueObj_traverse, .tp_clear = (inquiry)AwaitableValueObj_clear, .tp_as_async = (PyAsyncMethods*)&AwaitableValue_Type_as_async, .tp_getset = AwaitableValue_Type_getsetlist, .tp_iternext = (iternextfunc)AwaitableValueObj_next, .tp_iter = PyObject_SelfIter, .tp_dealloc = (destructor)AwaitableValueObj_dealloc, .tp_init = _asyncio_AwaitableValue___init__, }; /*********************** Functions **************************/ static PyObject * coroutine_to_future(PyObject *coro, PyObject *loop) { int release_loop = 0; if (loop == Py_None) { loop = get_event_loop(); if (loop == NULL) { return NULL; } release_loop = 1; } PyObject *task = create_task(coro, loop); if (release_loop) { Py_DECREF(loop); } return task; } static inline PyObject * suspended_coroutine_to_future(PyObject *coro, PyObject *loop) { int is_native_coro = PyCoro_CheckExact(coro) || PyGen_CheckExact(coro); _SuspendedCoroNode node; // for non-native coroutines add entry to the list of // suspended_coroutines so in Task.__init__ we can make // proper decision whether to schedule task or not. if (!is_native_coro) { node.coro = coro; node.prev = suspended_coroutines; suspended_coroutines = &node; } PyObject *fut = coroutine_to_future(coro, loop); if (!is_native_coro) { assert(suspended_coroutines == &node); suspended_coroutines = node.prev; } return fut; } static inline PyObject * verify_future(PyObject *fut, PyObject *loop) { if (loop != Py_None && ((FutureObj *)fut)->fut_loop != loop) { PyErr_SetString(PyExc_ValueError, "The future belongs to a different loop than the one " "specified as the loop argument"); return NULL; } Py_INCREF(fut); return fut; } static inline PyObject * verify_futurelike(PyObject *fut, PyObject *loop) { PyMethodTableRef *t = get_or_create_method_table(Py_TYPE(fut)); if (t == NULL) { return NULL; } if (loop != Py_None) { PyObject *fut_loop = t->get_loop(fut); int error = fut_loop != loop; Py_DECREF(fut_loop); if (error) { PyErr_SetString(PyExc_ValueError, "The future belongs to a different loop than the one specified as the loop argument"); return NULL; } } Py_INCREF(fut); return fut; } static PyObject * awaitable_to_future(PyObject *awaitable, PyObject *loop) { _Py_IDENTIFIER(_wrap_awaitable); if (asyncio_tasks__wrap_awaitable == NULL) { PyObject *tasks = PyImport_ImportModule("asyncio.tasks"); if (tasks == NULL) { return NULL; } asyncio_tasks__wrap_awaitable = _PyObject_GetAttrId(tasks, &PyId__wrap_awaitable); Py_DECREF(tasks); if (asyncio_tasks__wrap_awaitable == NULL) { return NULL; } } PyObject *stack[1] = {awaitable}; PyObject *wrapped = _PyObject_FastCall(asyncio_tasks__wrap_awaitable, stack, 1); if (wrapped == NULL) { return NULL; } PyObject *res = coroutine_to_future(wrapped, loop); Py_DECREF(wrapped); return res; } static inline int is_iterable_coroutine(PyGenObject *gen) { return ((PyCodeObject *)gen->gi_code)->co_flags & CO_ITERABLE_COROUTINE; } typedef enum { KIND_ASYNC_LAZY_VALUE, KIND_COROUTINE, KIND_FUTURE, KIND_FUTURELIKE, KIND_AWAITABLE, KIND_INVALID, KIND_ERROR } CORO_OR_FUTURE_KIND; static CORO_OR_FUTURE_KIND _get_coro_or_future_kind(PyObject *coro_or_future) { // push fast checks first if (_AsyncLazyValue_CheckExact(coro_or_future)) { return KIND_ASYNC_LAZY_VALUE; } PyTypeObject *t = Py_TYPE(coro_or_future); if (PyCoro_CheckExact(coro_or_future) || (PyGen_CheckExact(coro_or_future) && is_iterable_coroutine((PyGenObject *)coro_or_future)) || t == &_PyAsyncGenASend_Type) { return KIND_COROUTINE; } // check explicitly registered coroutine types first for (Py_ssize_t i = 0; i < PyList_GET_SIZE(known_coroutine_types); ++i) { if ((PyObject *)t == PyList_GET_ITEM(known_coroutine_types, i)) { return KIND_COROUTINE; } } // check subclasses of native Future (Future, Task, ContextAwareTask etc..) if (PyType_IsSubtype(t, &FutureType)) { return KIND_FUTURE; } // check seen awaitable types if (PySet_Contains(awaitable_types_cache, (PyObject *)t)) { return KIND_AWAITABLE; } // check seen non-standard coroutine types if (PySet_Contains(iscoroutine_typecache, (PyObject *)t)) { return KIND_COROUTINE; } // continue with more heavyweight checks that might involve attribute // lookups and __subclass_check__ calls if (isfuture(coro_or_future)) { return KIND_FUTURELIKE; } if (is_coroutine(coro_or_future)) { return KIND_COROUTINE; } int ok = PyObject_IsInstance(coro_or_future, collections_abc_Awaitable); if (ok < 0) { return KIND_ERROR; } if (ok) { if (PySet_Add(awaitable_types_cache, (PyObject *)t) < 0) { return KIND_ERROR; } return KIND_AWAITABLE; } return KIND_INVALID; } static PyObject * _coro_or_future_to_future(CORO_OR_FUTURE_KIND kind, PyObject *coro_or_future, PyObject *loop) { switch (kind) { case KIND_ASYNC_LAZY_VALUE: return AsyncLazyValue_ensure_future( (AsyncLazyValueObj *)coro_or_future, loop); case KIND_COROUTINE: return coroutine_to_future(coro_or_future, loop); case KIND_FUTURE: return verify_future(coro_or_future, loop); case KIND_FUTURELIKE: return verify_futurelike(coro_or_future, loop); case KIND_AWAITABLE: return awaitable_to_future(coro_or_future, loop); case KIND_INVALID: PyErr_SetString( PyExc_TypeError, "An asyncio.Future, a coroutine or an awaitable is required"); return NULL; case KIND_ERROR: return NULL; default: Py_UNREACHABLE(); } } /*[clinic input] _asyncio.ensure_future coro_or_future: object loop: object = None Converts coro_or_future argument to a future [clinic start generated code]*/ static PyObject * _asyncio_ensure_future_impl(PyObject *module, PyObject *coro_or_future, PyObject *loop) /*[clinic end generated code: output=0c3c3c67b20d4f72 input=ac7d0d89f9a713c1]*/ { return _coro_or_future_to_future( _get_coro_or_future_kind(coro_or_future), coro_or_future, loop); } /********************* _GatheringFuture *************************************/ static inline int _cancel_futures(_GatheringFutureObj *gfut) { int ret = 0; PyObject *list = gfut->gf_data; DECLARE_METHODTABLE(t) FOREACH_INDEX(gfut->gf_datamap, i) { PyObject *child = PyList_GET_ITEM(list, i); assert(child != NULL); FETCH_METHOD_TABLE(t, Py_TYPE(child)); int is_true = t->cancel(child, NULL); if (is_true < 0) { return -1; } if (is_true) { ret = 1; } } FOREACH_INDEX_END() return ret; } static int GatheringFuture_cancel_impl(_GatheringFutureObj *self, PyObject *Py_UNUSED(ignored)) { if (future_done((FutureObj *)self)) { return 0; } int ret = _cancel_futures(self); if (ret < 0) { return -1; } if (ret) { self->gf_cancel_requested = 1; } return ret; } /*[clinic input] class _asyncio._GatheringFuture "_GatheringFutureObj *" "&_GatheringFutureType" [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=4d860324340f71a0]*/ /*[clinic input] _asyncio._GatheringFuture.cancel Cancel the future and schedule callbacks. [clinic start generated code]*/ static PyObject * _asyncio__GatheringFuture_cancel_impl(_GatheringFutureObj *self) /*[clinic end generated code: output=c3be9bd8f57fb631 input=fcdd2dad96c0022c]*/ { int is_true = GatheringFuture_cancel_impl(self, NULL); if (is_true < 0) { return NULL; } if (is_true) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } // clang-format off static PyMethodDef _GatherinFutureType_methods[] = { _ASYNCIO_FUTURE_RESULT_METHODDEF _ASYNCIO_FUTURE_EXCEPTION_METHODDEF _ASYNCIO_FUTURE_SET_RESULT_METHODDEF _ASYNCIO_FUTURE_SET_EXCEPTION_METHODDEF _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF _ASYNCIO__GATHERINGFUTURE_CANCEL_METHODDEF _ASYNCIO_FUTURE_CANCELLED_METHODDEF _ASYNCIO_FUTURE_DONE_METHODDEF _ASYNCIO_FUTURE_GET_LOOP_METHODDEF _ASYNCIO_FUTURE__REPR_INFO_METHODDEF {NULL, NULL} /* Sentinel */ }; // clang-format on static int _GatheringFutureObj_traverse(_GatheringFutureObj *fut, visitproc visit, void *arg) { FutureObj_traverse((FutureObj *)fut, visit, arg); Py_VISIT(fut->gf_data); return 0; } static int _GatheringFutureObj_clear(_GatheringFutureObj *fut) { (void)FutureObj_clear((FutureObj *)fut); Py_CLEAR(fut->gf_data); return 0; } static void _GatheringFutureObj_dealloc(PyObject *self) { _GatheringFutureObj *gfut = (_GatheringFutureObj *)self; if (_GatheringFuture_CheckExact(self)) { /* When fut is subclass of _GatheringFuture, finalizer is called from * subtype_dealloc. */ if (PyObject_CallFinalizerFromDealloc(self) < 0) { // resurrected. return; } } PyObject_GC_UnTrack(self); if (gfut->gf_weakreflist != NULL) { PyObject_ClearWeakRefs(self); } (void)_GatheringFutureObj_clear(gfut); _Bitset_dealloc(&gfut->gf_datamap); Py_TYPE(gfut)->tp_free(gfut); } static _GatheringFutureObj * _GatheringFutureObj_new(PyObject *data, PyObject *loop, int return_exceptions) { _GatheringFutureObj *gfut = (_GatheringFutureObj *)PyType_GenericNew( &_GatheringFutureType, NULL, NULL); if (gfut == NULL) { return NULL; } if (future_init((FutureObj *)gfut, loop) < 0) { Py_DECREF(gfut); return NULL; } if (_Bitset_init(&gfut->gf_datamap, PyList_GET_SIZE(data)) < 0) { Py_DECREF(gfut); return NULL; } gfut->gf_data = data; Py_INCREF(data); gfut->gf_cancel_requested = 0; gfut->gf_return_exceptions = return_exceptions; gfut->gf_pending = 0; return gfut; } static void _GatheringFutureObj_set_awaiter(_GatheringFutureObj *self, PyObject *awaiter) { PyObject *list = self->gf_data; if (list == NULL) { return; } FOREACH_INDEX(self->gf_datamap, i) { PyObject* fut = PyList_GET_ITEM(list, i); _PyAwaitable_SetAwaiter(fut, awaiter); } FOREACH_INDEX_END() } static PyAsyncMethodsWithExtra _GatheringFutureType_as_async = { .ame_async_methods = { .am_await = (unaryfunc)future_new_iter, }, .ame_setawaiter = (setawaiterfunc)_GatheringFutureObj_set_awaiter, }; static PyTypeObject _GatheringFutureType = { PyVarObject_HEAD_INIT(NULL, 0) "_asyncio._GatheringFuture", sizeof(_GatheringFutureObj), /* tp_basicsize */ .tp_base = &FutureType, .tp_dealloc = _GatheringFutureObj_dealloc, .tp_as_async = (PyAsyncMethods*)&_GatheringFutureType_as_async, .tp_repr = (reprfunc)FutureObj_repr, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_FINALIZE | Py_TPFLAGS_HAVE_AM_EXTRA, .tp_doc = _asyncio_Future___init____doc__, .tp_traverse = (traverseproc)_GatheringFutureObj_traverse, .tp_clear = (inquiry)_GatheringFutureObj_clear, .tp_weaklistoffset = offsetof(_GatheringFutureObj, gf_weakreflist), .tp_iter = (getiterfunc)future_new_iter, .tp_methods = _GatherinFutureType_methods, .tp_getset = FutureType_getsetlist, .tp_dictoffset = offsetof(_GatheringFutureObj, dict), .tp_new = PyType_GenericNew, .tp_finalize = (destructor)FutureObj_finalize, }; static PyObject * _GatheringFutureObj_mark_fut_error_as_retrieved(_GatheringFutureObj *gfut, PyObject *fut) { // outer future is done - PyMethodTableRef *t = get_or_create_method_table(Py_TYPE(fut)); int cancelled = t->cancelled(fut); if (cancelled < 0) { return NULL; } if (!cancelled) { // Mark exception retrieved. PyObject *exc = t->exception(fut); if (exc == NULL) { return NULL; } Py_DECREF(exc); } Py_RETURN_NONE; } static PyObject * _GatheringFutureObj_set_cancelled_error(_GatheringFutureObj *gfut, PyObject *fut, PyMethodTableRef *t) { PyObject *exc = _PyObject_CallNoArg(asyncio_CancelledError); if (exc == NULL) { return NULL; } int ok = future_set_exception((FutureObj *)gfut, exc); Py_DECREF(exc); if (ok < 0) { return NULL; } Py_RETURN_NONE; } static PyObject * _GatheringFutureObj_handle_all_done(_GatheringFutureObj *gfut) { DECLARE_METHODTABLE(t) FOREACH_INDEX(gfut->gf_datamap, i) { PyObject *fut = PyList_GET_ITEM(gfut->gf_data, i); PyObject *res; FETCH_METHOD_TABLE(t, Py_TYPE(fut)); if (t == NULL) { return NULL; } int is_cancelled = t->cancelled(fut); if (is_cancelled < 0) { return NULL; } if (is_cancelled) { res = _PyObject_CallNoArg(asyncio_CancelledError); } else { PyObject *exc = t->exception(fut); if (exc == NULL) { return NULL; } if (exc != Py_None) { // steal ref res = exc; } else { Py_DECREF(exc); res = t->result(fut); } } if (res == NULL) { return NULL; } // replace fut with res in gf_data PyList_SET_ITEM(gfut->gf_data, i, res); // decref fut Py_DECREF(fut); } FOREACH_INDEX_END() // drop datamap - it is not necessary anymore _Bitset_dealloc(&gfut->gf_datamap); if (gfut->gf_cancel_requested) { // drop results Py_CLEAR(gfut->gf_data); PyObject *exc = _PyObject_CallNoArg(asyncio_CancelledError); if (exc == NULL) { return NULL; } int ok = future_set_exception((FutureObj *)gfut, exc); Py_DECREF(exc); if (ok < 0) { return NULL; } } else { int ok = future_set_result((FutureObj *)gfut, gfut->gf_data); if (ok < 0) { return NULL; } Py_CLEAR(gfut->gf_data); } Py_RETURN_NONE; } static PyObject * _GatheringFutureObj_child_done(PyObject *self, PyObject **args, Py_ssize_t nargs) { assert(_GatheringFuture_CheckExact(self)); assert(nargs == 1); _GatheringFutureObj *gfut = (_GatheringFutureObj *)self; PyObject *fut = args[0]; // decrease pending futures counter gfut->gf_pending--; if (future_done((FutureObj *)gfut)) { return _GatheringFutureObj_mark_fut_error_as_retrieved(gfut, fut); } if (!gfut->gf_return_exceptions) { PyMethodTableRef *t = get_or_create_method_table(Py_TYPE(fut)); int cancelled = t->cancelled(fut); if (cancelled < 0) { return NULL; } if (cancelled) { // Check if 'fut' is cancelled first, as //'fut.exception()' will *raise* a CancelledError // instead of returning it. return _GatheringFutureObj_set_cancelled_error(gfut, fut, t); } else { PyObject *exc = t->exception(fut); if (exc == NULL) { return NULL; } if (exc != Py_None) { int ok = future_set_exception((FutureObj *)gfut, exc); Py_DECREF(exc); if (ok < 0) { return NULL; } Py_RETURN_NONE; } else { Py_DECREF(exc); } } } if (gfut->gf_pending != 0) { // still has more pending work Py_RETURN_NONE; } return _GatheringFutureObj_handle_all_done(gfut); } static PyMethodDef _GatheringFutureObj_child_done_def = { "_on_done_callback", (PyCFunction)_GatheringFutureObj_child_done, METH_FASTCALL, NULL, }; // calls 'loop.create_future' // if loop is None - uses 'get_event_loop' to obtain the loop static PyObject * _create_future(PyObject *loop) { int release_loop = 0; if (loop == Py_None) { loop = get_event_loop(); if (loop == NULL) { return NULL; } release_loop = 1; } PyObject *create_future = NULL; int meth_found = _PyObject_GetMethod(loop, create_future_name, &create_future); if (create_future == NULL) { if (release_loop) { Py_DECREF(loop); } return NULL; } PyObject *fut; if (meth_found) { PyObject *stack[1] = {loop}; fut = _PyObject_FastCall(create_future, stack, 1); } else { fut = _PyObject_FastCall(create_future, NULL, 0); } Py_DECREF(create_future); if (release_loop) { Py_DECREF(loop); } return fut; } // Takes result value and wraps it either in WaitHandle // or in future depending of whether gather call is awaited static PyObject * _wrap_result_value(PyObject *result, PyObject *loop, int awaited) { if (awaited) { Py_INCREF(result); return _PyWaitHandle_New(result, NULL); } PyObject *fut = _create_future(loop); if (fut == NULL) { return NULL; } PyMethodTableRef *t = get_or_create_method_table(Py_TYPE(fut)); if (t == NULL) { Py_DECREF(fut); return NULL; } if (t->set_result(fut, result) < 0) { Py_DECREF(fut); return NULL; } return fut; } #define USE_RESULT_ERROR -1 #define USE_RESULT_FOUND 0 #define USE_RESULT_NOT_FOUND 1 // Checks if coroutine 'coro' was already processed // and if yes - tries to reuse result of previous processing // Returns: // USE_RESULT_ERROR in case of error // USE_RESULT_FOUND in case if future for 'coro' was found // USE_RESULT_NOT_FOUND if future for 'coro' was not found static int _try_use_result_for_existing_coro(PyThreadState *tstate, PyObject *coro, Py_ssize_t i, PyObject *arg_to_fut, Py_hash_t coro_hash, PyObject *data, _Bitset *datamap) { PyObject *val = _PyDict_GetItem_KnownHash(arg_to_fut, coro, coro_hash); if (val == NULL) { if (_PyErr_Occurred(tstate)) { return USE_RESULT_ERROR; } return USE_RESULT_NOT_FOUND; } // if val is PyLong - this means that duplicate entry // evaluated and result is stored in 'results' list // at index 'val'. // In this case put the value at index 'i' as well. // Otherwise val is a future that needs to be resolved // put it in list of pending futures 'children' if (PyLong_CheckExact(val)) { Py_ssize_t index = PyLong_AsSsize_t(val); if (index == -1) { return USE_RESULT_ERROR; } val = PyList_GET_ITEM(data, i); } else { assert(datamap != NULL); _Bitset_set(datamap, i); } Py_INCREF(val); PyList_SET_ITEM(data, i, val); return USE_RESULT_FOUND; } static PyObject * _create_done_callback(PyObject *data, PyObject *loop, int return_exceptions) { PyObject *gfut = (PyObject *)_GatheringFutureObj_new(data, loop, return_exceptions); if (gfut == NULL) { return NULL; } PyObject *done_callback = PyCFunction_New(&_GatheringFutureObj_child_done_def, gfut); Py_DECREF(gfut); return done_callback; } static int _context_aware_task_set_ctx(PyObject *task, PyObject *ctx) { assert(PyObject_IsInstance(task, (PyObject *)&ContextAwareTaskType) > 0); return ContextAwareTaskObj_set_ctx((ContextAwareTaskObj *)task, ctx, NULL); } // current_context - returns owned ref // current_task - returns borrowed ref static int get_current_context_and_task(PyThreadState *tstate, PyObject **current_context, PyObject **current_task, int *context_aware_task) { PyObject *current_loop = NULL, *t = NULL; if (get_running_loop(&current_loop) < 0) { return -1; } if (current_loop != NULL) { t = PyDict_GetItemWithError(current_tasks, current_loop); Py_DECREF(current_loop); if (t == NULL && _PyErr_Occurred(tstate)) { return -1; } } if (t != NULL) { int ok = PyObject_IsInstance(t, (PyObject *)&ContextAwareTaskType); if (ok < 0) { return -1; } if (ok) { // if current task is ContextAwareTask - pull the context from it PyObject *ctx = ContextAwareTaskObj_get_ctx((ContextAwareTaskObj *)t, NULL); if (ctx == NULL) { return -1; } *current_task = t; *current_context = ctx; *context_aware_task = 1; return 0; } } // otherwise get context using user supplied hook PyObject *ctx = get_current_context(); if (ctx == NULL) { return -1; } if (t == NULL) { t = Py_None; } *current_task = t; *current_context = ctx; *context_aware_task = 0; return 0; } static PyObject * _start_coroutine_helper(PyThreadState *tstate, PyObject *coro, PyObject *loop, int *finished) { PyObject *context = PyContext_CopyCurrent(); if (context == NULL) { return NULL; } if (PyContext_Enter(context)) { Py_DECREF(context); return NULL; } PyObject *res; PySendResult gen_status = PyIter_Send(tstate, coro, Py_None, &res); if (gen_status == PYGEN_RETURN || gen_status == PYGEN_ERROR) { int failed = PyContext_Exit(context); Py_DECREF(context); if (failed) { return NULL; } *finished = 1; return res; } *finished = 0; PyObject *fut = suspended_coroutine_to_future(coro, loop); if (fut == NULL) { Py_DECREF(res); PyContext_Exit(context); Py_DECREF(context); return NULL; } PyMethodTableRef *t = get_or_create_method_table(Py_TYPE(fut)); // substitute context that was captured by the task // with the context that was used to start the coroutine PyObject *ok = t->set_task_context(fut, context); if (ok == NULL) { Py_DECREF(fut); Py_DECREF(res); PyContext_Exit(context); Py_DECREF(context); return NULL; } Py_DECREF(ok); int failed = PyContext_Exit(context); Py_DECREF(context); if (failed) { Py_DECREF(res); Py_XDECREF(fut); return NULL; } if (fut == NULL) { Py_DECREF(res); return NULL; } // link task to yielded future so it will be woken up // once future is fulfilled PyObject *retval = t->set_fut_waiter(fut, res); Py_DECREF(res); if (retval == NULL) { Py_DECREF(fut); return NULL; } Py_DECREF(retval); return fut; } static PyObject * _gather_multiple(PyObject *const*items, Py_ssize_t nitems, PyObject *arg_to_fut, PyObject *loop, int return_exceptions, int awaited) { int release_loop = 0; // Defer creation of the callback. // After creation done_callback will own refs to gathering future PyObject *done_callback = NULL; // Helpers: // borrowed ref to gathering future _GatheringFutureObj *gfut = NULL; // borrowed ref to bitset gf_datamap _Bitset *datamap = NULL; // create list to store results/pending futures PyObject *data = PyList_New(nitems); if (data == NULL) { return NULL; } PyThreadState *tstate = PyThreadState_GET(); DECLARE_METHODTABLE(t) PyObject *current_task = NULL, *current_context = NULL, *awaiter = NULL; context_aware_task_set_ctx_f context_setter = NULL; if (awaited) { _PyShadowFrame* sf = tstate->shadow_frame; // If our caller is executing eagerly, it won't have a coroutine to set // as our awaiter yet. This will be fixed up if and when the caller // does suspend. if (_PyShadowFrame_HasGen(sf)) { awaiter = (PyObject *)_PyShadowFrame_GetGen(sf); } } for (Py_ssize_t i = 0; i < nitems; ++i) { PyObject *arg = items[i]; if (awaiter) { _PyAwaitable_SetAwaiter(arg, awaiter); } Py_hash_t arg_hash = -1; if (arg_to_fut != NULL) { // try to find result for a previous case when this coroutine // appear in the argument list arg_hash = PyObject_Hash(arg); if (arg_hash == -1) { goto failed; } int ok = _try_use_result_for_existing_coro( tstate, arg, i, arg_to_fut, arg_hash, data, datamap); if (ok == USE_RESULT_FOUND) { continue; } if (ok == USE_RESULT_ERROR) { goto failed; } assert(ok == USE_RESULT_NOT_FOUND); } // determine kind of input awaitable const CORO_OR_FUTURE_KIND kind = _get_coro_or_future_kind(arg); // fast path checks // 1. completed AsyncLazyValue if (kind == KIND_ASYNC_LAZY_VALUE && ((AsyncLazyValueObj *)arg)->alv_state == ALV_DONE) { // completed asynclazyvalue - get the value directly // we can do this for both awaited and non-awaited cases PyList_SET_ITEM( data, i, AsyncLazyValue_get_result((AsyncLazyValueObj *)arg)); continue; } // 2. completed AwaitableValue if (Py_TYPE(arg) == (PyTypeObject *)&AwaitableValue_Type) { PyList_SET_ITEM( data, i, AwaitableValueObj_get_value((AwaitableValueObj *)arg, NULL)); continue; } // 3. completed future/task if ((Future_CheckExact(arg) || Task_CheckExact(arg)) && ((FutureObj *)arg)->fut_state == STATE_FINISHED) { PyList_SET_ITEM( data, i, FutureObj_get_result((FutureObj *)arg, NULL)); continue; } PyObject *fut; if (awaited && kind == KIND_COROUTINE) { // when awaited - try to execute coroutine eagerly if (current_context == NULL) { int context_aware_task; if (get_current_context_and_task(tstate, &current_context, &current_task, &context_aware_task) < 0) { goto failed; } if (context_aware_task) { context_setter = _context_aware_task_set_ctx; } } int finished = 0; PyObject *res = _start_coroutine_helper(tstate, arg, loop, &finished); if (call_reset_context(tstate, current_task, current_context, context_setter, res == NULL) < 0) { Py_CLEAR(res); } if (res == NULL) { if (finished && return_exceptions) { // error during eager execution // and exceptions should be returned as results PyObject *et, *ev, *tb; PyErr_Fetch(&et, &ev, &tb); if (!ev || !PyObject_TypeCheck(ev, (PyTypeObject *) et)) { PyErr_NormalizeException(&et, &ev, &tb); } if (tb != NULL) { PyException_SetTraceback(ev, tb); } Py_DECREF(et); Py_DECREF(tb); // grab exception value as result res = ev; } else { // mark remaining coroutines in the list as completed for (Py_ssize_t j = i + 1; j < nitems; ++j) { if (PyCoro_CheckExact(items[j])) { _PyGen_MarkJustStartedGenAsCompleted((PyGenObject *)items[j]); } } goto failed; } } if (finished) { // at this point res is either original result or exception // in any case we own the reference assert(res != NULL); // pass the ownership to list PyList_SET_ITEM(data, i, res); if (arg_to_fut != NULL) { assert(arg_hash != -1); // record index in case there is a duplicate // of arg coroutine in the list PyObject *index = PyLong_FromSsize_t(i); if (index == NULL) { // don't need to decref res since it is owned by // 'results' list goto failed; } int ok = _PyDict_SetItem_KnownHash( arg_to_fut, arg, index, arg_hash); Py_DECREF(index); if (ok < 0) { // don't need to decref res since it is owned by 'data' // list goto failed; } } continue; } else { // res is future fut = res; } } else { // generic awaitable -> future case fut = _coro_or_future_to_future(kind, arg, loop); if (fut == NULL) { goto failed; } } assert(fut != NULL); FETCH_METHOD_TABLE(t, Py_TYPE(fut)); if (loop == Py_None) { loop = t->get_loop(fut); if (loop == NULL) { Py_DECREF(fut); goto failed; } release_loop = 1; } // now we know that coroutine was suspended and loop is fixed up - we // need to wait until coroutinw is resumed - create done_callback if it // was not done before and initialize borrowed refs to gfut and bitset if (done_callback == NULL) { done_callback = _create_done_callback(data, loop, return_exceptions); if (done_callback == NULL) { Py_DECREF(fut); goto failed; } // get borrowed refs to gathering future and datamap bitmap gfut = (_GatheringFutureObj *)PyCFunction_GET_SELF(done_callback); datamap = &gfut->gf_datamap; } if (fut != arg) { // 'arg' was not a Future, therefore, 'fut' is a new // Future created specifically for 'arg'. Since the caller // can't control it, disable the "destroy pending task" // warning. if (t->set_log_destroy_pending(fut, 0) < 0) { Py_DECREF(fut); goto failed; } } if (arg_to_fut != NULL) { assert(arg_hash != -1); if (_PyDict_SetItem_KnownHash(arg_to_fut, arg, fut, arg_hash) < 0) { Py_DECREF(fut); goto failed; } } gfut->gf_pending++; PyObject *res = t->on_completed(fut, done_callback, NULL); if (res == NULL) { Py_DECREF(fut); goto failed; } Py_DECREF(res); // pass the 'fut' ownership to the 'data' list PyList_SET_ITEM(data, i, fut); // set the bit to indicate that entry is a future _Bitset_set(datamap, i); } Py_XDECREF(current_context); if (gfut == NULL) { // no tasks were scheduled - can just return accumulated results PyObject *retval = _wrap_result_value(data, loop, awaited); Py_DECREF(data); return retval; } assert(done_callback != NULL); // if gfut was created - we should have pending tasks in the queue assert(gfut->gf_pending > 0); // create new reference to gfut Py_INCREF(gfut); // drop refs to data and done_callback // they are already captured in gfut or pending tasks Py_DECREF(data); Py_DECREF(done_callback); if (release_loop) { Py_DECREF(loop); } return (PyObject *)gfut; failed: Py_XDECREF(current_context); Py_DECREF(data); if (gfut) { // capture and reset error to make sure we can run cancellation code PyObject *et, *ev, *tb; PyErr_Fetch(&et, &ev, &tb); // if we've already created gfut and exception was raised // we need to cancel all pending work _PyErr_StackItem *previous_exc_info = tstate->exc_info; _PyErr_StackItem exc_info = {.exc_type = et, .exc_value = ev, .exc_traceback = tb, .previous_item = previous_exc_info}; tstate->exc_info = &exc_info; int ok = _cancel_futures(gfut); tstate->exc_info = previous_exc_info; if (ok < 0) { // _cancel_futures itself has raised exception // release initial error, it was already captured // as cause Py_DECREF(et); Py_XDECREF(ev); Py_XDECREF(tb); } else { // restore original error PyErr_Restore(et, ev, tb); } gfut->gf_state = STATE_FAULTED; } // decref done callback - it will release gfut Py_XDECREF(done_callback); if (release_loop) { Py_DECREF(loop); } return NULL; } static PyObject * _gather_worker(PyObject *const*items, Py_ssize_t nitems, PyObject *loop, int return_exceptions, int assume_no_duplicates, int awaited) { if (nitems == 0) { PyObject *result = PyList_New(0); if (result == NULL) { return NULL; } PyObject *retval = _wrap_result_value(result, loop, awaited); Py_DECREF(result); return retval; } PyObject *arg_to_fut = NULL; if (!assume_no_duplicates) { arg_to_fut = PyDict_New(); if (arg_to_fut == NULL) { return NULL; } } PyObject *result = _gather_multiple( items, nitems, arg_to_fut, loop, return_exceptions, awaited); Py_XDECREF(arg_to_fut); return result; } static PyObject * _asyncio_gather_impl(PyObject *const*args, size_t nargsf, PyObject *kwnames, int assume_no_duplicates) { static const char * const _keywords[] = {"loop", "return_exceptions", NULL}; static _PyArg_Parser _parser = {NULL, _keywords, "gather", 0}; Py_ssize_t nargs = PyVectorcall_NARGS(nargsf); PyObject *argsbuf[2]; PyObject *loop = Py_None; int return_exceptions = 0; PyObject *const*kwargs = _PyArg_UnpackKeywords(args + nargs, 0, NULL, kwnames, &_parser, 0, 0, 0, argsbuf); if (kwargs == NULL) { return NULL; } switch (kwnames != NULL ? PyTuple_GET_SIZE(kwnames) : 0) { case 0: break; case 1: { if (kwargs[0]) { loop = kwargs[0]; } else { return_exceptions = PyObject_IsTrue(kwargs[1]); if (return_exceptions < 0) { return NULL; } } } break; case 2: { loop = kwargs[0]; return_exceptions = PyObject_IsTrue(kwargs[1]); if (return_exceptions < 0) { return NULL; } } break; default: Py_UNREACHABLE(); } return _gather_worker(args, nargs, loop, return_exceptions, assume_no_duplicates, _Py_AWAITED_CALL(nargsf) != 0);} static PyObject * _asyncio_gather(PyObject *Py_UNUSED(module), PyObject **args, size_t nargsf, PyObject *kwnames) { return _asyncio_gather_impl(args, nargsf, kwnames, 0); } static PyObject * _asyncio_ig_gather(PyObject *Py_UNUSED(module), PyObject **args, size_t nargsf, PyObject *kwnames) { return _asyncio_gather_impl(args, nargsf, kwnames, 1); } static PyObject * _asyncio_ig_gather_raise(PyObject *module, PyObject **args, size_t nargsf) { return _gather_worker(args, PyVectorcall_NARGS(nargsf), Py_None, 0, 1, _Py_AWAITED_CALL(nargsf) != 0); } static PyObject * _asyncio_ig_gather_iterable_impl(PyObject **args, size_t nargsf, const char *name, int return_exceptions) { Py_ssize_t nargs = PyVectorcall_NARGS(nargsf); if (nargs != 1) { PyErr_Format(PyExc_TypeError, "%s expects 1 positional argument, got %d", name, nargs); return NULL; } PyObject *iterable = args[0]; PyObject **items; Py_ssize_t nitems; if (PyList_CheckExact(iterable)) { items = _PyList_ITEMS(iterable); nitems = PyList_GET_SIZE(iterable); } else if (PyTuple_CheckExact(iterable)) { items = &PyTuple_GET_ITEM(iterable, 0); nitems = PyTuple_GET_SIZE(iterable); } else { iterable = PySequence_Tuple(iterable); if (iterable == NULL) { return NULL; } items = &PyTuple_GET_ITEM(iterable, 0); nitems = PyTuple_GET_SIZE(iterable); } PyObject *result = _gather_worker(items, nitems, Py_None, return_exceptions, 1, _Py_AWAITED_CALL(nargsf) != 0); if (iterable != args[0]) { Py_DECREF(iterable); } return result; } static PyObject * _asyncio_ig_gather_iterable(PyObject *module, PyObject **args, size_t nargsf) { return _asyncio_ig_gather_iterable_impl(args, nargsf, "gather_iterable", 0); } static PyObject * _asyncio_ig_gather_iterable_return_exceptions(PyObject *module, PyObject **args, size_t nargsf) { return _asyncio_ig_gather_iterable_impl( args, nargsf, "gather_iterable_return_exceptions", 1); } static PyObject * _asyncio_ig_gather_iterable_raise(PyObject *module, PyObject **args, size_t nargsf) { return _asyncio_ig_gather_iterable_impl( args, nargsf, "gather_iterable_raise", 0); } static PyObject * _asyncio_ig_gather_iterable_no_raise(PyObject *module, PyObject **args, size_t nargsf) { return _asyncio_ig_gather_iterable_impl( args, nargsf, "gather_iterable_no_raise", 1); } static PyObject * _asyncio_ig_gather_no_raise(PyObject *module, PyObject **args, size_t nargsf) { return _gather_worker(args, PyVectorcall_NARGS(nargsf), Py_None, 1, 1, _Py_AWAITED_CALL(nargsf) != 0); } /*[clinic input] _asyncio._get_running_loop Return the running event loop or None. This is a low-level function intended to be used by event loops. This function is thread-specific. [clinic start generated code]*/ static PyObject * _asyncio__get_running_loop_impl(PyObject *module) /*[clinic end generated code: output=b4390af721411a0a input=0a21627e25a4bd43]*/ { PyObject *loop; if (get_running_loop(&loop)) { return NULL; } if (loop == NULL) { /* There's no currently running event loop */ Py_RETURN_NONE; } return loop; } /*[clinic input] _asyncio._set_running_loop loop: 'O' / Set the running event loop. This is a low-level function intended to be used by event loops. This function is thread-specific. [clinic start generated code]*/ static PyObject * _asyncio__set_running_loop(PyObject *module, PyObject *loop) /*[clinic end generated code: output=ae56bf7a28ca189a input=4c9720233d606604]*/ { if (set_running_loop(loop)) { return NULL; } Py_RETURN_NONE; } /*[clinic input] _asyncio.get_event_loop Return an asyncio event loop. When called from a coroutine or a callback (e.g. scheduled with call_soon or similar API), this function will always return the running event loop. If there is no running event loop set, the function will return the result of `get_event_loop_policy().get_event_loop()` call. [clinic start generated code]*/ static PyObject * _asyncio_get_event_loop_impl(PyObject *module) /*[clinic end generated code: output=2a2d8b2f824c648b input=9364bf2916c8655d]*/ { return get_event_loop(); } /*[clinic input] _asyncio.get_running_loop Return the running event loop. Raise a RuntimeError if there is none. This function is thread-specific. [clinic start generated code]*/ static PyObject * _asyncio_get_running_loop_impl(PyObject *module) /*[clinic end generated code: output=c247b5f9e529530e input=2a3bf02ba39f173d]*/ { PyObject *loop; if (get_running_loop(&loop)) { return NULL; } if (loop == NULL) { /* There's no currently running event loop */ PyErr_SetString( PyExc_RuntimeError, "no running event loop"); } return loop; } /*[clinic input] _asyncio._register_task task: object Register a new task in asyncio as executed by loop. Returns None. [clinic start generated code]*/ static PyObject * _asyncio__register_task_impl(PyObject *module, PyObject *task) /*[clinic end generated code: output=8672dadd69a7d4e2 input=21075aaea14dfbad]*/ { if (register_task(task) < 0) { return NULL; } Py_RETURN_NONE; } /*[clinic input] _asyncio._unregister_task task: object Unregister a task. Returns None. [clinic start generated code]*/ static PyObject * _asyncio__unregister_task_impl(PyObject *module, PyObject *task) /*[clinic end generated code: output=6e5585706d568a46 input=28fb98c3975f7bdc]*/ { if (unregister_task(task) < 0) { return NULL; } Py_RETURN_NONE; } /*[clinic input] _asyncio._enter_task loop: object task: object Enter into task execution or resume suspended task. Task belongs to loop. Returns None. [clinic start generated code]*/ static PyObject * _asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task) /*[clinic end generated code: output=a22611c858035b73 input=de1b06dca70d8737]*/ { if (enter_task(loop, task) < 0) { return NULL; } Py_RETURN_NONE; } /*[clinic input] _asyncio._leave_task loop: object task: object Leave task execution or suspend a task. Task belongs to loop. Returns None. [clinic start generated code]*/ static PyObject * _asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task) /*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/ { if (leave_task(loop, task) < 0) { return NULL; } Py_RETURN_NONE; } /*[clinic input] _asyncio.all_tasks loop: object = None Return a set of all tasks for the loop. [clinic start generated code]*/ static PyObject * _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) /*[clinic end generated code: output=0e107cbb7f72aa7b input=43a1b423c2d95bfa]*/ { if (loop != Py_None) { return _all_tasks(loop, 1); } else { PyObject *current_loop = get_event_loop(); if (current_loop == NULL) { return NULL; } PyObject *res = _all_tasks(current_loop, 1); Py_DECREF(current_loop); return res; } } /*[clinic input] _asyncio._is_coro_suspended coro: object / Returns true if coroutine being passed is in suspended state [clinic start generated code]*/ static PyObject * _asyncio__is_coro_suspended(PyObject *module, PyObject *coro) /*[clinic end generated code: output=2dc85abc60ad2517 input=f83d85ec96c84d05]*/ { if (_is_coro_suspended(coro)) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } /*[clinic input] _asyncio._register_known_coroutine_type coro_type: object / Records type as known coroutine type [clinic start generated code]*/ static PyObject * _asyncio__register_known_coroutine_type(PyObject *module, PyObject *coro_type) /*[clinic end generated code: output=f3171e157e46e8dd input=cf1bda10ff448a40]*/ { if (!PyType_Check(coro_type)) { PyErr_Format(PyExc_TypeError, "Type expected, got '%s'", Py_TYPE(coro_type)->tp_name); return NULL; } int ok = PyObject_IsSubclass(coro_type, asyncio_coroutines__COROUTINE_TYPES); if (ok < 0) { return NULL; } if (ok == 0) { PyErr_Format(PyExc_TypeError, "Type '%s' is not a coroutine type", ((PyTypeObject *)coro_type)->tp_name); return NULL; } if (PyList_Append(known_coroutine_types, coro_type) < 0) { return NULL; } Py_RETURN_NONE; } /*[clinic input] _asyncio._set_context_helpers get_current_context_obj: object reset_context_obj: object / Internal function used to supply custom context management hooks. Hooks are provided as function pointers wrapped in PyCapsule objects [clinic start generated code]*/ static PyObject * _asyncio__set_context_helpers_impl(PyObject *module, PyObject *get_current_context_obj, PyObject *reset_context_obj) /*[clinic end generated code: output=1431c0c0b69eaf7d input=727ad3656f8570bc]*/ { if (_set_context_helpers(get_current_context_obj, reset_context_obj) < 0) { return NULL; } Py_RETURN_NONE; } // clang-format off /*[clinic input] _asyncio._reset_context_helpers Internal function used to reset custom context management hooks to default values. [clinic start generated code]*/ static PyObject * _asyncio__reset_context_helpers_impl(PyObject *module) /*[clinic end generated code: output=4397eef9ef07f99b input=11e31191807d2f94]*/ // clang-format off { _reset_context_helpers(); Py_RETURN_NONE; } static PyObject * _asyncio_create_awaitable_value(PyObject *Py_UNUSED(module), PyObject *value) { return AwaitableValueObj_new(value); } static PyObject * _asyncio_clear_caches(PyObject *Py_UNUSED(module), PyObject *Py_UNUSED(arg)) { PySet_Clear(awaitable_types_cache); PySet_Clear(iscoroutine_typecache); Py_RETURN_NONE; } static PyObject * _start_immediate_impl(PyObject *coro, PyObject *loop); /*[clinic input] _asyncio._start_immediate coro: object loop: object = None / Executes provided coroutine eagerly. If coroutine is completed - returns AwaitableValue that contain the result. If coroutine is not completed - returns a task that wraps the coroutine [clinic start generated code]*/ static PyObject * _asyncio__start_immediate_impl(PyObject *module, PyObject *coro, PyObject *loop) /*[clinic end generated code: output=7fa6139388ccfbe7 input=b69cd8f4808f7382]*/ { int release_loop = 0; if (loop == Py_None) { if (get_running_loop(&loop) < 0) { return NULL; } if (loop == NULL) { PyErr_SetString(PyExc_RuntimeError, "no running event loop"); return NULL; } release_loop = 1; } PyObject *res = _start_immediate_impl(coro, loop); if (release_loop) { Py_DECREF(loop); } return res; } static PyObject * _start_immediate_impl(PyObject *coro, PyObject *loop) { assert(_get_coro_or_future_kind(coro) == KIND_COROUTINE); PyThreadState *tstate = PyThreadState_GET(); PyObject *ctx, *task; int context_aware_task; if (get_current_context_and_task(tstate, &ctx, &task, &context_aware_task) < 0) { return NULL; } context_aware_task_set_ctx_f context_setter = context_aware_task ? _context_aware_task_set_ctx : NULL; int finished; PyObject *res = _start_coroutine_helper(tstate, coro, loop, &finished); if (call_reset_context(tstate, task, ctx, context_setter, res == NULL) < 0) { Py_XDECREF(res); Py_DECREF(ctx); return NULL; } Py_DECREF(ctx); if (finished && res != NULL) { PyObject *retval = AwaitableValueObj_new(res); Py_DECREF(res); return retval; } return res; } /*********************** PyRunningLoopHolder ********************/ static PyRunningLoopHolder * new_running_loop_holder(PyObject *loop) { PyRunningLoopHolder *rl = PyObject_New( PyRunningLoopHolder, &PyRunningLoopHolder_Type); if (rl == NULL) { return NULL; } #if defined(HAVE_GETPID) && !defined(MS_WINDOWS) rl->rl_pid = current_pid; #endif Py_INCREF(loop); rl->rl_loop = loop; return rl; } static void PyRunningLoopHolder_tp_dealloc(PyRunningLoopHolder *rl) { Py_CLEAR(rl->rl_loop); PyObject_Free(rl); } static PyTypeObject PyRunningLoopHolder_Type = { PyVarObject_HEAD_INIT(NULL, 0) "_RunningLoopHolder", sizeof(PyRunningLoopHolder), .tp_getattro = PyObject_GenericGetAttr, .tp_flags = Py_TPFLAGS_DEFAULT, .tp_dealloc = (destructor)PyRunningLoopHolder_tp_dealloc, }; /********************* _AwaitingFuture *************************************/ static PyObject * awaiting_future_complete(FutureObj *self, PyObject *Py_UNUSED(arg)) { if (future_done(self)) { Py_RETURN_NONE; } if (future_set_result(self, Py_None) < 0) { return NULL; } Py_RETURN_NONE; } PyMethodDef _AwaitingFuture_complete = { "awaiting_future_complete", (PyCFunction)awaiting_future_complete, METH_O, NULL }; /*[clinic input] class _asyncio._AwaitingFuture "_AwaitingFutureObj *" "&_AwaitingFuture_Type" [clinic start generated code]*/ /*[clinic end generated code: output=da39a3ee5e6b4b0d input=35d731a9ef8b07e2]*/ /*[clinic input] _asyncio._AwaitingFuture.__init__ awaited: object * loop: object = None A subclass of Future that completes when awaited completes. An _AwaitingFuture is primarily useful if you want to wait for another future to complete but be able to reliably cancel the wait. An _AwaitingFuture completes either with a None result when awaited completes or with a CancelledError if it is cancelled. It does not support set_result or set_exception. It propagates its awaiter onto awaited when it is awaited. [clinic start generated code]*/ static int _asyncio__AwaitingFuture___init___impl(_AwaitingFutureObj *self, PyObject *awaited, PyObject *loop) /*[clinic end generated code: output=3c01cd528bbb6dc3 input=79939f5e053d0945]*/ { if (!isfuture(awaited)) { PyErr_Format(PyExc_TypeError, "First argument must be a future. Got %R.", awaited); return -1; } int st = future_init((FutureObj *) self, loop); if (st != 0) { return st; } Py_INCREF(awaited); self->af_awaited = awaited; self->af_done_callback = PyCFunction_New(&_AwaitingFuture_complete, (PyObject *) self); if (self->af_done_callback == NULL) { return -1; } PyMethodTableRef *tableref = get_or_create_method_table(Py_TYPE(awaited)); if (tableref == NULL) { return -1; } PyObject *res = tableref->on_completed(awaited, self->af_done_callback, NULL); if (res == NULL) { return -1; } Py_DECREF(res); return 0; } /*[clinic input] _asyncio._AwaitingFuture.cancel Cancel the future and schedule callbacks. If the future is already done or cancelled, return False. Otherwise, change the future's state to cancelled, schedule the callbacks and return True. This does not propagate cancellation onto the future that we're waiting on. [clinic start generated code]*/ static PyObject * _asyncio__AwaitingFuture_cancel_impl(_AwaitingFutureObj *self) /*[clinic end generated code: output=4bdc008d3b05902a input=3318fa0518e207a4]*/ { ENSURE_FUTURE_ALIVE((FutureObj *) self); if (self->af_state != STATE_PENDING) { Py_RETURN_FALSE; } // Awaited may continue running while whatever awaited us may finish. // Clear awaiter in case awaited outlives us. _PyAwaitable_SetAwaiter(self->af_awaited, NULL); PyObject *res = FutureLike_remove_done_callback(self->af_awaited, self->af_done_callback); if (res == NULL) { return NULL; } Py_DECREF(res); int st = future_cancel_impl((FutureObj *) self, NULL); if (st < 0) { return NULL; } if (st) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } /*[clinic input] _asyncio._AwaitingFuture.set_result result: object Unsupported by _AwaitingFuture. [clinic start generated code]*/ static PyObject * _asyncio__AwaitingFuture_set_result_impl(_AwaitingFutureObj *self, PyObject *result) /*[clinic end generated code: output=5a84708fa644a9d1 input=0cd4092a28e5a40e]*/ { PyErr_SetString(PyExc_RuntimeError, "_AwaitingFuture does not support set_result operation"); return NULL; } /*[clinic input] _asyncio._AwaitingFuture.set_exception exception: object Unsupported by _AwaitingFuture. [clinic start generated code]*/ static PyObject * _asyncio__AwaitingFuture_set_exception_impl(_AwaitingFutureObj *self, PyObject *exception) /*[clinic end generated code: output=bc1654087cb1db67 input=105ca64ce20606fd]*/ { PyErr_SetString(PyExc_RuntimeError, "_AwaitingFuture does not support set_exception operation"); return NULL; } // clang-format off static PyMethodDef _AwaitingFutureType_methods[] = { _ASYNCIO_FUTURE_RESULT_METHODDEF _ASYNCIO_FUTURE_EXCEPTION_METHODDEF _ASYNCIO__AWAITINGFUTURE_SET_RESULT_METHODDEF _ASYNCIO__AWAITINGFUTURE_SET_EXCEPTION_METHODDEF _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF _ASYNCIO__AWAITINGFUTURE_CANCEL_METHODDEF _ASYNCIO_FUTURE_CANCELLED_METHODDEF _ASYNCIO_FUTURE_DONE_METHODDEF _ASYNCIO_FUTURE_GET_LOOP_METHODDEF _ASYNCIO_FUTURE__REPR_INFO_METHODDEF {NULL, NULL} /* Sentinel */ }; // clang-format on static int _AwaitingFutureObj_traverse(_AwaitingFutureObj *fut, visitproc visit, void *arg) { FutureObj_traverse((FutureObj *)fut, visit, arg); Py_VISIT(fut->af_awaited); Py_VISIT(fut->af_done_callback); return 0; } static int _AwaitingFutureObj_clear(_AwaitingFutureObj *fut) { (void)FutureObj_clear((FutureObj *)fut); Py_CLEAR(fut->af_awaited); Py_CLEAR(fut->af_done_callback); return 0; } static void _AwaitingFutureObj_dealloc(PyObject *self) { _AwaitingFutureObj *fut = (_AwaitingFutureObj *)self; /* _AwaitingFutureObj cannot be subclassed */ if (PyObject_CallFinalizerFromDealloc(self) < 0) { // resurrected. return; } PyObject_GC_UnTrack(self); if (fut->af_weakreflist != NULL) { PyObject_ClearWeakRefs(self); } (void)_AwaitingFutureObj_clear(fut); Py_TYPE(fut)->tp_free(fut); } static void _AwaitingFutureObj_set_awaiter(_AwaitingFutureObj *self, PyObject *awaiter) { _PyAwaitable_SetAwaiter(self->af_awaited, awaiter); } static PyAsyncMethodsWithExtra _AwaitingFutureType_as_async = { .ame_async_methods = { .am_await = (unaryfunc)future_new_iter, }, .ame_setawaiter = (setawaiterfunc)_AwaitingFutureObj_set_awaiter, }; static PyTypeObject _AwaitingFuture_Type = { PyVarObject_HEAD_INIT(NULL, 0) "_asyncio._AwaitingFuture", sizeof(_AwaitingFutureObj), /* tp_basicsize */ .tp_base = &FutureType, .tp_dealloc = _AwaitingFutureObj_dealloc, .tp_as_async = (PyAsyncMethods*)&_AwaitingFutureType_as_async, .tp_repr = (reprfunc)FutureObj_repr, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_HAVE_FINALIZE | Py_TPFLAGS_HAVE_AM_EXTRA, .tp_doc = _asyncio__AwaitingFuture___init____doc__, .tp_traverse = (traverseproc)_AwaitingFutureObj_traverse, .tp_clear = (inquiry)_AwaitingFutureObj_clear, .tp_weaklistoffset = offsetof(_AwaitingFutureObj, af_weakreflist), .tp_iter = (getiterfunc)future_new_iter, .tp_methods = _AwaitingFutureType_methods, .tp_getset = FutureType_getsetlist, .tp_dictoffset = offsetof(_AwaitingFutureObj, dict), .tp_init = (initproc)_asyncio__AwaitingFuture___init__, .tp_new = PyType_GenericNew, .tp_finalize = (destructor)FutureObj_finalize, }; /*********************** Module **************************/ static void module_free_freelists(void) { PyObject *next; PyObject *current; next = (PyObject*) fi_freelist; while (next != NULL) { assert(fi_freelist_len > 0); fi_freelist_len--; current = next; next = (PyObject*) ((futureiterobject*) current)->future; PyObject_GC_Del(current); } assert(fi_freelist_len == 0); fi_freelist = NULL; } static void module_free(void *m) { Py_CLEAR(asyncio_mod); Py_CLEAR(traceback_extract_stack); Py_CLEAR(asyncio_future_repr_info_func); Py_CLEAR(asyncio_get_event_loop_policy); Py_CLEAR(asyncio_iscoroutine_func); Py_CLEAR(asyncio_task_get_stack_func); Py_CLEAR(asyncio_task_print_stack_func); Py_CLEAR(asyncio_task_repr_info_func); Py_CLEAR(asyncio_InvalidStateError); Py_CLEAR(asyncio_CancelledError); Py_CLEAR(collections_abc_Awaitable); Py_CLEAR(asyncio_coroutines__COROUTINE_TYPES); Py_CLEAR(asyncio_tasks__wrap_awaitable); Py_CLEAR(minus_one); Py_CLEAR(all_non_asyncio_tasks); Py_CLEAR(current_tasks); Py_CLEAR(iscoroutine_typecache); Py_CLEAR(context_kwname); Py_CLEAR(context_name); Py_CLEAR(call_soon_args1); Py_CLEAR(call_soon_args2); Py_CLEAR(call_soon_kwargs); Py_CLEAR(event_loop_dispatch_tables); Py_CLEAR(last_used_eventloop_type); Py_CLEAR(last_used_eventloop_dispatch_table); Py_CLEAR(fallback_dispatch_table); Py_CLEAR(context_aware_task_hooks); Py_CLEAR(known_coroutine_types); Py_CLEAR(awaitable_types_cache); all_asyncio_tasks = NULL; module_free_freelists(); module_initialized = 0; } static int module_init(void) { PyObject *module = NULL; asyncio_mod = PyImport_ImportModule("asyncio"); if (asyncio_mod == NULL) { goto fail; } if (module_initialized != 0) { return 0; } else { module_initialized = 1; } current_tasks = PyDict_New(); if (current_tasks == NULL) { goto fail; } iscoroutine_typecache = PySet_New(NULL); if (iscoroutine_typecache == NULL) { goto fail; } context_kwname = Py_BuildValue("(s)", "context"); if (context_kwname == NULL) { goto fail; } context_name = PyUnicode_FromString("context"); if (context_name == NULL) { goto fail; } context_name_hash = PyObject_Hash(context_name); if (context_name_hash == -1) { goto fail; } event_loop_dispatch_tables = PyDict_New(); if (event_loop_dispatch_tables == NULL) { goto fail; } fallback_dispatch_table = PyEventLoopDispatchTable_new(); if (fallback_dispatch_table == NULL) { goto fail; } known_coroutine_types = PyList_New(0); if (known_coroutine_types == NULL) { goto fail; } awaitable_types_cache = PySet_New(NULL); if (awaitable_types_cache == NULL) { goto fail; } #define WITH_MOD(NAME) \ Py_CLEAR(module); \ module = PyImport_ImportModule(NAME); \ if (module == NULL) { \ goto fail; \ } #define GET_MOD_ATTR(VAR, NAME) \ VAR = PyObject_GetAttrString(module, NAME); \ if (VAR == NULL) { \ goto fail; \ } WITH_MOD("asyncio.events") GET_MOD_ATTR(asyncio_get_event_loop_policy, "get_event_loop_policy") WITH_MOD("asyncio.base_futures") GET_MOD_ATTR(asyncio_future_repr_info_func, "_future_repr_info") WITH_MOD("asyncio.exceptions") GET_MOD_ATTR(asyncio_InvalidStateError, "InvalidStateError") GET_MOD_ATTR(asyncio_CancelledError, "CancelledError") WITH_MOD("asyncio.base_tasks") GET_MOD_ATTR(asyncio_task_repr_info_func, "_task_repr_info") GET_MOD_ATTR(asyncio_task_get_stack_func, "_task_get_stack") GET_MOD_ATTR(asyncio_task_print_stack_func, "_task_print_stack") WITH_MOD("asyncio.coroutines") GET_MOD_ATTR(asyncio_iscoroutine_func, "iscoroutine") GET_MOD_ATTR(asyncio_coroutines__COROUTINE_TYPES, "_COROUTINE_TYPES") WITH_MOD("collections.abc") GET_MOD_ATTR(collections_abc_Awaitable, "Awaitable") WITH_MOD("traceback") GET_MOD_ATTR(traceback_extract_stack, "extract_stack") PyObject *weak_set; WITH_MOD("weakref") GET_MOD_ATTR(weak_set, "WeakSet"); all_non_asyncio_tasks = _PyObject_CallNoArg(weak_set); Py_CLEAR(weak_set); if (all_non_asyncio_tasks == NULL) { goto fail; } Py_DECREF(module); return 0; fail: Py_CLEAR(module); module_free(NULL); return -1; #undef WITH_MOD #undef GET_MOD_ATTR } PyDoc_STRVAR(module_doc, "Accelerator module for asyncio"); static PyMethodDef asyncio_methods[] = { _ASYNCIO_GET_EVENT_LOOP_METHODDEF _ASYNCIO_GET_RUNNING_LOOP_METHODDEF _ASYNCIO__GET_RUNNING_LOOP_METHODDEF _ASYNCIO__SET_RUNNING_LOOP_METHODDEF _ASYNCIO__REGISTER_TASK_METHODDEF _ASYNCIO__UNREGISTER_TASK_METHODDEF _ASYNCIO__ENTER_TASK_METHODDEF _ASYNCIO__LEAVE_TASK_METHODDEF _ASYNCIO_ISFUTURE_METHODDEF _ASYNCIO_ALL_TASKS_METHODDEF _ASYNCIO_ENSURE_FUTURE_METHODDEF _ASYNCIO__IS_CORO_SUSPENDED_METHODDEF _ASYNCIO__REGISTER_KNOWN_COROUTINE_TYPE_METHODDEF _ASYNCIO__SET_CONTEXT_HELPERS_METHODDEF _ASYNCIO__RESET_CONTEXT_HELPERS_METHODDEF { "gather", (PyCFunction)_asyncio_gather, METH_FASTCALL | METH_KEYWORDS, NULL }, // IG specific bits { "ig_gather", (PyCFunction)_asyncio_ig_gather, METH_FASTCALL | METH_KEYWORDS, NULL }, { "ig_gather_raise", (PyCFunction)_asyncio_ig_gather_raise, METH_FASTCALL, NULL }, { "ig_gather_no_raise", (PyCFunction)_asyncio_ig_gather_no_raise, METH_FASTCALL, NULL }, { "ig_gather_iterable", (PyCFunction)_asyncio_ig_gather_iterable, METH_FASTCALL, NULL }, { "ig_gather_iterable_return_exceptions", (PyCFunction)_asyncio_ig_gather_iterable_return_exceptions, METH_FASTCALL, NULL }, { "ig_gather_iterable_raise", (PyCFunction)_asyncio_ig_gather_iterable_raise, METH_FASTCALL, NULL }, { "ig_gather_iterable_no_raise", (PyCFunction)_asyncio_ig_gather_iterable_no_raise, METH_FASTCALL, NULL }, { "create_awaitable_value", (PyCFunction)_asyncio_create_awaitable_value, METH_O, NULL }, _ASYNCIO__START_IMMEDIATE_METHODDEF { "_clear_caches", (PyCFunction)_asyncio_clear_caches, METH_NOARGS, NULL }, { "_clear_method_table", (PyCFunction)clear_method_table, METH_O, NULL}, {NULL, NULL} }; static struct PyModuleDef _asynciomodule = { PyModuleDef_HEAD_INIT, /* m_base */ "_asyncio", /* m_name */ module_doc, /* m_doc */ -1, /* m_size */ asyncio_methods, /* m_methods */ NULL, /* m_slots */ NULL, /* m_traverse */ NULL, /* m_clear */ (freefunc)module_free /* m_free */ }; #if defined(HAVE_GETPID) && !defined(MS_WINDOWS) void reset_pid(void) { current_pid = getpid(); } #endif static const char * get_name(PyObject *func) { assert(PyCFunction_Check(func)); PyMethodDef *method = ((PyCFunctionObject *)func)->m_ml; return method->ml_name; } PyObject * call_gather_fastcall(PyObject *func, PyObject *const *args, size_t nargsf, PyObject *kwnames) { if (kwnames != NULL && PyTuple_GET_SIZE(kwnames) != 0) { PyErr_Format(PyExc_TypeError, "%.200s() takes no keyword arguments", get_name(func)); } _PyCFunctionFast meth = (_PyCFunctionFast)PyCFunction_GET_FUNCTION(func); return meth(PyCFunction_GET_SELF(func), args, nargsf); } PyObject * call_gather_fastcall_kw(PyObject *func, PyObject *const *args, size_t nargsf, PyObject *kwnames) { _PyCFunctionFastWithKeywords meth = (_PyCFunctionFastWithKeywords)PyCFunction_GET_FUNCTION(func); return meth(PyCFunction_GET_SELF(func), args, nargsf, kwnames); } static int set_gather_entrypoint(PyObject *mod, struct _Py_Identifier *id, vectorcallfunc entry) { PyObject *f = _PyObject_GetAttrId(mod, id); if (f == NULL) { return -1; } assert(PyCFunction_Check(f)); ((PyCFunctionObject *)f)->vectorcall = entry; Py_DECREF(f); return 0; } PyMODINIT_FUNC PyInit__asyncio(void) { if (module_init() < 0) { return NULL; } if (PyType_Ready(&FutureType) < 0) { return NULL; } if (PyType_Ready((PyTypeObject *)&FutureIterType) < 0) { return NULL; } if (PyType_Ready(&TaskStepMethWrapper_Type) < 0) { return NULL; } if (PyType_Ready(&TaskType) < 0) { return NULL; } if (PyType_Ready(&PyRunningLoopHolder_Type) < 0) { return NULL; } if (PyType_Ready(&_MethodTable_RefType) < 0) { return NULL; } if (PyType_Ready(&_PyEventLoopDispatchTable_Type) < 0) { return NULL; } if (PyType_Ready(&ContextAwareTaskHooksType) < 0) { return NULL; } if (PyType_Ready(&ContextAwareTaskType) < 0) { return NULL; } if (PyType_Ready((PyTypeObject *)&_AsyncLazyValue_Type) < 0) { return NULL; } if (PyType_Ready((PyTypeObject *)&_AsyncLazyValueCompute_Type) < 0) { return NULL; } if (PyType_Ready(&_GatheringFutureType) < 0) { return NULL; } if (PyType_Ready((PyTypeObject *)&AwaitableValue_Type) < 0) { return NULL; } if (PyType_Ready((PyTypeObject *)&_AwaitingFuture_Type) < 0) { return NULL; } if (PyType_Ready(&_ContextAwareTaskCallback_Type) < 0) { return NULL; } methodref_callback = PyCFunction_New(&_MethodTableRefCallback, NULL); if (methodref_callback == NULL) { return NULL; } get_loop_name = _PyUnicode_FromId(&PyId_get_loop); if (get_loop_name == NULL) { return NULL; } add_done_callback_name = _PyUnicode_FromId(&PyId_add_done_callback); if (add_done_callback_name == NULL) { return NULL; } remove_done_callback_name = _PyUnicode_FromId(&PyId_remove_done_callback); if (remove_done_callback_name == NULL) { return NULL; } result_name = _PyUnicode_FromId(&PyId_result); if (result_name == NULL) { return NULL; } cancel_name = _PyUnicode_FromId(&PyId_cancel); if (cancel_name == NULL) { return NULL; } done_name = _PyUnicode_FromId(&PyId_done); if (done_name == NULL) { return NULL; } create_task_name = _PyUnicode_FromId(&PyId_create_task); if (create_task_name == NULL) { return NULL; } minus_one = PyLong_FromLong(-1); if (minus_one == NULL) { return NULL; } throw_name = _PyUnicode_FromId(&PyId_throw); if (throw_name == NULL) { return NULL; } create_future_name = _PyUnicode_FromId(&PyId_create_future); if (create_future_name == NULL) { return NULL; } context_aware_task_hooks = PyList_New(0); if (context_aware_task_hooks == NULL) { return NULL; } Future_TableRef = create_method_table(&FutureType); if (Future_TableRef == NULL) { return NULL; } GatheringFuture_TableRef = create_method_table(&_GatheringFutureType); if (GatheringFuture_TableRef == NULL) { return NULL; } PyObject *m = PyModule_Create(&_asynciomodule); if (m == NULL) { return NULL; } Py_INCREF(&FutureType); if (PyModule_AddObject(m, "Future", (PyObject *)&FutureType) < 0) { Py_DECREF(&FutureType); Py_DECREF(m); return NULL; } Py_INCREF(&TaskType); if (PyModule_AddObject(m, "Task", (PyObject *)&TaskType) < 0) { Py_DECREF(&TaskType); Py_DECREF(m); return NULL; } Py_INCREF(&ContextAwareTaskType); if (PyModule_AddObject( m, "ContextAwareTask", (PyObject *)&ContextAwareTaskType) < 0) { Py_DECREF(&ContextAwareTaskType); return NULL; } Py_INCREF(&_AsyncLazyValue_Type); if (PyModule_AddObject( m, "AsyncLazyValue", (PyObject *)&_AsyncLazyValue_Type) < 0) { Py_DECREF(&_AsyncLazyValue_Type); return NULL; } Py_INCREF(&AwaitableValue_Type); if (PyModule_AddObject( m, "AwaitableValue", (PyObject *)&AwaitableValue_Type) < 0) { Py_DECREF(&AwaitableValue_Type); return NULL; } Py_INCREF(&_AwaitingFuture_Type); if (PyModule_AddObject( m, "_AwaitingFuture", (PyObject *)&_AwaitingFuture_Type) < 0) { Py_DECREF(&_AwaitingFuture_Type); return NULL; } PyObject *async_lazy_value_as_future = PyCapsule_New(AsyncLazyValue_ensure_future, NULL, NULL); if (async_lazy_value_as_future == NULL) { return NULL; } if (PyModule_AddObject(m, "_async_lazy_value_as_future", async_lazy_value_as_future) < 0) { Py_DECREF(async_lazy_value_as_future); return NULL; } Py_INCREF(current_tasks); if (PyModule_AddObject(m, "_current_tasks", current_tasks) < 0) { Py_DECREF(current_tasks); Py_DECREF(m); return NULL; } #if defined(HAVE_GETPID) && !defined(MS_WINDOWS) reset_pid(); pthread_atfork(NULL, NULL, reset_pid); #endif _Py_IDENTIFIER(gather); _Py_IDENTIFIER(ig_gather); _Py_IDENTIFIER(ig_gather_raise); _Py_IDENTIFIER(ig_gather_no_raise); _Py_IDENTIFIER(ig_gather_iterable); _Py_IDENTIFIER(ig_gather_iterable_return_exceptions); _Py_IDENTIFIER(ig_gather_iterable_raise); _Py_IDENTIFIER(ig_gather_iterable_no_raise); if (set_gather_entrypoint(m, &PyId_gather, call_gather_fastcall_kw) < 0) { return NULL; } if (set_gather_entrypoint(m, &PyId_ig_gather, call_gather_fastcall_kw) < 0) { return NULL; } if (set_gather_entrypoint(m, &PyId_ig_gather_raise, call_gather_fastcall) < 0) { return NULL; } if (set_gather_entrypoint( m, &PyId_ig_gather_no_raise, call_gather_fastcall) < 0) { return NULL; } if (set_gather_entrypoint( m, &PyId_ig_gather_iterable, call_gather_fastcall) < 0) { return NULL; } if (set_gather_entrypoint(m, &PyId_ig_gather_iterable_return_exceptions, call_gather_fastcall) < 0) { return NULL; } if (set_gather_entrypoint( m, &PyId_ig_gather_iterable_raise, call_gather_fastcall) < 0) { return NULL; } if (set_gather_entrypoint( m, &PyId_ig_gather_iterable_no_raise, call_gather_fastcall) < 0) { return NULL; } return m; }