utils/scripts/insert_enrollment_data_in_bq.py (44 lines of code) (raw):
"""
Script to insert enrollment records in BQ
"""
# disabling for linting to pass
# pylint: disable = broad-exception-raised, broad-except
import datetime
import os
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore
from common.models import CourseEnrollmentMapping, Section
from common.utils.bq_helper import insert_rows_to_bq
DATABASE_PREFIX = os.getenv("DATABASE_PREFIX", "")
BQ_DATASET = DATABASE_PREFIX + "lms_analytics"
cred = credentials.Certificate("service_creds.json")
app = firebase_admin.initialize_app(cred)
db = firestore.client()
def main():
print("script started")
enrollment_ref = db.collection(CourseEnrollmentMapping.collection_name)
docs = enrollment_ref.stream()
for doc in docs:
try:
doc_dict = doc.to_dict()
section = Section.find_by_id(doc_dict["section"].id)
enrollment_record = CourseEnrollmentMapping.find_by_id(doc.id)
rows = [{
"enrollment_id": enrollment_record.id,
"email": enrollment_record.user.email,
"user_id": enrollment_record.user.user_id,
"role": enrollment_record.role,
"status": enrollment_record.status,
"invitation_id": enrollment_record.invitation_id,
"section_id": section.id,
"cohort_id": section.cohort.id,
"course_id": section.classroom_id,
"timestamp": datetime.datetime.utcnow(),
}]
if insert_rows_to_bq(
rows=rows, dataset=BQ_DATASET, table_name="sectionEnrollmentRecord"):
print("Successfully Enrollment record " +
f"inserted in bq by id: {enrollment_record.id}")
else:
print("Failed Enrollment record inserted" +
f" in bq by id: {enrollment_record.id}")
except Exception as e:
print(f"failed insertion due to:{e}")
if __name__ == "__main__":
main()