aipodcast-20250228/alibabacloud_aipodcast20250228/client.py (237 lines of code) (raw):
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
from typing import Dict
from Tea.core import TeaCore
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util.client import Client as UtilClient
from alibabacloud_endpoint_util.client import Client as EndpointUtilClient
from alibabacloud_aipodcast20250228 import models as aipodcast_20250228_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
class Client(OpenApiClient):
"""
*\
"""
def __init__(
self,
config: open_api_models.Config,
):
super().__init__(config)
self._endpoint_rule = ''
self.check_config(config)
self._endpoint = self.get_endpoint('aipodcast', self._region_id, self._endpoint_rule, self._network, self._suffix, self._endpoint_map, self._endpoint)
def get_endpoint(
self,
product_id: str,
region_id: str,
endpoint_rule: str,
network: str,
suffix: str,
endpoint_map: Dict[str, str],
endpoint: str,
) -> str:
if not UtilClient.empty(endpoint):
return endpoint
if not UtilClient.is_unset(endpoint_map) and not UtilClient.empty(endpoint_map.get(region_id)):
return endpoint_map.get(region_id)
return EndpointUtilClient.get_endpoint_rules(product_id, region_id, endpoint_rule, network, suffix)
def podcast_task_result_query_with_options(
self,
request: aipodcast_20250228_models.PodcastTaskResultQueryRequest,
headers: Dict[str, str],
runtime: util_models.RuntimeOptions,
) -> aipodcast_20250228_models.PodcastTaskResultQueryResponse:
"""
@summary ai播客生成任务结果查询
@param request: PodcastTaskResultQueryRequest
@param headers: map
@param runtime: runtime options for this request RuntimeOptions
@return: PodcastTaskResultQueryResponse
"""
UtilClient.validate_model(request)
body = {}
if not UtilClient.is_unset(request.task_id):
body['taskId'] = request.task_id
if not UtilClient.is_unset(request.workspace_id):
body['workspaceId'] = request.workspace_id
req = open_api_models.OpenApiRequest(
headers=headers,
body=OpenApiUtilClient.parse_to_map(body)
)
params = open_api_models.Params(
action='PodcastTaskResultQuery',
version='2025-02-28',
protocol='HTTPS',
pathname=f'/podcast/task',
method='POST',
auth_type='AK',
style='ROA',
req_body_type='formData',
body_type='json'
)
if UtilClient.is_unset(self._signature_version) or not UtilClient.equal_string(self._signature_version, 'v4'):
return TeaCore.from_map(
aipodcast_20250228_models.PodcastTaskResultQueryResponse(),
self.call_api(params, req, runtime)
)
else:
return TeaCore.from_map(
aipodcast_20250228_models.PodcastTaskResultQueryResponse(),
self.execute(params, req, runtime)
)
async def podcast_task_result_query_with_options_async(
self,
request: aipodcast_20250228_models.PodcastTaskResultQueryRequest,
headers: Dict[str, str],
runtime: util_models.RuntimeOptions,
) -> aipodcast_20250228_models.PodcastTaskResultQueryResponse:
"""
@summary ai播客生成任务结果查询
@param request: PodcastTaskResultQueryRequest
@param headers: map
@param runtime: runtime options for this request RuntimeOptions
@return: PodcastTaskResultQueryResponse
"""
UtilClient.validate_model(request)
body = {}
if not UtilClient.is_unset(request.task_id):
body['taskId'] = request.task_id
if not UtilClient.is_unset(request.workspace_id):
body['workspaceId'] = request.workspace_id
req = open_api_models.OpenApiRequest(
headers=headers,
body=OpenApiUtilClient.parse_to_map(body)
)
params = open_api_models.Params(
action='PodcastTaskResultQuery',
version='2025-02-28',
protocol='HTTPS',
pathname=f'/podcast/task',
method='POST',
auth_type='AK',
style='ROA',
req_body_type='formData',
body_type='json'
)
if UtilClient.is_unset(self._signature_version) or not UtilClient.equal_string(self._signature_version, 'v4'):
return TeaCore.from_map(
aipodcast_20250228_models.PodcastTaskResultQueryResponse(),
await self.call_api_async(params, req, runtime)
)
else:
return TeaCore.from_map(
aipodcast_20250228_models.PodcastTaskResultQueryResponse(),
await self.execute_async(params, req, runtime)
)
def podcast_task_result_query(
self,
request: aipodcast_20250228_models.PodcastTaskResultQueryRequest,
) -> aipodcast_20250228_models.PodcastTaskResultQueryResponse:
"""
@summary ai播客生成任务结果查询
@param request: PodcastTaskResultQueryRequest
@return: PodcastTaskResultQueryResponse
"""
runtime = util_models.RuntimeOptions()
headers = {}
return self.podcast_task_result_query_with_options(request, headers, runtime)
async def podcast_task_result_query_async(
self,
request: aipodcast_20250228_models.PodcastTaskResultQueryRequest,
) -> aipodcast_20250228_models.PodcastTaskResultQueryResponse:
"""
@summary ai播客生成任务结果查询
@param request: PodcastTaskResultQueryRequest
@return: PodcastTaskResultQueryResponse
"""
runtime = util_models.RuntimeOptions()
headers = {}
return await self.podcast_task_result_query_with_options_async(request, headers, runtime)
def podcast_task_submit_with_options(
self,
tmp_req: aipodcast_20250228_models.PodcastTaskSubmitRequest,
headers: Dict[str, str],
runtime: util_models.RuntimeOptions,
) -> aipodcast_20250228_models.PodcastTaskSubmitResponse:
"""
@summary ai播客生成任务提交
@param tmp_req: PodcastTaskSubmitRequest
@param headers: map
@param runtime: runtime options for this request RuntimeOptions
@return: PodcastTaskSubmitResponse
"""
UtilClient.validate_model(tmp_req)
request = aipodcast_20250228_models.PodcastTaskSubmitShrinkRequest()
OpenApiUtilClient.convert(tmp_req, request)
if not UtilClient.is_unset(tmp_req.file_urls):
request.file_urls_shrink = OpenApiUtilClient.array_to_string_with_specified_style(tmp_req.file_urls, 'fileUrls', 'json')
if not UtilClient.is_unset(tmp_req.voices):
request.voices_shrink = OpenApiUtilClient.array_to_string_with_specified_style(tmp_req.voices, 'voices', 'json')
body = {}
if not UtilClient.is_unset(request.counts):
body['counts'] = request.counts
if not UtilClient.is_unset(request.file_urls_shrink):
body['fileUrls'] = request.file_urls_shrink
if not UtilClient.is_unset(request.text):
body['text'] = request.text
if not UtilClient.is_unset(request.topic):
body['topic'] = request.topic
if not UtilClient.is_unset(request.voices_shrink):
body['voices'] = request.voices_shrink
if not UtilClient.is_unset(request.workspace_id):
body['workspaceId'] = request.workspace_id
req = open_api_models.OpenApiRequest(
headers=headers,
body=OpenApiUtilClient.parse_to_map(body)
)
params = open_api_models.Params(
action='PodcastTaskSubmit',
version='2025-02-28',
protocol='HTTPS',
pathname=f'/podcast/task/submit',
method='POST',
auth_type='AK',
style='ROA',
req_body_type='formData',
body_type='json'
)
if UtilClient.is_unset(self._signature_version) or not UtilClient.equal_string(self._signature_version, 'v4'):
return TeaCore.from_map(
aipodcast_20250228_models.PodcastTaskSubmitResponse(),
self.call_api(params, req, runtime)
)
else:
return TeaCore.from_map(
aipodcast_20250228_models.PodcastTaskSubmitResponse(),
self.execute(params, req, runtime)
)
async def podcast_task_submit_with_options_async(
self,
tmp_req: aipodcast_20250228_models.PodcastTaskSubmitRequest,
headers: Dict[str, str],
runtime: util_models.RuntimeOptions,
) -> aipodcast_20250228_models.PodcastTaskSubmitResponse:
"""
@summary ai播客生成任务提交
@param tmp_req: PodcastTaskSubmitRequest
@param headers: map
@param runtime: runtime options for this request RuntimeOptions
@return: PodcastTaskSubmitResponse
"""
UtilClient.validate_model(tmp_req)
request = aipodcast_20250228_models.PodcastTaskSubmitShrinkRequest()
OpenApiUtilClient.convert(tmp_req, request)
if not UtilClient.is_unset(tmp_req.file_urls):
request.file_urls_shrink = OpenApiUtilClient.array_to_string_with_specified_style(tmp_req.file_urls, 'fileUrls', 'json')
if not UtilClient.is_unset(tmp_req.voices):
request.voices_shrink = OpenApiUtilClient.array_to_string_with_specified_style(tmp_req.voices, 'voices', 'json')
body = {}
if not UtilClient.is_unset(request.counts):
body['counts'] = request.counts
if not UtilClient.is_unset(request.file_urls_shrink):
body['fileUrls'] = request.file_urls_shrink
if not UtilClient.is_unset(request.text):
body['text'] = request.text
if not UtilClient.is_unset(request.topic):
body['topic'] = request.topic
if not UtilClient.is_unset(request.voices_shrink):
body['voices'] = request.voices_shrink
if not UtilClient.is_unset(request.workspace_id):
body['workspaceId'] = request.workspace_id
req = open_api_models.OpenApiRequest(
headers=headers,
body=OpenApiUtilClient.parse_to_map(body)
)
params = open_api_models.Params(
action='PodcastTaskSubmit',
version='2025-02-28',
protocol='HTTPS',
pathname=f'/podcast/task/submit',
method='POST',
auth_type='AK',
style='ROA',
req_body_type='formData',
body_type='json'
)
if UtilClient.is_unset(self._signature_version) or not UtilClient.equal_string(self._signature_version, 'v4'):
return TeaCore.from_map(
aipodcast_20250228_models.PodcastTaskSubmitResponse(),
await self.call_api_async(params, req, runtime)
)
else:
return TeaCore.from_map(
aipodcast_20250228_models.PodcastTaskSubmitResponse(),
await self.execute_async(params, req, runtime)
)
def podcast_task_submit(
self,
request: aipodcast_20250228_models.PodcastTaskSubmitRequest,
) -> aipodcast_20250228_models.PodcastTaskSubmitResponse:
"""
@summary ai播客生成任务提交
@param request: PodcastTaskSubmitRequest
@return: PodcastTaskSubmitResponse
"""
runtime = util_models.RuntimeOptions()
headers = {}
return self.podcast_task_submit_with_options(request, headers, runtime)
async def podcast_task_submit_async(
self,
request: aipodcast_20250228_models.PodcastTaskSubmitRequest,
) -> aipodcast_20250228_models.PodcastTaskSubmitResponse:
"""
@summary ai播客生成任务提交
@param request: PodcastTaskSubmitRequest
@return: PodcastTaskSubmitResponse
"""
runtime = util_models.RuntimeOptions()
headers = {}
return await self.podcast_task_submit_with_options_async(request, headers, runtime)