in bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py [0:0]
def configure(self, available_hosts, zk_units, peers, extra_libs):
"""
This is the core logic of setting up spark.
:param dict available_hosts: Hosts that Spark should know about.
:param list zk_units: List of Zookeeper dicts with host/port info.
:param list peers: List of Spark peer tuples (unit name, IP).
:param list extra_libs: List of extra lib paths for driver/executors.
"""
# Set KV based on connected applications
unitdata.kv().set('zookeeper.units', zk_units)
unitdata.kv().set('sparkpeer.units', peers)
unitdata.kv().flush(True)
# Get our config ready
dc = self.dist_config
mode = hookenv.config()['spark_execution_mode']
master_ip = utils.resolve_private_address(available_hosts['spark-master'])
master_url = self.get_master_url(master_ip)
req_driver_mem = hookenv.config()['driver_memory']
req_executor_mem = hookenv.config()['executor_memory']
if mode.startswith('yarn'):
spark_events = 'hdfs://{}'.format(dc.path('spark_events'))
else:
spark_events = 'file://{}'.format(dc.path('spark_events'))
# handle tuning options that may be set as percentages
driver_mem = '1g'
executor_mem = '1g'
if req_driver_mem.endswith('%'):
if mode == 'standalone' or mode.startswith('local'):
mem_mb = host.get_total_ram() / 1024 / 1024
req_percentage = float(req_driver_mem.strip('%')) / 100
driver_mem = str(int(mem_mb * req_percentage)) + 'm'
else:
hookenv.log("driver_memory percentage in non-local mode. "
"Using 1g default.", level=hookenv.WARNING)
else:
driver_mem = req_driver_mem
if req_executor_mem.endswith('%'):
if mode == 'standalone' or mode.startswith('local'):
mem_mb = host.get_total_ram() / 1024 / 1024
req_percentage = float(req_executor_mem.strip('%')) / 100
executor_mem = str(int(mem_mb * req_percentage)) + 'm'
else:
hookenv.log("executor_memory percentage in non-local mode. "
"Using 1g default.", level=hookenv.WARNING)
else:
executor_mem = req_executor_mem
# Some spark applications look for envars in /etc/environment
with utils.environment_edit_in_place('/etc/environment') as env:
env['MASTER'] = master_url
env['SPARK_HOME'] = dc.path('spark_home')
# Setup hosts dict
hosts = {
'spark': master_ip,
}
if 'namenode' in available_hosts:
hosts['namenode'] = available_hosts['namenode']
if 'resourcemanager' in available_hosts:
hosts['resourcemanager'] = available_hosts['resourcemanager']
# Setup roles dict. We always include the history server and client.
# Determine other roles based on our execution mode.
roles = ['spark-history-server', 'spark-client']
if mode == 'standalone':
roles.append('spark-master')
roles.append('spark-worker')
elif mode.startswith('yarn'):
roles.append('spark-on-yarn')
roles.append('spark-yarn-slave')
# Setup overrides dict
override = {
'spark::common::master_url': master_url,
'spark::common::event_log_dir': spark_events,
'spark::common::history_log_dir': spark_events,
'spark::common::extra_lib_dirs':
':'.join(extra_libs) if extra_libs else None,
'spark::common::driver_mem': driver_mem,
'spark::common::executor_mem': executor_mem,
}
if zk_units:
zks = []
for unit in zk_units:
ip = utils.resolve_private_address(unit['host'])
zks.append("%s:%s" % (ip, unit['port']))
zk_connect = ",".join(zks)
override['spark::common::zookeeper_connection_string'] = zk_connect
else:
override['spark::common::zookeeper_connection_string'] = None
# Create our site.yaml and trigger puppet.
# NB: during an upgrade, we configure the site.yaml, but do not
# trigger puppet. The user must do that with the 'reinstall' action.
bigtop = Bigtop()
bigtop.render_site_yaml(hosts, roles, override)
if unitdata.kv().get('spark.version.repo', False):
hookenv.log("An upgrade is available and the site.yaml has been "
"configured. Run the 'reinstall' action to continue.",
level=hookenv.INFO)
else:
bigtop.trigger_puppet()
self.patch_worker_master_url(master_ip, master_url)
# Packages don't create the event dir by default. Do it each time
# spark is (re)installed to ensure location/perms are correct.
self.configure_events_dir(mode)
# Handle examples and Spark-Bench. Do this each time this method is
# called in case we need to act on a new resource or user config.
self.configure_examples()
self.configure_sparkbench()