#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import os, sys
#, urllib2, urllib

def cleanup(args): 
	cmd = "hadoop fs -rm -r /tmp/%s" % args["htable_name"]
	print(cmd)
	ret = os.system(cmd)
	print(cmd, "return", ret)
	return ret

def hfile(args):
	cmd = """spark-submit --class "org.apache.s2graph.loader.subscriber.TransferToHFile" \
--name "TransferToHFile@shon" \
--conf "spark.task.maxFailures=20" \
--master yarn-cluster \
--num-executors %s \
--driver-memory 1g \
--executor-memory 2g \
--executor-cores 1 \
%s \
--input %s \
--tmpPath /tmp/%s \
--zkQuorum %s \
--table %s \
--dbUrl %s \
--dbUser %s \
--dbPassword %s \
--maxHFilePerRegionServer %s \
--labelMapping %s \
--autoEdgeCreate %s""" % (args["num_executors"],
						   JAR,
						   args["input"],
						   args["htable_name"],
						   args["hbase_zk"],
						   args["htable_name"],
						   args["db_url"],
						   args["db_user"],
						   args["db_password"],
						   args["max_file_per_region"],
						   args["label_mapping"],
						   args["auto_create_edge"])
	print(cmd)
	ret = os.system(cmd)
	print(cmd, "return", ret)
	return ret

def distcp(args): 
	cmd = "hadoop distcp -overwrite -m %s -bandwidth %s /tmp/%s %s/tmp/%s" % (args["-m"], args["-bandwidth"], args["htable_name"], args["hbase_namenode"], args["htable_name"])
	print(cmd)
	ret = os.system(cmd)
	print(cmd, "return", ret)
	return ret

def chmod(args):
	cmd = "export HADOOP_CONF_DIR=%s; export HADOOP_USER_NAME=hdfs; hadoop fs -chmod -R 777 /tmp/%s" % (args["HADOOP_CONF_DIR"], args["htable_name"])
	print(cmd)
	ret = os.system(cmd)
	print(cmd, "return", ret)
	return ret

def load(args):
	cmd = "export HADOOP_CONF_DIR=%s; export HBASE_CONF_DIR=%s; hbase %s /tmp/%s %s" % \
		  (args["HADOOP_CONF_DIR"], args["HBASE_CONF_DIR"], LOADER_CLASS, args["htable_name"], args["htable_name"])
	print(cmd)
	ret = os.system(cmd)
	print(cmd, "return", ret)
	return ret

def send(msg):
	print(msg)

def run(args):
	cleanup(args)
	send("[Start]: bulk loader")
	ret = hfile(args)
	
	if ret != 0: return send("[Failed]: loader build hfile failed %s" % ret)
	else: send("[Success]: loader build hfile")
	
	# ret = distcp(args)
    #
	# if ret != 0: return send("[Failed]: loader distcp failed %s" % ret)
	# else: send("[Success]: loader distcp")
    #
	# ret = chmod(args)
	#
	# if ret != 0: return send("[Failed]: loader chmod failed %s" % ret)
	# else: send("[Success]: loader chmod")

	ret = load(args)

	if ret != 0: return send("[Failed]: loader complete bulkload failed %s" % ret)
	else: send("[Success]: loader complete bulkload")


LOADER_CLASS = "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles"
JAR="loader/target/scala-2.11/s2loader-assembly-0.2.1-SNAPSHOT.jar"

args = {
"HADOOP_CONF_DIR": "hdfs_conf_gasan", 
"HBASE_CONF_DIR": "hbase_conf_gasan", 
"htable_name": "test", 
"hbase_namenode": "hdfs://localhost:8020",
"hbase_zk": "localhost",
"db_url": "jdbc:mysql://localhost:3306/graph_dev",
"db_user": "sa",
"db_password": "sa",
"max_file_per_region": 1,
"label_mapping": "none",
"auto_create_edge": "false",
"-m": 1, 
"-bandwidth": 10,
"num_executors": 2,
"input": "/tmp/test.txt"
}

run(args)
