in iact3/generate_params.py [0:0]
def _get_template_from_oss(self, template_url, components):
bucket_name = components.netloc
object_path = components.path.strip('/')
if not bucket_name or not object_path:
raise Iact3Exception(f'Invalid oss url {template_url}')
region_id = self.region
if components.query:
t = parse_qs(components.query)
region_ids = t.get('RegionId')
region_id = region_ids[0] if region_ids else self.region
oss_plugin = OssPlugin(region_id=region_id, bucket_name=bucket_name, credential=self.credential)
try:
object_meta = oss_plugin.get_object_meta(object_path)
except Exception as ex:
raise Iact3Exception(f'Oss failed: {ex}')
if object_meta is None:
raise Iact3Exception(f'Invalid oss url {template_url}')
if object_meta.content_length > self.template_max_size:
raise Iact3Exception(f'template from {template_url} exceeds maximum allowed size (524288 bytes)')
try:
r = oss_plugin.get_object_content(object_path)
except Exception as ex:
raise Iact3Exception(f'Oss failed: {ex}')
if r is None:
raise Iact3Exception(f'Invalid oss url {template_url}')
return r.read()