def configure()

in bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py [0:0]


    def configure(self, available_hosts, zk_units, peers, extra_libs):
        """
        This is the core logic of setting up spark.

        :param dict available_hosts: Hosts that Spark should know about.
        :param list zk_units: List of Zookeeper dicts with host/port info.
        :param list peers: List of Spark peer tuples (unit name, IP).
        :param list extra_libs: List of extra lib paths for driver/executors.
        """
        # Set KV based on connected applications
        unitdata.kv().set('zookeeper.units', zk_units)
        unitdata.kv().set('sparkpeer.units', peers)
        unitdata.kv().flush(True)

        # Get our config ready
        dc = self.dist_config
        mode = hookenv.config()['spark_execution_mode']
        master_ip = utils.resolve_private_address(available_hosts['spark-master'])
        master_url = self.get_master_url(master_ip)
        req_driver_mem = hookenv.config()['driver_memory']
        req_executor_mem = hookenv.config()['executor_memory']
        if mode.startswith('yarn'):
            spark_events = 'hdfs://{}'.format(dc.path('spark_events'))
        else:
            spark_events = 'file://{}'.format(dc.path('spark_events'))

        # handle tuning options that may be set as percentages
        driver_mem = '1g'
        executor_mem = '1g'
        if req_driver_mem.endswith('%'):
            if mode == 'standalone' or mode.startswith('local'):
                mem_mb = host.get_total_ram() / 1024 / 1024
                req_percentage = float(req_driver_mem.strip('%')) / 100
                driver_mem = str(int(mem_mb * req_percentage)) + 'm'
            else:
                hookenv.log("driver_memory percentage in non-local mode. "
                            "Using 1g default.", level=hookenv.WARNING)
        else:
            driver_mem = req_driver_mem

        if req_executor_mem.endswith('%'):
            if mode == 'standalone' or mode.startswith('local'):
                mem_mb = host.get_total_ram() / 1024 / 1024
                req_percentage = float(req_executor_mem.strip('%')) / 100
                executor_mem = str(int(mem_mb * req_percentage)) + 'm'
            else:
                hookenv.log("executor_memory percentage in non-local mode. "
                            "Using 1g default.", level=hookenv.WARNING)
        else:
            executor_mem = req_executor_mem

        # Some spark applications look for envars in /etc/environment
        with utils.environment_edit_in_place('/etc/environment') as env:
            env['MASTER'] = master_url
            env['SPARK_HOME'] = dc.path('spark_home')

        # Setup hosts dict
        hosts = {
            'spark': master_ip,
        }
        if 'namenode' in available_hosts:
            hosts['namenode'] = available_hosts['namenode']
        if 'resourcemanager' in available_hosts:
            hosts['resourcemanager'] = available_hosts['resourcemanager']

        # Setup roles dict. We always include the history server and client.
        # Determine other roles based on our execution mode.
        roles = ['spark-history-server', 'spark-client']
        if mode == 'standalone':
            roles.append('spark-master')
            roles.append('spark-worker')
        elif mode.startswith('yarn'):
            roles.append('spark-on-yarn')
            roles.append('spark-yarn-slave')

        # Setup overrides dict
        override = {
            'spark::common::master_url': master_url,
            'spark::common::event_log_dir': spark_events,
            'spark::common::history_log_dir': spark_events,
            'spark::common::extra_lib_dirs':
                ':'.join(extra_libs) if extra_libs else None,
            'spark::common::driver_mem': driver_mem,
            'spark::common::executor_mem': executor_mem,
        }
        if zk_units:
            zks = []
            for unit in zk_units:
                ip = utils.resolve_private_address(unit['host'])
                zks.append("%s:%s" % (ip, unit['port']))

            zk_connect = ",".join(zks)
            override['spark::common::zookeeper_connection_string'] = zk_connect
        else:
            override['spark::common::zookeeper_connection_string'] = None

        # Create our site.yaml and trigger puppet.
        # NB: during an upgrade, we configure the site.yaml, but do not
        # trigger puppet. The user must do that with the 'reinstall' action.
        bigtop = Bigtop()
        bigtop.render_site_yaml(hosts, roles, override)
        if unitdata.kv().get('spark.version.repo', False):
            hookenv.log("An upgrade is available and the site.yaml has been "
                        "configured. Run the 'reinstall' action to continue.",
                        level=hookenv.INFO)
        else:
            bigtop.trigger_puppet()
            self.patch_worker_master_url(master_ip, master_url)

            # Packages don't create the event dir by default. Do it each time
            # spark is (re)installed to ensure location/perms are correct.
            self.configure_events_dir(mode)

        # Handle examples and Spark-Bench. Do this each time this method is
        # called in case we need to act on a new resource or user config.
        self.configure_examples()
        self.configure_sparkbench()