in assets/training/distillation/src/generate_data.py [0:0]
def process_request(idx: str, data: dict, url: str, endpoint_key: str):
"""Process a single conversational request.
Args:
idx (str): Row index in Input data.
data (dict): Payload dict
url (str): Endpoint URL
endpoint_key (str): key to authenticate endpoint request
Returns:
dict: result dictionary
"""
try:
# Basic validation for the input data
messages = data.pop("messages", [])
if not messages: # empty messages
return {
"idx": idx,
"status_code": None,
"messages": [],
"exception": "Empty messages",
}
first_message = messages[0]
if first_message["role"] != "system":
logger.warning(
f"First message should be system, but got {first_message['role']}"
)
return {
"idx": idx,
"status_code": None,
"messages": [],
"exception": (
"Incorrect format.\n"
f"First message should be system, but got {first_message['role']}"
),
}
for message in messages[1:]:
role = message["role"]
if role not in ("assistant", "user"):
logger.warning(f"role should be system or user, but got {role}")
return {
"idx": idx,
"status_code": None,
"messages": [],
"exception": f"Incorrect format.\nRole should be assistant or user, but got {role}",
}
messages = normalize_messages(messages)
last_status_code = None
synthetic_responses = []
inference_data = []
for turn_id, message in enumerate(messages):
role = message["role"]
if role == "system":
# Data for fine-tune job should not include CoT prompt
synthetic_responses.append(message)
inference_data.append(process_system_prompt(message))
elif role == "user":
synthetic_responses.append(message)
inference_data.append(message)
else:
data_with_inference_parameters = {"messages": inference_data}
for key, value in data.items():
data_with_inference_parameters[key] = value
# replace the assistant content from the model
log_entry = {"idx": idx, "turn": turn_id}
response: Response = _invoke_endpoint(
url=url,
key=endpoint_key,
data=data_with_inference_parameters,
log_entry=log_entry,
)
last_status_code = response.status_code
if last_status_code != 200:
break
response_data = response.json()
# response content should be structured as below for a successful vllm response
prediction_result = response_data["choices"][0]["message"][
"content"
].strip()
# For CoT prompts, need to remove the reasoning and only use the answer
if (
enable_cot
and data_generation_task_type
!= DataGenerationTaskType.CONVERSATION
):
key = SystemPrompt.get_response_key(data_generation_task_type)
prediction_result = json.loads(prediction_result)[key]
if (
enable_cod
and data_generation_task_type
== DataGenerationTaskType.SUMMARIZATION
):
result = json.loads(prediction_result)
prediction_result = result[-1]["Denser_Summary"]
synthetic_responses.append(
{"role": "assistant", "content": str(prediction_result)}
)
is_success = last_status_code == 200
logger.info(f"Processing idx: {idx} - {is_success}")
return {
"idx": idx,
"status_code": last_status_code,
"messages": synthetic_responses,
"exception": (
f"Not able to generate synthetic response for all turns for idx: {idx}"
if not is_success
else None
),
}
except Exception as e:
logger.error(f"idx: {idx}. exception: {e}")
return {
"idx": idx,
"status_code": None,
"messages": [],
"exception": e,
}