in pontoon/teams/views.py [0:0]
def ajax_translation_memory_upload(request, locale):
"""Upload Translation Memory entries from a .TMX file."""
try:
file = request.FILES["tmx_file"]
except MultiValueDictKeyError:
return JsonResponse(
{"status": False, "message": "No file uploaded."},
status=400,
)
if file.size > 20 * 1024 * 1024:
return JsonResponse(
{
"status": False,
"message": "File size limit exceeded. The maximum allowed size is 20 MB.",
},
status=400,
)
if not file.name.endswith(".tmx"):
return JsonResponse(
{
"status": False,
"message": "Invalid file format. Only .TMX files are supported.",
},
status=400,
)
locale = get_object_or_404(Locale, code=locale)
code = locale.code
# Parse the TMX file
try:
tree = ET.parse(file)
root = tree.getroot()
except ET.ParseError as e:
return JsonResponse(
{"status": False, "message": f"Invalid XML file: {e}"}, status=400
)
# Extract TM entries
file_entries = []
srclang_pattern = re.compile(r"^en(?:[-_](us))?$", re.IGNORECASE)
ns = {"xml": "http://www.w3.org/XML/1998/namespace"}
header = root.find("header")
header_srclang = header.attrib.get("srclang", "") if header else ""
def get_seg_text(tu, lang, ns):
# Try to find <tuv> with the xml:lang attribute
seg = tu.find(f"./tuv[@xml:lang='{lang}']/seg", namespaces=ns)
# If not found, try the lang attribute
if seg is None:
seg = tu.find(f"./tuv[@lang='{lang}']/seg")
return seg.text.strip() if seg is not None and seg.text else None
tu_elements = root.findall(".//tu")
for tu in tu_elements:
try:
srclang = tu.attrib.get("srclang", header_srclang)
tu_str = ET.tostring(tu, encoding="unicode")
if not srclang_pattern.match(srclang):
log.info(f"Skipping <tu> with unsupported srclang: {tu_str}")
continue
source = get_seg_text(tu, srclang, ns)
target = get_seg_text(tu, code, ns)
if source and target:
file_entries.append({"source": source, "target": target})
else:
log.info(f"Skipping <tu> with missing or empty segment: {tu_str}")
except Exception as e:
log.info(f"Error processing <tu>: {e}")
if not file_entries:
return JsonResponse(
{"status": False, "message": "No valid translation entries found."},
status=400,
)
# Create TranslationMemoryEntry objects
tm_entries = [
TranslationMemoryEntry(
source=entry["source"],
target=entry["target"],
locale=locale,
)
for entry in file_entries
]
# Filter out entries that already exist in the database
existing_combinations = set(
TranslationMemoryEntry.objects.filter(locale=locale).values_list(
"source", "target"
)
)
tm_entries_to_create = [
entry
for entry in tm_entries
if (entry.source, entry.target) not in existing_combinations
]
created_entries = TranslationMemoryEntry.objects.bulk_create(
tm_entries_to_create, batch_size=1000
)
log_action(
ActionLog.ActionType.TM_ENTRIES_UPLOADED,
request.user,
tm_entries=created_entries,
)
parsed = len(file_entries)
skipped_on_parse = len(tu_elements) - parsed
imported = len(created_entries)
duplicates = parsed - len(tm_entries_to_create)
message = f"Importing TM entries complete. Imported: {imported}."
if imported == 0:
message = "No TM entries imported."
if duplicates:
message += f" Skipped duplicates: {duplicates}."
return JsonResponse(
{
"status": True,
"message": message,
"parsed": parsed,
"skipped_on_parse": skipped_on_parse,
"imported": imported,
"duplicates": duplicates,
}
)