1_synthetic-qa-generation/glan-instruct/glan.py (378 lines of code) (raw):

###################################################################################################### # [Description] # Open Source implementation of GLAN (Generalized Instruction Tuning) using the Azure OpenAI API. # # Author: Daekeun Kim (daekeun.kim@microsoft.com) # Reference: Synthetic Data (Almost) from Scratch: Generalized Instruction Tuning for Language Models # https://arxiv.org/pdf/2402.13064 ###################################################################################################### import os import json import time import uuid import random import openai import markdown import textwrap import jsonlines from tqdm import tqdm from bs4 import BeautifulSoup from datasets import load_dataset from dotenv import load_dotenv from openai import AzureOpenAI, RateLimitError import logging logger = logging.getLogger('GLAN_logger') MAX_RETRIES = 3 DELAY_INCREMENT = 30 load_dotenv() # take environment variables from .env. client = AzureOpenAI( azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"), api_key = os.getenv("AZURE_OPENAI_API_KEY"), api_version = os.getenv("AZURE_OPENAI_API_VERSION") ) def format_timespan(seconds): hours = seconds // 3600 minutes = (seconds - hours*3600) // 60 remaining_seconds = seconds - hours*3600 - minutes*60 timespan = f"{hours} hours {minutes} minutes {remaining_seconds:.4f} seconds." return timespan def read_jsonl(filepath): data = [] with jsonlines.open(filepath) as reader: for obj in reader: data.append(obj) return data def read_text_to_list(file_path): data_list = [] with open(file_path, 'r') as file: for line in file: cleaned_line = line.strip() if cleaned_line: data_list.append(cleaned_line) return data_list def save_list_to_text(file_path, data_list): with open(file_path, 'w') as file: for item in data_list: file.write(f"{item}\n") def generate_taxonomy(max_number_of_fields=10, model_name="gpt-4o", **kwargs): """ Generate a taxonomy of human knowledge and capabilities. """ prompt = f""" Create a taxonomy of human knowledge and capabilities. Break it down into fields, sub-fields, and disciplines. Limit the number of fields to a maximum of {max_number_of_fields}. Provide the result in JSON format with the following structure: {{ "fields": [ {{ "field_name": "Field Name", "sub_fields": [ {{ "sub_field_name": "Sub-field Name", "disciplines": ["Discipline 1", "Discipline 2", ...] }}, ... ] }}, ... ] }} Examples of `field_name` are Natural Sciences, Humanities or Service. Examples of `sub_field_name` are Chemistry, Sociology or Retailing. """ response = client.chat.completions.create( model=model_name, messages=[{"role": "user", "content": prompt}], response_format = {'type': "json_object"}, **kwargs ) taxonomy = response.choices[0].message.content try: taxonomy_json = json.loads(taxonomy) except json.JSONDecodeError: taxonomy_json = {"error": "Failed to parse JSON"} key = next(iter(taxonomy_json)) disciplines = [discipline for field in taxonomy_json[key] for sub_field in field['sub_fields'] for discipline in sub_field['disciplines']] return taxonomy_json, disciplines def validate_subjects_json_structure(data): """ Check if the JSON data has the correct structure for the subjects. """ # Check if the top-level key "subjects" exists and is a list if "subjects" not in data or not isinstance(data["subjects"], list): return False # Iterate through each subject to validate its structure for subject in data["subjects"]: # Check if each subject is a dictionary if not isinstance(subject, dict): return False # Check if required keys exist in each subject and have the correct types if "subject" not in subject or not isinstance(subject["subject"], str): return False if "level" not in subject or not isinstance(subject["level"], int): return False if "subtopics" not in subject or not isinstance(subject["subtopics"], list): return False # Check if each item in "subtopics" is a string if not all(isinstance(subtopic, str) for subtopic in subject["subtopics"]): return False return True def generate_subjects(discipline, max_number_of_subjects=2, max_number_of_subtopics=5, model_name="gpt-4o", **kwargs): """ Generate a list of subjects for a given discipline. Please refer to section 2.2 of the paper. """ prompt = f""" You are an expert in {discipline}. Create a comprehensive list of subjects a student should learn under this discipline. For each subject, provide the level (e.g., 100, 200, 300, 400, 500, 600, 700, 800, 900) and include key subtopics in JSON format. {{ "subjects": [ {{ 'subject': 'Introduction to Computer Science', 'level': 100, 'subtopics': [ 'Basic Programming', 'Software Development Fundamentals', 'Computer Organization' ] }}, ... ] }} Limit the number of `subjects` to a maximum of {max_number_of_subjects}. Limit the number of `subtopics` to a maximum of {max_number_of_subtopics} for each `subject`. """ t0 = time.time() response = client.chat.completions.create( model=model_name, messages=[{"role": "user", "content": prompt}], response_format = {'type': "json_object"}, **kwargs ) subjects = response.choices[0].message.content subjects_json = json.loads(subjects) if not validate_subjects_json_structure(subjects_json): logger.info("Failed to parse JSON. Trying again.") subjects_json = generate_subjects(discipline, max_number_of_subjects, max_number_of_subtopics, model_name, **kwargs) t1 = time.time() logger.info(f"Generating subjects took {t1 - t0:.4f} seconds.") return subjects_json def generate_syllabus(subject, level, subtopics, max_number_of_session_name=5, model_name="gpt-4o", **kwargs): """ Generate a syllabus for a given subject at a specific level. Please refer to section 2.3 of the paper. """ prompt = f""" You are an expert in creating educational syllabi. Create a detailed syllabus for the subject "{subject}" at the {level} level. The syllabus should be broken down into multiple class sessions, each covering different key concepts. The subtopics for this subject include: {subtopics}. Provide the syllabus in JSON format with the following structure in JSON format: {{ "syllabus": [ {{ "session_name": "Session 1 Name", "description": "Brief description of the session", "key_concepts": ["Key concept 1", "Key concept 2", ...] }}, ... ] }} Limit the number of `session_name` to a maximum of {max_number_of_session_name}. """ t0 = time.time() response = client.chat.completions.create( model=model_name, messages=[{"role": "user", "content": prompt}], response_format = {'type': "json_object"}, **kwargs ) output = response.choices[0].message.content.strip() #logger.info(textwrap.indent(output, '\t')) try: syllabus_json = json.loads(output) key = next(iter(syllabus_json)) syllabus = syllabus_json[key] except json.JSONDecodeError: logger.error("Failed to parse JSON") return None, None # Extract class details class_sessions = [session['session_name'] for session in syllabus] key_concepts = [session['key_concepts'] for session in syllabus] t1 = time.time() logger.info(f"\tGenerating syllabus took {t1 - t0:.4f} seconds.") return class_sessions, key_concepts def sample_class_sessions_and_key_concepts(class_sessions, key_concepts, single_session=True): """ Sample class sessions and key concepts to generate questions of varying difficulty. class_sessions: List of class sessions key_concepts: List of key concepts for each session. single_session: Whether to sample from a single session or multiple sessions. :return: Combination of sampled class sessions and core concepts """ if single_session: session_index = random.randint(0, len(class_sessions) - 1) selected_session = class_sessions[session_index] num_concepts = min(5, len(key_concepts[session_index])) selected_key_concepts = random.sample(key_concepts[session_index], k=random.randint(1, num_concepts)) else: if len(class_sessions) < 2: raise ValueError("Not enough sessions for multi-session sampling") session_indices = random.sample(range(len(class_sessions)), k=2) selected_sessions = [class_sessions[i] for i in session_indices] combined_key_concepts = key_concepts[session_indices[0]] + key_concepts[session_indices[1]] num_concepts = min(5, len(combined_key_concepts)) selected_key_concepts = random.sample(combined_key_concepts, k=random.randint(2, num_concepts)) return selected_session if single_session else selected_sessions, selected_key_concepts def generate_questions( class_sessions, key_concepts, subject, level, subtopics, model_name="gpt-4o", num_iterations=2, num_questions_per_iteration=5, max_tokens=2048, batch_size=4, language="Korean", **kwargs ): """ Generate questions based on class sessions and key concepts using LangChain pipeline. Please refer to section 2.4 of the paper. """ from langchain_openai import AzureChatOpenAI from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser, BaseOutputParser #from langchain_core.pydantic_v1 import BaseModel, Field from pydantic import BaseModel, Field llm = AzureChatOpenAI( max_tokens=max_tokens, openai_api_version="2024-09-01-preview", azure_deployment=model_name, **kwargs ) class CustomOutputParser(BaseOutputParser): def parse(self, text: str): cleaned_text = text.strip() return {"question": cleaned_text} prompt = PromptTemplate.from_template( """Based on the class session(s) {selected_class_sessions} and key concepts {selected_key_concepts}, generate a homework question. A question must be less than {max_tokens} tokens. Write in {language}. """ ) chain = prompt | llm | CustomOutputParser() questions = [] for idx in range(num_iterations): t0 = time.time() logger.info(f"\t\t===== Generating Questions: Iteration {idx}") selected_class_sessions, selected_key_concepts = sample_class_sessions_and_key_concepts(class_sessions, key_concepts, single_session=True) batch_inputs = [{ "selected_class_sessions": selected_class_sessions, "selected_key_concepts": selected_key_concepts, "max_tokens": max_tokens, "language": language } for _ in range(num_questions_per_iteration)] metadata = {"subject": subject, "level": level, "subtopics": subtopics} with tqdm(total=len(batch_inputs), desc="\t\tProcessing Questions") as pbar: for i in range(0, len(batch_inputs), batch_size): minibatch = batch_inputs[i:i+batch_size] retries = 0 while retries <= MAX_RETRIES: try: questions_ = chain.batch(minibatch, {"max_concurrency": batch_size}) break # Exit the retry loop once successful except RateLimitError as rate_limit_error: delay = (retries + 1) * DELAY_INCREMENT logger.warning(f"{rate_limit_error}. Retrying in {delay} seconds...") time.sleep(delay) retries += 1 if retries > MAX_RETRIES: logger.error(f"Max retries reached this batch. Skipping to next batch.") break except Exception as e: logger.error(f"Error in process_inputs: {e}") break for q in questions_: q.update(metadata) questions.extend(questions_) pbar.set_postfix({"current_batch": f"{i//batch_size + 1}/{(len(batch_inputs) + (batch_size-1))//batch_size}"}) pbar.update(len(minibatch)) t1 = time.time() logger.info(f"\t\tIteration {idx} took {t1 - t0:.4f} seconds.") return questions def generate_answers(all_questions, model_name="gpt-4o", max_tokens=1024, batch_size=5, **kwargs): """ Generate answers to the questions using LangChain pipeline. Please refer to section 2.4 of the paper. """ from langchain.schema.output_parser import StrOutputParser from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate, SystemMessagePromptTemplate from langchain_openai import AzureChatOpenAI llm = AzureChatOpenAI( temperature=0, max_tokens=max_tokens, openai_api_version="2024-09-01-preview", azure_deployment=model_name, ) system_prompt = """Answer the question. Keep the answer short and concise. The topic, level, and subtopic of this question are as follows. ## Subject: {subject} ## Level: {level} ## Subtopics: {subtopics} Respond "DO NOT KNOW" if not sure about the answer. """ system_prompt += f"Answer must be less than {max_tokens} token length." system_message_template = SystemMessagePromptTemplate.from_template(system_prompt) human_prompt = [ { "type": "text", "text": "{question}" }, ] human_message_template = HumanMessagePromptTemplate.from_template(human_prompt) prompt = ChatPromptTemplate.from_messages( [ system_message_template, human_message_template ] ) chain = prompt | llm | StrOutputParser() logger.info(f"===== Generating Answers") t0 = time.time() all_answers = [] with tqdm(total=len(all_questions), desc="Processing Answers") as pbar: for i in range(0, len(all_questions), batch_size): minibatch = all_questions[i:i+batch_size] retries = 0 while retries <= MAX_RETRIES: try: answers = chain.batch(minibatch, {"max_concurrency": batch_size}) break # Exit the retry loop once successful except RateLimitError as rate_limit_error: delay = (retries + 1) * DELAY_INCREMENT logger.warning(f"{rate_limit_error}. Retrying in {delay} seconds...") time.sleep(delay) retries += 1 if retries > MAX_RETRIES: logger.error(f"Max retries reached this batch. Skipping to next batch.") break except Exception as e: logger.error(f"Error in process_inputs: {e}") break all_answers.extend(answers) pbar.set_postfix({"current_batch": f"{i//batch_size + 1}/{(len(all_questions) + (batch_size-1))//batch_size}"}) pbar.update(len(minibatch)) t1 = time.time() timespan = format_timespan(t1 - t0) logger.info(f"Generating Answer dataset took {timespan}") return all_answers def set_logger(logfile_name="logfile.log"): logger.setLevel(logging.INFO) console_handler = logging.StreamHandler() file_handler = logging.FileHandler(logfile_name) console_handler.setLevel(logging.INFO) file_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) logger.addHandler(console_handler) logger.addHandler(file_handler) def glan_instruction_generation(args): """ GLAN Pipeline """ GENERATE_DISCIPLINES = args.generate_disciplines GENERATE_QUESTION_ONLY = args.generate_question_only DISCIPLINES_FILEPATH = args.disciplines_filepath LANGUAGE = args.language MODEL_NAME = args.model_name MODEL_NAME_FOR_ANSWER = args.model_name_for_answer MAX_NUMBER_OF_FIELDS = args.max_number_of_fields MAX_NUMBER_OF_SUBJECTS = args.max_number_of_subjects MAX_NUMBER_OF_SUBTOPICS = args.max_number_of_subtopics MAX_NUMBER_OF_SESSION_NAME = args.max_number_of_session_name NUM_ITERATIONS = args.num_iterations NUM_QUESTIONS_PER_ITERATION = args.num_questions_per_iteration QUESTION_MAX_TOKENS = args.question_max_tokens QUESTION_BACTH_SIZE = args.question_batch_size ANSWER_MAX_TOKENS = args.answer_max_tokens ANSWER_BACTH_SIZE = args.answer_batch_size OUTPUT_DIR = args.output_dir UUID = str(uuid.uuid4())[:4] set_logger(args.logfile_name) logger.info(f"GENERATE_DISCIPLINES = {GENERATE_DISCIPLINES}") logger.info(f"GENERATE_QUESTION_ONLY = {GENERATE_QUESTION_ONLY}") logger.info(f"DISCIPLINES_FILEPATH = {DISCIPLINES_FILEPATH}") logger.info(f"LANGUAGE = {LANGUAGE}") logger.info(f"MODEL_NAME = {MODEL_NAME}") logger.info(f"MODEL_NAME_FOR_ANSWER = {MODEL_NAME_FOR_ANSWER}") logger.info(f"MAX_NUMBER_OF_FIELDS = {MAX_NUMBER_OF_FIELDS}") logger.info(f"MAX_NUMBER_OF_SUBJECTS = {MAX_NUMBER_OF_SUBJECTS}") logger.info(f"MAX_NUMBER_OF_SUBTOPICS = {MAX_NUMBER_OF_SUBTOPICS}") logger.info(f"MAX_NUMBER_OF_SESSION_NAME = {MAX_NUMBER_OF_SESSION_NAME}") logger.info(f"NUM_ITERATIONS = {NUM_ITERATIONS}") logger.info(f"NUM_QUESTIONS_PER_ITERATION = {NUM_QUESTIONS_PER_ITERATION}") logger.info(f"QUESTION_MAX_TOKENS = {QUESTION_MAX_TOKENS}") logger.info(f"QUESTION_BACTH_SIZE = {QUESTION_BACTH_SIZE}") logger.info(f"ANSWER_MAX_TOKENS = {ANSWER_MAX_TOKENS}") logger.info(f"ANSWER_BACTH_SIZE = {ANSWER_BACTH_SIZE}") logger.info(f"OUTPUT_DIR = {OUTPUT_DIR}") t0 = time.time() all_questions = [] if GENERATE_DISCIPLINES: logger.info(f"===== Generate a Taxonomy of human knowledge and capabilities") t0 = time.time() taxonomy_json, disciplines = generate_taxonomy(max_number_of_fields=MAX_NUMBER_OF_FIELDS, model_name="gpt-4o", temperature=0.5) t1 = time.time() logger.info(f"Generating taxonomy took {t1 - t0:.4f} seconds.") else: logger.info(f"===== Load pre-defined disciplines") disciplines = read_text_to_list(DISCIPLINES_FILEPATH) for idx1, discipline in enumerate(disciplines): logger.info("====================================================================================================") logger.info(f"===== [Discipline {idx1}] Generating Subjects for discipline: {discipline}") logger.info("====================================================================================================") subjects_json = generate_subjects( discipline, max_number_of_subjects=MAX_NUMBER_OF_SUBJECTS, max_number_of_subtopics=MAX_NUMBER_OF_SUBTOPICS, model_name=MODEL_NAME, temperature=1.0, top_p=0.95 ) logger.info(f"Number of subjects is {len(subjects_json['subjects'])}") for idx2, s in enumerate(subjects_json["subjects"]): subject = s['subject'] level = s['level'] subtopics = ", ".join(s['subtopics']) logger.info("\t====================================================================================================") logger.info(f"\t===== [Subject {idx2}] Generating Syllabus: Discipline: {discipline} - Subject: {subject} - Level: {level}") logger.info("\t====================================================================================================") class_sessions, key_concepts = generate_syllabus( subject, level, subtopics, max_number_of_session_name=MAX_NUMBER_OF_SESSION_NAME, model_name=MODEL_NAME, temperature=1.0, top_p=0.95 ) logger.info(f"\tNumber of class sessions is {len(class_sessions)}") questions = generate_questions( class_sessions, key_concepts, subject, level, subtopics, model_name=MODEL_NAME, num_iterations=NUM_ITERATIONS, num_questions_per_iteration=NUM_QUESTIONS_PER_ITERATION, max_tokens=QUESTION_MAX_TOKENS, batch_size=QUESTION_BACTH_SIZE, language=LANGUAGE ) # logger.info(f"\t===== Waiting for 30 seconds to avoid rate limit error.") # time.sleep(30) all_questions.extend(questions) t1 = time.time() timespan = format_timespan(t1 - t0) logger.info(f"Generating Question dataset took {timespan}") num_questions = len(all_questions) os.makedirs(OUTPUT_DIR, exist_ok=True) filename = f"{OUTPUT_DIR}/GLAN_Questions_{LANGUAGE}_{num_questions}_Samples_{UUID}.jsonl" with jsonlines.open(filename, mode='w') as writer: for question in all_questions: writer.write(question) if not GENERATE_QUESTION_ONLY: all_answers = generate_answers( all_questions, model_name=MODEL_NAME_FOR_ANSWER, max_tokens=ANSWER_MAX_TOKENS, batch_size=ANSWER_BACTH_SIZE ) instructions = [] for q, a in zip(all_questions, all_answers): if a not in "DO NOT KNOW": q.update({"answer": a}) instructions.append(q) num_instructions = len(instructions) new_filename = filename.replace("Questions", "Instructions") with jsonlines.open(new_filename, mode='w') as writer: for instruction in instructions: writer.write(instruction)