docker/solr/cloud-init/create-collection.py (109 lines of code) (raw):
#!/usr/local/bin/python -u
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import requests
import requests.exceptions
import json
import json.decoder
import time
import sys
import logging
from kazoo.client import KazooClient
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
MAX_RETRIES = int(os.environ["MAX_RETRIES"])
SDAP_ZK_SOLR = os.environ["SDAP_ZK_SOLR"]
SDAP_SOLR_URL = os.environ["SDAP_SOLR_URL"]
ZK_LOCK_GUID = os.environ["ZK_LOCK_GUID"]
MINIMUM_NODES = int(os.environ["MINIMUM_NODES"])
CREATE_COLLECTION_PARAMS = os.environ["CREATE_COLLECTION_PARAMS"]
def get_cluster_status():
try:
return requests.get("{}admin/collections?action=CLUSTERSTATUS".format(SDAP_SOLR_URL)).json()
except (requests.exceptions.ConnectionError, json.decoder.JSONDecodeError):
return False
logging.info("Attempting to aquire lock from {}".format(SDAP_ZK_SOLR))
zk_host, zk_chroot = SDAP_ZK_SOLR.split('/')
zk = KazooClient(hosts=zk_host)
zk.start()
zk.ensure_path(zk_chroot)
zk.chroot = zk_chroot
lock = zk.Lock("/collection-creator", ZK_LOCK_GUID)
try:
with lock: # blocks waiting for lock acquisition
logging.info("Lock aquired. Checking for SolrCloud at {}".format(SDAP_SOLR_URL))
# Wait for MAX_RETRIES for the entire Solr cluster to be available.
attempts = 0
status = None
collection_exists = False
while attempts <= MAX_RETRIES:
status = get_cluster_status()
if not status:
# If we can't get the cluster status, my Solr node is not running
attempts += 1
logging.info("Waiting for Solr at {}".format(SDAP_SOLR_URL))
time.sleep(1)
continue
else:
# If we can get the cluster status, at least my Solr node is running
# We can check if the collection exists already now
if 'collections' in status['cluster'] and 'nexustiles' in status['cluster']['collections']:
# Collection already exists. Break out of the while loop
collection_exists = True
logging.info("nexustiles collection already exists.")
break
else:
# Collection does not exist, but need to make sure number of expected nodes are running
live_nodes = status['cluster']['live_nodes']
if len(live_nodes) < MINIMUM_NODES:
# Not enough live nodes
logging.info("Found {} live node(s). Expected at least {}. Live nodes: {}".format(len(live_nodes), MINIMUM_NODES, live_nodes))
attempts += 1
time.sleep(1)
continue
else:
# We now have a full cluster, ready to create collection.
logging.info("Detected full cluster of at least {} nodes. Checking for nexustiles collection".format(MINIMUM_NODES))
break
# Make sure we didn't exhaust our retries
if attempts > MAX_RETRIES:
raise RuntimeError("Exceeded {} retries while waiting for at least {} nodes to become live for {}".format(MAX_RETRIES, MINIMUM_NODES, SDAP_SOLR_URL))
# Full cluster, did not exceed retries. Check if collection already exists
if not collection_exists:
# Collection does not exist, create it.
create_command = "{}admin/collections?action=CREATE&{}".format(SDAP_SOLR_URL, CREATE_COLLECTION_PARAMS)
logging.info("Creating collection with command {}".format(create_command))
create_response = requests.get(create_command).json()
if 'failure' not in create_response:
# Collection created, we're done.
logging.info("Collection created. {}".format(create_response))
pass
else:
# Some error occured while creating the collection
raise RuntimeError("Could not create collection. Received response: {}".format(create_response))
schema_api = "{}nexustiles/schema".format(SDAP_SOLR_URL)
field_type_payload = json.dumps({
"add-field-type": {
"name": "geo",
"class": "solr.SpatialRecursivePrefixTreeFieldType",
"geo": "true",
"precisionModel": "fixed",
"maxDistErr": "0.000009",
"spatialContextFactory": "com.spatial4j.core.context.jts.JtsSpatialContextFactory",
"precisionScale": "1000",
"distErrPct": "0.025",
"distanceUnits": "degrees"}})
logging.info("Creating field-type 'geo'...")
field_type_response = requests.post(url=schema_api, data=field_type_payload)
if field_type_response.status_code < 400:
logging.info("Success.")
else:
logging.error("Error creating field type 'geo': {}".format(field_type_response.text))
def add_field(schema_api, name, type):
field_payload = json.dumps({
"add-field": {
"name": name,
"type": type}})
logging.info(f"Creating {type} field '{name}'...")
field_response = requests.post(url=schema_api, data=field_payload)
if field_response.status_code < 400:
logging.info("Success.")
else:
logging.error(f"Error creating field '{name}': {field_response.text}")
add_field(schema_api, 'geo', 'geo')
add_field(schema_api, 'tile_max_lat', 'pdouble')
add_field(schema_api, 'tile_min_lat', 'pdouble')
add_field(schema_api, 'tile_max_lon', 'pdouble')
add_field(schema_api, 'tile_min_lon', 'pdouble')
add_field(schema_api, 'tile_min_elevation_d', 'pdouble')
add_field(schema_api, 'tile_max_elevation_d', 'pdouble')
finally:
zk.stop()
zk.close()
# We're done, do nothing forever.
logging.info("Done.")
while True:
time.sleep(987654321)