in assets/large_language_models/rag/components/src/flow_creation.py [0:0]
def main(args, ws, current_run, activity_logger: Logger):
"""Extract main method."""
activity_logger.info(
"[Promptflow Creation]: Received and parsed arguments for promptflow creation in RAG."
+ " Creating promptflow now..."
)
print("best_prompts: %s" % args.best_prompts)
print("mlindex_name: %s" % args.mlindex_name)
print("mlindex_asset_id: %s" % args.mlindex_asset_id)
print("llm_connection_name: %s" % args.llm_connection_name)
print("llm_config: %s" % args.llm_config)
print("embedding_connection: %s" % args.embedding_connection)
print("embeddings_model: %s" % args.embeddings_model)
# parse completion/chat: connection, deployment name, model name
# parse embedding: connection, deployment name, model name
completion_connection_name = get_connection_name(args.llm_connection_name)
completion_config = json.loads(args.llm_config)
if completion_config.get("type") == "open_ai":
completion_model_name = completion_config.get("model_name", "gpt-3.5-turbo")
completion_deployment_name = completion_model_name
completion_model_name = (
"gpt-3.5-turbo" if completion_model_name is None else completion_model_name
)
completion_deployment_name = (
"gpt-3.5-turbo"
if completion_deployment_name is None
else completion_deployment_name
)
completion_provider = "OpenAI"
else:
completion_model_name = completion_config.get("model_name", "gpt-35-turbo")
# If completion deployment not set, set as blank in PF rather than a placeholder
completion_deployment_name = completion_config.get("deployment_name", None)
# Set default if key exsits but is set to None (as it is for basic pipelines)
completion_model_name = (
"gpt-35-turbo" if completion_model_name is None else completion_model_name
)
completion_deployment_name = (
"" if completion_deployment_name is None else completion_deployment_name
)
completion_provider = "AzureOpenAI"
embedding_connection_name = get_connection_name(args.embedding_connection)
if (
completion_connection_name == "azureml-rag-default-aoai"
and embedding_connection_name != "azureml-rag-default-aoai"
):
# default completion connection name to embedding ones if embedding connection is provided
completion_connection_name = embedding_connection_name
embedding_deployment_name_and_model_name = get_deployment_and_model_name(
args.embeddings_model
)
embedding_deployment_name = embedding_deployment_name_and_model_name[0]
embedding_model_name = embedding_deployment_name_and_model_name[1]
print("completion_connection_name: %s" % completion_connection_name)
print("completion_provider: %s" % completion_provider)
print("completion_model_name: %s" % completion_model_name)
print("completion_deployment_name: %s" % completion_deployment_name)
print("embedding_connection_name: %s" % embedding_connection_name)
print("embedding_deployment_name: %s" % embedding_deployment_name)
print("embedding_model_name: %s" % embedding_model_name)
if completion_model_name.startswith("gpt-") and USE_CHAT_FLOWS:
print("Using chat flows")
is_chat = True
prefix = "chat_"
else:
print("Not using chat flows")
is_chat = False
prefix = ""
if args.best_prompts is None:
if is_chat:
top_prompts = [
"You are an AI assistant that helps users answer questions given a specific context and "
+ "conversation history. You will be given a context and chat history, and then asked a "
+ "question based on that context and history. Your answer should be as "
+ "precise as possible, and should only come from the context.",
"You are an AI assistant that helps users answer questions given a specific context and "
+ "chat history. You will be given a context and history and asked a question based on "
+ "that context and chat history. Your answer should be as precise as possible and should "
+ "only come from the context.",
"You are an chat assistant for helping users answering question given a specific context and "
+ "history. You are given a context and conversation history and you'll be asked a question "
+ "based on the context and history. Your answer should be as precise as possible and answer "
+ "should be only from the context.",
]
else:
top_prompts = [
"You are an AI assistant that helps users answer questions given a specific context. You will be "
+ "given a context, and then asked a question based on that context. Your answer should be as "
+ "precise as possible, and should only come from the context.",
"You are an AI assistant that helps users answer questions given a specific context. You will be "
+ "given a context and asked a question based on that context. Your answer should be as precise "
+ "as possible and should only come from the context.",
"You are an chat assistant for helping users answering question given a specific context.You are "
+ "given a context and you'll be asked a question based on the context.Your answer should be "
+ "as precise as possible and answer should be only from the context.",
]
else:
with open(args.best_prompts, "r") as f:
promt_json = json.load(f)
top_prompts = promt_json.get("best_prompt")
if top_prompts is None:
# the output is in metrics mode:
top_prompts = []
for metric_name in _STATIC_METRIC_PRIORITY_LIST:
top_prompts_for_metric = promt_json.get(
"best_prompt_" + metric_name
)
if top_prompts_for_metric is not None:
top_prompts.append(top_prompts_for_metric[0])
if args.mlindex_asset_id is None:
activity_logger.info("[Promptflow Creation]: No MlIndex asset Id passed in.")
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), "mlindex_asset_id_file"
)
else:
with open(args.mlindex_asset_id, "r") as f:
mlindex_asset_id = f.read()
print(mlindex_asset_id)
print(top_prompts)
with open(
os.path.join(args.mlindex_asset_uri, "MLIndex"), "r", encoding="utf-8"
) as src:
mlindex_content = src.read()
if isinstance(top_prompts, str):
top_prompts = [top_prompts, top_prompts, top_prompts]
if USE_CODE_FIRST:
file_name = os.path.join(
Path(__file__).parent.absolute(), "flow_yamls", prefix + "flow.dag.yaml"
)
else:
file_name = os.path.join(
Path(__file__).parent.absolute(),
"flow_jsons",
prefix + "flow_with_variants_mlindex.json",
)
# for top prompts, construct top templates and inject into variants
with open(file_name, "r") as file:
flow_with_variants = file.read()
flow_name = args.mlindex_name + "-sample-flow"
# Replace these values in both code first and json
flow_with_variants = flow_with_variants.replace(
"@@Embedding_Deployment_Name", embedding_deployment_name
)
flow_with_variants = flow_with_variants.replace(
"@@Embedding_Connection", embedding_connection_name
)
flow_with_variants = flow_with_variants.replace(
"@@Completion_Deployment_Name", completion_deployment_name
)
flow_with_variants = flow_with_variants.replace(
"@@Completion_Connection", completion_connection_name
)
flow_with_variants = flow_with_variants.replace(
"@@Completion_Provider", completion_provider
)
flow_with_variants = flow_with_variants.replace(
"@@MLIndex_Asset_Id", mlindex_asset_id
)
flow_with_variants = flow_with_variants.replace(
"@@MLIndex_Content", textwrap.indent(mlindex_content, " ")
)
api_name = "chat" if completion_model_name.startswith("gpt-") else "completion"
flow_with_variants = flow_with_variants.replace("@@API", api_name)
if USE_CODE_FIRST:
# write flow dag yaml back to file
with open(
os.path.join(Path(__file__).parent.absolute(), CODE_DIR, "flow.dag.yaml"),
"w",
) as file:
file.write(flow_with_variants)
import codecs
# Write prompt file content for Variants
for idx in range(0, len(top_prompts)):
prompt_str = post_processing_prompts(
json_stringify(top_prompts[idx]),
_CITATION_TEMPLATE,
_USER_INPUT,
is_chat,
)
with open(
os.path.join(
Path(__file__).parent.absolute(),
CODE_DIR,
f"Prompt_variants__Variant_{idx}.jinja2",
),
"w",
) as file:
file.write(codecs.decode(prompt_str, "unicode_escape"))
if USE_CHAT_FLOWS:
# Write extra modify query prompt to folder
with open(
os.path.join(
Path(__file__).parent.absolute(),
CODE_DIR,
"modify_query_with_history.jinja2",
),
"w",
) as file:
file.write(codecs.decode(_MODIFY_PROMPT, "unicode_escape"))
# upload code
yaml_path = upload_code_files(ws) + "/flow.dag.yaml"
activity_logger.info(
"[Promptflow Creation]: Code first flow files successfully uploaded!"
)
# Load in Json
json_name = os.path.join(
"flow_jsons", prefix + "flow_with_variants_mlindex_code_first.json"
)
with open(
os.path.join(Path(__file__).parent.absolute(), json_name), "r"
) as file:
flow_submit_data = file.read()
flow_submit_data = flow_submit_data.replace("@@Flow_Name", flow_name)
flow_submit_data = flow_submit_data.replace("@@Flow_Definition_Path", yaml_path)
# replace values as it should
activity_logger.info(
"[Promptflow Creation]: Code first json payload successfully generated, trying to parse into json dict..."
)
json_payload = json.loads(flow_submit_data)
activity_logger.info(
"[Promptflow Creation]: Code first json payload successfully parsed, submit to Promptflow service now..."
)
else:
flow_with_variants = flow_with_variants.replace("@@Flow_Name", flow_name)
# replace variants with actual metric name and value
flow_with_variants = flow_with_variants.replace(
"@@prompt_variant_0",
post_processing_prompts(
json_stringify(top_prompts[0]), _CITATION_TEMPLATE, _USER_INPUT, is_chat
),
)
flow_with_variants = flow_with_variants.replace(
"@@prompt_variant_1",
post_processing_prompts(
json_stringify(top_prompts[1]), _CITATION_TEMPLATE, _USER_INPUT, is_chat
),
)
flow_with_variants = flow_with_variants.replace(
"@@prompt_variant_2",
post_processing_prompts(
json_stringify(top_prompts[2]), _CITATION_TEMPLATE, _USER_INPUT, is_chat
),
)
if USE_CHAT_FLOWS:
flow_with_variants = flow_with_variants.replace(
"@@modify_prompt", _MODIFY_PROMPT
)
activity_logger.info(
"[Promptflow Creation]: Json payload successfully generated, trying to parse into json dict..."
)
json_payload = json.loads(flow_with_variants)
activity_logger.info(
"[Promptflow Creation]: Json payload successfully parsed, submit to promptflow service now..."
)
###########################################################################
# ### construct PF MT service endpoints ### #
promptflow_mt_url = SERVICE_ENDPOINT + "/flow/api" + WORKSPACE_SCOPE + "/flows"
headers = get_default_headers(RUN_TOKEN, content_type="application/json")
response = try_request(promptflow_mt_url, json_payload, headers, activity_logger)
if not response.ok:
activity_logger.error(
f"[Promptflow Creation]: Flow creation failed with Response Code: {response.status_code},"
+ f"response:{response.text}."
)
raise Exception(
f"Flow creation failed with Response Code: {response.status_code}, response:{response.text}."
)
pf_response_json = json.loads(response.text)
if "flowResourceId" not in pf_response_json:
activity_logger.error(
f"[Promptflow Creation]: Flow creation failed with Response Code: {response.status_code},"
+ f" response:{response.text}."
)
raise Exception("flowResourceId is missing in the flow creation response.")
else:
flow_id = pf_response_json["flowResourceId"]
activity_logger.info(
f"[Promptflow Creation]: Flow creation Succeeded! Id is:{flow_id}",
extra={"properties": {"flow_id": flow_id}},
)
run_properties = current_run.get_properties()
parent_run_id = run_properties["azureml.pipelinerunid"]
parent_run = ws.get_run(parent_run_id)
parent_run.add_properties({"azureml.promptFlowResourceId": flow_id})
activity_logger.info(
"[Promptflow Creation]: Add into run property Succeed! All operation is done!",
extra={"properties": {"flow_id": flow_id}},
)