in sapp/db_support.py [0:0]
def _merge_by_keys(cls, database: DB, items, hash_item, *attrs):
"""An object can have multiple attributes as its key. This merges the
items to be added with existing items in the database based on their
key(s).
session: Session object for querying the DB.
items: Iterator of items to be added to the DB.
hash_item: Function that takes as in put the item to be added and
returns a hash of it.
attrs: List of attributes of the object/class that represent the
object's key.
Returns the next item (in items) that is not already in the DB.
"""
# Note: items is an iterator, not an iterable, 'tee' is a must.
items_iter1, items_iter2 = tee(items)
keys = {} # map of hash -> keys of the item
for i in items_iter1:
# An item's key is a map of 'attr -> item[attr]' where attr is
# usually a column name.
# For 'SharedText', its key would look like: {
# "kind": "feature",
# "contents": "via tito",
# }
item_hash = hash_item(i)
keys[item_hash] = {attr.key: getattr(i, attr.key) for attr in attrs}
# Find existing items.
existing_ids = {} # map of item_hash -> existing ID
cls_attrs = [getattr(cls, attr.key) for attr in attrs]
for fetch_keys in split_every(BATCH_SIZE, keys.values()):
filters = []
for fetch_key in fetch_keys:
# Sub-filters for checking if item with fetch_key is in the DB
# Example: [
# SharedText.kind.__eq__("feature"),
# SharedText.contents.__eq__("via tito"),
# ]
subfilter = [
getattr(cls, attr).__eq__(val)
if type(val) is not dict
else getattr(cls, attr) == cast(val, JSON)
for attr, val in fetch_key.items()
]
filters.append(and_(*subfilter))
with database.make_session() as session:
existing_items = (
# pyre-fixme[16]: `PrepareMixin` has no attribute `id`.
session.query(cls.id, *cls_attrs)
.filter(or_(*(filters)))
.all()
)
for existing_item in existing_items:
item_hash = hash_item(existing_item)
existing_ids[item_hash] = existing_item.id
# Now see if we can merge
new_items = {}
for i in items_iter2:
item_hash = hash_item(i)
if item_hash in existing_ids:
# The key is already in the DB
i.id.resolve(existing_ids[item_hash], is_new=False)
elif item_hash in new_items:
# The key is already in the list of new items
i.id.resolve(new_items[item_hash].id, is_new=False)
else:
# The key is new
new_items[item_hash] = i
yield i