in odps/sqlalchemy_odps.py [0:0]
def create_connect_args(self, url):
url_string = str(url)
project = url.host
if project is None and options.default_project:
project = options.default_project
access_id = url.username
secret_access_key = url.password
logview_host = options.logview_host
endpoint = None
session_name = None
sqa_type = False
quota_name = None
reuse_odps = False
project_as_schema = False
fallback_policy = ""
cache_names = False
cache_seconds = 24 * 3600
hints = {}
if url.query:
query = dict(url.query)
if endpoint is None:
endpoint = query.pop("endpoint", None)
if logview_host is None:
logview_host = query.pop("logview_host", query.pop("logview", None))
if session_name is None:
session_name = query.pop("session", None)
if quota_name is None:
quota_name = query.pop("quota_name", None)
if sqa_type is False:
sqa_type = query.pop("interactive_mode", "false").lower()
if sqa_type == "true":
sqa_type = "v1"
elif sqa_type == "false":
sqa_type = False
if reuse_odps is False:
reuse_odps = query.pop("reuse_odps", "false").lower() != "false"
if query.get("project_as_schema", None) is not None:
project_as_schema = (
query.pop("project_as_schema", "false").lower() != "false"
)
if fallback_policy == "":
fallback_policy = query.pop("fallback_policy", "default")
if cache_names is False:
cache_names = query.pop("cache_names", "false").lower() != "false"
cache_seconds = int(query.pop("cache_seconds", cache_seconds))
hints = query
if endpoint is None:
endpoint = options.endpoint or DEFAULT_ENDPOINT
if session_name is None:
session_name = PUBLIC_SESSION_NAME
kwargs = {
"access_id": access_id,
"secret_access_key": secret_access_key,
"project": project,
"endpoint": endpoint,
"session_name": session_name,
"use_sqa": sqa_type,
"fallback_policy": fallback_policy,
"project_as_schema": project_as_schema,
"hints": hints,
}
if quota_name is not None:
kwargs["quota_name"] = quota_name
if access_id is None:
kwargs.pop("access_id", None)
kwargs.pop("secret_access_key", None)
kwargs["account"] = options.account
for k, v in six.iteritems(kwargs):
if v is None:
raise ValueError(
"{} should be provided to create connection, "
"you can either specify in connection string as format: "
'"odps://<access_id>:<access_key>@<project_name>", '
"or create an ODPS object and call `.to_global()` "
"to set it to global".format(k)
)
if logview_host is not None:
kwargs["logview_host"] = logview_host
if cache_names:
_sqlalchemy_obj_list_cache[url_string] = ObjectCache(expire=cache_seconds)
if reuse_odps:
# the odps object can only be reused only if it will be identical
if (
url_string in _sqlalchemy_global_reusable_odps
and _sqlalchemy_global_reusable_odps.get(url_string) is not None
):
kwargs["odps"] = _sqlalchemy_global_reusable_odps.get(url_string)
kwargs["access_id"] = None
kwargs["secret_access_key"] = None
else:
_sqlalchemy_global_reusable_odps[url_string] = ODPS(
access_id=access_id,
secret_access_key=secret_access_key,
project=project,
endpoint=endpoint,
logview_host=logview_host,
)
return [], kwargs