scripts/replay-from-filestobackup/replay.py (94 lines of code) (raw):

#!/usr/bin/env python3 from argparse import ArgumentParser from elasticsearch import Elasticsearch from elasticsearch_dsl import Search from elasticsearch_dsl.response.hit import Hit import dateutil.parser as dateutil import mimetypes import os import pika import logging import uuid from pprint import pprint import json logging.basicConfig(level=logging.ERROR) logger = logging.getLogger(__name__) logger.level = logging.DEBUG ## The purpose of this script is to read un-backed-up files from Elasticsearch and replay them onto an exchange as ## "new" files. def should_drop(filepath: str) -> bool: """ returns a boolean True if the file should be ignored for backup :param filepath: :return: """ return ".Trashes" in filepath \ or "Adobe Premiere Preview Files" in filepath \ or filepath.endswith(".pkf") \ or not filepath.startswith("/srv/Multimedia2/") \ or filepath.startswith("/srv/Multimedia2/Scratch") def convert_to_assetsweeper_record(h: Hit): """ converts the data from the given search hit into a (partial) asset sweeper record for storagetier :param h: search hit to convert :return: """ if isinstance(h.wholepath, tuple): wholepath = "".join(list(h.wholepath)) else: wholepath = h.wholepath parsed_time = dateutil.parse(h.timestamp) epoch_time = int(parsed_time.timestamp()) file_dir = os.path.dirname(wholepath) file_name = os.path.basename(wholepath) return ({ "imported_id": None, "mime_type": mimetypes.types_map.get("."+h.extension, "application/octet-stream"), "mtime": epoch_time, "ctime": epoch_time, "atime": epoch_time, "size": h.size, "ignore": False, "owner": 0, "group": 0, "parent_dir": file_dir, "filename": file_name }, wholepath) def scan_for_files(client: Elasticsearch, index_name: str, filter_type: str): """ generator that scans for files from the index and yields them :param client: elasticsearch client object :param index_name: name of the index to scan :param filter_type: either 'nobackup' or 'resync' :return: a generator which yields the records as dictionaries """ if filter_type=="nobackup": filter_str = "Entry had no matches" elif filter_type=="resync": filter_str = "Matched none out of 1 potential entries" else: logger.error("Invalid filter option. You must specify 'nobackup' or 'resync'") raise ValueError("Invalid filter parameter") s = Search(using=client, index=index_name)\ .query("term", **{"notes.keyword": filter_str}) logger.info("Got a total of {0} records to upload".format(s.count())) return s.scan() def send_to_rabbit(conn, exchg: str, routing_key: str, rec: dict): """ Sends the given message to a rabbitmq exchange :param conn: active RabbitMQ Channel object :param exchg: exchange name to send to (string) :param routing_key: routing key to send with it (string) :param rec: the record to send, as a dictionary :return: None """ content = json.dumps(rec, separators=(',', ':')) msg_id = uuid.uuid4() props = pika.BasicProperties(content_type="application/json",content_encoding="utf8",message_id=msg_id.hex) conn.basic_publish(exchg, routing_key, content, props) # START MAIN parser = ArgumentParser(description="Read un-backed-up files from elasticsearch and replay them for backup") parser.add_argument("--es",dest='esurl',default='localhost',help="URL(s) to access elasticsearch on") parser.add_argument("--index",dest='indexname',default='files-to-back-up',help="Index name to scan") parser.add_argument("--rabbitmq",dest="rmq",default="localhost", help="rabbitmq instance to send results to") parser.add_argument("--port",dest="port",default="5672", help="port on which rabbitmq is running") parser.add_argument("--user", dest="user", default="rabbit", help="rabbitmq username") parser.add_argument("--passwd", dest="passwd", default="rabbit", help="rabbitmq password") parser.add_argument("--vhost", dest="vhost", default="/", help="virtualhost on the rabbitmq broker") parser.add_argument("--exchange", dest="exchange", help="rabbitmq exchange to send to") parser.add_argument("--routing-key", dest="routingkey", default="test.message", help="routing key for sent messages") parser.add_argument("--limit", dest="limit", help="limit the returned results to this amount") parser.add_argument("--filter-type", dest="filtertype", default="nobackup", help="defaults to 'nobackup' which means only select files logged as having no backup at all. You can also specify 'resync', which means to only select files which have a backup that does not match.") args = parser.parse_args() mimetypes.init() credentials = pika.PlainCredentials(args.user, args.passwd) rmq_conn = pika.BlockingConnection(pika.ConnectionParameters(host=args.rmq, port=args.port, virtual_host=args.vhost, credentials=credentials)) rmq_chan = rmq_conn.channel() esclient = Elasticsearch(args.esurl.split(","), verify_certs=True) i = 0 limit = None if args.limit is not None: limit = int(args.limit) for f in scan_for_files(esclient, args.indexname, args.filtertype): rec, wholepath = convert_to_assetsweeper_record(f) if should_drop(wholepath): logger.debug("dropping {0}".format(wholepath)) continue pprint(rec) send_to_rabbit(rmq_chan, args.exchange, args.routingkey, rec) i+=1 if limit and i>=limit: logger.info("Exiting due to user-supplied limit") break logger.info("Selected {0} records".format(i))