scripts/mahout/add_mahout_interpreters.py (189 lines of code) (raw):

# /** # * Licensed to the Apache Software Foundation (ASF) under one # * or more contributor license agreements. See the NOTICE file # * distributed with this work for additional information # * regarding copyright ownership. The ASF licenses this file # * to you under the Apache License, Version 2.0 (the # * "License"); you may not use this file except in compliance # * with the License. You may obtain a copy of the License at # * # * http://www.apache.org/licenses/LICENSE-2.0 # * # * Unless required by applicable law or agreed to in writing, software # * distributed under the License is distributed on an "AS IS" BASIS, # * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # * See the License for the specific language governing permissions and # * limitations under the License. # */ import argparse import json from os.path import isfile from os import getcwd from subprocess import call, check_call ####################################################################################################################### # I put these here so it will (hopeully) be easy(er) to bump versions / maintain # If there is demand, we could easily make parts or all comand line arguments as well ####################################################################################################################### tar_name = "apache-mahout-distribution-0.12.2.tar.gz" mahout_bin_url = "http://apache.osuosl.org/mahout/0.12.2/%s" % tar_name mahout_version = "0.12.2" parser = argparse.ArgumentParser() parser.add_argument("--force_download", help="force download Apache Mahout", action="store_true") parser.add_argument("--restart_later", help="force download Apache Mahout", action="store_true") parser.add_argument("--zeppelin_home", help="path to ZEPPELIN_HOME") parser.add_argument("--mahout_home", help="path to MAHOUT_HOME, use this if you have already installed Apache Mahout") parser.add_argument("--overwrite_existing", help="if %sparkMahout or %flinkMahout exist, delete them and create new ones. Otherwise Fail.", action="store_true") args = parser.parse_args() class ZeppelinTerpWrangler: def __init__(self, interpreter_json_path): self.interpreter_json_path = interpreter_json_path def _getTerpID(self, terpName): terp_id = None for k, v in self.interpreter_json['interpreterSettings'].iteritems(): if v['name'] == terpName: terp_id = k break return terp_id def _terpExists(self, terpName): terp_id = self._getTerpID(terpName) if terp_id == None: return False return True def createTerp(self, original_terp_name, new_terp_name, overwrite_existing=True ): new_terp_id = new_terp_name if self._terpExists(new_terp_name): print "Found existing '%s' interpreter..." % new_terp_name if overwrite_existing: print "deleting %s from interpreter.json" %new_terp_name del self.interpreter_json['interpreterSettings'][self._getTerpID(new_terp_name)] else: print "exiting program." exit(1) orig_terp_id = self._getTerpID(original_terp_name) from copy import deepcopy self.interpreter_json['interpreterSettings'][new_terp_id] = deepcopy( self.interpreter_json['interpreterSettings'][orig_terp_id]) self.interpreter_json['interpreterSettings'][new_terp_id]['name'] = new_terp_name self.interpreter_json['interpreterSettings'][new_terp_id]['id'] = new_terp_id print "created new interpreter '%s' from interpreter '%s" % (new_terp_name, original_terp_name) def _readTerpJson(self): with open(self.interpreter_json_path) as f: self.interpreter_json = json.load(f) def _writeTerpJson(self): with open(self.interpreter_json_path, 'wb') as f: json.dump(self.interpreter_json, f, sort_keys=True, indent=4) def _updateTerpProp(self, terpName, property, value): terp_id = self._getTerpID(terpName) self.interpreter_json['interpreterSettings'][terp_id]['properties'][property] = value def _addTerpDep(self, terpName="", dep="", exclusions=None): if self.interpreter_json == {}: print "no interpreter.json loaded, reading last one downloaded" self._readTerpJson() terp_id = self._getTerpID(terpName) deps = self.interpreter_json['interpreterSettings'][terp_id]['dependencies'] dep_dict = { u'groupArtifactVersion': dep, u'local': False } if exclusions != None: dep_dict["exclusions"] = exclusions deps.append(dep_dict) ## Remove Duplicate Dependencies seen = set() new_deps = list() for d in deps: t = d.items() if t[0] not in seen: seen.add(t[0]) new_deps.append(d) self.interpreter_json['interpreterSettings'][terp_id]['dependencies'] = new_deps def addMahoutConfig(self, terpName, mahout_home, mahout_version = "0.12.2"): print "updating '%s' with Apache Mahout dependencies and settings" % terpName terpDeps = ["%s/mahout-math-%s.jar" % (mahout_home, mahout_version), "%s/mahout-math-scala_2.10-%s.jar" % (mahout_home, mahout_version)] if "spark" in terpName.lower(): configs = { "spark.kryo.referenceTracking": "false", "spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator", "spark.kryoserializer.buffer": "32k", "spark.kryoserializer.buffer.max": "600m", "spark.serializer": "org.apache.spark.serializer.KryoSerializer" } terpDeps.append('%s/mahout-spark_2.10-%s-dependency-reduced.jar' % (mahout_home, mahout_version)) terpDeps.append("%s/mahout-spark_2.10-%s.jar" % (mahout_home, mahout_version)) terpDeps.append("%s/mahout-spark-shell_2.10-%s.jar" % (mahout_home, mahout_version)) if "flink" in terpName.lower(): configs = { "taskmanager.numberOfTaskSlots" : "12" } addlDeps = [ "%s/mahout-flink_2.10-%s.jar" % (mahout_home, mahout_version), "%s/mahout-hdfs-%s.jar" % (mahout_home, mahout_version), "com.google.guava:guava:14.0.1" #"%s/guava-14.0.1.jar" % mahout_home ## reuired in lib dir if running against cluster ] for t in addlDeps: terpDeps.append(t) for k, v in configs.iteritems(): self._updateTerpProp(terpName, k, v) for t in terpDeps: self._addTerpDep(terpName, t) ####################################################################################################################### # Need to be sure we know where Zeppelin Top directory is so we can edit conf files # ####################################################################################################################### def valid_zeppelin_home(path): return isfile(path + "/bin/zeppelin-daemon.sh") if args.zeppelin_home == None: zeppelin_home = getcwd() if (zeppelin_home.split("/")[-1] == "bin") and (isfile("zeppelin-daemon.sh")): print "we're in the zeppelin/bin" zeppelin_home = "/".join(zeppelin_home.split("/")[:-1]) print "--zeppelin_home not specified, using %s" % zeppelin_home else: zeppelin_home = args.zeppelin_home if not valid_zeppelin_home(zeppelin_home): print "%s does not appear to be a valid ZEPPELIN_HOME - e.g. the top level directory of the ZEPPELIN install" % zeppelin_home exit(1) else: print "ZEPPELIN_HOME validated" interpreter_json_path = zeppelin_home + "/conf/interpreter.json" if not isfile(interpreter_json_path): print "interpreter.json doesn't exist. Checking weather Zeppelin is running." status = call(["bin/zeppelin-daemon.sh", 'status'], cwd=zeppelin_home) if status == 1: print "Zeppelin doesn't appear to be running- it is possible that Zeppelin has never been run (interpreter.json is created when Zeppelin is run)" print "I'm going to try to start Zeppelin to create interpreter.json" call(["bin/zeppelin-daemon.sh", 'start'], cwd=zeppelin_home) from time import sleep sleep(3) else: print "We're in the correct top-level directory, Zeppelin appears to be running, but there is no 'interpreter.json'. \ \nThis is a confusing case. Please try restarting Zeppelin, but if that doesn't work reach out on the mailing list." if isfile(interpreter_json_path): z = ZeppelinTerpWrangler(interpreter_json_path) else: print "'interpreter.json' not found in %s/conf" % args.zeppelin_home exit(1) ####################################################################################################################### # If --mahout_home not set, download and untar Mahout in to ZEPPELIN_HOME # Set MAHOUT_HOME to ZEPPELIN_HOME/<mahout_untar_dir> ####################################################################################################################### def download_mahout(): if args.force_download: print "--force_download: OK, deleting existing tar if it exists." call(["rm", "%s/%s" % (zeppelin_home, tar_name)]) return True elif isfile("%s/%s" % (zeppelin_home, tar_name)): print "%s found, skipping download" % tar_name return False elif args.mahout_home: print "--mahout_home set, skipping download" return False else: return True if download_mahout(): check_call(['wget', mahout_bin_url], cwd= zeppelin_home) check_call(['tar', 'xzf', tar_name], cwd= zeppelin_home) if args.mahout_home: mahout_home = args.mahout_home else: mahout_home = zeppelin_home + "/" + ".".join(tar_name.split(".")[:-2]) ####################################################################################################################### # Create new interpreters ####################################################################################################################### z._readTerpJson() z.createTerp("spark", "sparkMahout", args.overwrite_existing) z.createTerp("flink", "flinkMahout", args.overwrite_existing) z.addMahoutConfig("sparkMahout", mahout_home, mahout_version) z.addMahoutConfig("flinkMahout", mahout_home, mahout_version) z._writeTerpJson() ####################################################################################################################### # Add "export MAHOUT_HOME=... to conf/zeppelin-env.sh # Create if doesn't exist. ####################################################################################################################### mahout_home_str = '\nexport MAHOUT_HOME=%s\n' % (mahout_home) zeppelin_env_sh_path = '%s/conf/zeppelin-env.sh' % zeppelin_home if isfile(zeppelin_env_sh_path): with open(zeppelin_env_sh_path, 'rb') as f: zeppelin_env_sh = f.readlines() if any(["export MAHOUT_HOME=" in line for line in zeppelin_env_sh]): print "'export MAHOUT_HOME=...' already exists in zeppelin_env.sh, not appending" else: print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str with open(zeppelin_env_sh_path, 'a') as f: f.write(mahout_home_str) else: print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str with open(zeppelin_env_sh_path, 'wb') as f: f.write(mahout_home_str) ####################################################################################################################### # You have to restart Apache Zeppelin for new terps to show up... do this for user unless the specified otherwise # ####################################################################################################################### if not args.restart_later: print "restarting Apache Zeppelin to load new interpreters..." check_call(["bin/zeppelin-daemon.sh", 'restart'], cwd= zeppelin_home) else: print "--restart_later flag detected: remember to restart Zeppelin to see new Mahout interpreters!!" ####################################################################################################################### # Good bye ####################################################################################################################### print "---------------------------------------------------------------------------------------------------------------" print "all done! Thanks for using Apache Mahout" print "bye"