analysis/webservice/algorithms/doms/DomsInitialization.py (110 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import logging
import pkg_resources
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from cassandra.cluster import NoHostAvailable
from cassandra.policies import (DCAwareRoundRobinPolicy, TokenAwarePolicy,
WhiteListRoundRobinPolicy)
from webservice.NexusHandler import nexus_initializer
@nexus_initializer
class DomsInitializer:
def __init__(self):
self.log = logging.getLogger(__name__)
def init(self, config):
self.log.info("*** STARTING DOMS INITIALIZATION ***")
domsconfig = configparser.SafeConfigParser()
domsconfig.read(DomsInitializer._get_config_files('domsconfig.ini'))
domsconfig = self.override_config(domsconfig, config)
cassHost = domsconfig.get("cassandra", "host")
cassPort = domsconfig.get("cassandra", "port")
cassUsername = domsconfig.get("cassandra", "username")
cassPassword = domsconfig.get("cassandra", "password")
cassKeyspace = domsconfig.get("cassandra", "keyspace")
cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
cassPolicy = domsconfig.get("cassandra", "dc_policy")
try:
cassCreateKeyspaceGranted = domsconfig.get("cassandra", "create_keyspace_granted")
except configparser.NoOptionError:
cassCreateKeyspaceGranted = "True"
self.log.info("Cassandra Host(s): %s" % (cassHost))
self.log.info("Cassandra Keyspace: %s" % (cassKeyspace))
self.log.info("Cassandra Datacenter: %s" % (cassDatacenter))
self.log.info("Cassandra Protocol Version: %s" % (cassVersion))
self.log.info("Cassandra DC Policy: %s" % (cassPolicy))
if cassPolicy == 'DCAwareRoundRobinPolicy':
dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
token_policy = TokenAwarePolicy(dc_policy)
elif cassPolicy == 'WhiteListRoundRobinPolicy':
token_policy = WhiteListRoundRobinPolicy([cassHost])
if cassUsername and cassPassword:
auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword)
else:
auth_provider = None
try:
with Cluster([host for host in cassHost.split(',')],
port=int(cassPort),
load_balancing_policy=token_policy,
protocol_version=cassVersion,
auth_provider=auth_provider) as cluster:
session = cluster.connect()
if cassCreateKeyspaceGranted in ["True", "true"]:
self.createKeyspace(session, cassKeyspace)
else:
session.set_keyspace(cassKeyspace)
self.createTables(session)
except NoHostAvailable as e:
self.log.error("Unable to connect to Cassandra, Nexus will not be able to access local data ", e)
def override_config(self, first, second):
for section in second.sections():
if first.has_section(section): # only override preexisting section, ignores the other
for option in second.options(section):
if second.get(section, option) is not None:
first.set(section, option, second.get(section, option))
return first
def createKeyspace(self, session, cassKeyspace):
log = logging.getLogger(__name__)
log.info("Verifying DOMS keyspace '%s'" % cassKeyspace)
session.execute(
"CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" % cassKeyspace)
session.set_keyspace(cassKeyspace)
def createTables(self, session):
log = logging.getLogger(__name__)
log.info("Verifying DOMS tables")
self.createDomsExecutionsTable(session)
self.createDomsParamsTable(session)
self.createDomsDataTable(session)
self.createDomsExecutionStatsTable(session)
def createDomsExecutionsTable(self, session):
log = logging.getLogger(__name__)
log.info("Verifying doms_executions table")
cql = """
CREATE TABLE IF NOT EXISTS doms_executions (
id uuid PRIMARY KEY,
time_started timestamp,
time_completed timestamp,
user_email text,
status text,
message text
);
"""
session.execute(cql)
def createDomsParamsTable(self, session):
log = logging.getLogger(__name__)
log.info("Verifying doms_params table")
cql = """
CREATE TABLE IF NOT EXISTS doms_params (
execution_id uuid PRIMARY KEY,
primary_dataset text,
matchup_datasets text,
depth_tolerance decimal,
depth_min decimal,
depth_max decimal,
time_tolerance int,
radius_tolerance decimal,
start_time timestamp,
end_time timestamp,
platforms text,
bounding_box text,
parameter text
);
"""
session.execute(cql)
def createDomsDataTable(self, session):
log = logging.getLogger(__name__)
log.info("Verifying doms_data table")
cql = """
CREATE TABLE IF NOT EXISTS doms_data (
id uuid,
execution_id uuid,
value_id text,
primary_value_id text,
is_primary boolean,
x decimal,
y decimal,
source_dataset text,
measurement_time timestamp,
platform text,
device text,
measurement_values_json text,
depth decimal,
file_url text,
PRIMARY KEY ((execution_id, is_primary), primary_value_id, id)
);
"""
session.execute(cql)
def createDomsExecutionStatsTable(self, session):
log = logging.getLogger(__name__)
log.info("Verifying doms_execution_stats table")
cql = """
CREATE TABLE IF NOT EXISTS doms_execution_stats (
execution_id uuid PRIMARY KEY,
num_gridded_matched int,
num_gridded_checked int,
num_insitu_matched int,
num_insitu_checked int,
time_to_complete int,
num_unique_secondaries int
);
"""
session.execute(cql)
@staticmethod
def _get_config_files(filename):
log = logging.getLogger(__name__)
candidates = []
extensions = ['.default', '']
for extension in extensions:
try:
candidate = pkg_resources.resource_filename(__name__, filename + extension)
candidates.append(candidate)
except KeyError as ke:
log.warning('configuration file {} not found'.format(filename + extension))
return candidates