in collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py [0:0]
def __init__(self, solr_url: str, dataset_id: str, signature_fun=None):
try:
self._url_prefix = f"{solr_url.strip('/')}/solr"
self._create_collection_if_needed()
self._solr_granules = pysolr.Solr(f"{self._url_prefix}/{self._granule_collection_name}")
self._solr_datasets = pysolr.Solr(f"{self._url_prefix}/{self._dataset_collection_name}")
self._dataset_id = dataset_id
self._signature_fun = signature_fun
self._latest_ingested_file_update = self._get_latest_file_update()
except requests.exceptions.RequestException:
raise DatasetIngestionHistorySolrException(f"solr instance unreachable {solr_url}")