in mapillary_tools/upload_api_v4.py [0:0]
def finish(self, file_handle: str) -> int:
headers = {
"Authorization": f"OAuth {self.user_access_token}",
}
data: T.Dict[str, T.Union[str, int]] = {
"file_handle": file_handle,
"file_type": self.file_type,
}
if self.organization_id is not None:
data["organization_id"] = self.organization_id
resp = requests.post(
f"{MAPILLARY_GRAPH_API_ENDPOINT}/finish_upload",
headers=headers,
json=data,
timeout=REQUESTS_TIMEOUT,
)
resp.raise_for_status()
data = resp.json()
cluster_id = data.get("cluster_id")
if cluster_id is None:
raise RuntimeError(
f"Upload server error: failed to create the cluster {resp.text}"
)
return T.cast(int, cluster_id)