Python/switchboard.c (346 lines of code) (raw):

/* Copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com) */ #include <stddef.h> #include "switchboard.h" /* A subscription for changes to an object */ typedef struct { PyObject_HEAD Switchboard_Callback callback; /* An argument to callback, may be NULL */ PyObject *arg; /* A weak reference to the object we've subscribed to */ PyObject *watched; } ObjSubscr; static int obj_subscr_traverse(ObjSubscr *self, visitproc visit, void *arg); static void obj_subscr_dealloc(ObjSubscr *subscr); PyTypeObject ObjSubscr_Type = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "ObjSubscr", .tp_basicsize = sizeof(ObjSubscr), .tp_itemsize = 0, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, .tp_traverse = (traverseproc) obj_subscr_traverse, .tp_dealloc = (destructor) obj_subscr_dealloc, }; static ObjSubscr * obj_subscr_new(PyObject *obj, Switchboard_Callback callback, PyObject *arg) { ObjSubscr *subscr = PyObject_GC_New(ObjSubscr, &ObjSubscr_Type); if (subscr == NULL) { return NULL; } /* This only creates a new weak reference if one did not already exist. Otherwise * the pre-existing weakref is returned. */ subscr->watched = PyWeakref_NewRef(obj, NULL); if (subscr->watched == NULL) { Py_DECREF(subscr); return NULL; } subscr->callback = callback; Py_XINCREF(arg); subscr->arg = arg; PyObject_GC_Track((PyObject *) subscr); return subscr; } static int obj_subscr_traverse(ObjSubscr *self, visitproc visit, void *arg) { Py_VISIT(self->arg); Py_VISIT(self->watched); return 0; } static void obj_subscr_dealloc(ObjSubscr *subscr) { PyObject_GC_UnTrack((PyObject *) subscr); Py_XDECREF(subscr->arg); Py_XDECREF(subscr->watched); PyObject_GC_Del((PyObject *) subscr); } /* * These callbacks are used to notify the switchboard that an object it has * subscribers for has been reclaimed. * * It handles notifying any registered subscriptions and then removes them * from the switchboard. */ typedef struct { PyObject_HEAD /* A weak reference to the switchboard to notify */ PyObject *switchboard_ref; } ObjGoneCallback; static PyObject *obj_gone_callback_call(PyObject *callable, PyObject *args, PyObject *kwargs); static int obj_gone_callback_traverse(ObjGoneCallback *self, visitproc visit, void *arg); static void obj_gone_callback_dealloc(ObjGoneCallback *self); static void switchboard_notify_gone(Switchboard *switchboard, PyObject *ref); PyTypeObject ObjGoneCallback_Type = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "ObjGoneCallback", .tp_basicsize = sizeof(ObjGoneCallback), .tp_itemsize = 0, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, .tp_call = obj_gone_callback_call, .tp_traverse = (traverseproc) obj_gone_callback_traverse, .tp_dealloc = (destructor) obj_gone_callback_dealloc, }; #define ObjGoneCallback_Check(obj) (Py_TYPE(obj) == &ObjGoneCallback_Type) static ObjGoneCallback * obj_gone_callback_new(Switchboard *switchboard) { ObjGoneCallback *cb = PyObject_GC_New(ObjGoneCallback, &ObjGoneCallback_Type); if (cb == NULL) { return NULL; } cb->switchboard_ref = PyWeakref_NewRef((PyObject *) switchboard, NULL); if (cb->switchboard_ref == NULL) { Py_DECREF(cb); return NULL; } PyObject_GC_Track((PyObject *) cb); return cb; } static PyObject * obj_gone_callback_call(PyObject *callable, PyObject *args, PyObject *kwargs) { assert(ObjGoneCallback_Check(callable)); ObjGoneCallback *cb = (ObjGoneCallback *) callable; PyObject *ref = PyTuple_GetItem(args, 0); PyObject *switchboard = PyWeakref_GetObject(cb->switchboard_ref); if (switchboard != Py_None) { switchboard_notify_gone((Switchboard *) switchboard, ref); } Py_RETURN_NONE; } static int obj_gone_callback_traverse(ObjGoneCallback *self, visitproc visit, void *arg) { Py_VISIT(self->switchboard_ref); return 0; } static void obj_gone_callback_dealloc(ObjGoneCallback *self) { PyObject_GC_UnTrack((PyObject *) self); Py_XDECREF(self->switchboard_ref); PyObject_GC_Del(self); } static int switchboard_traverse(Switchboard *self, visitproc visit, void *arg); static int switchboard_clear(Switchboard *switchboard); static void switchboard_dealloc(Switchboard *switchboard); PyTypeObject Switchboard_Type = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "Switchboard", .tp_basicsize = sizeof(Switchboard), .tp_itemsize = 0, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, .tp_traverse = (traverseproc) switchboard_traverse, .tp_clear = (inquiry) switchboard_clear, .tp_dealloc = (destructor) switchboard_dealloc, .tp_weaklistoffset = offsetof(Switchboard, weaklist), }; int Switchboard_Init(void) { if (PyType_Ready(&ObjSubscr_Type) < 0) { return -1; } if (PyType_Ready(&ObjGoneCallback_Type) < 0) { return -1; } return PyType_Ready(&Switchboard_Type); } Switchboard * Switchboard_New(void) { if (Switchboard_Init() < 0) { return NULL; } Switchboard *switchboard = PyObject_GC_New(Switchboard, &Switchboard_Type); if (switchboard == NULL) { return NULL; } switchboard->subscrs = PyDict_New(); if (switchboard->subscrs == NULL) { Py_DECREF(switchboard); return NULL; } switchboard->weaklist = NULL; switchboard->obj_gone_callback = (PyObject *)obj_gone_callback_new(switchboard); if (switchboard->obj_gone_callback == NULL) { Py_DECREF(switchboard); return NULL; } PyObject_GC_Track((PyObject *) switchboard); return switchboard; } static int switchboard_traverse(Switchboard *self, visitproc visit, void *arg) { Py_VISIT(self->subscrs); return 0; } static int switchboard_clear(Switchboard *switchboard) { Py_CLEAR(switchboard->subscrs); return 0; } static void switchboard_dealloc(Switchboard *switchboard) { PyObject_GC_UnTrack((PyObject *) switchboard); Py_DECREF(switchboard->subscrs); Py_DECREF(switchboard->obj_gone_callback); PyObject_GC_Del((PyObject *) switchboard); } PyObject * Switchboard_Subscribe(Switchboard *switchboard, PyObject *obj, Switchboard_Callback cb, PyObject *cb_arg) { ObjSubscr *subscr = obj_subscr_new(obj, cb, cb_arg); if (subscr == NULL) { return NULL; } PyObject *subscrs = PyDict_GetItem(switchboard->subscrs, subscr->watched); if (subscrs == NULL) { /* No subscriptions for obj yet */ subscrs = PySet_New(NULL); if (subscrs == NULL) { Py_DECREF(subscr); return NULL; } PyObject *key = PyWeakref_NewRef(obj, switchboard->obj_gone_callback); if (key == NULL) { Py_DECREF(subscr); Py_DECREF(subscrs); return NULL; } if (PyDict_SetItem(switchboard->subscrs, key, subscrs) != 0) { Py_DECREF(subscrs); Py_DECREF(subscr); Py_DECREF(key); return NULL; } Py_DECREF(key); } else { Py_INCREF(subscrs); } if (PySet_Add(subscrs, (PyObject *) subscr) < 0) { Py_DECREF(subscr); subscr = NULL; } Py_DECREF(subscrs); return (PyObject *) subscr; } static PyObject * duplicate(PyObject *sequence) { Py_ssize_t size = PyObject_Size(sequence); if (size < 0) { return NULL; } PyObject *result = PyTuple_New(size); if (result == NULL) { return NULL; } PyObject *iter = PyObject_GetIter(sequence); if (iter == NULL) { Py_DECREF(result); return NULL; } PyObject *item = NULL; Py_ssize_t i = 0; while ((item = PyIter_Next(iter))) { /* PyTuple_SetItem steals a reference */ Py_INCREF(item); PyTuple_SetItem(result, i, item); Py_DECREF(item); i++; } Py_DECREF(iter); if (PyErr_Occurred()) { Py_DECREF(result); result = NULL; } return result; } int Switchboard_Notify(Switchboard *switchboard, PyObject *obj) { PyObject *ref = PyWeakref_NewRef(obj, NULL); if (ref == NULL) { return -1; } PyObject *subscrs = PyDict_GetItem(switchboard->subscrs, ref); Py_DECREF(ref); if (subscrs == NULL) { /* No subscriptions for obj; nothing to do */ return 0; } /* Copy subscriptions in case any of the callbacks modify the original set */ PyObject *subscrs_copy = duplicate(subscrs); if (subscrs_copy == NULL) { return -1; } /* Invoke the callbacks */ Py_ssize_t num_subscrs = PyTuple_Size(subscrs_copy); for (Py_ssize_t i = 0; i < num_subscrs; i++) { ObjSubscr *subscr = (ObjSubscr *) PyTuple_GetItem(subscrs_copy, i); subscr->callback((PyObject *) subscr, subscr->arg, subscr->watched); } Py_DECREF(subscrs_copy); return 0; } static void switchboard_notify_gone(Switchboard *switchboard, PyObject *ref) { assert(PyWeakref_Check(ref)); PyObject *subscrs = PyDict_GetItem(switchboard->subscrs, ref); if (subscrs == NULL) { return; } Py_INCREF(subscrs); PyDict_DelItem(switchboard->subscrs, ref); PyObject *iter = PyObject_GetIter(subscrs); if (iter == NULL) { Py_DECREF(subscrs); return; } /* Notify all subscribers */ PyObject *item = NULL; while ((item = PyIter_Next(iter))) { ObjSubscr *subscr = (ObjSubscr *) item; subscr->callback(item, subscr->arg, subscr->watched); Py_DECREF(item); } Py_DECREF(iter); Py_DECREF(subscrs); if (PyErr_Occurred()) { /* An error ocurred while iterating through the subscriptions. There isn't anything we can do at this point, since the subscribed object is gone. Clear the error and move on. */ PyErr_Clear(); } } Py_ssize_t Switchboard_GetNumSubscriptions(Switchboard *switchboard, PyObject *object) { PyObject *ref = PyWeakref_NewRef(object, NULL); if (ref == NULL) { return -1; } PyObject *subscrs = PyDict_GetItem(switchboard->subscrs, ref); Py_DECREF(ref); if (subscrs == NULL) { /* No subscriptions for obj; nothing to do */ return 0; } return PyObject_Size(subscrs); } int Switchboard_Unsubscribe(Switchboard *switchboard, PyObject *subscr) { PyObject *watched = ((ObjSubscr *) subscr)->watched; PyObject *subscrs = PyDict_GetItem(switchboard->subscrs, watched); if (subscrs == NULL) { return 0; } if (PySet_Discard(subscrs, subscr) < 0) { return -1; } if (PyObject_Size(subscrs) == 0) { PyDict_DelItem(switchboard->subscrs, watched); } return 1; } int Switchboard_UnsubscribeAll(Switchboard *switchboard, PyObject *handles) { PyObject *iter = PyObject_GetIter(handles); if (iter == NULL) { return -1; } PyObject *subscr = NULL; while ((subscr = PyIter_Next(iter))) { Switchboard_Unsubscribe(switchboard, subscr); Py_DECREF(subscr); } Py_DECREF(iter); if (PyErr_Occurred()) { return -1; } return 0; }