in collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py [0:0]
def _create_collection_if_needed(self):
try:
if not self._req_session:
self._req_session = requests.session()
payload = {'action': 'CLUSTERSTATUS'}
collections_endpoint = f"{self._url_prefix}/admin/collections"
result = self._req_session.get(collections_endpoint, params=payload)
response = result.json()
node_number = len(response['cluster']['live_nodes'])
existing_collections = response['cluster']['collections'].keys()
if self._granule_collection_name not in existing_collections:
# Create collection
payload = {'action': 'CREATE',
'name': self._granule_collection_name,
'numShards': node_number
}
result = self._req_session.get(collections_endpoint, params=payload)
response = result.json()
logger.info(f"solr collection created {response}")
# Update schema
schema_endpoint = f"{self._url_prefix}/{self._granule_collection_name}/schema"
self._add_field(schema_endpoint, "dataset_s", "string")
self._add_field(schema_endpoint, "granule_s", "string")
self._add_field(schema_endpoint, "granule_signature_s", "string")
if self._dataset_collection_name not in existing_collections:
# Create collection
payload = {'action': 'CREATE',
'name': self._dataset_collection_name,
'numShards': node_number
}
result = self._req_session.get(collections_endpoint, params=payload)
response = result.json()
logger.info(f"solr collection created {response}")
# Update schema
schema_endpoint = f"{self._url_prefix}/{self._dataset_collection_name}/schema"
self._add_field(schema_endpoint, "dataset_s", "string")
self._add_field(schema_endpoint, "latest_update_l", "TrieLongField")
self._add_field(schema_endpoint, "store_type_s", "string", True)
self._add_field(schema_endpoint, "source_s", "string", True)
self._add_field(schema_endpoint, "config", "text_general", True)
except requests.exceptions.RequestException as e:
logger.error(f"solr instance unreachable {self._solr_url}")
raise e