in use-cases/inferencing/batch-inference/src/run_batch_predictions.py [0:0]
def predict(self):
logger.info("Start predictions")
# Send the Request
headers = {"Content-Type": "application/json"}
for i in range(len(self.df)):
user_message = self.df["Question"][i]
# Request Data
request_data = {
"model": self.model_name,
"messages": [{"role": "user", "content": user_message}],
"temperature": 0.5,
"top_k": 1.0,
"top_p": 1.0,
"max_tokens": 256,
}
# print(f"API Endpoint {self.api_endpoint}")
response = requests.post(
self.api_endpoint, headers=headers, data=json.dumps(request_data)
)
# Check for Successful Response
if response.status_code == 200:
response_data = response.json()
# Assuming the response structure matches OpenAI's format
ai_response = response_data["choices"][0]["message"]["content"]
logger.info(
f"HTTP {response.status_code} received",
extra={
"ai_response": ai_response,
"user_message": user_message,
},
)
with open(self.output_file, "a") as f:
f.write(ai_response + "\n") # Append with newline
f.write("----------\n")
else:
logger.error(f"Error: {response.status_code} - {response.text}")
logger.info("Finish predictions")
logger.info("Start write predictions to GCS")
# save file to gcs after completion
model_iteration_tag = self.model_name.rsplit("-", 1)[1]
client = storage.Client()
bucket = client.get_bucket(self.gcs_bucket)
with open(self.output_file, "r") as local_file:
blob = bucket.blob(f"predictions/{self.output_file}-{model_iteration_tag}")
blob.upload_from_file(local_file)
logger.info("Finish write predictions to GCS")