in chunking/chunkers/json_chunker.py [0:0]
def get_chunks(self):
"""
Splits the JSON content into chunks while ensuring each chunk is valid JSON.
The method:
1. Decodes document bytes to text.
2. Parses the JSON.
3. Uses a recursive partitioning algorithm to split the parsed JSON into valid pieces
whose pretty-printed form is within the token limit.
4. Creates chunk dictionaries from the resulting pieces.
"""
if not self.document_bytes:
logging.error(f"[json_chunker][{self.filename}] No document bytes provided.")
return []
text = self.decode_to_utf8(self.document_bytes)
try:
parsed_json = json.loads(text)
except json.JSONDecodeError as e:
logging.error(f"[json_chunker][{self.filename}] Error parsing JSON: {e}")
return []
# Recursively partition the parsed JSON
partitioned = self._recursive_chunk_json(parsed_json)
# Pretty-print each partition and filter by token count
chunk_texts = []
for part in partitioned:
dumped = json.dumps(part, indent=2, ensure_ascii=False)
token_count = self.token_estimator.estimate_tokens(dumped)
if token_count >= self.minimum_chunk_size:
chunk_texts.append(dumped)
chunk_dicts = []
chunk_id = 0
for chunk_text in chunk_texts:
token_count = self.token_estimator.estimate_tokens(chunk_text)
if token_count > self.max_chunk_size:
logging.warning(
f"[json_chunker][{self.filename}] A chunk still exceeds max tokens ({token_count} > {self.max_chunk_size})."
" This may happen if a single element is very large."
)
# Optionally, you might decide to leave such chunks as is,
# or further process them with a string splitter.
chunk_id += 1
chunk_dict = self._create_chunk(chunk_id, chunk_text)
chunk_dicts.append(chunk_dict)
logging.info(f"[json_chunker][{self.filename}] Created {len(chunk_dicts)} chunk(s).")
return chunk_dicts