# HTML Render

In [None]:
%%html
<!-- Silencing error messages in the notebook -->
<style>
div.output_stderr {
background: #ffdd;
display: none;
}
</style>

In [None]:
%%html
<!-- Making the cells take up the full width of the window -->
<style>
.container { width:100% !important; }
</style>

In [None]:
%%html
<!-- Changing the font of code cells -->
<style>
.CodeMirror{font-family: "Courier New";font-size: 12pt;}
</style>

In [None]:
%%html
<!-- Changing the size of tables to 20px -->
<style>
.rendered_html table, .rendered_html td, .rendered_html th {font-size: 20px;}
</style>

# System Settings

In [None]:
import os
from pathlib import Path
home = os.path.realpath(str(Path.home()))
cwd = os.getcwd()
print(f'home: {home}')
print(f'cwd: {cwd}')

In [None]:
import os
import pandas as pd

pd.set_option('display.max_rows', None)

# Convert the os.environ object to a dictionary and then to a DataFrame
env_df = pd.DataFrame(list(dict(os.environ).items()), columns=['Environment Variable', 'Value'])

# Display the DataFrame
from IPython.display import display

display(env_df)

In [None]:
import socket
localhost=socket.gethostname()
local_ip=socket.gethostbyname(localhost)

print(f'localhost: {localhost}')
print(f'ip: {local_ip}')

In [None]:
spark_version=!head  -n1 $SPARK_HOME/RELEASE | awk '{print $2}'
spark_version = spark_version[0]

print(f"Spark version from SPARK_HOME: {spark_version}")
spark_version_short=''.join(spark_version.split('.'))

In [None]:
import logging
import sys

logging.basicConfig(format='%(levelname)s : %(message)s', level=logging.ERROR, stream=sys.stdout)
logger = logging.getLogger()

In [None]:
import os

hdfs_event_dir=''
local_event_dir=''

def get_spark_eventlog_dir(path):
    eventlog_dir = None
    eventlog_enabled = False
    try:
        with open(path, 'r') as f:
            for line in f:
                if line.startswith('spark.eventLog.dir'):
                    eventlog_dir = line.split(' ')[-1].strip()
                elif line.startswith('spark.eventLog.enabled'):
                    eventlog_enabled = line.split(' ')[-1].strip().lower() == 'true'
    except FileNotFoundError:
        raise SystemExit(f"'spark-defaults.conf' not found: {path}")
    if not eventlog_enabled:
        raise SystemExit("'spark.eventLog.enabled' must be enabled.")
    return eventlog_dir

spark_defaults_conf = None

if 'SPARK_CONF_DIR' in os.environ:
    spark_defaults_conf = os.path.join(os.environ['SPARK_CONF_DIR'], 'spark-defaults.conf')
elif 'SPARK_HOME' in os.environ:
    spark_defaults_conf = os.path.join(os.environ['SPARK_HOME'], 'conf', 'spark-defaults.conf')

if spark_defaults_conf:
    event_log_dir = get_spark_eventlog_dir(spark_defaults_conf)
    if event_log_dir:
        print(f"spark.eventLog.dir: {event_log_dir}")
        if event_log_dir[:7] == 'hdfs://':
            hdfs_event_dir = event_log_dir
        elif event_log_dir[:6] == 'file:/':
            local_event_dir = event_log_dir[6:]
    else:
        raise SystemExit(f"'spark.eventLog.dir' is not configured in {spark_defaults_conf}")
else:
    raise SystemExit("Cannot get `spark.eventLog.dir`. Neither SPARK_CONF_DIR nor SPARK_HOME defined in envrionment variables.")
    

# Monitor

In [None]:
import findspark
import os

findspark.init(os.environ['SPARK_HOME'])
os.environ.setdefault('SPARK_SUBMIT_OPTS', '-Dscala.usejavacp=true')

In [None]:
import warnings
warnings.filterwarnings('ignore')

import atexit
import collections
import gzip
import importlib
import json
import logging
import math
import os
import pathlib
import shutil
import signal
import subprocess
import tempfile
import threading
import time
import timeit
import traceback

import matplotlib
import matplotlib.colors as colors
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import pandas as pd
import platform
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
import spylon_kernel
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor
from datetime import date, datetime
from functools import reduce
from IPython.display import display, HTML
from matplotlib import rcParams
from pyspark import SparkConf, SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql import SparkSession, SQLContext, Window
from pyspark.sql.functions import col, floor, lit, rank, to_date
from pyspark.sql.types import (DoubleType, FloatType, IntegerType,
                               StringType, StructField, StructType,
                               TimestampType)

from spylon_kernel import register_ipython_magics
from spylon.spark.utils import SparkJVMHelpers

register_ipython_magics()

rcParams['font.sans-serif'] = 'Courier New'
rcParams['font.family'] = 'Courier New'
rcParams['font.size'] = '12'

%matplotlib inline


In [None]:
import socket
import os
import sys
import json

def upload_profile(server, base_dir, appid):
    local_profile_dir = os.path.join(home, 'profile')
    !mkdir -p {local_profile_dir}
    !(cd {local_profile_dir}; rm -f {appid}.tar.gz; tar zcvf {appid}.tar.gz {appid}) >/dev/null 2>&1
    
    server_local_dir=os.path.join('PAUS', base_dir)
    server_local_profile_dir=os.path.join(server_local_dir, 'profile')
    server_hdfs_dir=f'/{base_dir}/'

    !ssh {server} "mkdir -p {server_local_profile_dir}"
    !ssh {server} "cd {server_local_profile_dir} && rm {appid}.tar.gz >/dev/null 2>&1"
    !ssh {server} "cd {server_local_profile_dir} && rm -r {appid} >/dev/null 2>&1"
    !scp {local_profile_dir}/{appid}.tar.gz {server}:{server_local_profile_dir}/
    !ssh {server} "cd {server_local_profile_dir} && tar zxf {appid}.tar.gz"
    !ssh {server} "hdfs dfs -mkdir -p {server_hdfs_dir}"
    !ssh {server} "hdfs dfs -rm -r {server_hdfs_dir}{appid} >/dev/null 2>&1"
    !ssh {server} "hdfs dfs -put {server_local_profile_dir}/{appid} {server_hdfs_dir}"
    !ssh {server} "cd {server_local_profile_dir}; rm {appid}.tar.gz; rm -r {appid}"

def killsar(clients):
    for l in clients:
        out=!ssh $l "ps aux | grep -w sar | grep -v grep | tr -s ' ' | cut -d' ' -f2"
        for x in out:
            !ssh $l "kill $x > /dev/null 2>&1"
    for l in clients:
        out=!ssh $l "ps aux | grep -w pidstat | grep -v grep | tr -s ' ' | cut -d' ' -f2"
        for x in out:
            !ssh $l "kill $x > /dev/null 2>&1"
    for l in clients:
        out=!ssh $l "ps aux | grep -w perf | grep -v grep | tr -s ' ' | cut -d' ' -f2"
        for x in out:
            !ssh root@$l "kill $x > /dev/null 2>&1"
    for l in clients:
        !ssh $l "emon -stop > /dev/null 2>&1"

def killnumactl(clients):
    for l in clients:
        out =!ssh $l "ps aux | grep numactl | grep bash | tr -s ' ' | cut -d' ' -f2"
        for x in out:
            !ssh $l "kill $x > /dev/null 2>&1"

def startmonitor(clients, appid, collect_emon, **kwargs):
    local_profile_dir=os.path.join(home, 'profile')
    prof=os.path.join(local_profile_dir, appid)
    !mkdir -p {prof}
    
    for l in clients:
        !ssh root@{l} date
    
    killsar(clients)
    
    if collect_emon:
        !cp -f {emon_list} {home}/emon.list
        for l in clients:
            !scp {home}/emon.list {l}:{home}/emon.list  > /dev/null 2>&1
    
    perfsyscalls=kwargs.get("collect_perf_syscall",None)
    
    for l in clients:
        prof_client=os.path.join(prof, l)
        !mkdir -p {prof_client}
        !ssh {l} mkdir -p {prof_client}
        !ssh {l} "sar -o {prof_client}/sar.bin -r -u -d -B -n DEV 1 >/dev/null 2>&1 &"
        !ssh root@{l} "jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f 1 | xargs -I % bash -c '(cat /proc/%/status >> {prof_client}/%.stat; cat /proc/%/io >> {prof_client}/%.stat)'"
        if collect_emon:
            !ssh {l} "emon -i {home}/emon.list -f {prof_client}/emon.rst >/dev/null 2>&1 & "
        else:
            !ssh root@{l} "perf stat -e 'instructions,cycles,cpu_clk_unhalted.thread,cpu_clk_unhalted.ref_tsc' -a -I 500 -o {prof_client}/perfstat.txt  >/dev/null 2>&1 & "
            !ssh {l} "cat /sys/devices/system/cpu/cpu0/tsc_freq_khz | xargs -I% echo %000 > {prof_client}/tsc_freq 2>/dev/null &"
            !ssh {l} "lscpu | grep '^CPU(s):' | cut -d ':' -f 2 | tr -d ' ' > {prof_client}/totalcores 2>/dev/null &"
        if kwargs.get("collect_pid",False):
            !ssh {l} "jps | grep CoarseGrainedExecutorBackend | head -n 1 | cut -d' ' -f 1 | xargs  -I % pidstat -h -t -p % 1  > {prof_client}/pidstat.out  2>/dev/null &"
        !ssh root@{l} 'cat /proc/uptime  | cut -d" " -f 1 | xargs -I ^ date -d "- ^ seconds"  +%s.%N' > $prof/$l/uptime.txt
        if kwargs.get("collect_sched",False):
            !ssh root@{l} 'perf trace -e "sched:sched_switch" -C 8-15 -o {prof_client}/sched.txt -T -- sleep 10000 >/dev/null 2>/dev/null &'
        if perfsyscalls is not None:
            !ssh root@{l} "perf stat -e 'syscalls:sys_exit_poll,syscalls:sys_exit_epoll_wait' -a -I 1000 -o {prof_client}/perfsyscalls.txt  >/dev/null 2>&1 & "
        if kwargs.get("collect_hbm",False):
            hbm_nodes = kwargs.get("hbm_nodes")
            if hbm_nodes is not None:
                print("collect_hbm")
                hbm_nodes = '\|'.join(["node " + str(i) for i in hbm_nodes])
                %env hbm_numa_nodes={hbm_nodes}
                %env hbm_l = {l}
                %env hbm_prof = {prof}
                !ssh $hbm_l "echo timestamp, size, free > $hbm_prof/$hbm_l/numactl.csv"
                !ssh $hbm_l "while :; do echo \$(numactl -H | grep '$hbm_numa_nodes' | grep 'size' | awk '{ print \$4 }' | awk '{ s += \$1 } END { print s }'), \$(numactl -H | grep '$hbm_numa_nodes' | grep 'free' | awk '{ print \$4 }' | awk '{ s += \$1 } END { print s }') | ts '%Y-%m-%d %H:%M:%S,' >> $hbm_prof/$hbm_l/numactl.csv; sleep 1; done >/dev/null 2>&1 &"
            else:
                print("Missing argument: hbm_nodes. e.g. hbm_nodes = list(range(8,16))")

def stopmonitor(clients, sc, appid, result, collect_emon, **kwargs):
    %cd ~
    
    local_profile_dir=os.path.join(home, 'profile')
    prof=os.path.join(local_profile_dir, appid)
    !mkdir -p {prof}

    killsar(clients)
    killnumactl(clients)
    
    for l in clients:
        prof_client=os.path.join(prof, l)
        !ssh {l} "sar -f {prof_client}/sar.bin -r > {prof_client}/sar_mem.sar;sar -f {prof_client}/sar.bin -u > {prof_client}/sar_cpu.sar;sar -f {prof_client}/sar.bin -d -p > {prof_client}/sar_disk.sar;sar -f {prof_client}/sar.bin -n DEV > {prof_client}/sar_nic.sar;sar -f {prof_client}/sar.bin -B > {prof_client}/sar_page.sar;" 
        !ssh root@{l} "jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f 1 | xargs -I % bash -c '(cat /proc/%/status >> {prof_client}/%.stat; cat /proc/%/io >> {prof_client}/%.stat)'"
        if collect_emon:
            !ssh {l} "source ~/sep_install/sep_vars.sh>/dev/null 2>&1; emon -v " > {prof}/{l}/emonv.txt
        !ssh {l} "sar -V " > {prof_client}/sarv.txt
        !ssh {l} "test -f {prof_client}/perfstat.txt && head -n 1 {prof_client}/perfstat.txt > {prof_client}/perfstarttime"
        if l!= socket.gethostname():
            !scp -r {l}:{prof_client} {prof}/ > /dev/null 2>&1
    
    if sc is not None:
        sc.stop()
        
    !git --git-dir="{gluten_home}/.git" log --format="commit %H%nAuthor: %an <%ae>%nDate:  %cs%n %n %s %n" --since=`date --date='2 days ago'  +'%m/%d/%Y'` > {prof}/changelog_gluten
    !git --git-dir="{gluten_home}/ep/build-velox/build/velox_ep/.git" log --format="commit %H%nAuthor: %an <%ae>%nDate:  %cs%n %n %s %n" --since=`date --date='2 days ago'  +'%m/%d/%Y'` > {prof}/changelog_velox
    
    with open(f"{prof}/starttime","w") as f:
        f.write("{:d}".format(int(time.time()*1000)))
    
    with open(f'{prof}/query_time.json', 'w') as f:
        json.dump(result, f)

    if hdfs_event_dir != '':
        !hadoop fs -copyToLocal {hdfs_event_dir}/{appid} {prof}/app.log
    elif local_event_dir != '':
        !cp {local_event_dir}/{appid}  {prof}/app.log

In [None]:
def pinexecutor_numa(clients):
    cpunum = !ssh {clients[0]} "grep 'processor' /proc/cpuinfo | wc -l"
    cpunum = int(cpunum[0])
        
    numanodes=!ssh {clients[0]} "cat /sys/devices/system/node/node*/cpulist"
    numanodes = list(filter(lambda x: x != '', numanodes))
    print(numanodes)
    for client in clients:
        pids=!ssh {client} "jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f1"
        print(client,":",len(pids)," ","\t".join(map(str,pids)))
        
        cpunum_c = !ssh {client} "grep 'processor' /proc/cpuinfo | wc -l"
        cpunum_c = int(cpunum_c[0])
        if cpunum_c != cpunum:
            print(f"client {client} cpunum not match!")
            return
        numanodes_c=!ssh {client} "cat /sys/devices/system/node/node*/cpulist"
        numanodes_c = list(filter(lambda x: x != '', numanodes))
        time.sleep(1)
        print(numanodes_c)
        if numanodes_c != numanodes:
            print(f"client {client} numanodes not match!")
            return
        
        idx = 0
        nodes=len(numanodes)
        for i in range(nodes):
            cpus = numanodes[i]
            for l in pids[idx:idx+int(len(pids)/nodes)]: #  executors on 1 numanode
                print(f" {cpus} {l}")
                !ssh {client} "taskset -a -p -c $cpus $l > /dev/null 2>&1 "
            idx += int(len(pids)/nodes)

In [None]:
def config_pagecache(clients, run_gluten=True):
    for l in clients:
        if run_gluten:
            !ssh root@$l "echo 80 > /proc/sys/vm/dirty_ratio"
            !ssh root@$l "echo 50 > /proc/sys/vm/dirty_background_ratio"
            !ssh root@$l "echo 360000 > /proc/sys/vm/dirty_expire_centisecs"
            !ssh root@$l "echo 3000 > /proc/sys/vm/dirty_writeback_centisecs"

        else:
            !ssh root@$l "echo 10 > /proc/sys/vm/dirty_ratio"
            !ssh root@$l "echo 20 > /proc/sys/vm/dirty_background_ratio"
            !ssh root@$l "echo 3000 > /proc/sys/vm/dirty_expire_centisecs"
            !ssh root@$l "echo 500 > /proc/sys/vm/dirty_writeback_centisecs"

In [None]:
def print_kernel_params(clietns):
    params = {
        'transparent hugepage': '/sys/kernel/mm/transparent_hugepage/enabled',
        'auto numa balancing': '/proc/sys/kernel/numa_balancing',
        'scaling governor': '/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor',
        'scaling max freq': '/sys/devices/system/cpu/cpu*/cpufreq/scaling_max_freq',
        'scaling cur freq': '/sys/devices/system/cpu/cpu*/cpufreq/scaling_cur_freq',
        'power & perf policy': '/sys/devices/system/cpu/cpu*/power/energy_perf_bias',
        'dirty_ratio': '/proc/sys/vm/dirty_ratio',
        'dirty_background_ratio': '/proc/sys/vm/dirty_background_ratio',
        'dirty_expire_centisecs': '/proc/sys/vm/dirty_expire_centisecs',
        'dirty_writeback_centisecs': '/proc/sys/vm/dirty_writeback_centisecs'
    }
    for k, param in params.items():
        print()
        print(f'{k} ({param})')
        for l in clients:
            print(l + ": ", end='')
            res = !ssh root@$l "cat {param}"
            print(*res)
    # print numactl
    print()
    print("numactl -H")
    for l in clients:
        print(l + ":")
        res = !ssh $l "numactl -H"
        print('\n'.join(res))
    # print memory freq
    print()
    print("Memory Frequency")
    for l in clients:
        print(l + ":")
        res= !ssh root@$l "dmidecode -t memory | grep Speed"
        print('\n'.join(res))

In [None]:
def dropcache(clients):
    for l in clients:
        !ssh root@$l "sync && echo 3 > /proc/sys/vm/drop_caches; echo 1 >/proc/sys/vm/compact_memory; free -h"

In [None]:
def config_mem_cgroup(clients):
    mem_cgroup = """
CGROUP_ROOT=/sys/fs/cgroup/gluten

if [ ! -d $CGROUP_ROOT ] ; then
        sudo mkdir $CGROUP_ROOT
        # enable memory for subtree
        sudo bash -c "echo '+memory' >> $CGROUP_ROOT/cgroup.subtree_control"
fi

# move each process to sub memory group
index=0
for pid in `jps | grep Coarse | awk '{print $1}'` ; do
        target_cgroup=$CGROUP_ROOT/mem-${index}
        if [ ! -d $target_cgroup ] ; then
                sudo mkdir $target_cgroup
        fi
        proc_file=$target_cgroup/cgroup.procs
        sudo bash -c "echo $pid >> $proc_file"
        index=`expr $index + 1`
done
    """
    with open(f'{home}/mem-cgroup.sh', 'w+') as f:
        f.writelines(mem_cgroup)
    for l in clients:
        !scp {home}/mem-cgroup.sh {l}:{home}/ >/dev/null 2>&1
        !ssh {l} "bash {home}/mem-cgroup.sh >/dev/null 2>&1 &"

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import os


from IPython.display import display, HTML

def get_io_stats(appid,  client):
    file_path = os.path.join(home,'profile',appid,client)
    statf = [f for f in os.listdir(file_path) if f.endswith('.stat')]
    statmap=[]
    for f in statf:
        statmap.append({'pid':f[:-len(".stat")]})
        with open(os.path.join(file_path, f),"r") as fi:
            cnts=fi.readlines()
        for l in cnts:
            for fld in ['rchar','wchar','syscr','syscw','read_bytes','write_bytes','cancelled_write_bytes']:
                if l.startswith(fld):
                    if not fld in statmap[-1]:
                        statmap[-1][fld]=int(l.split(" ")[-1].strip())
                    else:
                        statmap[-1][fld]=(int(l.split(" ")[-1].strip())-statmap[-1][fld])/1024/1024/1024

    df = pd.DataFrame(statmap).drop('pid', axis=1).sum().to_frame()
    df.columns = ['sum']
    return df

# Preprocess 'time' column
def process_time(dataframes):
    for df in dataframes:
        df.columns=['time']+list(df.columns[1:])
        df = df[df.time != 'Average:']
        df['time'] = pd.to_datetime(df['time'], format='%H:%M:%S').dt.time
        df['time'] = df['time'].apply(lambda dt: dt.hour*3600 + dt.minute*60 + dt.second)

        offset = 12 * 3600 # half-day seconds
        for i in range(1, len(df)):
            if df['time'].iloc[i] < df['time'].iloc[i-1]:  # Detect AM->PM or PM->AM
                for j in range(i, len(df)): # Apply offset until end
                    df['time'].iloc[j] += offset

        df['time'] = df['time'].astype(int)
        yield df

def draw_sar(appid, qtime=None, disk_dev=None, nic_dev=None, client=None):
    if client is None:
        client = clients[0]

    display(HTML('<font size=6pt color=red>{:s}</font>'.format(client)))

    display(get_io_stats(appid, client))

    # Read data
    profile_dir = os.path.join(home,'profile',appid,client)
    datafiles = [os.path.join(profile_dir, datafile) for datafile in ['sar_cpu.sar', 'sar_mem.sar', 'sar_disk.sar', 'sar_nic.sar', 'sar_page.sar']]
    dataframes = [pd.read_csv(datafile, header=1, delim_whitespace=True, parse_dates=True) for datafile in datafiles]
  
    num_figs=5
    fig, axs=plt.subplots(num_figs,1,sharex=True,figsize=(30,5*4))

    [cpu_df, mem_df, disk_df, nic_df, page_df] = process_time(dataframes)

    # CPU usage
    cpu_df['total'] = cpu_df['%user'] + cpu_df['%system'] + cpu_df['%iowait']

    starttime = cpu_df[cpu_df['total'] > 50]['time'].min() - 1
    cpu_df['time'] -= starttime

    axs[4].stackplot(cpu_df['time'], cpu_df['%user'], cpu_df['%system'], cpu_df['%iowait'], labels=['user','system','iowait'])
    axs[4].legend(loc='upper left')

    # Memory usage
    mem_df['dirty_cached'] = mem_df['kbdirty'] * mem_df['%memused'] / mem_df['kbmemused']
    mem_df['clean_cached'] = (mem_df['kbcached'] - mem_df['kbdirty']) * mem_df['%memused'] / mem_df['kbmemused']
    mem_df['used'] = mem_df['kbmemused'] * mem_df['%memused'] / mem_df['kbmemused']
#     mem_df['used'] = (mem_df['kbmemused'] - mem_df['kbbuffers'] - mem_df['kbcached'])* mem_df['%memused'] / mem_df['kbmemused']

    mem_df['time'] -= starttime

    axs[0].stackplot(mem_df['time'], mem_df['used'], mem_df['clean_cached'], mem_df['dirty_cached'], labels=['used','clean cached','dirty cached'])
    axs[0].legend(loc='upper left')
    axs[0].grid(axis = 'y')

    # Disk usage
    if disk_dev is not None:
        disk_df = disk_df[disk_df['DEV'].isin(disk_dev)]
    disk_df['rkB/s'] = disk_df['rkB/s'].astype(float)
    disk_df['wkB/s'] = disk_df['wkB/s'].astype(float)
    disk_df['%util'] = disk_df['%util'].astype(float)


    disk_df = disk_df.groupby('time').agg({'rkB/s': 'sum', 'wkB/s': 'sum', '%util':'mean'}).reset_index()
    disk_df['read'] = disk_df['rkB/s'] / 1024
    disk_df['write'] = disk_df['wkB/s'] / 1024

    disk_df['time'] -= starttime

    axs[1].stackplot(disk_df['time'], disk_df['read'], disk_df['write'], labels=['read MB/s','write MB/s'])
    axs[1].grid(axis = 'y')

    ax2 = axs[1].twinx()

    ax2.plot(disk_df['time'], disk_df['%util'],'g-')
    axs[1].legend(loc='upper left')

    
    # Nic usage
    if nic_dev is not None:
        nic_df = nic_df[nic_df['IFACE'].isin(nic_dev)]
    nic_df['rxkB/s'] = nic_df['rxkB/s'].astype(float)
    nic_df['txkB/s'] = nic_df['txkB/s'].astype(float)
    
    nic_df = nic_df.groupby('time').agg({'rxkB/s': 'sum', 'txkB/s': "sum"}).reset_index()
    nic_df['rx'] = nic_df['rxkB/s'] / 1024
    nic_df['tx'] = nic_df['txkB/s'] / 1024
    
    nic_df['time'] -= starttime
    
    axs[2].stackplot(nic_df['time'], nic_df['rx'], nic_df['tx'], labels=['rx MB/s','tx MB/s'])
    axs[2].legend(loc='upper left')
    axs[2].grid(axis = 'y')

    # Pagefaults
    page_df['minflt/s'] = page_df['fault/s'] - page_df['majflt/s']
    
    page_df['time'] -= starttime

    axs[3].stackplot(page_df['time'], page_df['minflt/s'], page_df['majflt/s'], labels=['minor_fault/s','major_fault/s'])
    axs[3].legend(loc='upper left')
    axs[3].grid(axis = 'y')

    # Add vertical lines and text for qtime, and calculate per query cpu%
    if qtime is not None:
        for ax in axs:
            x = 0
            ax.axvline(x = x, color = 'b')
            for k, v in qtime.items():
                x += v
                ax.axvline(x = x, color = 'b')

            tx = 0
            for k, v in qtime.items():
                if v / x > 15 / 772:
                    ax.text(tx + v / 2 - 6 * x / 772, ax.get_ylim()[1] * 1.05, k)
                tx += v

        x = 0
        qtime_se = {}
        cols = ['%user','%system','%iowait']
        for k, v in qtime.items():
            filtered_df = cpu_df[(cpu_df['time'] >= x) & (cpu_df['time'] <= x+v)]
            averages = filtered_df[cols].mean()
            qtime_se[k] = averages.tolist()
            x += v
        if qtime_se:
            perqcpu = pd.DataFrame(qtime_se).T
            perqcpu.columns = cols
            display(perqcpu)

    plt.show()


In [None]:
def convert_to_etc_gmt(tz_offset=None):
    # Run the 'date +%z' command and get the output
    if not tz_offset:
        tz_offset = !date +%z
        tz_offset = tz_offset[0]
    
    # Extract the sign and the hour/minute offset
    sign = tz_offset[0]
    hours = int(tz_offset[1:3])
    minutes = int(tz_offset[3:])

    # Convert the offset to a GMT value
    gmt_offset = hours + (minutes / 60)
    if sign == '+':
        gmt_offset = -gmt_offset
    else:
        gmt_offset = abs(gmt_offset)

    # Construct the Etc/GMT string
    etc_gmt = f"Etc/GMT{int(gmt_offset):+d}"
    return etc_gmt

In [None]:
def get_last_run(records_file, appid=''):
    if os.path.exists(records_file):
        if appid:
            lines=!tail -n100 $records_file
            if len(lines) > 1:
                # Check appid match
                last_appid = lines[-1].split('\t')[1]
                if last_appid != appid:
                    print(f'appid not match. Required {appid}. Got {last_appid}')
                else:
                    for line in lines[:-1][::-1]:
                        l=line.split('\t')
                        if 'main' in l[3]:
                            return l[1],l[2],l[3]
        else:
            lines=!tail -n1 $records_file
            if len(lines) == 1:
                l=lines[0].split('\t')
                return l[1],l[2],l[3]
    return None, None, None

# TestTPC

In [None]:
import os
import socket
from dataclasses import dataclass
from functools import wraps
from pathlib import Path
from typing import List 

class TestTPC:
    @dataclass
    class query_info:
        tables: List[str]
        sql: List[str]

    query_infos = {}
    query_ids =[]

    tpctables=[]
    tpc_query_path = ''
    
    RECORDS_SPARK_TPCH = f"records_spark_tpch_spark{spark_version_short}.csv"
    RECORDS_SPARK_TPCDS = f"records_spark_tpcds_spark{spark_version_short}.csv"
    RECORDS_GLUTEN_TPCH = f"records_gluten_tpch_spark{spark_version_short}.csv"
    RECORDS_GLUTEN_TPCDS = f"records_gluten_tpcds_spark{spark_version_short}.csv"
    
    def __init__(self, spark, table_dir, run_gluten, workload, server, base_dir, nb_name, data_source = 'parquet'):
        self.spark = spark
        self.sc = spark.sparkSession.sparkContext
        self.appid = self.sc.applicationId
        self.app_name = '_'.join(self.sc.appName.split(' '))
        self.run_gluten = run_gluten
        self.workload = workload
        self.table_dir = table_dir
        self.server = server
        self.base_dir = base_dir
        self.nb_name = nb_name
        self.data_source = data_source
        self.table_loaded = False
        self.result = {}
        self.duration = 0
        self.stopped = False
        self.collect_emon = False
        self.perf_html = ''
        self.finished_nb = ''
        for l in os.listdir(self.tpc_query_path):
            if (l[-3:] == 'sql'):
                with open(self.tpc_query_path+l,"r") as f:
                    self.query_infos[l.split(".")[0]]=self.query_info(self.tpctables,["\n".join(f.readlines())])
        self.query_ids = sorted(self.query_infos.keys(), key=lambda x: str(len(x))+x if x[-1] != 'a' and x[-1] != 'b' else str(len(x)-1) + x)
        print("http://{}:18080/history/{}/jobs/".format(local_ip, self.sc.applicationId))
    
    def start_monitor(self, clients, emon_list='', **kw):
        if emon_list:
            self.collect_emon = True
        startmonitor(clients, self.appid, self.collect_emon, **kw)
    
    def stop_monitor(self, clients, **kw):
        if self.stopped:
            return
        stopmonitor(clients, self.sc, self.appid, self.result, self.collect_emon, **kw)

        output_nb = f'{self.nb_name[:-6]}-{self.appid}.ipynb'
        
        record_file = ''
        if self.workload == 'tpch':
            if self.run_gluten:
                record_file = self.RECORDS_GLUTEN_TPCH
            else:
                record_file = self.RECORDS_SPARK_TPCH
        else:
            if self.run_gluten:
                record_file = self.RECORDS_GLUTEN_TPCDS
            else:
                record_file = self.RECORDS_SPARK_TPCDS
        record_file = os.path.join(cwd, record_file)
        with open(record_file, 'a+') as f:
            f.write(f'{datetime.now()}\t{self.appid}\t{self.base_dir}\t{self.app_name}\t{output_nb}\t{self.duration}\n')

        if self.server:
            if output_nb.startswith(cwd):
                output_nb = os.path.relpath(output_nb, cwd)
                self.finished_nb = f"http://{localhost}:8888/tree/{output_nb}"
            upload_profile(self.server, self.base_dir, self.appid)
        
        self.stopped = True

    def run_perf_analysis(self, server_gluten_home, disk_dev, nic_dev, proxy='', emails=[], pr=''):
        if not self.server:
            return

        run_script=f'{server_gluten_home}/tools/workload/benchmark_velox/analysis/run_perf_analysis.sh'

        disk=','.join(disk_dev)
        nic=','.join(nic_dev)

        command =' '.join(['bash', run_script, '--base-dir', self.base_dir, '--name', self.app_name, '--appid', self.appid, '--disk', disk, '--nic', nic, '--tz', convert_to_etc_gmt(), '--proxy', proxy if proxy != '' else "''", '--emails', ','.join(emails) if emails else "''", '--pr', pr if pr != '' and pr.isdigit() else "''"])
        
        if self.run_gluten:
            if self.workload == 'tpch':
                comp_file = os.path.join(cwd, self.RECORDS_GLUTEN_TPCH)
                baseline_file = os.path.join(cwd, self.RECORDS_SPARK_TPCH)
            else:
                comp_file = os.path.join(cwd, self.RECORDS_GLUTEN_TPCDS)
                baseline_file = os.path.join(cwd, self.RECORDS_SPARK_TPCDS)
            comp_appid, comp_base_dir, comp_name = get_last_run(comp_file, self.appid)
            if comp_appid:
                command += ' '.join(['', '--comp-appid', comp_appid, '--comp-base-dir', comp_base_dir, '--comp-name', comp_name])
            baseline_appid, baseline_base_dir, _ = get_last_run(baseline_file, '')
            if baseline_appid:
                command += ' '.join(['', '--baseline-appid', baseline_appid, '--baseline-base-dir', baseline_base_dir])
        print(command)

        # Block if running on local cluster.
        if self.server == localhost:
            !ssh {self.server} "{command} > /dev/null 2>&1"
        else:
            !ssh {self.server} "{command} > /dev/null 2>&1 &"

        self.perf_html=f'http://{self.server}:8889/view/{self.base_dir}/html/{self.app_name}_{self.appid}.html'
        display(HTML(f'<a href="{self.perf_html}">{self.perf_html}</a>'))
        
    def load_table(self, table):
        if type(self.table_dir)==list:
            return self.spark.read.format(self.data_source).load([os.path.join(t, table) for t in self.table_dir])
        else:
            return self.spark.read.format(self.data_source).load(os.path.join(self.table_dir, table))
    
    def load_tables_as_tempview(self, tables):
        for table in tables:
            df = self.load_table(table)
            df.createOrReplaceTempView(table)
        
    def load_all_tables_as_tempview(self):
        print(f"Loading all tables: {self.tpctables}")
        self.load_tables_as_tempview(self.tpctables)
    
    def load_query(self, query):
        info = self.query_infos[query]
        return [self.spark.sql(q) for q in info.sql]
    
    def run_query(self, query, explain = False, print_result=False, load_table=True):
        if load_table:
            self.load_all_tables_as_tempview()
        start_time = timeit.default_timer()
        print("start query " + query + ", application id " + self.sc.applicationId)
        print("{} : {}".format("Start time", start_time))
        self.sc.setJobDescription(query)

        queries = self.load_query(query)
        for q in queries:
            if explain: q.explain()
            collect=q.collect()
        end_time = timeit.default_timer()
        duration = end_time - start_time
        display(HTML(('Completed Query. Time(sec): <font size=6pt color=red>{:f}</font>'.format(duration))))
        
        self.result[query] = duration
        self.duration += float(duration)
        if print_result:
            print(collect)

    def power_run(self, explain=False, print_result=False, load_table=True):
        if load_table:
            self.load_all_tables_as_tempview()
        for l in self.query_ids:
            self.run_query(l, explain=explain, print_result=print_result, load_table=False)

    def print_result(self):
        print(self.result)
        print()
        print(f"total duration:\n{self.duration}\n")
        if self.server:
            print(self.finished_nb)
            print(f"http://{self.server}:1088/tracing_examples/trace_viewer.html#/tracing/test_data/{self.appid}.json")
            print(f"http://{self.server}:18080/history/{self.appid}")
            print(self.perf_html)
        print(self.appid)
        for t in self.result.values():
            print(t)
    
class TestTPCH(TestTPC):
    tpctables = ['customer', 'lineitem', 'nation', 'orders', 'part', 'partsupp', 'region', 'supplier']
    tpc_query_path = f'{gluten_home}/tools/gluten-it/common/src/main/resources/tpch-queries/'
        
    def __init__(self, spark, table_dir, run_gluten, server, base_dir, nb_name, data_source = 'parquet'):
        TestTPC.__init__(self,spark, table_dir, run_gluten, 'tpch', server, base_dir, nb_name, data_source)
                
class TestTPCDS(TestTPC):
    tpctables = [ 'call_center',
         'catalog_page',
         'catalog_returns',
         'catalog_sales',
         'customer',
         'customer_address',
         'customer_demographics',
         'date_dim',
         'household_demographics',
         'income_band',
         'inventory',
         'item',
         'promotion',
         'reason',
         'ship_mode',
         'store',
         'store_returns',
         'store_sales',
         'time_dim',
         'warehouse',
         'web_page',
         'web_returns',
         'web_sales',
         'web_site']
    tpc_query_path = f'{gluten_home}/tools/gluten-it/common/src/main/resources/tpcds-queries/'
    
    def __init__(self, spark, table_dir, run_gluten, server, base_dir, nb_name, data_source = 'parquet'):
        TestTPC.__init__(self,spark, table_dir, run_gluten, 'tpcds', server, base_dir, nb_name, data_source)

# Create SparkContext

## default config

In [None]:
import os

def findjemalloc():
    l = clients[0]
    jemallocDir = !ssh $l "whereis libjemalloc.so.2"
    libjemalloc = jemallocDir[0].split(' ')
    return libjemalloc[1]

def get_py4jzip():
    spark_home=os.environ['SPARK_HOME']
    py4jzip = !ls {spark_home}/python/lib/py4j*.zip
    return py4jzip[0]

def default_conf(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars='', app_name='', master='yarn', run_gluten=False):
    # Create a temp directory that gets cleaned up on exit
    output_dir = os.path.abspath(tempfile.mkdtemp())
    def cleanup():
        shutil.rmtree(output_dir, True)
    atexit.register(cleanup)
    signal.signal(signal.SIGTERM, cleanup)

##################################################
    def convert_to_bytes(size):
        units = {'k': 1, 'm': 2, 'g': 3}
        size = size.lower()
        if size[-1] in units:
            return int(size[:-1]) * 1024 ** units[size[-1]]
        else:
            return int(size)

    def yarn_padding(size):
        min_size =  convert_to_bytes('1g')
        step = min_size
        while size > min_size:
            min_size += step
        return min_size - size
    
    num_nodes = len(clients)
    num_executors = num_nodes*executors_per_node
    parallelism = num_executors*cores_per_executor*task_per_core

    if run_gluten:
        offheap_ratio = gluten_offheap_ratio
    else:
        offheap_ratio = spark_offheap_ratio
    driver_memory = convert_to_bytes('20g')
    executor_memory_overhead = convert_to_bytes('1g')
    
    # Minimun executor memory
    min_memory = convert_to_bytes('1g')

    # Calculate executor onheap memory
    num_driver = 1 if localhost in clients else 0
    executor_memory = math.floor((convert_to_bytes(memory_per_node) - (executor_memory_overhead + min_memory)*executors_per_node - (driver_memory + min_memory)*num_driver)/(offheap_ratio*num_driver + (1+offheap_ratio)*executors_per_node))
    executor_memory = max(executor_memory, min_memory)
    # Calculate driver/executor offheap memory in MB
    #offheap_memory_per_node = convert_to_bytes(memory_per_node) - (executor_memory + executor_memory_overhead) * executors_per_node
    if offheap_ratio > 0:
        enable_offheap = True
        offheap_memory = math.floor(executor_memory*offheap_ratio)
    else:
        enable_offheap = False
        offheap_memory = 0

    byte_to_mb = lambda x: int(x/(1024 ** 2))
    driver_memory_mb = byte_to_mb(driver_memory)
    executor_memory_overhead_mb = byte_to_mb(executor_memory_overhead)
    executor_memory_mb = byte_to_mb(executor_memory)
    offheap_memory_mb = byte_to_mb(offheap_memory)
    
    executor_totalmem_mb = executor_memory_overhead_mb + executor_memory_mb + offheap_memory_mb
    executor_totalmem_mb = yarn_padding(executor_totalmem_mb)
    if byte_to_mb(convert_to_bytes(memory_per_node)) - executor_totalmem_mb*executors_per_node > executor_totalmem_mb:
        executor_memory_overhead_mb += 1024
    
    print('''
        executors per node: {:d}
        parallelism: {:d}
        executor memory: {:d}m
        offheap memory: {:d}m
    '''.format(executors_per_node, parallelism, executor_memory_mb, offheap_memory_mb))

    conf = SparkConf() \
        .set('spark.app.name', app_name)\
        .set('spark.master',master)\
        .set('spark.executor.memory', '{:d}m'.format(executor_memory_mb))\
        .set('spark.memory.offHeap.enabled', enable_offheap)\
        .set('spark.memory.offHeap.size','{:d}m'.format(offheap_memory_mb))\
        .set('spark.sql.shuffle.partitions', parallelism)\
        .set('spark.executor.instances', '{:d}'.format(num_executors))\
        .set('spark.executor.cores','{:d}'.format(cores_per_executor))\
        .set('spark.task.cpus','{:d}'.format(1))\
        .set('spark.driver.memory', '{:d}m'.format(driver_memory_mb))\
        .set('spark.executor.memoryOverhead', '{:d}m'.format(executor_memory_overhead_mb))\
        .set('spark.driver.maxResultSize', '4g')\
        .set('spark.executor.extraJavaOptions',\
            f'-XX:+UseParallelOldGC -XX:ParallelGCThreads=2 -XX:NewRatio=1 -XX:SurvivorRatio=1 -XX:+UseCompressedOops -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:ErrorFile={home}/logs/java/hs_err_pid%p.log')\
        .set('spark.driver.extraClassPath', extra_jars) \
        .set('spark.executor.extraClassPath', extra_jars) \
        .set('spark.executorEnv.PYTHONPATH',f"{os.environ['SPARK_HOME']}python:{get_py4jzip()}") \
        .set("spark.repl.class.outputDir", output_dir) \
        .set("spark.sql.broadcastTimeout", "4800") \
        .set('spark.serializer','org.apache.spark.serializer.KryoSerializer')\
        .set('spark.kryoserializer.buffer.max','512m')\
        .set('spark.kryo.unsafe',False)\
        .set('spark.sql.adaptive.enabled',True)\
        .set('spark.sql.autoBroadcastJoinThreshold',"10m")\
        .set('spark.sql.catalogImplementation','hive')\
        .set('spark.sql.optimizer.dynamicPartitionPruning.enabled',True)\
        .set('spark.cleaner.periodicGC.interval', '10s')

    return conf


def create_cntx_with_config(conf,conf_overwrite=None):

    importlib.reload(pyspark.java_gateway)

    def Popen(*args, **kwargs):
        """Wraps subprocess.Popen to force stdout and stderr from the child process
        to pipe to this process without buffering.
        """
        global spark_jvm_proc
        # Override these in kwargs to avoid duplicate value errors
        # Set streams to unbuffered so that we read whatever bytes are available
        # when ready, https://docs.python.org/3.6/library/subprocess.html#popen-constructor
        kwargs['bufsize'] = 0
        # Capture everything from stdout for display in the notebook
        kwargs['stdout'] = subprocess.PIPE
        print("java proc gateway popen")
        spark_jvm_proc = subprocess.Popen(*args, **kwargs)
        return spark_jvm_proc
    pyspark.java_gateway.Popen = Popen

    spylon_kernel.scala_interpreter.scala_intp=None
    
    if conf_overwrite is not None:
        conf=conf_overwrite(conf)
    print("spark.serializer: ",conf.get("spark.serializer"))
    print("master: ",conf.get("spark.master"))
    
    sc = SparkContext(conf = conf,master=conf.get("spark.master"))
    sc.setLogLevel('ERROR')
    
    sc.addPyFile(f"{os.environ['SPARK_HOME']}/python/lib/pyspark.zip")
    sc.addPyFile(get_py4jzip())
    
    spark = SQLContext(sc)
    
    time.sleep(30)
    
    for client in clients:
        pids=!ssh $client "jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f1"
        print(client,":",len(pids)," ","\t".join(map(str,pids)))
        
    spark_session = SparkSession(sc)
    spark_jvm_helpers = SparkJVMHelpers(spark_session._sc)
    spylon_kernel.scala_interpreter.spark_state = spylon_kernel.scala_interpreter.SparkState(spark_session, spark_jvm_helpers, spark_jvm_proc)
    
    print("appid: ",sc.applicationId)
    print("SparkConf:")

    df = pd.DataFrame(sc.getConf().getAll(), columns=['key', 'value'])
    display(df)

    return sc, spark

## Spark

In [None]:
def spark_tpch_conf_overwrite(conf):
    return conf

def spark_tpcds_conf_overwrite(conf):
    conf.set('spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold', '0')\
        .set('spark.sql.optimizer.runtime.bloomFilter.enabled', 'true')
    return conf

In [None]:
def create_cntx_spark(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name='', master='yarn', conf_overwrite=None):
    conf = default_conf(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name, master, run_gluten=False)
    conf.set("spark.sql.execution.arrow.maxRecordsPerBatch",20480)\
        .set("spark.sql.parquet.columnarReaderBatchSize",20480)\
        .set("spark.sql.inMemoryColumnarStorage.batchSize",20480)
    return create_cntx_with_config(conf,conf_overwrite)

## Gluten

In [None]:
def gluten_tpch_conf_overwrite(conf):
    return conf

def gluten_tpcds_conf_overwrite(conf):
    conf.set('spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold', '0')\
        .set('spark.sql.optimizer.runtime.bloomFilter.enabled', 'true')\
        .set('spark.gluten.sql.columnar.joinOptimizationLevel', '18')\
        .set('spark.gluten.sql.columnar.physicalJoinOptimizeEnable', 'true')\
        .set('spark.gluten.sql.columnar.physicalJoinOptimizationLevel', '18')\
        .set('spark.gluten.sql.columnar.logicalJoinOptimizeEnable', 'true')\
    return conf

In [None]:
def create_cntx_gluten(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name='', master='yarn', conf_overwrite=None):
    conf = default_conf(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name, master, run_gluten=True)
    conf.set('spark.sql.files.maxPartitionBytes', '4g')\
        .set('spark.plugins','org.apache.gluten.GlutenPlugin')\
        .set('spark.shuffle.manager','org.apache.spark.shuffle.sort.ColumnarShuffleManager')\
        .set('spark.gluten.sql.columnar.backend.lib','velox')\
        .set('spark.gluten.sql.columnar.maxBatchSize',4096)\
        .set('spark.gluten.sql.columnar.forceShuffledHashJoin',True)\
        .set('spark.executorEnv.LD_PRELOAD', findjemalloc())\
        .set('spark.gluten.sql.columnar.coalesce.batches', 'true')
    
    return create_cntx_with_config(conf,conf_overwrite)

## Create Context

In [None]:
def create_cntx(run_gluten=False, workload='tpch', app_conf_overwrite=None, server='', base_dir='', nb_name='tpc_workload.ipynb', app_name=''):
    table_dir=''
    extra_jars = ''
    is_tpch_workload=False
    is_tpcds_workload=False
    app_name_suffix=''
    workload_conf_overwrite=None
    create_cntx_func=None
    test_tpc=None

    if workload.lower() == 'tpch':
        app_name_suffix = f"tpch_spark{spark_version_short}"
        tabledir = tpch_tabledir
        is_tpch_workload=True
    elif workload.lower() == 'tpcds':
        app_name_suffix = f"tpcds_spark{spark_version_short}"
        tabledir = tpcds_tabledir
        is_tpcds_workload=True
    else:
        raise ValueError(f"Unknown workload: {workload}")

    lastgit=!git --git-dir {gluten_home}/.git log --format="%H" -n 1
    lastgit = lastgit[0]
    print(f'lastgit: {lastgit}')

    nodes=len(clients)

    if run_gluten:
        jars_base=f"{home}/jars/"+lastgit
        
        for target_jar in gluten_target_jar.split(","):
            !ls -l {target_jar}
            !mkdir -p {jars_base}
            !rm -rf {jars_base}/*
            !cp {target_jar} {jars_base}/
            if target_jar[-4:] != '.jar':
                !cp -f {target_jar} {jars_base}/gluten-{lastgit}.jar

        jars=!ls -d {jars_base}/*.jar
        extra_jars=":".join(["file://"+j for j in jars])
        print(f'extra_jars: {extra_jars}')

        for c in clients:
            if c!=localhost:
                !ssh {c} "rm -rf {jars_base}"
                !ssh {c} "mkdir -p {jars_base}"
                !scp {jars_base}/*.jar {c}:{jars_base} >/dev/null 2>&1

        app_name_suffix = '_'.join(['gluten', app_name_suffix, lastgit[:6]])
        create_cntx_func=create_cntx_gluten
        if is_tpch_workload:
            task_per_core = gluten_tpch_task_per_core
            workload_conf_overwrite = gluten_tpch_conf_overwrite
        elif is_tpcds_workload:
            task_per_core = gluten_tpcds_task_per_core
            workload_conf_overwrite = gluten_tpcds_conf_overwrite
    else:
        app_name_suffix = '_'.join(['spark', app_name_suffix, lastgit[:6]])
        create_cntx_func=create_cntx_spark
        if is_tpch_workload:
            task_per_core = spark_tpch_task_per_core
            workload_conf_overwrite = spark_tpch_conf_overwrite
        elif is_tpcds_workload:
            task_per_core = spark_tpcds_task_per_core
            workload_conf_overwrite = spark_tpcds_conf_overwrite
    
    if app_name:
        app_name = app_name + ' ' + app_name_suffix
    else:
        app_name = app_name_suffix

    conf_overwrite = lambda conf: app_conf_overwrite(workload_conf_overwrite(conf))
    
    sc, spark = create_cntx_func(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name, master, conf_overwrite)
    
    # Pin executors to numa nodes for Gluten
    if run_gluten:
        pinexecutor_numa(clients)

    appid = sc.applicationId
    print("start run: ", appid)
    
    if is_tpch_workload:
        test_tpc = TestTPCH(spark, tabledir, run_gluten, server, base_dir, nb_name)
    elif is_tpcds_workload:
        test_tpc = TestTPCDS(spark, tabledir, run_gluten, server, base_dir, nb_name)
    
    return sc, spark, appid, test_tpc