runtime/experimental/python/v3.11cuda/lib/launcher.py (76 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
from sys import stdin
from sys import stdout
from sys import stderr
from os import fdopen
import sys, os, json, traceback, warnings
import threading, collections
from pathlib import Path
import traceback
try:
# if the directory 'virtualenv' is extracted out of a zip file
path_to_virtualenv = os.path.abspath('./virtualenv')
if os.path.isdir(path_to_virtualenv):
# activate the virtualenv using activate_this.py contained in the virtualenv
activate_this_file = path_to_virtualenv + '/bin/activate_this.py'
if not os.path.exists(activate_this_file): # try windows path
activate_this_file = path_to_virtualenv + '/Scripts/activate_this.py'
if os.path.exists(activate_this_file):
with open(activate_this_file) as f:
code = compile(f.read(), activate_this_file, 'exec')
exec(code, dict(__file__=activate_this_file))
else:
sys.stderr.write("Invalid virtualenv. Zip file does not include 'activate_this.py'.\n")
sys.exit(1)
except Exception:
traceback.print_exc(file=sys.stderr, limit=0)
sys.exit(1)
# now import the action as process input/output
import main__
out = fdopen(3, "wb")
if os.getenv("__OW_WAIT_FOR_ACK", "") != "":
out.write(json.dumps({"ok": True}, ensure_ascii=False).encode('utf-8'))
out.write(b'\n')
out.flush()
env = os.environ
SETUP="_setup"
hash = env.get("__OW_CODE_HASH")
if hash:
SETUP="/tmp/"+hash
SETUP_DONE=SETUP+"_done"
# lanched as a thread to execute the setup
def setup_thread(setup, payload):
with open(SETUP, 'w', buffering=1) as file:
file.write("Setup thread started.\n")
try:
setup(payload, file)
Path(SETUP_DONE).touch(exist_ok=True)
file.write("Setup thread ended successfully.\n")
except:
traceback.print_exc(file=file)
while True:
line = stdin.readline()
if not line: break
args = json.loads(line)
payload = {}
for key in args:
if key == "value":
payload = args["value"]
else:
env["__OW_%s" % key.upper()]= args[key]
# if there is a setup
if hasattr(main__, 'setup'):
# if setup is not complete
if not os.path.exists(SETUP_DONE):
# if setup is running
if os.path.exists(SETUP):
payload['setup_status'] = Path(SETUP).read_text()
else:
payload['setup_status'] = "Setup thread started.\n"
thread = threading.Thread(target=setup_thread, args=(main__.setup, payload,))
thread.start()
print("started setup", file=sys.stderr)
res = {}
try:
res = main__.main(payload)
except Exception as ex:
print(traceback.format_exc(), file=stderr)
res = {"error": str(ex)}
out.write(json.dumps(res, ensure_ascii=False).encode('utf-8'))
out.write(b'\n')
stdout.flush()
stderr.flush()
out.flush()