# initialize

In [None]:
from __future__ import nested_scopes
from IPython.display import display, HTML
display(HTML('<style>.container { width:100% !important; }</style>'))
display(HTML('<style>.CodeMirror{font-family: "Courier New";font-size: 12pt;}</style>'))

In [None]:
import logging
logger = logging.getLogger()
logger.setLevel(logging.ERROR)

import warnings
warnings.filterwarnings('ignore')

In [None]:
import os
import datetime
from datetime import date
import time
import threading
import gzip
import json
import math
import re
import html
import builtins

import collections
import numpy
import pandas
pandas.options.display.max_rows=50
pandas.options.display.max_columns=200
pandas.options.display.float_format = '{:,}'.format

import matplotlib
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib.lines as mlines
from matplotlib import colors
from matplotlib import rcParams
rcParams['font.sans-serif'] =  'Courier New'
rcParams['font.family'] = 'Courier New'
rcParams['font.size'] = '12'
%matplotlib inline

from ipywidgets import IntProgress,Layout

import pyspark
import pyspark.sql
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, floor, lit, rank, col, lag, when, pandas_udf, PandasUDFType, avg, sum as _sum
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.storagelevel import StorageLevel

import seaborn as sns
from functools import reduce
from pandasql import sqldf
from itertools import chain

In [None]:
import pyhdfs
import socket
localhost=socket.gethostname()
local_ip=socket.gethostbyname(localhost)

fs = pyhdfs.HdfsClient(hosts=f'{local_ip}:9870', user_name='sparkuser')

#  fs functions

In [None]:
def getexecutor_stat(pdir):
    appfolder=fs.list_status(pdir)
    total_rchar=0
    total_wchar=0
    total_read_bytes=0
    total_write_bytes=0
    total_cancelled_write_bytes=0

    for t in appfolder:
        if t['type']=='DIRECTORY' and t['pathSuffix']!="summary.parquet":
            cdir=pdir+t['pathSuffix']
            for cntfile in fs.listdir(cdir):
                if cntfile.endswith(".stat"):
                    with fs.open(cdir+"/"+cntfile) as f:
                        cnt=f.readlines()
                        rchar=0
                        wchar=0
                        read_bytes=0
                        write_bytes=0
                        cancelled_write_bytes=0
                        for c in cnt:
                            c=c.decode('ascii')
                            if c.startswith("rchar"):
                                v=int(c.split(" ")[-1])
                                rchar=v-rchar
                            elif c.startswith("wchar"):
                                v=int(c.split(" ")[-1])
                                wchar=v-wchar
                            elif c.startswith("read_bytes"):
                                v=int(c.split(" ")[-1])
                                read_bytes=v-read_bytes
                            elif c.startswith("write_bytes"):
                                v=int(c.split(" ")[-1])
                                write_bytes=v-write_bytes
                            elif c.startswith("cancelled_write_bytes"):
                                v=int(c.split(" ")[-1])
                                cancelled_write_bytes=v-cancelled_write_bytes
                        total_rchar+=rchar/1024/1024
                        total_wchar+=wchar/1024/1024
                        total_read_bytes+=read_bytes/1024/1024
                        total_write_bytes+=write_bytes/1024/1024
                        total_cancelled_write_bytes+=cancelled_write_bytes/1024/1024
    return (total_rchar,total_wchar,total_read_bytes,total_write_bytes,total_cancelled_write_bytes)

In [None]:
def background_gradient(s, m, M, cmap='PuBu', low=0, high=0):
    from matplotlib import colors
    rng = M - m
    norm = colors.Normalize(m - (rng * low),
                            M + (rng * high))
    normed = norm(s.values)
    c = [colors.rgb2hex(x) for x in plt.cm.get_cmap(cmap)(normed)]
    return ['background-color: {:s}'.format(color) for color in c]

# base class

In [None]:
class SparkLog_Analysis:
    def __init__(self, appid,jobids,clients):
        pass

In [None]:
class Analysis:
    def __init__(self,file):
        self.file=file
        self.starttime=0
        self.df=None
    
    def load_data(self):
        pass
    
    def generate_trace_view_list(self,id=0, **kwargs):
        if self.df==None:
            self.load_data()
        trace_events=[]
        node=kwargs.get('node',"node")
        trace_events.append(json.dumps({"name": "process_name","ph": "M","pid":id,"tid":0,"args":{"name":" "+node}}))
        return trace_events
   
    def generate_trace_view(self, trace_output, **kwargs):
        traces=[]
        traces.extend(self.generate_trace_view_list(0,**kwargs))
        
        output='''
        {
            "traceEvents": [
        
        ''' + \
        ",\n".join(traces)\
       + '''
            ],
            "displayTimeUnit": "ns"
        }'''

        if("home" in trace_output):
            outputfolder=trace_output
            appidx=trace_output.split("/")[-1]
        else:
            outputfolder='/home/sparkuser/trace_result/'+trace_output+'.json'
            appidx=trace_output
        with open(outputfolder, 'w') as outfile: 
            outfile.write(output)
        
        traceview_link=f'http://{local_ip}:1088/tracing_examples/trace_viewer.html#/tracing/test_data/{appidx}.json'
        display(HTML(f"<a href={traceview_link}>{traceview_link}</a>"))
        return traceview_link

# EMON process

In [None]:
def get_alias_name(metric,func):
    return metric+"_"+func.__name__

In [None]:
def splits_fill0(x):
    fi=[]
    for l in x:
        li=re.split(r'\s+',l.strip())
        li=[l.replace(",","") for l in li]
        for j in range(len(li),192*4+5):
            li.append('0')
        fi.append(li)
    return iter(fi)

In [None]:
def background_gradient(s, m, M, cmap='PuBu', low=0, high=0):
    from matplotlib import colors
    rng = M - m
    norm = colors.Normalize(m - (rng * low),
                            M + (rng * high))
    normed = norm(s.values)
    c = [colors.rgb2hex(x) for x in plt.cm.get_cmap(cmap)(normed)]
    return ['background-color: {:s}'.format(color) for color in c]

In [None]:
class Emon_Analysis(Analysis):
    def __init__(self,emon_file):
        Analysis.__init__(self,emon_file)
        
        paths=os.path.split(self.file)
        if fs.exists(paths[0]+"/emonv.txt"):
            self.totalcores=0
            self.numberofpackages=0
            self.coresperpackage=0
            self.threadsperpackage=0
            self.tsc=0
            self.unc_cha_cnt=0
            self.unc_mdf_cnt=0
            self.unc_imc_cnt=0
            self.unc_cxlcm_cnt=0
            self.unc_cxldp_cnt=0
            self.unc_mchbm_cnt=0
            self.unc_m2hbm_cnt=0
            self.unc_pmem_fc_cnt=0
            self.unc_pmem_mc_cnt=0
            self.unc_m2m_cnt=0
            self.unc_qpi_cnt=0
            self.unc_r3qpi_cnt=0
            self.unc_iio_cnt=0
            self.unc_irp_cnt=0
            self.unc_pcu_cnt=0
            self.unc_ubox_cnt=0
            self.unc_m2pcie_cnt=0
            self.unc_rdt_cnt=0
            with fs.open(paths[0]+"/emonv.txt") as f:
                allcnt = f.read().decode('ascii')
            for l in allcnt.split("\n"):
                if l.startswith("number_of_online_processors"):
                    self.totalcores=int(re.split(" +",l)[2])
                elif re.search("Number of Packages: +(\d+)",l):
                    self.numberofpackages=int(re.search("Number of Packages: +(\d+)",l).group(1))
                elif re.search("Cores Per Package: +(\d+)",l):
                    self.coresperpackage=int(re.search("Cores Per Package: +(\d+)",l).group(1))
                elif re.search("Threads Per Package: +(\d+)",l):
                    self.threadsperpackage=int(re.search("Threads Per Package: +(\d+)",l).group(1))
                elif re.search("TSC Freq +[.]+ +([0-9.]+)",l):
                    self.tsc=int(float(re.search("TSC Freq +[.]+ +([0-9.]+)",l).group(1))*1000000)
                elif l.startswith("    cha"):
                    self.unc_cha_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    mdf"):
                    self.unc_mdf_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    imc"):
                    self.unc_imc_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    cxlcm"):
                    self.unc_cxlcm_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    cxldp"):
                    self.unc_cxldp_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    mchbm"):
                    self.unc_mchbm_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    m2hbm"):
                    self.unc_m2hbm_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    pmem_fc"):
                    self.unc_pmem_fc_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    pmem_mc"):
                    self.unc_pmem_mc_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    m2m"):
                    self.unc_m2m_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    qpi"):
                    self.unc_qpi_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    r3qpi"):
                    self.unc_r3qpi_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    iio"):
                    self.unc_iio_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    irp"):
                    self.unc_irp_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    pcu"):
                    self.unc_pcu_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    ubox"):
                    self.unc_ubox_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    m2pcie"):
                    self.unc_m2pcie_cnt=int(re.split(" +",l)[-1])*2
                elif l.startswith("    rdt"):
                    self.unc_rdt_cnt=int(re.split(" +",l)[-1])*2
        else:
            raise Exception("Wrong, no emonv specified")
            
        self.begin_clk=0
        self.end_clk=0
            
        self.corecnt=self.totalcores
        
        self.emon_metrics=collections.OrderedDict({
            'emon_cpuutil':{
                'sum_func':self.cores_sum,   
                'events':{
                    'a':'CPU_CLK_UNHALTED.REF_TSC'
                },
                'formula':{
                    'cpu%':'a/({:f}*{:d})'.format(self.tsc,self.corecnt)
                },
                'fmt':lambda l: F.round(l, 3)
            },
            'emon_cpufreq':{
                'sum_func':self.cores_sum,   
                'events':{
                    'a':'CPU_CLK_UNHALTED.THREAD',
                    'b':'CPU_CLK_UNHALTED.REF_TSC'
                },
                'formula':{
                    'cpu freq':'a/b*{:f}'.format(self.tsc/1000000)
                },
                'fmt':lambda l: F.round(l, 3)
            },
            'emon_instr_retired':{
                'sum_func':self.cores_sum,   
                'events':{
                    'a':'INST_RETIRED.ANY'
                },
                'formula':{
                    'pathlength':'a/1000000000'
                },
                'fmt':lambda l: F.round(l, 0)
            },
            'emon_ipc':{
                'sum_func':self.cores_sum,   
                'events':{
                    'a':'CPU_CLK_UNHALTED.THREAD',
                    'b':'INST_RETIRED.ANY'
                },
                'formula':{
                    'ipc':'b/a'
                },
                'fmt':lambda l: F.round(l, 3)
            }
        })
        self.effective_metric=None
        self.appclients=[] # there is no appid and client column

    def count_sum(self,collected_cores):
        return F.expr('+'.join(['_{:d}/_2*{:d}'.format(c+3,self.tsc) for c in collected_cores]))

    def cores_sum(self,collected_cores):
        return self.count_sum(collected_cores)

    def mem_sum(self,collected_cores):
        return self.count_sum(collected_cores)

    def pcie_sum(self,collected_cores):
        return self.count_sum([2,3,7,8])
        
    def list_metric(self):
        if self.effective_metric is None:
            self.get_effective_metric()
        for k in self.effective_metric:
            m=self.emon_metrics[k]
            print(k)
            for fk,fm in m['formula'].items():
                print("    ",fk)
            
    def load_data(self):
        paths=os.path.split(self.file)
        if fs.exists(paths[0]+"/emon.parquet/_SUCCESS"):
            self.df=spark.read.parquet(paths[0]+"/emon.parquet")
            self.df.cache()
            return
        
        emondata=sc.textFile(self.file)
        emondf=emondata.mapPartitions(splits_fill0).toDF()
        emondf=emondf.withColumn("id", F.monotonically_increasing_id())
        giddf=emondf.where(emondf._1.rlike("======")).selectExpr("id as g_id")
        
        iddf=emondf.where(emondf._1.rlike("\d\d/")).selectExpr("_1 as r_1","_2 as r_2","id as r_id")
        jfid=emondf.where(emondf._2.rlike("^[1-9][0-9][0-9]+")).join(iddf,on=[emondf.id>iddf.r_id]).groupBy('id').agg(F.max('r_id').alias('r_id'))
        iddf=iddf.join(jfid,on='r_id',how='left')
        emondf=emondf.where(emondf._2.rlike("^[1-9][0-9][0-9]+")).join(iddf,on='id',how='left')
        
        jfid=emondf.join(giddf,on=[emondf.id>giddf.g_id]).groupBy('id').agg(F.max('g_id').alias('g_id'))
        giddf=giddf.join(jfid,on='g_id',how='left')
        emondf=emondf.join(giddf,on='id',how='inner')
        
        df=emondf

        select_list = []
        for idx, c in enumerate(df.columns):
            if idx >= 2 and c.startswith('_'):
                select_list.append(col(c).cast(LongType()).alias(c))
            else:
                select_list.append(col(c))
        df=df.select(select_list)

        df=df.withColumn("timestamp",F.unix_timestamp(F.concat_ws(' ','r_1','r_2'),'MM/dd/yyyy HH:mm:ss')*F.lit(1000)+(F.split(F.col('r_2'),'\.')[1]).astype(IntegerType()))
        df=df.drop("r_1")
        df=df.drop("r_2")
        
        cores=list(range(0,self.totalcores))
        df=df.withColumn('sum',
                         F.when(F.col("_1").startswith("UNC_IIO"),self.pcie_sum(cores))
                         .otherwise(self.cores_sum(cores)))
        if self.begin_clk>0 and self.end_clk>0:
            df=df.withColumn('valid',((F.col("timestamp")>F.lit(self.begin_clk)) & (F.col("timestamp")<F.lit(self.end_clk))))
        else:
            df=df.withColumn('valid',F.lit(True))
        
        df.repartition(3).write.mode("overwrite").parquet(paths[0]+"/emon.parquet")
        self.df=df
        df.cache()
        
    def get_effective_metric(self):
        if self.df==None:
            self.load_data()

        emondf=self.df
        gid=emondf.agg(F.min('g_id')).collect()[0]['min(g_id)']
        emondf=emondf.where(F.col("g_id")==gid)
        emondf=emondf.cache()

        effective_metric=[]

        progress = IntProgress(layout=Layout(width='80%', height='40px'))
        progress.max = len(self.emon_metrics)
        progress.description = 'Calculate Effective Metrics'
        display(progress)
        progress.value=0

        for k,m in self.emon_metrics.items():
            join_df=None
            progress.value=progress.value+1
            for alias,event in m['events'].items():
                if join_df is None:
                    join_df=emondf.where("_1='{:s}'".format(event)).select('r_id','g_id')
                else:
                    tdf=emondf.where("_1='{:s}'".format(event)).select('r_id','g_id')
                    join_dft=join_df.join(tdf.drop('g_id'),on='r_id',how='inner')
                    if join_dft.count()==0:
                        join_df=join_df.join(tdf.drop('r_id'),on='g_id',how='inner')
                    else:
                        join_df=join_dft
            if join_df.count()>0:
                effective_metric.append(k)
        progress.value=progress.value+1
        self.effective_metric=effective_metric
        emondf.unpersist()
    
    def gen_metric(self,emondf, m):
        join_df=None
        for alias,event in m['events'].items():
            if join_df is None:
                join_df=emondf.where("_1='{:s}'".format(event)).select('timestamp','_1','_2','r_id','g_id',*self.appclients,F.col('sum').alias(alias))
            else:
                tdf=emondf.where("_1='{:s}'".format(event)).select('_1','_2','r_id','g_id',*self.appclients,F.col('sum').alias(alias))
                join_dft=join_df.join(tdf.drop('g_id'),on=['r_id',*self.appclients],how='inner')
                if join_dft.count()==0:
                    join_df=join_df.join(tdf.drop('r_id'),on=['g_id',*self.appclients],how='inner')
                else:
                    join_df=join_dft
        return join_df

    
    
    def generate_trace_view_list(self,id=0, **kwargs):
        trace_events=Analysis.generate_trace_view_list(self,id, **kwargs)
        
        cores=list(range(0,self.totalcores))
        
        emondf=self.df
        if 'collected_cores' in kwargs:
            cores=kwargs.get("collected_cores",None)
            emondf=emondf.withColumn('sum',
                     F.when(F.col("_1").startswith("UNC_IIO"),self.pcie_sum(cores))
                     .otherwise(self.cores_sum(cores)))
        show_metric= kwargs.get('show_metric', None)
            
        if show_metric is None and self.effective_metric is None:
            self.get_effective_metric()

        self.effective_metric=show_metric if show_metric is not None else self.effective_metric
        
        emondf=self.df
        
        tid=0
        for k in self.effective_metric:
            m=self.emon_metrics[k]
            join_df=self.gen_metric(emondf,m)
            rstdf=join_df.select(
                            F.lit(tid).alias('tid'),
                            F.lit(id).alias('pid'),
                            F.lit('C').alias('ph'),
                            F.lit(k).alias('name'),
                            (F.col('timestamp')-F.lit(self.starttime)).alias("ts"),
                            F.struct(*[m['fmt'](F.expr(formula)).alias(col_name) for col_name,formula in m['formula'].items() ]).alias('args')
            ).where(F.col("ts").isNotNull()).orderBy('ts')
            trace_events.extend(rstdf.toJSON().collect())
            trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":tid,"args":{"sort_index ":tid}}))
            tid=tid+1        

        return trace_events
    
    def show_emon_metric(self,metric,sub_metric,core,draw=True,metric_define=None, **kwargs):
        if self.df==None:
            self.load_data()
        emondf=self.df
        
        showalltime=kwargs.get("showalltime",True)
        
        if not showalltime:
            emondf=emondf.filter(F.col("valid")==F.lit(True))
        
        if metric is None or metric=='':
            for k in self.effective_metric:
                m=self.emon_metrics[k]
                if sub_metric in m['formula']:
                    break
            else:
                print("can't find metric",sub_metric)
                return        
        else:
            k=metric
        if metric_define is None:
            m= self.emon_metrics[k]
        else:
            m= metric_define[k]

        if type(core)==int:
            core=[core,]
        emondf=emondf.withColumn('sum',
                 F.when(F.col("_1").startswith("UNC_IIO"),self.pcie_sum(core))
                 .otherwise(self.count_sum(core)))
            
        join_df=self.gen_metric(emondf,m)
        
        rstdf=join_df.select(
                    F.col('timestamp').alias('ts'),
                    m['fmt'](F.expr(m['formula'][sub_metric])).alias(sub_metric),
                    'r_id'
        ).where(F.col("timestamp").isNotNull()).orderBy('timestamp')
        
        metric_sum=rstdf.select(sub_metric).summary().toPandas()
        display(metric_sum)
        
        if draw:
            pddf=rstdf.toPandas()
            pddf['ts']=(pddf['ts']-pddf.loc[0,'ts'])/1000
            fig, axs = plt.subplots(nrows=1, ncols=2, sharey=True,figsize=(30,8),gridspec_kw = {'width_ratios':[1, 5]})
            plt.subplots_adjust(wspace=0.01)
            sns.violinplot(y=sub_metric, data=pddf, ax=axs[0],palette=['g'])
            axs[0].yaxis.grid(True, which='major')
            ax=axs[1]
            ax.stackplot(pddf['ts'], pddf[sub_metric],colors=['bisque'])
            #ymin, ymax = ax.get_ylim()
            ax2 = ax.twinx()
            ax2.set_ylim(ax.get_ylim())
            ax2.axhline(y=float(metric_sum.loc[4,sub_metric]), linewidth=2, color='r')
            ax2.axhline(y=float(metric_sum.loc[5,sub_metric]), linewidth=2, color='r')
            ax2.axhline(y=float(metric_sum.loc[6,sub_metric]), linewidth=2, color='r')
            ax2.axhline(y=float(metric_sum.loc[7,sub_metric]), linewidth=2, color='r')
            ax.set_xlabel('time (s)')
            ax.yaxis.grid(True, which='major')
            plt.show()
            
            hist_elapsedtime=rstdf.select('`{:s}`'.format(sub_metric)).rdd.flatMap(lambda x: x).histogram(15)
            fig, axs = plt.subplots(figsize=(30, 5))
            ax=axs
            binSides, binCounts = hist_elapsedtime
            binSides=[builtins.round(l,2) for l in binSides]

            N = len(binCounts)
            ind = numpy.arange(N)
            width = 0.5

            rects1 = ax.bar(ind+0.5, binCounts, width, color='b')

            ax.set_ylabel('Frequencies')
            ax.set_title(sub_metric)
            ax.set_xticks(numpy.arange(N+1))
            ax.set_xticklabels(binSides)
        return rstdf
        

    def gen_reduce_metric(self,metric,core,sub_metric,agg_func):
        if self.df==None:
            self.load_data()
        emondf=self.df
        
        emondf=emondf.where(F.col("valid")==F.lit(True))
        
        k=metric
        m= self.emon_metrics[k]

        if type(core)==int:
            core=[core,]
        
        if len(core)<self.totalcores:
            emondf=emondf.withColumn('sum',
                     F.when(F.col("_1").startswith("UNC_IIO"),self.pcie_sum(core))
                     .otherwise(self.count_sum(core)))

        join_df=self.gen_metric(emondf,m)
        
        rstdf=join_df.select(
                *self.appclients,
                m['fmt'](F.expr(m['formula'][sub_metric])).alias(sub_metric)
        ).where(F.col("timestamp").isNotNull())
        return rstdf
    
    def get_reduce_metric(self,metric,core,sub_metric,agg_func):
        rstdf=self.gen_reduce_metric(metric,core,sub_metric,agg_func)
        return rstdf.agg(*[l("`{:s}`".format(sub_metric)).alias(get_alias_name(sub_metric,l)) for l in agg_func]).toPandas()
   
    def get_reduce_metrics(self,core=None,agg_func=[F.max,F.mean,F.min,F.sum]):
        coldf=None
        if self.effective_metric is None:
            self.get_effective_metric()

        if core is None:
            core=list(range(0,self.totalcores))
        progress = IntProgress(layout=Layout(width='80%', height='40px'))
        progress.max = len(self.effective_metric)
        progress.description = 'Calculate Effective Metrics'
        display(progress)
        progress.value=0
        
        columns=[f.__name__ for f in agg_func]
            
        for k in self.effective_metric:
            progress.value=progress.value+1
            m=self.emon_metrics[k]
            for fk,fm in m['formula'].items():
                df=self.get_reduce_metric(k,core,fk,agg_func)
                df.columns=columns
                df.index=[fk]
                if coldf is None:
                    coldf=df
                else:
                    coldf=coldf.append(df)
        progress.value=progress.value+1
        return coldf
    

class Emon_Analysis_All(Emon_Analysis):
    def __init__(self,emon_files):
        Emon_Analysis.__init__(self
                               ,emon_files[0])
        self.emon_files=emon_files
        self.appclients=['appid','client']
        
    def load_data(self):
        spark.clearCache()
        emondf=spark.read.format("parquet").load(self.emon_files)
        emondf=emondf.withColumn("file",F.input_file_name())
        filepath=emondf.select(F.col("file")).limit(1).collect()[0]['file']
        length=len(filepath.split("/"))
        emondf=emondf.withColumn("appid",F.split("file","/")[length-4])
        emondf=emondf.withColumn("client",F.split("file","/")[length-3]).drop("file")
        emondf=emondf.cache()
        self.df=emondf
        
    def get_reduce_metric(self,metric,core=None,sub_metric=None,agg_func=[F.max,F.mean,F.min,F.sum]):
        
        if core is None:
            core=list(range(0,self.totalcores))
        if sub_metric is None:
            m=self.emon_metrics[metric]
            sub_metric = list(m['formula'].keys())[0]
        
        rstdf=self.gen_reduce_metric(metric,core,sub_metric,agg_func)
        return rstdf.groupBy("appid").agg(*[l("`{:s}`".format(sub_metric)).alias(get_alias_name(sub_metric,l)) for l in agg_func]).toPandas()
    
    def get_reduce_metrics(self,core=None,agg_func=[F.max,F.mean,F.min,F.sum]):
        return None
    
    def generate_trace_view_list(self, id , **kwargs):
        Analysis.generate_trace_view_list(self,0)
        
        cores=list(range(0,self.totalcores))
        
        pidmap=kwargs.get("pidmap",None)
        if pidmap is None:
            print("multiple emon process needs pidmap in {'client':pid,} format")
            return []
        else:
            display(pidmap)
            
        emondf=self.df
        if 'collected_cores' in kwargs:
            cores=kwargs.get("collected_cores",None)
            emondf=emondf.withColumn('sum',
                     F.when(F.col("_1").startswith("UNC_IIO"),self.pcie_sum(cores))
                     .otherwise(self.cores_sum(cores)))
        show_metric= kwargs.get('show_metric', None)
            
        if show_metric is None and self.effective_metric is None:
            self.get_effective_metric()

        self.effective_metric=show_metric if show_metric is not None else self.effective_metric
        
        mapexpr=F.create_map([F.lit(x) for x in chain(*pidmap.items())])
        
        trace_events=[]
        for c,id in pidmap.items():
            trace_events.append(json.dumps({"name": "process_name","ph": "M","pid":id,"tid":0,"args":{"name":" "+c}}))
        tid=0
        for k in self.effective_metric:
            m=self.emon_metrics[k]
            join_df=self.gen_metric(emondf,m)
            join_df=join_df.withColumn('pid',mapexpr.getItem(F.col("client")))
            rstdf=join_df.select(
                            F.lit(tid).alias('tid'),
                            F.col('pid').alias('pid'),
                            F.lit('C').alias('ph'),
                            F.lit(k).alias('name'),
                            (F.col('timestamp')-F.lit(self.starttime)).alias("ts"),
                            F.struct(*[m['fmt'](F.expr(formula)).alias(col_name) for col_name,formula in m['formula'].items() ]).alias('args')
            ).where(F.col("ts").isNotNull()).orderBy('ts')
            trace_events.extend(rstdf.toJSON().collect())
            for id in pidmap.values():
                trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":tid,"args":{"sort_index ":tid}}))
            tid=tid+1
        return trace_events    
    
    
def get_emon_parquets(apps,basedir):
    emondfunion=None
    emondfs=[]
    for appid in apps:
        slaves=fs.list_status("/"+basedir+"/"+appid)
        slaves=[f['pathSuffix'] for f in slaves if f['type']=='DIRECTORY' and f['pathSuffix']!="summary.parquet"]
        for client in slaves:
            if not fs.exists(f"/{basedir}/{appid}/{client}/emon.parquet"):
                print(f"/{basedir}/{appid}/{client}/emon.parquet is not found, trying to load data ...")
                emonals=Emon_Analysis(f"/{basedir}/{appid}/{client}/emon.rst")
                emonals.load_data()
            emondfs.append(f"/{basedir}/{appid}/{client}/emon.parquet")
    return emondfs

# app log analysis

In [None]:
def get_his_perf(namelike,currentdir):
    dird=fs.listdir("/gluten")
    apps=[]
    for l in dird:
        if l.startswith("2") and l>(date.today() - timedelta(days=60)).strftime("%Y_%m_%d"):
            for r in fs.listdir("/gluten/"+l):
                if fs.exists("/gluten/"+l+"/"+r+"/app.log"):
                    apps.append("/gluten/"+l+"/"+r+"/app.log")
    if currentdir not in apps:
        apps.append(currentdir)
    appdf=spark.read.json(apps)
    appdf=appdf.withColumn("filename", F.input_file_name())
    starttime=appdf.where("Properties.`spark.app.name` like '"+namelike+"%' and Event='SparkListenerJobStart'").select("filename",F.col('Properties.`spark.app.name`').alias("appname"),F.col('Submission Time').alias("starttime"))
    finishtime=appdf.where("Event='SparkListenerJobEnd'").select("filename",F.col('Completion Time').alias("finishtime"))
    starttime=starttime.groupBy("filename").agg(F.max("appname").alias("appname"),F.min("starttime").alias("starttime"))
    finishtime=finishtime.groupBy("filename").agg(F.max("finishtime").alias("finishtime"))
    elapsedtime=starttime.join(finishtime,"filename").orderBy("starttime").select(F.date_format(F.from_unixtime(F.col('starttime')/1000),"yyyy_MM_dd").alias("test_date"),(F.col("finishtime")/1000-F.col("starttime")/1000).alias("elapsedtime"))
    epsdf=elapsedtime.toPandas()
    epsdf.plot(x='test_date',y=['elapsedtime'],style="-*",figsize=(30,8))

In [None]:
from pyspark.sql.functions import udf
@udf("long")
def isfinish_udf(s):
    import json
    s=json.loads(s)
    def isfinish(root):
        if "isFinalPlan=false" in root['simpleString'] or root['children'] is None:
            return 0
        for c in root["children"]:
            if isfinish(c)==0:
                return 0
        return 1
    if len(s)>0:
        return isfinish(s[0])
    else:
        return 0
    
@pandas_udf("taskid long, start long, dur long, name string", PandasUDFType.GROUPED_MAP)
def time_breakdown(pdf):
    ltime=pdf['Launch Time'][0]+2
    pdf['start']=0
    pdf['dur']=0
    outpdf=[]
    ratio=(pdf["Finish Time"][0]-pdf["Launch Time"][0])/pdf["Update"].sum()
    ratio=1 if ratio>1 else ratio
    for idx,l in pdf.iterrows():
        if(l["Update"]*ratio>1):
            outpdf.append([l["Task ID"],ltime,int(l["Update"]*ratio),l["mname"]])
            ltime=ltime+int(l["Update"]*ratio)
    if len(outpdf)>0:
        return pandas.DataFrame(outpdf)
    else:
        return pandas.DataFrame({'taskid': pandas.Series([], dtype='long'),
                   'start': pandas.Series([], dtype='long'),
                   'dur': pandas.Series([], dtype='long'),
                   'name': pandas.Series([], dtype='str'),
                                })
    
class App_Log_Analysis(Analysis):
    def __init__(self, file, jobids):
        Analysis.__init__(self,file)
        self.jobids=[] if jobids is None else [str(l) for l in jobids]
        self.df=None
        self.pids=[]
        
    def load_data(self):
        print("load data ", self.file)
        jobids=self.jobids
        df=spark.read.json(self.file)
        
        if 'App ID' in df.columns:
            self.appid=df.where("`App ID` is not null").collect()[0]["App ID"]
        else:
            self.appid="Application-00000000"
                
        if df.where("Event='org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates'").count()>0:
            self.dfacc=df.where("Event='org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates'").select(F.col("executionId").alias("queryid"),F.explode("accumUpdates"))
        else:
            self.dfacc = None
            
        if "sparkPlanInfo" in df.columns:
            self.queryplans=df.where("(Event='org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart' or Event='org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate') \
                                  and (sparkPlanInfo.nodeName!='AdaptiveSparkPlan' or sparkPlanInfo.simpleString='AdaptiveSparkPlan isFinalPlan=true') ").select(F.col("executionId").alias("queryid"),'physicalPlanDescription',"sparkPlanInfo.*")
        else:
            self.queryplans=None
        
        seen = set()
        
        if self.queryplans is not None:
            self.queryplans=self.queryplans.where(isfinish_udf(F.to_json("children"))==1)
        
            self.allmetrics=[]
            if self.queryplans.count() > 0:
                metrics=self.queryplans.collect()
                def get_metric(root):
                    for l in root["metrics"]:
                        if l['accumulatorId'] not in seen:
                            seen.add(l['accumulatorId'])
                            self.allmetrics.append([l['accumulatorId'],l["metricType"],l['name'],root["nodeName"]])
                    if root['children'] is not None:
                        for c in root["children"]:
                            get_metric(c)
                for c in metrics:
                    get_metric(c)
        
            amsdf=spark.createDataFrame(self.allmetrics)
            amsdf=amsdf.withColumnRenamed("_1","ID").withColumnRenamed("_2","type").withColumnRenamed("_3","Name").withColumnRenamed("_4","nodeName")
        
        
        if self.dfacc is not None:
            self.dfacc=self.dfacc.select("queryid",(F.col("col")[0]).alias("ID"),(F.col("col")[1]).alias("Update")).join(amsdf,on=["ID"])
        
        if self.queryplans is not None:
            self.metricscollect=[l for l in self.allmetrics if l[1] in ['nsTiming','timing'] and (l[2].startswith("time to") or l[2].startswith("time of") or l[2].startswith("scan time") or l[2].startswith("shuffle write time") or l[2].startswith("time to spill") or l[2].startswith("task commit time")) 
                                 and l[2] not in("time to collect batch", "time of scan") ]
        
        #config=df.where("event='SparkListenerJobStart' and Properties.`spark.executor.cores` is not null").select("Properties.*").limit(1).collect()
        config=df.select("`Spark Properties`.*").where("`spark.app.id` is not null").limit(1).collect()
    
        configdic=config[0].asDict()
        self.parallelism=int(configdic['spark.sql.shuffle.partitions']) if 'spark.sql.shuffle.partitions' in configdic else 1
        self.executor_cores=int(configdic['spark.executor.cores']) if 'spark.executor.cores' in configdic else 1
        self.executor_instances=int(configdic['spark.executor.instances']) if 'spark.executor.instances' in configdic else 1
        self.taskcpus= int(configdic['spark.task.cpus'])if 'spark.task.cpus' in configdic else 1
        self.batchsize= int(configdic['spark.gluten.sql.columnar.maxBatchSize'])if 'spark.gluten.sql.columnar.maxBatchSize' in configdic else 4096
        
        self.realexecutors = df.where(~F.isnull(F.col("Executor ID"))).select("Executor ID").distinct().count()
        
        execstart = df.where("Event='org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart'").select("executionId","time")
        execend = df.where("Event='org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd'").select("executionId","time")
        execstart=execstart.withColumnRenamed("time","query_starttime").withColumnRenamed("executionId","queryid")
        execend=execend.withColumnRenamed("time","query_endtime").withColumnRenamed("executionId","queryid")
        exectime = execstart.join(execend,on=["queryid"])

        if "spark.sql.execution.id" in df.where("Event='SparkListenerJobStart'").select("Properties.*").columns:
            df_jobstart=df.where("Event='SparkListenerJobStart'").select("Job ID","Submission Time",F.col("Properties.`spark.sql.execution.id`").alias("queryid"),"Stage IDs")
        else:
            df_jobstart=df.where("Event='SparkListenerJobStart'").select("Job ID","Submission Time",F.lit(0).alias("queryid"),"Stage IDs")
        
        df_jobend=df.where("Event='SparkListenerJobEnd'").select("`Job ID`","Completion Time")
        df_job=df_jobstart.join(df_jobend,"Job ID")
        df_job=df_job.withColumnRenamed("Submission Time","job_start_time")
        df_job=df_job.withColumnRenamed("Completion Time","job_stop_time")
        self.df_job=df_job
        
        jobstage=df_job.select("*",F.explode("Stage IDs").alias("Stage ID"))
        task=df.where("(Event='SparkListenerTaskEnd' or Event='SparkListenerTaskStart') ").select("Event","Stage ID","task info.*","task metrics.*")
        
        self.failed_stages = [str(l['Stage ID']) for l in task.where("Failed='true'").select("Stage ID").distinct().collect()]
        
        self.speculativetask = task.where("speculative = 'true'").count()
        self.speculativekilledtask = task.where("speculative = true and killed='true'").count()
        self.speculativestage = task.where("speculative = true and killed='true'").select("`Stage ID`").distinct().count()
        
        validtsk = task.where("Event = 'SparkListenerTaskEnd' and (Failed<>'true' or killed<>'true')").select("`Task ID`")
        task=task.join(validtsk,on='Task ID',how='inner')
        
        taskjob=task.\
            select("Host","`Event`","`Launch Time`","`Executor ID`","`Task ID`","`Finish Time`",
                    "`Stage ID`","`Input Metrics`.`Bytes Read`","`Disk Bytes Spilled`","`Memory Bytes Spilled`","`Shuffle Read Metrics`.`Local Bytes Read`","`Shuffle Read Metrics`.`Remote Bytes Read`",
                   "`Shuffle Write Metrics`.`Shuffle Bytes Written`","`Executor Deserialize Time`","`Shuffle Read Metrics`.`Fetch Wait Time`","`Executor Run Time`","`Shuffle Write Metrics`.`Shuffle Write Time`",
                   "`Result Serialization Time`","`Getting Result Time`","`JVM GC Time`","`Executor CPU Time`","Accumulables","Peak Execution Memory",
                    F.when(task['Finish Time']==0,task['Launch Time']).otherwise(task['Finish Time']).alias('eventtime')
        ).join(jobstage,"Stage ID").where("`Finish Time` is null or `Finish Time` <=job_stop_time+5")
        
        taskjob = taskjob.join(exectime,on=['queryid'],how='left')
        
        self.df=taskjob
        
        if len(jobids)>0:
            self.df=self.df.where('`Job ID` in ({:s})'.format(','.join(jobids)))
        
        queryids=self.df.select(F.col("queryid").astype(IntegerType())).distinct().where("queryid is not null").orderBy("queryid").toPandas()
        
        self.query_num=len(queryids)
        if self.query_num>0:
            queryidx=queryids.reset_index()
            queryidx['index']=queryidx['index']+1
            #tpcds query
            if self.query_num==103:
                queryidx['index']=queryidx['index'].map(tpcds_query_map)
            qidx=spark.createDataFrame(queryidx)
            qidx=qidx.withColumnRenamed("index","real_queryid")
            self.df=self.df.join(qidx,on="queryid",how="left")
            if self.dfacc is not None:
                self.dfacc=self.dfacc.join(qidx,on="queryid",how='left')

            if self.queryplans:
                self.queryplans=self.queryplans.join(qidx,"queryid",how="right")
        
        self.df=self.df.fillna(0)
        self.df=self.df.withColumn('Executor ID',F.when(F.col("Executor ID")=="driver",1).otherwise(F.col("Executor ID")))
        self.df.cache()
        
        
        
        ##############################
        
        dfx=self.df.where("Event='SparkListenerTaskEnd'").select("Stage ID","Launch Time","Finish Time","Task ID")
        dfxpds=dfx.toPandas()
        dfxpds.columns=[l.replace(" ","_") for l in dfxpds.columns]
        dfxpds_ods=sqldf('''select * from dfxpds order by finish_time desc''')
        criticaltasks=[]
        idx=0
        prefinish=0
        launchtime=dfxpds_ods["Launch_Time"][0]
        criticaltasks.append([dfxpds_ods["Task_ID"][0],launchtime,dfxpds_ods["Finish_Time"][0]])
        total_row=len(dfxpds_ods)

        while True:
            while idx<total_row:
                if dfxpds_ods["Finish_Time"][idx]-2<launchtime:
                    break
                idx=idx+1
            else:
                break
            cur_finish=dfxpds_ods["Finish_Time"][idx]
            cur_finish=launchtime-1 if cur_finish>=launchtime else cur_finish
            launchtime=dfxpds_ods["Launch_Time"][idx]
            criticaltasks.append([dfxpds_ods["Task_ID"][idx],launchtime,cur_finish])
        self.criticaltasks=criticaltasks

    def get_physical_plan(appals,**kwargs):
        if appals.df is None:
            appals.load_data()
        queryid=kwargs.get('queryid',None)
        shownops=kwargs.get("shownops",['ArrowRowToColumnarExec','ColumnarToRow','RowToArrowColumnar',
                                        'VeloxNativeColumnarToRowExec','ArrowColumnarToRow','Filter','HashAggregate','Project','SortAggregate','SortMergeJoin','window'])
        
        desensitization=kwargs.get('desensitization',True)
        
        def get_fields(colss):
            lvls=0
            colns=[]
            ks=""
            for c in colss:
                if c=="," and lvls==0:
                    colns.append(ks)
                    ks=""
                    continue
                if c==" " and ks=="":
                    continue
                if c=="(":
                    lvls+=1
                if c==")":
                    lvls-=1
                ks+=c
            if ks!="":
                colns.append(ks)
            return colns
        
        def get_column_names(s, opname, resultname, prefix, columns, funcs):
            p=re.search(r" "+opname+" ",s[0])
            if p:
                for v in s[1].split("\n"):
                    if v.startswith(resultname):
                        cols=re.search("\[([^0-9].+)\]",v)
                        if cols:
                            colss=cols.group(1)
                            colns=get_fields(colss)
                            if opname+str(len(columns)) not in funcs:
                                funcs[opname+str(len(columns))]=[]
                            funcs[opname+str(len(columns))].extend(colns)
                            for c in colns:
                                if " AS " in c:
                                    c=re.sub("#\d+L*","",c)
                                    colname=re.search(r" AS (.+)",c).group(1)
                                    if colname not in columns:
                                        columns[colname]=prefix
        
        plans=appals.queryplans.select('real_queryid','physicalPlanDescription').collect() if queryid is None else appals.queryplans.where(f"real_queryid='{queryid}'").select("physicalPlanDescription").collect()
        
        for pr in range(0,len(plans)):
            plan=plans[pr]['physicalPlanDescription']
            nodes={}
            lines=plan.split("\n")
            for idx in range(0,len(lines)):
                l=lines[idx]
                if l=='+- == Final Plan ==':
                    while l!='+- == Initial Plan ==':
                        idx+=1
                        l=lines[idx]
                        if not l.endswith(")"):
                            break
                        idv=re.search("\(\d+\)$",l).group(0)
                        nodes[idv]=[l]
                if l=="== Physical Plan ==":
                    while not lines[idx+1].startswith("("):
                        idx+=1
                        l=lines[idx]
                        if not l.endswith(")"):
                            break
                        idv=re.search("\(\d+\)$",l).group(0)
                        nodes[idv]=[l]
                        
                if l.startswith("("):
                    idv=re.search("^\(\d+\)",l).group(0)
                    if idv in nodes:
                        desc=""
                        while l.strip()!="":
                            desc+=l+"\n"
                            idx+=1
                            l=lines[idx]
                        desc=re.sub(r"#\d+L*",r"",desc)
                        desc=re.sub(r"= [^)]+",r"=",desc)
                        desc=re.sub(r"IN \([^)]\)",r"IN ()",desc)
                        desc=re.sub(r"In\([^)]\)",r"In()",desc)
                        desc=re.sub(r"EqualTo\(([^,]+),[^)]+\)",r"EqualTo(\1,)",desc)
                        desc=re.sub(r"搜索广告",r"xxx",desc)
                        ## add all keyword replace here
                        nodes[idv].append(desc)
            tables={}
            columns={}
            functions={}
            for s in nodes.values():
                p=re.search(r"Scan arrow [^.]*\.([^ ]+)",s[0])
                if p:
                    tn=p.group(1)
                    if not tn in tables:
                        tables[tn]="table"
                    if desensitization:
                        s[0]=s[0].replace(tn,tables[tn])
                        s[1]=s[1].replace(tn,tables[tn])
                    colsv=[]
                    schema=[]
                    for v in s[1].split("\n"):
                        if v.startswith("ReadSchema"):
                            cols=re.search("<(.*)>",v)
                            if cols:
                                colss=cols.group(1).split(",")
                                for c in colss:
                                    cts=c.split(":")
                                    ct=cts[0]
                                    if not ct in columns:
                                        if len(cts)==2:
                                            cts[1]=cts[1]
                                            columns[ct]=cts[1]+"_"
                                        else:
                                            columns[ct]="c_"
                        if v.startswith("Location") and desensitization:
                            s[1]=s[1].replace(v+"\n","")
                            
                get_column_names(s, "Project", "Output", "proj_", columns, functions)
                get_column_names(s, "HashAggregate", "Results", "shagg_", columns, functions)
                get_column_names(s, "SortAggregate", "Results", "stagg_", columns, functions)
                get_column_names(s, "ColumnarConditionProject", "Arguments", "cproj_", columns, functions)
                get_column_names(s, "ColumnarHashAggregate", "Results", "cshagg_", columns, functions)
                get_column_names(s, "Window", "Arguments", "window_", columns, functions)

            keys=[]
            ckeys=list(columns.keys())
            for l in range(0,len(ckeys)):
                k1=ckeys[l]
                for k in range(0,len(keys)):
                    if keys[k] in k1:
                        keys.insert(k,k1)
                        break
                else:
                    keys.append(k1)
                
            for s in nodes.values():
                s[1]=html.escape(s[1])
                if desensitization:
                    for c in keys:
                        v=columns[c]
                        if v.startswith("array") or v.startswith("map") or v.startswith("struct"):
                            s[1]=re.sub(c, '<span style="color:red;background-color:yellow">'+html.escape(v)+"</span>",s[1])
                        else:
                            s[1]=re.sub(c, "<font color=#33cc33>"+html.escape(v)+"</font>",s[1])


            htmls=['''<table style="table-layout:fixed;max-width: 100%;">''']
            qid=pr+1 if queryid is None else queryid
            htmls.append(f"<tr><td colspan=2>{qid}</td></tr>")
            for l in nodes.values():
                if shownops is not None:
                    for k in shownops:
                        if " "+k+" " in l[0]:
                            break
                    else:
                        continue
                htmls.append("<tr>")
                htmls.append('<td width=33%><div align="left" style="font-family:Courier New;overflow-wrap: anywhere">')
                htmls.append(l[0].replace(" ","_")
                             .replace("ColumnarToRow","<font color=blue>ColumnarToRow</font>")
                             .replace("RowToArrowColumnar","<font color=blue>RowToArrowColumnar</font>")
                             .replace("ArrowColumnarToRow","<font color=blue>ArrowColumnarToRow</font>")
                             .replace("ArrowRowToColumnar","<font color=blue>ArrowRowToColumnar</font>")
                             .replace("VeloxNativeColumnarToRowExec","<font color=blue>VeloxNativeColumnarToRowExec</font>")
                            )
                htmls.append("</div></td>")
                htmls.append('<td width=66%><div align="left" style="font-family:Courier New;overflow-wrap: anywhere">')
                ls=l[1].split("\n")
                lsx=[]
                for t in ls:
                    cols=re.search("\[([^0-9].+)\]",t)
                    if cols:
                        colss=cols.group(1)
                        colns=get_fields(colss)
                        t=re.sub("\[([^0-9].+)\]","",t)
                        t+="["+'<span style="background-color:#ededed;">;</span>'.join(colns)+"]"                        
                    if ":" in t:
                        lsx.append(re.sub(r'^([^:]+:)',r'<font color=blue>\1</font>',t))
                    else:
                        lsx.append(t)
                htmls.append("<br>".join(lsx))
                htmls.append("</div></td>")
                htmls.append("</tr>")
            htmls.append("</table>")
            display(HTML("\n".join(htmls)))
            
            for k, v in functions.items():
                functions[k]=[l for l in v if "(" in l]
            for f in functions.values():
                for idx in range(0,len(f)):
                    for c in keys:
                        v=columns[c]
                        if v.startswith("array") or v.startswith("map") or v.startswith("struct"):
                            f[idx]=re.sub(c, '<span style="color:red;background-color:yellow">'+html.escape(v)+"</span>",f[idx])
                        else:
                            f[idx]=re.sub(c, "<font color=#33cc33>"+html.escape(v)+"</font>",f[idx])
            funchtml="<table>"
            for k,v in functions.items():
                if shownops is not None:
                    for ks in shownops:
                        if " "+ks+" " in k:
                            break
                    else:
                        continue
                funchtml+="<tr><td width=10%>"+k+'</td><td width=90%><table stype="width:100%;table-layout:fixed">'
                for f in v:
                    funchtml+='<tr><td width=100% ><div align="left" style="font-family:Courier New">'+f+"</div></td></tr>"
                funchtml+="</table></td></tr>"
            funchtml+="</table>"    
            display(HTML(funchtml))
        
        return plans
        
    def get_physical_allnodes(appals,**kwargs):
        if appals.df is None:
            appals.load_data()
        queryid=None
        
        plans=appals.queryplans.select('real_queryid','physicalPlanDescription').collect() if queryid is None else appals.queryplans.where(f"real_queryid='{queryid}'").select("physicalPlanDescription").collect()
        
        allnodes={}
        for pr in range(0,len(plans)):
            plan=plans[pr]['physicalPlanDescription']
            allnodes[pr]={}
            nodes=allnodes[pr]
            if plan is None:
                continue
            lines=plan.split("\n")
            for idx in range(0,len(lines)):
                l=lines[idx]
                if l=='+- == Final Plan ==':
                    while l!='+- == Initial Plan ==':
                        idx+=1
                        l=lines[idx]
                        if not l.endswith(")"):
                            break
                        idv=re.search("\(\d+\)$",l).group(0)
                        nodes[idv]=[l]
                if l.startswith("("):
                    idv=re.search("^\(\d+\)",l).group(0)
                    if idv in nodes:
                        desc=""
                        while l!="":
                            desc+=l+"\n"
                            idx+=1
                            l=lines[idx]
                        nodes[idv].append(desc)
        return allnodes
        
        
    def get_basic_state(appals):
        if appals.df is None:
            appals.load_data()
        display(HTML(f"<a href=http://{localhost}:18080/history/{appals.appid}>http://{localhost}:18080/history/{appals.appid}</a>"))
        
        errorcolor="#000000" if appals.executor_instances == appals.realexecutors else "#c0392b"
        
        qtime=appals.get_query_time(plot=False)
        sums=qtime.sum()
        
        total_rchar,total_wchar,total_read_bytes,total_write_bytes,total_cancelled_write_bytes = getexecutor_stat(appals.file[:-len("app.log")])
        
        if len(appals.failed_stages)>0:
            failure="<br>".join(["query: " + str(l["real_queryid"])+"|stage: " + str(l["Stage ID"]) for l in appals.df.where("`Stage ID` in ("+",".join(appals.failed_stages)+")").select("real_queryid","Stage ID").distinct().collect()])
        else:
            failure=""
            
        stats={"appid":appals.appid,
            "executor.instances":appals.executor_instances,
            "executor.cores":appals.executor_cores,
            "shuffle.partitions":appals.parallelism,
            "batch size":appals.batchsize,
            "real executors":appals.realexecutors,
            "Failed Tasks":failure,
            "Speculative Tasks":appals.speculativetask,
            "Speculative Killed Tasks":appals.speculativekilledtask,
            "Speculative Stage":appals.speculativestage,
            "runtime":round(sums['runtime'],2),
            "disk spilled":round(sums['disk spilled'],2),
            "memspilled":round(sums['memspilled'],2),
            "local_read":round(sums['local_read'],2),
            "remote_read":round(sums['remote_read'],2),
            "shuffle_write":round(sums['shuffle_write'],2),
            "task run time":round(sums['run_time'],2),
            "ser_time":round(sums['ser_time'],2),
            "f_wait_time":round(sums['f_wait_time'],2),
            "gc_time":round(sums['gc_time'],2),
            "input read":round(sums['input read'],2),
            "acc_task_time":round(sums['acc_task_time'],2),
            "file read size":round(total_rchar,2),
            "file write size":round(total_wchar,2),
            "disk read size":round(total_read_bytes,2),
            "disk write size":round(total_write_bytes,2),
            "disk cancel size":round(total_cancelled_write_bytes,2)
            }
        
        display(HTML(f'''
        <table border="1" cellpadding="1" cellspacing="1" style="width:500px">
            <tbody>
                <tr>
                    <td style="width:135px">appid</td>
                    <td style="width:351px"><span style="color:#000000"><strong>{appals.appid}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">executor.instances</td>
                    <td style="width:351px"><span style="color:#000000"><strong>{appals.executor_instances}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">executor.cores</td>
                    <td style="width:351px"><span style="color:#000000"><strong>{appals.executor_cores}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">shuffle.partitions</td>
                    <td style="width:351px"><span style="color:#000000"><strong>{(appals.parallelism)}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">batch size</td>
                    <td style="width:351px"><span style="color:#000000"><strong>{(appals.batchsize):,}</strong></span></td>
                </tr>                
                <tr>
                    <td style="width:135px">real executors</td>
                    <td style="width:351px"><span style="color:{errorcolor}"><strong>{(appals.realexecutors)}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">Failed Tasks</td>
                    <td style="width:351px"><span style="color:{errorcolor}"><strong>{(failure)}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">Speculative Tasks</td>
                    <td style="width:351px"><span style="color:#87b00c"><strong>{(appals.speculativetask)}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">Speculative Killed Tasks</td>
                    <td style="width:351px"><span style="color:#87b00c"><strong>{(appals.speculativekilledtask)}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">Speculative Stage</td>
                    <td style="width:351px"><span style="color:#87b00c"><strong>{(appals.speculativestage)}</strong></span></td>
                </tr>
                <tr>
                    <td style="width:135px">runtime</td>
                    <td style="width:351px"><strong>{round(sums['runtime'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">disk spilled</td>
                    <td style="width:351px"><strong>{round(sums['disk spilled'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">memspilled</td>
                    <td style="width:351px"><strong>{round(sums['memspilled'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">local_read</td>
                    <td style="width:351px"><strong>{round(sums['local_read'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">remote_read</td>
                    <td style="width:351px"><strong>{round(sums['remote_read'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">shuffle_write</td>
                    <td style="width:351px"><strong>{round(sums['shuffle_write'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">task run time</td>
                    <td style="width:351px"><strong>{round(sums['run_time'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">ser_time</td>
                    <td style="width:351px"><strong>{round(sums['ser_time'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">f_wait_time</td>
                    <td style="width:351px"><strong>{round(sums['f_wait_time'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">gc_time</td>
                    <td style="width:351px"><strong>{round(sums['gc_time'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">input read</td>
                    <td style="width:351px"><strong>{round(sums['input read'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">acc_task_time</td>
                    <td style="width:351px"><strong>{round(sums['acc_task_time'],2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">file read size</td>
                    <td style="width:351px"><strong>{round(total_rchar,2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">file write size</td>
                    <td style="width:351px"><strong>{round(total_wchar,2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">disk read size</td>
                    <td style="width:351px"><strong>{round(total_read_bytes,2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">disk write size</td>
                    <td style="width:351px"><strong>{round(total_write_bytes,2):,}</strong></td>
                </tr>
                <tr>
                    <td style="width:135px">disk cancel size</td>
                    <td style="width:351px"><strong>{round(total_cancelled_write_bytes,2):,}</strong></td>
                </tr>
            </tbody>
        </table>

        '''))
        return stats
   
        
    def generate_trace_view_list_exec(self,id=0,**kwargs):
        Analysis.generate_trace_view_list(self,**kwargs)
        showcpu=kwargs.get('showcpu',False)
        shownodes=kwargs.get("shownodes",None)
        
        showdf=self.df.where(F.col("Host").isin(shownodes)) if shownodes else self.df
        
        events=showdf.toPandas()
        coretrack={}
        trace_events=[]
        starttime=self.starttime
        taskend=[]
        trace={"traceEvents":[]}
        exec_hosts={}
        hostsdf=showdf.select("Host").distinct().orderBy("Host")
        hostid=100000
        ended_event=[]
        
        for i,l in hostsdf.toPandas().iterrows():
            exec_hosts[l['Host']]=hostid
            hostid=hostid+100000

        for idx,l in events.iterrows():
            if l['Event']=='SparkListenerTaskStart':
                hostid=exec_hosts[l['Host']]

                tsk=l['Task ID']
                pid=int(l['Executor ID'])*100+hostid
                self.pids.append(pid)
                stime=l['Launch Time']
                #the task's starttime and finishtime is the same, ignore it.
                if tsk in ended_event:
                    continue
                if not pid in coretrack:
                    tids={}
                    trace_events.append({
                       "name": "process_name",
                       "ph": "M",
                       "pid":pid,
                       "tid":0,
                       "args":{"name":"{:s}.{:s}".format(l['Host'],l['Executor ID'])}
                      })

                else:
                    tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==-1:
                        tids[t]=[tsk,stime]
                        break
                else:
                    t=len(tids)
                    tids[t]=[tsk,stime]
                #print("task {:d} tid is {:s}.{:d}".format(tsk,pid,t))
                coretrack[pid]=tids

            if l['Event']=='SparkListenerTaskEnd':
                sevt={}
                eevt={}
                hostid=exec_hosts[l['Host']]
                pid=int(l['Executor ID'])*100+hostid
                tsk=l['Task ID']
                fintime=l['Finish Time']

                tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==tsk:
                        tids[t]=[-1,-1]
                        break
                else:
                    ended_event.append(tsk)
                    continue
                for ps in reversed([key for key in tids.keys()]) :
                    if tids[ps][1]-fintime<0 and tids[ps][1]-fintime>=-2:
                        fintime=tids[ps][1]
                        tids[t]=tids[ps]
                        tids[ps]=[-1,-1]
                        break
                if starttime==0:
                    starttime=l['Launch Time']
                    print(f'applog start time: {starttime}')

                sstime=l['Launch Time']-starttime

                trace_events.append({
                       'tid':pid+int(t),
                       'ts':sstime,
                       'dur':fintime-l['Launch Time'],
                       'pid':pid,
                       "ph":'X',
                       'name':"stg{:d}".format(l['Stage ID']),
                       'args':{"job id": l['job id'],
                               "stage id": l['Stage ID'],
                               "tskid":tsk,
                               "input":builtins.round(l["Bytes Read"]/1024/1024,2),
                               "spill":builtins.round(l["Memory Bytes Spilled"]/1024/1024,2),
                               "Shuffle Read Metrics": "",
                               "|---Local Read": builtins.round(l["Local Bytes Read"]/1024/1024,2),
                               "|---Remote Read":builtins.round(l["Remote Bytes Read"]/1024/1024,2),
                               "Shuffle Write Metrics": "",
                               "|---Write":builtins.round(l['Shuffle Bytes Written']/1024/1024,2)
                               }
                      })

                des_time=l['Executor Deserialize Time']
                read_time=l['Fetch Wait Time']
                exec_time=l['Executor Run Time']
                write_time=math.floor(l['Shuffle Write Time']/1000000)
                ser_time=l['Result Serialization Time']
                getrst_time=l['Getting Result Time']
                durtime=fintime-sstime-starttime;

                times=[0,des_time,read_time,exec_time,write_time,ser_time,getrst_time]
                time_names=['sched delay','deserialize time','read time','executor time','write time','serialize time','result time']
                evttime=reduce((lambda x, y: x + y),times)
                if evttime>durtime:
                    times=[math.floor(l*1.0*durtime/evttime) for l in times]
                else:
                    times[0]=durtime-evttime

                esstime=sstime
                for idx in range(0,len(times)):
                    if times[idx]>0:
                        trace_events.append({
                             'tid':pid+int(t),
                             'ts':esstime,
                             'dur':times[idx],                
                             'pid':pid,
                             'ph':'X',
                             'name':time_names[idx]})
                        if idx==3:
                            trace_events.append({
                                 'tid':pid+int(t),
                                 'ts':esstime,
                                 'dur':l['JVM GC Time'],
                                 'pid':pid,
                                 'ph':'X',
                                 'name':'GC Time'})
                            if showcpu:
                                trace_events.append({
                                     'tid':pid+int(t),
                                     'ts':esstime,
                                     'pid':pid,
                                     'ph':'C',
                                     'name':'cpu% {:d}'.format(pid+int(t)),
                                     'args':{'value':l['Executor CPU Time']/1000000.0/times[idx]}})
                                trace_events.append({
                                     'tid':pid+int(t),
                                     'ts':esstime+times[idx],
                                     'pid':pid,
                                     'ph':'C',
                                     'name':'cpu% {:d}'.format(pid+int(t)),
                                     'args':{'value':0}})
                        esstime=esstime+times[idx]
        self.starttime=starttime
        return [json.dumps(l) for l in trace_events]

    def generate_trace_view_list(self,id=0,**kwargs):
        Analysis.generate_trace_view_list(self,**kwargs)
        showcpu=kwargs.get('showcpu',False)
        shownodes=kwargs.get("shownodes",None)
        
        showdf=self.df.where(F.col("Host").isin(shownodes)) if shownodes else self.df
        
        showdf=showdf.orderBy(["eventtime", "Finish Time"], ascending=[1, 0])
        
        events=showdf.drop("Accumulables").toPandas()
        coretrack={}
        trace_events=[]
        starttime=self.starttime
        taskend=[]
        trace={"traceEvents":[]}
        exec_hosts={}
        hostsdf=showdf.select("Host").distinct().orderBy("Host")
        hostid=100000
        ended_event=[]
        
        for i,l in hostsdf.toPandas().iterrows():
            exec_hosts[l['Host']]=hostid
            hostid=hostid+100000

        tskmap={}
        for idx,l in events.iterrows():
            if l['Event']=='SparkListenerTaskStart':
                hostid=exec_hosts[l['Host']]

                tsk=l['Task ID']
                pid=int(l['Executor ID'])*100+hostid
                self.pids.append(pid)
                stime=l['Launch Time']
                #the task's starttime and finishtime is the same, ignore it.
                if tsk in ended_event:
                    continue
                if not pid in coretrack:
                    tids={}
                    trace_events.append({
                       "name": "process_name",
                       "ph": "M",
                       "pid":pid,
                       "tid":0,
                       "args":{"name":"{:s}.{:s}".format(l['Host'],l['Executor ID'])}
                      })

                else:
                    tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==-1:
                        tids[t]=[tsk,stime]
                        break
                else:
                    t=len(tids)
                    tids[t]=[tsk,stime]
                #print(f"task {tsk} tid is {pid}.{t}")
                coretrack[pid]=tids

            if l['Event']=='SparkListenerTaskEnd':
                sevt={}
                eevt={}
                hostid=exec_hosts[l['Host']]
                pid=int(l['Executor ID'])*100+hostid
                tsk=l['Task ID']
                fintime=l['Finish Time']
                
                tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==tsk:
                        tids[t]=[-1,-1]
                        break
                else:
                    ended_event.append(tsk)
                    continue
                for ps in reversed([key for key in tids.keys()]) :
                    if tids[ps][1]-fintime<0 and tids[ps][1]-fintime>=-2:
                        fintime=tids[ps][1]
                        tids[t]=tids[ps]
                        tids[ps]=[-1,-1]
                        break
                if starttime==0:
                    starttime=l['Launch Time']
                    print(f'applog start time: {starttime}')

                sstime=l['Launch Time']-starttime

                trace_events.append({
                       'tid':pid+int(t),
                       'ts':sstime,
                       'dur':fintime-l['Launch Time'],
                       'pid':pid,
                       "ph":'X',
                       'name':"stg{:d}".format(l['Stage ID']),
                       'args':{"job id": l['Job ID'],
                               "stage id": l['Stage ID'],
                               "tskid":tsk,
                               "input":builtins.round(l["Bytes Read"]/1024/1024,2),
                               "spill":builtins.round(l["Memory Bytes Spilled"]/1024/1024,2),
                               "Shuffle Read Metrics": "",
                               "|---Local Read": builtins.round(l["Local Bytes Read"]/1024/1024,2),
                               "|---Remote Read":builtins.round(l["Remote Bytes Read"]/1024/1024,2),
                               "Shuffle Write Metrics": "",
                               "|---Write":builtins.round(l['Shuffle Bytes Written']/1024/1024,2)
                               }
                      })
                tskmap[tsk]={'pid':pid,'tid':pid+int(t)}

        self.starttime=starttime
        self.tskmap=tskmap
        output=[json.dumps(l) for l in trace_events]
        
        df=self.df
        
        if showcpu and len(self.metricscollect)>0:
            metricscollect=self.metricscollect
            metrics_explode=df.where("Event='SparkListenerTaskEnd'").withColumn("metrics",F.explode("Accumulables"))
            m1092=metrics_explode.select(F.col("Executor ID"),F.col("`Stage ID`"),"`Task ID`",F.col("`Finish Time`"),F.col("`Launch Time`"),(F.col("`Finish Time`")-F.col("`Launch Time`")).alias("elapsedtime"),"metrics.*").where(F.col("ID").isin([l[0] for l in metricscollect]))
            metric_name_df = spark.createDataFrame(metricscollect)
            metric_name_df=metric_name_df.withColumnRenamed("_1","ID")
            metric_name_df=metric_name_df.withColumnRenamed("_2","unit")
            metric_name_df=metric_name_df.withColumnRenamed("_3","mname")

            met_df=m1092.join(metric_name_df,on="ID")
            met_df=met_df.withColumn("Update",F.when(F.col("unit")=='nsTiming',F.col("Update")/1000000).otherwise(F.col("Update")+0))
            met_df=met_df.where("Update>1")

            metdfx=met_df.groupBy("Task ID","elapsedtime").agg(F.sum("Update").alias("totalCnt"))
            taskratio=metdfx.withColumn("ratio",F.when(F.col("totalCnt")<F.col("elapsedtime"),1).otherwise(F.col("elapsedtime")/F.col("totalCnt"))).select("Task ID","ratio")
            met_df=met_df.join(taskratio,on="Task ID")
            met_df=met_df.withColumn("Update",F.col("Update")*F.col("ratio"))

            w = (Window.partitionBy('Task ID').orderBy(F.desc("Update")).rangeBetween(Window.unboundedPreceding, 0))
            met_df=met_df.withColumn('cum_sum', F.sum('Update').over(w))

            met_df=met_df.withColumn("starttime",F.col("Launch Time")+F.col("cum_sum")-F.col("Update"))

            tskmapdf = spark.createDataFrame(pandas.DataFrame(self.tskmap).T.reset_index())
            met_df=met_df.join(tskmapdf,on=[met_df["Task ID"]==tskmapdf["index"]])

            rstdf=met_df.select(
                F.col("tid"),
                F.round(F.col("starttime")-self.starttime,0).alias("ts"),
                F.round(F.col("Update"),0).alias("dur"),
                F.col("pid"),
                F.lit("X").alias("ph"),
                F.col("mname").alias("name")
            ).where(F.col("ts").isNotNull()).orderBy('ts')

            output.extend(rstdf.toJSON().collect())

            qtime=df.where("Event='SparkListenerTaskEnd'").groupBy("real_queryid").agg(F.min("Finish Time").alias("time"))
            output.extend(qtime.select(
                F.lit("i").alias("ph"),
                (F.col("time")-starttime).alias('ts'),
                F.lit(0).alias("pid"),
                F.lit(0).alias("tid"),
                F.lit("p").alias("s")
            ).toJSON().collect())
        
        self.starttime=starttime
        
        if kwargs.get("show_criticalshow_time_metric_path",True):
            output.extend(self.generate_critical_patch_traceview(hostid-1))
        
        return output        

    def generate_critical_patch_traceview(self,pid):
        if self.df is None:
            self.load_data()
        traces=[]
        df=self.df.where("Event='SparkListenerTaskEnd' and real_queryid is not null")
        criticaltasks=self.criticaltasks
        cripds=pandas.DataFrame(criticaltasks)
        cripds.columns=['task_id',"launch","finish"]
        cridf=spark.createDataFrame(cripds)
        df_ctsk=df.join(cridf,on=[F.col("task_id")==F.col("Task ID")],how="inner")
        traces.extend(df_ctsk.select(F.lit(38).alias("tid"),
                      (F.col("launch")-F.lit(self.starttime)+1).alias("ts"),
                      (F.col("finish")-F.col("launch")-1).alias("dur"),
                      F.lit(pid).alias("pid"),
                      F.lit("X").alias("ph"),
                      F.concat(F.lit("stg"),F.col("Stage ID")).alias("name"),
                      F.struct(
                          F.col("Task ID").alias('taskid'),
                          F.col("Executor ID").astype(IntegerType()).alias('exec_id'),
                          F.col("Host").alias("host"),
                          ).alias("args")
                        ).toJSON().collect())
        traces.extend(df.groupBy("real_queryid").agg(F.max("Finish Time").alias("finish"),F.min("Launch Time").alias("launch")).select(
                        F.lit(38).alias("tid"),
                      (F.col("launch")-F.lit(self.starttime)).alias("ts"),
                      (F.col("finish")-F.col("launch")).alias("dur"),
                      F.lit(pid).alias("pid"),
                      F.lit("X").alias("ph"),
                      F.concat(F.lit("qry"),F.col("real_queryid")).alias("name")).toJSON().collect())


        metricscollect=self.metricscollect

        metrics_explode=df_ctsk.where("Event='SparkListenerTaskEnd'").withColumn("metrics",F.explode("Accumulables"))
        m1092=metrics_explode.select(F.col("Executor ID"),F.col("`Stage ID`"),"`Task ID`",F.col("`Finish Time`"),F.col("`Launch Time`"),(F.col("`Finish Time`")-F.col("`Launch Time`")).alias("elapsedtime"),"metrics.*").where(F.col("ID").isin([l[0] for l in metricscollect]))
        metric_name_df = spark.createDataFrame(metricscollect)
        metric_name_df=metric_name_df.withColumnRenamed("_1","ID")
        metric_name_df=metric_name_df.withColumnRenamed("_2","unit")
        metric_name_df=metric_name_df.withColumnRenamed("_3","mname")
        metric_name_df=metric_name_df.withColumnRenamed("_4","node")

        metric_name_df=metric_name_df.where("mname <> 'time to collect batch' and mname <> 'time of scan'")

        met_df=m1092.join(metric_name_df,on="ID")
        met_df=met_df.withColumn("Update",F.when(F.col("unit")=='nsTiming',F.col("Update")/1000000).otherwise(F.col("Update")+0))
        
        #pandas UDF doesn't work. hang
        #tmbk=met_df.groupBy('Task ID').apply(time_breakdown)
        
        w=Window.partitionBy('Task ID')
        met_df1=met_df.withColumn("sum_update",F.sum("Update").over(w))
        met_df2=met_df1.withColumn("ratio",(F.col("Finish Time")-F.col("Launch Time")-2)/F.col("sum_update"))
        met_df3=met_df2.withColumn("ratio",F.when(F.col("ratio")>1,1).otherwise(F.col("ratio")))
        met_df4=met_df3.withColumn("update_ratio",F.floor(F.col("ratio")*F.col("Update")))
        met_df5=met_df4.where(F.col("update_ratio")>2)
        w = (Window.partitionBy('Task ID').orderBy(F.desc("update_ratio")).rowsBetween(Window.unboundedPreceding, Window.currentRow))
        met_df6=met_df5.withColumn('ltime_dur', F.sum('update_ratio').over(w))
        met_df8=met_df6.withColumn("ltime",F.col("ltime_dur")+F.col("Launch Time")-F.col("update_ratio"))

        tmbk=met_df8.withColumn("taskid",F.col("Task ID")).withColumn("start",F.col("ltime")+F.lit(1)).withColumn("dur",F.col("update_ratio")-F.lit(1)).withColumn("name",F.col("mname"))
        
        
        traces.extend(tmbk.select(
                        F.lit(38).alias("tid"),
                      (F.col("start")-F.lit(self.starttime)).alias("ts"),
                      (F.col("dur")).alias("dur"),
                      F.lit(pid).alias("pid"),
                      F.lit("X").alias("ph"),
                      F.col("name").alias("name")).toJSON().collect())
        traces.append(json.dumps({
                       "name": "process_name",
                       "ph": "M",
                       "pid":pid,
                       "tid":0,
                       "args":{"name":"critical path"}
                      }))
        return traces    
    
    def show_Stage_histogram(apps,stageid,bincount):
        if apps.df is None:
            apps.load_data()
        
        inputsize = apps.df.where("`Stage ID`={:d}".format(stageid)).select("Stage ID","Executor ID", "Task ID", F.explode("Accumulables")) \
                      .select("Stage ID","Executor ID", "Task ID","col.*") \
                      .where("Name='input size in bytes' or Name='size of files read'") \
                      .groupBy("Task ID") \
                      .agg((F.sum("Update")).alias("input read"))


        stage37=apps.df.where("`Stage ID`={:d} and event='SparkListenerTaskEnd'".format(stageid) )\
                        .join(inputsize,on=["Task ID"],how="left")\
                        .fillna(0) \
                        .select(F.col('Host'), 
                                F.round((F.col('Finish Time')/1000-F.col('Launch Time')/1000),2).alias('elapsedtime'),
                                F.round((F.col('`input read`')+F.col('`Bytes Read`')+F.col('`Local Bytes Read`')+F.col('`Remote Bytes Read`'))/1024/1024,2).alias('input'))
        stage37=stage37.cache()
        hist_elapsedtime=stage37.select('elapsedtime').rdd.flatMap(lambda x: x).histogram(15)
        hist_input=stage37.select('input').rdd.flatMap(lambda x: x).histogram(15)
        fig, axs = plt.subplots(figsize=(30, 5),nrows=1, ncols=2)
        ax=axs[0]
        binSides, binCounts = hist_elapsedtime
        binSides=[builtins.round(l,2) for l in binSides]

        N = len(binCounts)
        ind = numpy.arange(N)
        width = 0.5

        rects1 = ax.bar(ind+0.5, binCounts, width, color='b')

        ax.set_ylabel('Frequencies')
        ax.set_title('stage{:d} elapsed time breakdown'.format(stageid))
        ax.set_xticks(numpy.arange(N+1))
        ax.set_xticklabels(binSides)

        ax=axs[1]
        binSides, binCounts = hist_input
        binSides=[builtins.round(l,2) for l in binSides]

        N = len(binCounts)
        ind = numpy.arange(N)
        width = 0.5
        rects1 = ax.bar(ind+0.5, binCounts, width, color='b')

        ax.set_ylabel('Frequencies')
        ax.set_title('stage{:d} input data breakdown'.format(stageid))
        ax.set_xticks(numpy.arange(N+1))
        ax.set_xticklabels(binSides)

        out=stage37
        outpds=out.toPandas()

        fig, axs = plt.subplots(nrows=1, ncols=3, sharey=False,figsize=(30,8),gridspec_kw = {'width_ratios':[1, 1, 1]})
        plt.subplots_adjust(wspace=0.01)

        groups= outpds.groupby('Host')
        for name, group in groups:
            axs[0].plot(group.input, group.elapsedtime, marker='o', linestyle='', ms=5, label=name)
        axs[0].set_xlabel('input size (MB)')
        axs[0].set_ylabel('elapsed time (s)')

        axs[0].legend()

        axs[0].get_shared_y_axes().join(axs[0], axs[1])

        sns.violinplot(y='elapsedtime', x='Host', data=outpds,palette=['g'],ax=axs[1])

        sns.violinplot(y='input', x='Host', data=outpds,palette=['g'],ax=axs[2])

        #ax.xaxis.set_major_formatter(mtick.FormatStrFormatter(''))
        #ax.yaxis.set_major_formatter(mtick.FormatStrFormatter(''))

        if False:
            out=stage37
            vecAssembler = VectorAssembler(inputCols=["input",'elapsedtime'], outputCol="features").setHandleInvalid("skip")
            new_df = vecAssembler.transform(out)
            kmeans = KMeans(k=2, seed=1)  # 2 clusters here
            model = kmeans.fit(new_df.select('features'))
            transformed = model.transform(new_df)


            outpds=transformed.select('Host','elapsedtime','input','prediction').toPandas()

            fig, axs = plt.subplots(nrows=1, ncols=2, sharey=False,figsize=(30,8),gridspec_kw = {'width_ratios':[1, 1]})
            plt.subplots_adjust(wspace=0.01)

            groups= outpds.groupby('prediction')
            for name, group in groups:
                axs[0].plot(group.input, group.elapsedtime, marker='o', linestyle='', ms=5, label=name)
            axs[0].legend()

            bars=transformed.where('prediction=1').groupBy("Host").count().toPandas()

            axs[1].bar(bars['Host'], bars['count'], 0.4, color='coral')
            axs[1].set_title('cluster=1')

        plt.show()
        
    def show_Stages_hist(apps,**kwargs):
        if apps.df is None:
            apps.load_data()
        
        bincount=kwargs.get("bincount",15)
        threshold=kwargs.get("threshold",0.9)
        
        query=kwargs.get("queryid",None)
        if query and type(query)==int:
            query = [query,]
        df=apps.df.where(F.col("real_queryid").isin(query)) if query else apps.df
        
        totaltime=df.where("event='SparkListenerTaskEnd'" ).agg(F.sum(F.col('Finish Time')-F.col('Launch Time')).alias('total_time')).collect()[0]['total_time']
        stage_time=df.where("event='SparkListenerTaskEnd'" ).groupBy('`Stage ID`').agg(F.sum(F.col('Finish Time')-F.col('Launch Time')).alias('total_time')).orderBy('total_time', ascending=False).toPandas()
        stage_time['acc_total'] = stage_time['total_time'].cumsum()/totaltime
        stage_time=stage_time.reset_index()
        fig, ax = plt.subplots(figsize=(30, 5))

        rects1 = ax.plot(stage_time['index'],stage_time['acc_total'],'b.-')
        ax.set_xticks(stage_time['index'])
        ax.set_xticklabels(stage_time['Stage ID'])
        ax.set_xlabel('stage')
        ax.grid(which='major', axis='x')
        plt.show()
        shownstage=[]
        for x in stage_time.index:
            if stage_time['acc_total'][x]<=threshold:
                shownstage.append(stage_time['Stage ID'][x])
            else:
                shownstage.append(stage_time['Stage ID'][x])
                break
        for row in shownstage:
            apps.show_Stage_histogram(row,bincount) 
            
    def get_hottest_stages(apps,**kwargs):
        if apps.df is None:
            apps.load_data()
        
        bincount=kwargs.get("bincount",15)
        threshold=kwargs.get("threshold",0.9)
        plot=kwargs.get("plot",True)
        
        query=kwargs.get("queryid",None)
        if query and type(query)==int:
            query = [query,]
        df=apps.df.where(F.col("real_queryid").isin(query)) if query else apps.df.where("queryid is not NULL")

        stage_time=df.where("event='SparkListenerTaskEnd'" ).groupBy('`Stage ID`','Job ID','real_queryid').agg(
            F.sum(F.col('Finish Time')-F.col('Launch Time')).alias('total_time'),
            F.stddev(F.col('Finish Time')/1000-F.col('Launch Time')/1000).alias('stdev_time'),
            F.count("*").alias("cnt"),
            F.first('queryid').astype(IntegerType()).alias('queryid')
            )\
            .select('`Stage ID`','Job ID','real_queryid','queryid',
                    (F.col("total_time")/1000/(F.when(F.col("cnt")>F.lit(apps.executor_instances*apps.executor_cores/apps.taskcpus),F.lit(apps.executor_instances*apps.executor_cores/apps.taskcpus)).otherwise(F.col("cnt")))).alias("total_time"),
                    F.col("stdev_time")
                   ).orderBy('total_time', ascending=False).toPandas()

        totaltime=stage_time['total_time'].sum()
        stage_time['acc_total'] = stage_time['total_time'].cumsum()/totaltime
        stage_time['total'] = stage_time['total_time']/totaltime
        stage_time=stage_time.reset_index()

        shownstage=stage_time.loc[stage_time['acc_total'] <=threshold]
        shownstage['stg']=shownstage['real_queryid'].astype(str)+'_'+shownstage['Job ID'].astype(str)+'_'+shownstage['Stage ID'].astype(str)
        if plot:
            shownstage.plot.bar(x="stg",y="total",figsize=(30,8))



        norm = matplotlib.colors.Normalize(vmin=0, vmax=max(stage_time.queryid))
        cmap = matplotlib.cm.get_cmap('brg')
        def setbkcolor(x):
            rgba=cmap(norm(x['queryid']))
            return ['background-color:rgba({:d},{:d},{:d},1); color:white'.format(int(rgba[0]*255),int(rgba[1]*255),int(rgba[2]*255))]*9

        if plot:
            display(stage_time.style.apply(setbkcolor,axis=1).format({"total_time":lambda x: '{:,.2f}'.format(x),"acc_total":lambda x: '{:,.2%}'.format(x),"total":lambda x: '{:,.2%}'.format(x)}))
        
        return stage_time

    def scatter_elapsetime_input(apps,stageid):
        if apps.df is None:
            apps.load_data()
        stage37=apps.df.where("`Stage ID`={:d} and event='SparkListenerTaskEnd'".format(stageid) ).select(F.round((F.col('Finish Time')/1000-F.col('Launch Time')/1000),2).alias('elapsedtime'),F.round((F.col('`Bytes Read`')+F.col('`Local Bytes Read`')+F.col('`Remote Bytes Read`'))/1024/1024,2).alias('input')).toPandas()
        stage37.plot.scatter('input','elapsedtime',figsize=(30, 5))

    def get_critical_path_stages(self):     
        df=self.df.where("Event='SparkListenerTaskEnd'")
        criticaltasks=self.criticaltasks
        cripds=pandas.DataFrame(criticaltasks)
        cripds.columns=['task_id',"launch","finish"]
        cridf=spark.createDataFrame(cripds)
        df_ctsk=df.join(cridf,on=[F.col("task_id")==F.col("Task ID")],how="inner")
        df_ctsk=df_ctsk.withColumn("elapsed",(F.col("Finish Time")-F.col("Launch Time"))/1000)
        return df_ctsk.where("elapsed>10").orderBy(F.desc("elapsed")).select("real_queryid",F.round("elapsed",2).alias("elapsed"),"Host","executor ID","Stage ID","Task ID",F.round(F.col("Bytes Read")/1000000,0).alias("file read"),F.round((F.col("Local Bytes Read")+F.col("Remote Bytes Read"))/1000000,0).alias("shuffle read")).toPandas()
        
    def show_time_metric(self,**kwargs):
        if self.df is None:
            self.load_data()
        shownodes=kwargs.get("shownodes",None)
        query=kwargs.get("queryid",None)
        plot=kwargs.get("plot",True)
        taskids=kwargs.get("taskids",None)
        
        if query and type(query)==int:
            query = [query,]
        
        showexecutor=kwargs.get("showexecutor",True) if not taskids else False
        queryid = query[0] if query else 0
        
        df=self.df.where(F.col("Host").isin(shownodes)) if shownodes else self.df
        df=df.where(F.col("real_queryid").isin(query)) if query else df.where("queryid is not NULL")

        df=df.where(F.col("Task ID").isin(taskids)) if taskids else df

        exec_cores=1 if taskids else self.executor_cores
        execs=1 if taskids else self.executor_instances

        metricscollect=self.metricscollect

        metrics_explode=df.where("Event='SparkListenerTaskEnd'").withColumn("metrics",F.explode("Accumulables"))
        m1092=metrics_explode.select(F.col("Executor ID"),F.col("`Stage ID`"),"`Task ID`",F.col("`Finish Time`"),F.col("`Launch Time`"),(F.col("`Finish Time`")-F.col("`Launch Time`")).alias("elapsedtime"),"metrics.*").where(F.col("ID").isin([l[0] for l in metricscollect]))
        metric_name_df = spark.createDataFrame(metricscollect)
        metric_name_df=metric_name_df.withColumnRenamed("_1","ID")
        metric_name_df=metric_name_df.withColumnRenamed("_2","unit")
        metric_name_df=metric_name_df.withColumnRenamed("_3","mname")
        metric_name_df=metric_name_df.withColumnRenamed("_4","node")

        runtime=metrics_explode.agg(F.round(F.max("Finish Time")/1000-F.min("Launch Time")/1000,2).alias("runtime")).collect()[0]["runtime"]

        met_df=m1092.join(metric_name_df,on="ID")
        met_df=met_df.withColumn("Update",F.when(F.col("unit")=='nsTiming',F.col("Update")/1000000).otherwise(F.col("Update")+0))
        outpdf=met_df.groupBy("`Executor ID`","mname").sum("Update").orderBy("Executor ID").toPandas()

        met_time_cnt=df.where("Event='SparkListenerTaskEnd'")
        exectime=met_time_cnt.groupBy("Executor ID").agg((F.max("Finish Time")-F.min("Launch Time")).alias("totaltime"),F.sum(F.col("`Finish Time`")-F.col("`Launch Time`")).alias("tasktime"))

        totaltime_query=met_time_cnt.groupBy("real_queryid").agg((F.max("Finish Time")-F.min("Launch Time")).alias("totaltime")).agg(F.sum("totaltime").alias("totaltime")).collect()
        totaltime_query=totaltime_query[0]["totaltime"]
        
        pdf=exectime.toPandas()
        exeids=set(outpdf['Executor ID'])
        outpdfs=[outpdf[outpdf["Executor ID"]==l] for l in exeids]
        tasktime=pdf.set_index("Executor ID").to_dict()['tasktime']

        def comb(l,r):
            execid=list(r['Executor ID'])[0]
            lp=r[['mname','sum(Update)']]
            lp.columns=["mname","val_"+execid]
            idle=totaltime_query*exec_cores-tasktime[execid]
            nocount=tasktime[execid]-sum(lp["val_"+execid])
            if idle<0:
                idle=0
            if nocount<0:
                nocount=0
            lp=lp.append([{"mname":"idle","val_"+execid:idle}])
            lp=lp.append([{"mname":"not_counted","val_"+execid:nocount}])
            if l is not None:
                return pandas.merge(lp, l,on=["mname"],how='outer')
            else:
                return lp

        rstpdf=None
        for l in outpdfs[0:]:
            rstpdf=comb(rstpdf,l)
            
        for l in [l for l in rstpdf.columns if l!="mname"]:
            rstpdf[l]=rstpdf[l]/1000/exec_cores
    
        rstpdf=rstpdf.sort_values(by="val_"+list(exeids)[0],axis=0,ascending=False)
        if showexecutor and plot:
            rstpdf.set_index("mname").T.plot.bar(stacked=True,figsize=(30,8))
        pdf_sum=pandas.DataFrame(rstpdf.set_index("mname").T.sum())
        totaltime=totaltime_query/1000
        pdf_sum[0]=pdf_sum[0]/(execs)
        pdf_sum[0]["idle"]=(totaltime_query-sum(tasktime.values())/execs/exec_cores)/1000
        pdf_sum=pdf_sum.sort_values(by=0,axis=0,ascending=False)
        pdf_sum=pdf_sum.T
        pdf_sum.columns=["{:>2.0f}%_{:s}".format(pdf_sum[l][0]/totaltime*100,l) for l in pdf_sum.columns]
        matplotlib.rcParams['font.sans-serif'] = "monospace"
        matplotlib.rcParams['font.family'] = "monospace"
        import matplotlib.font_manager as font_manager
        if plot:
            ax=pdf_sum.plot.bar(stacked=True,figsize=(30,8))
            font = font_manager.FontProperties(family='monospace',
                                               style='normal', size=14)
            ax.legend(prop=font,loc=4)
            plt.title("{:s} q{:d} executors={:d} cores_per_executor={:d} parallelism={:d} sumtime={:.0f} runtime={:.0f}".format(self.file.split("/")[2],queryid,self.executor_instances,self.executor_cores,self.parallelism,totaltime,runtime),fontdict={'fontsize':24})
        return pdf_sum

    def show_critical_path_time_breakdown(self,**kwargs):
        if self.df is None:
            self.load_data()
        return self.show_time_metric(taskids=[l[0].item() for l in self.criticaltasks])
    
    def get_spark_config(self):
        df=spark.read.json(self.file)
        self.appid=df.where("`App ID` is not null").collect()[0]["App ID"]
        pandas.set_option('display.max_rows', None)
        pandas.set_option('display.max_columns', None)
        pandas.set_option('display.max_colwidth', 100000)
        return df.select("Properties.*").where("`spark.app.id` is not null").limit(1).toPandas().T
    
    def get_app_name(self):
        cfg=self.get_spark_config()
        display(HTML("<font size=5 color=red>" + cfg.loc[cfg.index=='spark.app.name'][0][0]+"</font>"))
        
        
    def get_query_time(self,**kwargs):
        if self.df is None:
            self.load_data()
        queryid=kwargs.get("queryid",None)
        showtable=kwargs.get("showtable",True)
        plot=kwargs.get("plot",True)
        
        if queryid and type(queryid)==int:
            queryid = [queryid,]
           
        df=self.df.where(F.col("real_queryid").isin(queryid)) if queryid else self.df.where("queryid is not NULL")
        
            
        stages=df.select("real_queryid","Stage ID").distinct().orderBy("Stage ID").groupBy("real_queryid").agg(F.collect_list("Stage ID").alias("stages")).orderBy("real_queryid")
        runtimeacc=df.where("Event='SparkListenerTaskEnd'") \
                      .groupBy("real_queryid") \
                      .agg(F.round(F.sum(F.col("Finish Time")-F.col("Launch Time"))/1000/self.executor_instances/self.executor_cores*self.taskcpus,2).alias("acc_task_time"))
        inputsize = df.select("real_queryid","Stage ID","Executor ID", "Task ID", F.explode("Accumulables")) \
                      .select("real_queryid","Stage ID","Executor ID", "Task ID","col.*") \
                      .where("Name='input size in bytes' or Name='size of files read'") \
                      .groupBy("real_queryid") \
                      .agg(F.round(F.sum("Update")/1024/1024/1024,2).alias("input read")).orderBy("real_queryid")
        if self.dfacc is not None:
            inputsizev1 = self.dfacc.where("Name='size of files read'").groupBy("real_queryid").agg(F.round(F.sum("Update")/1024/1024/1024,2).alias("input read v1")).orderBy("real_queryid")
            inputsize=inputsize.join(inputsizev1,on="real_queryid",how="outer")
            inputsize=inputsize.withColumn("input read",F.coalesce(F.col("input read"),F.col("input read v1"))).drop("input read v1")
        
        outputrows = df.select("real_queryid","Stage ID","Stage ID",F.explode("Accumulables"))\
                        .select("real_queryid","Stage ID","Stage ID","col.*")\
                        .where("Name='number of output rows'")\
                        .groupBy("real_queryid")\
                        .agg(F.round(F.sum("Update")/1000000000,2).alias("output rows"))
        
        stages=runtimeacc.join(stages,on="real_queryid",how="left")
        stages=inputsize.join(stages,on="real_queryid",how="left")
        stages=stages.join(outputrows,on='real_queryid',how="left")
        
        out=df.groupBy("real_queryid").agg(
            F.round(F.max("query_endtime")/1000-F.min("query_starttime")/1000,2).alias("runtime"),
            F.round(F.sum("Disk Bytes Spilled")/1024/1024/1024,2).alias("disk spilled"),
            F.round(F.sum("Memory Bytes Spilled")/1024/1024/1024,2).alias("memspilled"),
            F.round(F.sum("Local Bytes Read")/1024/1024/1024,2).alias("local_read"),
            F.round(F.sum("Remote Bytes Read")/1024/1024/1024,2).alias("remote_read"),
            F.round(F.sum("Shuffle Bytes Written")/1024/1024/1024,2).alias("shuffle_write"),
            F.round(F.sum("Executor Deserialize Time")/1000/self.parallelism,2).alias("deser_time"),
            F.round(F.sum("Executor Run Time")/1000/self.parallelism,2).alias("run_time"),
            F.round(F.sum("Result Serialization Time")/1000/self.parallelism,2).alias("ser_time"),
            F.round(F.sum("Fetch Wait Time")/1000/self.parallelism,2).alias("f_wait_time"),
            F.round(F.sum("JVM GC Time")/1000/self.parallelism,2).alias("gc_time"),
            F.round(F.max("Peak Execution Memory")/1000000000*self.executor_instances*self.executor_cores,2).alias("peak_mem"),
            F.max("queryid").alias("queryid")
            ).join(stages,"real_queryid",how="left").orderBy("real_queryid").toPandas().set_index("real_queryid")
        out["executors"]=self.executor_instances
        out["core/exec"]=self.executor_cores
        out["task.cpus"]=self.taskcpus
        out['parallelism']=self.parallelism
        
        if not showtable:
            return out

        def highlight_greater(x):
            m1 = x['acc_task_time'] / x['runtime'] * 100
            m2 = x['run_time'] / x['runtime'] * 100
            m3 = x['f_wait_time'] / x['runtime'] * 100
            

            df1 = pandas.DataFrame('', index=x.index, columns=x.columns)

            df1['acc_task_time'] = m1.apply(lambda x: 'background-image: linear-gradient(to right,#5fba7d {:f}%,white {:f}%)'.format(x,x))
            df1['run_time'] = m2.apply(lambda x: 'background-image: linear-gradient(to right,#5fba7d {:f}%,white {:f}%)'.format(x,x))
            df1['f_wait_time'] = m3.apply(lambda x: 'background-image: linear-gradient(to right,#d65f5f {:f}%,white {:f}%)'.format(x,x))
            return df1


        cm = sns.light_palette("green", as_cmap=True)
        if plot:
            display(out.style.apply(highlight_greater, axis=None).background_gradient(cmap=cm,subset=['input read', 'shuffle_write']))
        
        return out
    
    def get_query_time_metric(self):
        if self.df is None:
            self.load_data()
        querids=self.df.select("queryid").distinct().collect()
        for idx,q in enumerate([l["queryid"] for l in querids]):
            self.show_time_metric(query=[q,],showexecutor=False)
            
    def getOperatorCount(self):
        if self.df is None:
            self.load_data()
        df=spark.read.json(self.file)
        queryids=self.df.select(F.col("queryid").astype(LongType()),F.col("real_queryid")).distinct().orderBy("real_queryid")
        queryplans=self.queryplans.collect()
        list_queryid=[l.real_queryid for l in queryids.collect()]

        def get_child(execid,node):
            #wholestagetransformer not counted
            if node['nodeName'] is not None and not node['nodeName'].startswith("WholeStageCodegenTransformer"):
                if node["nodeName"] not in qps:
                    qps[node["nodeName"]]={l:0 for l in list_queryid}
                qps[node["nodeName"]][execid]=qps[node["nodeName"]][execid]+1
            if node["children"] is not None:
                for c in node["children"]:
                    get_child(execid,c)

        qps={}
        for c in queryplans:
            get_child(c['real_queryid'],c)

        return pandas.DataFrame(qps).T.sort_index(axis=0)        
    
    def get_query_plan(self,**kwargs):
        if self.df is None:
            self.load_data()

        queryid=kwargs.get("queryid",None)
        stageid=kwargs.get("stageid",None)
        
        outputstage=kwargs.get("outputstage",None)
        
        show_plan_only=kwargs.get("show_plan_only",False)
        show_simple_string=kwargs.get("show_simple_string",False)

        plot=kwargs.get("plot",True)
        
        colors=["#{:02x}{:02x}{:02x}".format(int(l[0]*255),int(l[1]*255),int(l[2]*255)) for l in matplotlib.cm.get_cmap('tab20').colors]
        
        if queryid is not None:
            if type(queryid)==int or type(queryid)==str:
                queryid = [queryid,]
            shown_stageid = [l["Stage ID"] for l in self.df.where(F.col("real_queryid").isin(queryid)).select("Stage ID").distinct().collect()]
        if stageid is not None:
            if type(stageid)==int:
                shown_stageid = [stageid,]
            elif type(stageid)==list:
                shown_stageid = stageid
            queryid = [l["real_queryid"] for l in self.df.where(F.col("`Stage ID`").isin(shown_stageid)).select("real_queryid").limit(1).collect()]


        queryplans=[]
        queryplans = self.queryplans.where(F.col("real_queryid").isin(queryid)).orderBy("real_queryid").collect() if queryid else self.queryplans.orderBy("real_queryid").collect()
        dfmetric=self.df.where("Event='SparkListenerTaskEnd'").select("queryid","real_queryid","Stage ID","Job ID",F.explode("Accumulables").alias("metric")).select("*","metric.*").select("Stage ID","ID","Update").groupBy("ID","Stage ID").agg(F.round(F.sum("Update"),1).alias("value"),F.round(F.stddev("Update"),1).alias("stdev")).collect()
        accid2stageid={l.ID:(l["Stage ID"],l["value"],l["stdev"]) for l in dfmetric}

        stagetime=self.df.where((F.col("real_queryid").isin(queryid))).where(F.col("Event")=='SparkListenerTaskEnd').groupBy("Stage ID").agg(
            F.round(F.sum(F.col("Finish Time")-F.col("Launch Time"))/1000/self.executor_instances/self.executor_cores*self.taskcpus,1).alias("elapsed time"),
            F.round(F.stddev(F.col("Finish Time")-F.col("Launch Time"))/1000,1).alias("time stdev"),
            F.count(F.col("Task ID")).alias("partitions")
            ).orderBy(F.desc("elapsed time")).collect()

        apptotaltime=reduce(lambda x,y: x+y['elapsed time'], stagetime,0)
        if apptotaltime==0:
            display(HTML("<font size=4 color=red>Error, totaltime is 0 </font>"))
            apptotaltime=1
            return ""

        stagemap={l["Stage ID"]:l["elapsed time"] for l in stagetime}
        stage_time_stdev_map={l["Stage ID"]:l["time stdev"] for l in stagetime}
        stagepartmap={l["Stage ID"]:l["partitions"] for l in stagetime}

        keystage=[]
        keystagetime=[]
        subtotal=0
        for s in stagetime:
            subtotal=subtotal+s['elapsed time']
            keystage.append(s['Stage ID'])
            keystagetime.append(s['elapsed time'])
            if subtotal/apptotaltime>0.9:
                break
        keystagetime=["{:02x}{:02x}".format(int(255*l/keystagetime[0]),255-int(255*l/keystagetime[0])) for l in keystagetime if keystagetime[0]>0]
        keystagemap=dict(zip(keystage,keystagetime))
        outstr=[]
        def print_plan(real_queryid,level,node,parent_stageid):
            stageid = accid2stageid[int(node["metrics"][0]["accumulatorId"])][0]  if node["metrics"] is not None and len(node["metrics"])>0 and node["metrics"][0]["accumulatorId"] in accid2stageid else parent_stageid

            if stageid in shown_stageid:
                fontcolor=f"color:#{keystagemap[stageid]}00;font-weight:bold" if stageid in keystagemap else "color:#000000"
                stagetime=0 if stageid not in stagemap else stagemap[stageid]
                stageParts=0 if stageid not in stagepartmap else stagepartmap[stageid]

                input_rowcntstr=""
                output_rowcntstr=""
                timename={}
                input_columnarbatch=""
                output_columnarbatch=""
                output_row_batch=""
                other_metric_name={}

                outputrows=0
                outputbatches=0
                if node["metrics"] is not None:
                    for m in node["metrics"]:

                        if m["accumulatorId"] not in accid2stageid:
                            continue
                        
                        if m["name"].endswith("block wall nanos") or m['name'].endswith("cpu nanos"):
                            continue
                            
                        
                        value=accid2stageid[m["accumulatorId"]][1]
                        stdev_value=accid2stageid[m["accumulatorId"]][2]
                        stdev_value=0 if stdev_value is None else stdev_value
                        if m["metricType"] in ['nsTiming','timing']:
                            totaltime=value/1000 if  m["metricType"] == 'timing' else value/1000000000
                            stdev_value=stdev_value/1000 if  m["metricType"] == 'timing' else stdev_value/1000000000
                            
                            timeratio= 0  if stagetime==0 else totaltime/self.executor_instances/self.executor_cores*self.taskcpus/stagetime*100
                            timeratio_query = totaltime/self.executor_instances/self.executor_cores*self.taskcpus/apptotaltime*100
                            if timeratio > 10 or timeratio_query>10:
                                timename[m["name"]]="<font style='background-color:#ffff42'>{:.2f}s ({:.1f}%, {:.1f}%, {:.2f})</font>".format(totaltime,timeratio, totaltime/self.executor_instances/self.executor_cores*self.taskcpus/apptotaltime*100,stdev_value)
                            else:
                                timename[m["name"]]="{:.2f}s ({:.1f}%, {:.1f}%, {:.2f})".format(totaltime,timeratio, totaltime/self.executor_instances/self.executor_cores*self.taskcpus/apptotaltime*100,stdev_value)
                        elif m["name"] in ["number of output rows","number of final output rows"]:
                            output_rowcntstr="{:,.1f}".format(value/1000/1000)+" M"
                            outputrows=value
                        elif m["name"] in ["number of output columnar batches","number of output batches","output_batches", "number of output vectors","number of final output vectors", "records read"]: 
                            # records reads is the output of shuffle
                            output_columnarbatch="{:,d}".format(int(value))
                            outputbatches=value
                        elif m["name"]=="number of input rows":
                            input_rowcntstr="{:,.1f}".format(value/1000/1000)+" M"
                        elif m["name"] in ["number of input batches","input_batches","number of input vectors"]:
                            input_columnarbatch="{:,d}".format(int(value))
                        else:
                            if value>1000000000:
                                other_metric_name[m["name"]]="{:,.1f} G ({:,.1f})".format(value/1000000000,stdev_value/1000000000)
                            elif value>1000000:
                                other_metric_name[m["name"]]="{:,.1f} M ({:,.1f})".format(value/1000000,stdev_value/1000000)
                            elif value>1000:
                                other_metric_name[m["name"]]="{:,.1f} K ({:,.1f})".format(value/1000,stdev_value/1000)
                            else:
                                other_metric_name[m["name"]]="{:,d} ({:,.1f})".format(int(value),stdev_value)


                if outputrows>0 and outputbatches>0:
                    output_row_batch="{:,d}".format(int(outputrows/outputbatches))


                fontcolor=f"color:#{keystagemap[stageid]}00;font-weight:bold" if stageid in keystage else "color:#000000"
                stagetime=0 if stageid not in stagemap else stagemap[stageid]
                stage_time_stdev=0 if stageid not in stage_time_stdev_map else stage_time_stdev_map[stageid]
                
                nodenamestr=node["nodeName"]
                if nodenamestr is None:
                    nodenamestr=""
                if nodenamestr in ['ColumnarToRow','RowToArrowColumnar','ArrowColumnarToRow','ArrowRowToColumnarExec','GlutenColumnarToRowExec','GlutenRowToArrowColumnar']:
                    nodename='<span style="color: green; background-color: #ffff42">'+nodenamestr+'</span>'
                else:
                    nodename=nodenamestr
                if outputstage is not None:
                    outputstage.append({"queryid":real_queryid,"stageid":stageid,"stagetime":stagetime,"stageParts":stageParts,"nodename":nodenamestr,"output_rowcnt":outputrows,"nodename_level":" ".join(["|_" for l in range(0,level)]) + " " + nodenamestr})
                if not show_plan_only:
                    nodestr= " ".join(["|_" for l in range(0,level)]) + " " + nodename
                    if show_simple_string :
                        simstr=node['simpleString']
                        nodestr = nodestr + "<br>\n" +  simstr                                                                 
                    
                    timenametable='<table  style="width:100%">\n'
                    
                    timenameSort=list(timename)
                    
                    for nameidx in sorted(timename):
                        timenametable+=f"<tr><td>{nameidx}</td><td>{timename[nameidx]}</td></tr>"
                    timenametable+="</table>\n"
                    
                    
                    othertable='<table style="width:100%">\n'
                    for nameidx in sorted(other_metric_name):
                        othertable+=f"<tr><td>{nameidx}</td><td>{other_metric_name[nameidx]}</td></tr>"
                    othertable+="</table>\n"
                    
                    outstr.append(f"<tr><td style='{fontcolor}'>{stageid}</td>"+
                                  f"<td style='{fontcolor}'> {stagetime}({stage_time_stdev}) </td>"+
                                  f"<td style='{fontcolor}'> {stageParts} </td>"+
                                  f"<td style='text-align:left; background-color:{colors[stageid % 20]}'>" + nodestr + f"</td>"+
                                  f"<td style='{fontcolor}'> {input_rowcntstr} </td>"+
                                  f"<td style='{fontcolor}'> {input_columnarbatch} </td>"+
                                  f"<td style='{fontcolor}'> {output_rowcntstr} </td>"+
                                  f"<td style='{fontcolor}'> {output_columnarbatch} </td>"+
                                  f"<td style='{fontcolor}'> {output_row_batch} </td>"+
                                  f"<td style='{fontcolor}' colspan=2> {timenametable} </td>"+
                                  f"<td style='{fontcolor}' colspan=2> {othertable} </td>"+
                                  "</tr>")
                else:
                    outstr.append(f"<tr><td style='{fontcolor}'>{stageid}</td>"+
                                  f"<td style='{fontcolor}'> {stagetime} </td>"+
                                  f"<td style='{fontcolor}'> {stageParts} </td>"+
                                  f"<td style='text-align:left; background-color:{colors[stageid % 20]}'>" + " ".join(["|_" for l in range(0,level)]) + " " + nodename + f"</td>"+
                                  f"<td style='{fontcolor}'> {output_rowcntstr} </td></tr>")
                    
            if node["children"] is not None:
                for c in node["children"]:
                    print_plan(real_queryid, level+1,c,stageid)

        for c in queryplans:
            outstr.append("<font color=red size=4>"+str(c['real_queryid'])+"</font><table>")
            if not show_plan_only:
                outstr.append('''<tr>
                                    <td>stage id</td>
                                    <td>stage time</td>
                                    <td>partions</td>
                                    <td>operator</td>
                                    <td>input rows</td>
                                    <td>input batches</td>
                                    <td>output rows</td>
                                    <td>output batches</td>
                                    <td>output rows/batch</td>
                                    <td width=150>time metric name</td>
                                    <td width=200>time(%stage,%total,stdev)</td>
                                    <td width=150>other metric name</td>
                                    <td width=130>value(stdev)</td>
                                </tr>''')
            else:
                outstr.append('''<tr>
                                    <td>stage id</td>
                                    <td>stage time</td>
                                    <td>partions</td>
                                    <td>operator</td>
                                    <td>output rows</td>
                                </tr>''')

            print_plan(c['real_queryid'],0,c,0)
            outstr.append("</table>")
        if plot:
            display(HTML(" ".join(outstr)))
        return " ".join(outstr)
    
    def get_metric_output_rowcnt(self, **kwargs):
        return self.get_metric_rowcnt("number of output rows",**kwargs)
        
    def get_metric_input_rowcnt(self, **kwargs):
        return self.get_metric_rowcnt("number of input rows",**kwargs)
        
    def get_metric_rowcnt(self,rowname, **kwargs):
        if self.df is None:
            self.load_data()

        queryid=kwargs.get("queryid",None)
        stageid=kwargs.get("stageid",None)
        show_task=kwargs.get("show_task",False)
        
        if queryid and type(queryid)==int:
            queryid = [queryid,]
            
        if stageid and type(stageid)==int:
            stageid = [stageid,]
            
        queryplans = self.queryplans.where(F.col("real_queryid").isin(queryid)).orderBy("real_queryid").collect() if queryid else self.queryplans.orderBy("real_queryid").collect()
        qps=[]

        rownames=rowname if type(rowname)==list else [rowname,]
        def get_child(execid,node):
            if node['metrics'] is not None:
                outputrows=[x for x in node["metrics"] if "name" in x and x["name"] in rownames]
                if len(outputrows)>0:
                    qps.append([node["nodeName"],execid,outputrows[0]['accumulatorId']])
            if node["children"] is not None:
                for c in node["children"]:
                    get_child(execid,c)
        for c in queryplans:
            get_child(c['real_queryid'],c)

        if len(qps)==0:
            print("Metric ",rowname," is not found. ")
            return None
        stagetime=self.df.where("Event='SparkListenerTaskEnd'").groupBy("Stage ID").agg(F.round(F.sum(F.col("Finish Time")-F.col("Launch Time"))/1000/self.executor_instances/self.executor_cores*self.taskcpus,2).alias("stage time"))
        dfmetric=self.df.where("Event='SparkListenerTaskEnd'").select("queryid","real_queryid","Stage ID","Job ID",F.explode("Accumulables").alias("metric")).select("*","metric.*").drop("metric")
        numrowmetric=spark.createDataFrame(qps)
        numrowmetric=numrowmetric.withColumnRenamed("_1","metric").withColumnRenamed("_2","real_queryid").withColumnRenamed("_3","metricid")
        dfmetric_rowcnt=dfmetric.join(numrowmetric.drop("real_queryid"),on=[F.col("metricid")==F.col("ID")],how="right")
        if show_task:
            stagemetric=dfmetric_rowcnt.join(stagetime,"Stage ID")
        else:
            stagemetric=dfmetric_rowcnt.groupBy("queryid","real_queryid","Job ID","Stage ID","metricid").agg(F.round(F.sum("Update")/1000000,2).alias("total_row"),F.max("metric").alias("nodename")).join(stagetime,"Stage ID")

        if queryid:
            if stageid:
                return stagemetric.where(F.col("real_queryid").isin(queryid) & F.col("Stage ID").isin(stageid)).orderBy("Stage ID")
            else:
                return stagemetric.where(F.col("real_queryid").isin(queryid)).orderBy("Stage ID")
        else:
            noderow=stagemetric.groupBy("real_queryid","nodename").agg(F.round(F.sum("total_row"),2).alias("total_row")).orderBy("nodename").collect()
            out={}
            qids=set([r.real_queryid for r in noderow])
            for r in noderow:
                if r.nodename not in out:
                    out[r.nodename]={c:0 for c in qids}
                out[r.nodename][r.real_queryid]=r.total_row
            return pandas.DataFrame(out).T.sort_index(axis=0)
    
    def get_query_info(self,queryid):
        display(HTML("<font color=red size=7 face='Courier New'><b> time stat info </b></font>",))
        tmp=self.get_query_time(queryid=queryid)
        display(HTML("<font color=red size=7 face='Courier New'><b> stage stat info </b></font>",))
        display(self.get_stage_stat(queryid=queryid))
        display(HTML("<font color=red size=7 face='Courier New'><b> query plan </b></font>",))
        self.get_query_plan(queryid=queryid)
        display(HTML("<font color=red size=7 face='Courier New'><b> stage hist info </b></font>",))
        self.show_Stages_hist(queryid=queryid)
        display(HTML("<font color=red size=7 face='Courier New'><b> time info </b></font>",))
        display(self.show_time_metric(queryid=queryid))
        display(HTML("<font color=red size=7 face='Courier New'><b> operator and rowcount </b></font>",))
        display(self.get_metric_input_rowcnt(queryid=queryid))
        display(self.get_metric_output_rowcnt(queryid=queryid))
        
    def get_app_info(self,**kwargs):
        if self.df is None:
            self.load_data()

        display(HTML(f"<font color=red size=7 face='Courier New'><b> {self.appid} </b></font>",))
        display(HTML(f"<a href=http://{localhost}:18080/history/{self.appid}>http://{localhost}:18080/history/{self.appid}</a>"))
        display(HTML("<font color=red size=7 face='Courier New'><b> query time </b></font>",))
        tmp=self.get_query_time(**kwargs)
        display(HTML("<font color=red size=7 face='Courier New'><b> operator count </b></font>",))
        pdf=self.getOperatorCount()
        display(pdf.style.apply(background_gradient,
               cmap='OrRd',
               m=pdf.min().min(),
               M=pdf.max().max(),
               low=0,
               high=1))
        
        display(HTML("<font color=red size=7 face='Courier New'><b> operator input row count </b></font>",))
        pdf=self.get_metric_input_rowcnt(**kwargs)
        if pdf is not None:
            display(pdf.style.apply(background_gradient,
                   cmap='OrRd',
                   m=pdf.min().min(),
                   M=pdf.max().max(),
                   low=0,
                   high=1))
        display(HTML("<font color=red size=7 face='Courier New'><b> operator output row count </b></font>",))
        pdf=self.get_metric_output_rowcnt(**kwargs)
        if pdf is not None:
            display(pdf.style.apply(background_gradient,
                   cmap='OrRd',
                   m=pdf.min().min(),
                   M=pdf.max().max(),
                   low=0,
                   high=1))
        self.show_time_metric(**kwargs)
        
    def get_stage_stat(self,**kwargs):
        if self.df is None:
            self.load_data()

        queryid=kwargs.get("queryid",None)

        if queryid and type(queryid)==int:
            queryid = [queryid,]
            
        df=self.df.where(F.col("real_queryid").isin(queryid)).where(F.col("Event")=='SparkListenerTaskEnd')
        
        inputsize = df.select("real_queryid","Stage ID","Executor ID", "Task ID", F.explode("Accumulables")) \
                      .select("real_queryid","Stage ID","Executor ID", "Task ID","col.*") \
                      .where("Name='input size in bytes' or Name='size of files read'") \
                      .groupBy("Stage ID") \
                      .agg(F.round(F.sum("Update")/1024/1024/1024,2).alias("input read"))
        
        return df.groupBy("Job ID","Stage ID").agg(
            F.round(F.sum(F.col("Finish Time")-F.col("Launch Time"))/1000/self.executor_instances/self.executor_cores*self.taskcpus,1).alias("elapsed time"),
            F.round(F.sum(F.col("Disk Bytes Spilled"))/1024/1024/1024,1).alias("disk spilled"),
            F.round(F.sum(F.col("Memory Bytes Spilled"))/1024/1024/1024,1).alias("mem spilled"),
            F.round(F.sum(F.col("Local Bytes Read"))/1024/1024/1024,1).alias("local read"),
            F.round(F.sum(F.col("Remote Bytes Read"))/1024/1024/1024,1).alias("remote read"),
            F.round(F.sum(F.col("Shuffle Bytes Written"))/1024/1024/1024,1).alias("shuffle write"),
            F.round(F.sum(F.col("Executor Deserialize Time"))/1000,1).alias("deseri time"),
            F.round(F.sum(F.col("Fetch Wait Time"))/1000,1).alias("fetch wait time"),
            F.round(F.sum(F.col("Shuffle Write Time"))/1000000000,1).alias("shuffle write time"),
            F.round(F.sum(F.col("Result Serialization Time"))/1000,1).alias("seri time"),
            F.round(F.sum(F.col("Getting Result Time"))/1000,1).alias("get result time"),
            F.round(F.sum(F.col("JVM GC Time"))/1000,1).alias("gc time"),
            F.round(F.sum(F.col("Executor CPU Time"))/1000000000,1).alias("exe cpu time")    
            ).join(inputsize,on=["Stage ID"],how="left").orderBy("Stage ID").toPandas()
    
    def get_metrics_by_node(self,node_name):
        if self.df is None:
            self.load_data()
        
        if type(node_name)==str:
            node_name=[node_name]
        metrics=self.queryplans.collect()
        coalesce=[]
        metricsid=[0]
        def get_metric(root):
            if root['nodeName'] in node_name:
                metricsid[0]=metricsid[0]+1
                for l in root["metrics"]:
                    coalesce.append([l['accumulatorId'],l["metricType"],l['name'],root["nodeName"],metricsid[0]])
            if root["children"] is not None:
                for c in root["children"]:
                    get_metric(c)
        for c in metrics:
            get_metric(c)

        df=self.df.select("queryid","real_queryid",'Stage ID','Task ID','Job ID',F.explode("Accumulables"))
        df=df.select("*","col.*")
        metricdf=spark.createDataFrame(coalesce)
        metricdf=metricdf.withColumnRenamed("_1","ID").withColumnRenamed("_2","Unit").withColumnRenamed("_3","metricName").withColumnRenamed("_4","nodeName").withColumnRenamed("_5","nodeID")
        df=df.join(metricdf,on=["ID"],how="right")
        shufflemetric=set(l[2] for l in coalesce)
        metricdfs=[df.where(F.col("Name")==l).groupBy("real_queryid","nodeID","Stage ID").agg(F.stddev("Update").alias(l+"_stddev"),F.mean("Update").alias(l+"_mean"),F.mean("Update").alias(l) if l.startswith("avg") else F.sum("Update").alias(l)) for l in shufflemetric]
        
        stagetimedf=self.df.where("Event='SparkListenerTaskEnd'").groupBy("Stage ID").agg(F.count("*").alias("partnum"),F.round(F.sum(F.col("Finish Time")-F.col("Launch Time"))/1000,2).alias("ElapsedTime"))
        
        nodemetric=reduce(lambda x,y: x.join(y, on=['nodeID',"Stage ID","real_queryid"],how="full"),metricdfs)
        return nodemetric.join(stagetimedf,on="Stage ID")
    
    
    def get_coalesce_batch_row_cnt(self,**kwargs):
        stagesum=self.get_metrics_by_node("CoalesceBatches")
        
        pandas.options.display.float_format = '{:,}'.format
        
        stagesum=stagesum.withColumnRenamed("number of output rows","rows")
        
        coalescedf = stagesum.orderBy("real_queryid",'Stage ID').where("rows>4000").toPandas()
        
        coalescedf["row/input_batch"] = coalescedf["rows"]/coalescedf["input_batches"]
        coalescedf["row/out_batch"] = coalescedf["rows"]/coalescedf["output_batches"]
        coalescedf['stage']=coalescedf["real_queryid"].astype(str)+"_"+coalescedf['Stage ID'].astype(str)
        
        ax=coalescedf.plot(y=["row/input_batch","row/out_batch"],figsize=(30,8),style="-*")
        coalescedf.plot(ax=ax,y=['rows'],secondary_y=['rows'],style="k_")
        self.print_real_queryid(ax,coalescedf)
        
        return coalescedf
    
    def print_real_queryid(self,ax,dataset):
        ax.axes.get_xaxis().set_ticks([])

        ymin, ymax = ax.get_ybound()

        real_queryid=list(dataset['real_queryid'])
        s=real_queryid[0]
        lastx=0
        for idx,v in enumerate(real_queryid):
            if v!=s:
                xmin = xmax = idx-1+0.5
                l = mlines.Line2D([xmin,xmax], [ymin,ymax],color="green")
                ax.add_line(l)
                ax.text(lastx+(xmin-lastx)/2-0.25,ymin-(ymax-ymin)/20,f"{s}",size=20)
                s=v
                lastx=xmin

    def get_shuffle_stat(self,**kwargs):
        if self.df is None:
            self.load_data()
            
        shufflesize=kwargs.get("shuffle_size",1000000)
        queryid=kwargs.get("queryid",None)
        if queryid is not None:
            if type(queryid) is str or type(queryid) is int:
                queryid=[queryid,]

        exchangedf=self.get_metrics_by_node(["ColumnarExchange","ColumnarExchangeAdaptor"])
        exchangedf.cache()
        if exchangedf.count() == 0:
            return (None, None)

        mapdf=exchangedf.where("`time to split` is not null").select("nodeID",F.col("Stage ID").alias("map_stageid"),"real_queryid",F.floor(F.col("time to split")/F.col("time to split_mean")).alias("map_partnum"),"time to compress","time to split","shuffle write time","time to spill",'shuffle records written','data size','shuffle bytes written','shuffle bytes written_mean','shuffle bytes written_stddev','shuffle bytes spilled','number of input rows','number of input batches')
        reducerdf=exchangedf.where("`time to split` is null").select("nodeID",F.col("Stage ID").alias("reducer_stageid"),"real_queryid",'local blocks read','local bytes read',F.floor(F.col("records read")/F.col("records read_mean")).alias("reducer_partnum"),(F.col('avg read batch num rows')/10).alias("avg read batch num rows"),'remote bytes read','records read','remote blocks read',(F.col("number of output rows")/F.col("records read")).alias("avg rows per split recordbatch"))
        shuffledf=mapdf.join(reducerdf,on=["nodeID","real_queryid"],how="full")
        if queryid is not None:
            shuffledf=shuffledf.where(F.col("real_queryid").isin(queryid))
        shuffle_pdf=shuffledf.where("`shuffle bytes written`>1000000").orderBy("real_queryid","map_stageid","nodeID").toPandas()
        if shuffle_pdf.shape[0] == 0:
            return (shuffledf, None)

        shuffle_pdf["shuffle bytes written"]=shuffle_pdf["shuffle bytes written"]/1000000000
        shuffle_pdf["data size"]=shuffle_pdf["data size"]/1000000000
        shuffle_pdf["shuffle bytes written_mean"]=shuffle_pdf["shuffle bytes written_mean"]/1000000
        shuffle_pdf["shuffle bytes written_stddev"]=shuffle_pdf["shuffle bytes written_stddev"]/1000000
        ax=shuffle_pdf.plot(y=["avg read batch num rows",'avg rows per split recordbatch'],figsize=(30,8),style="-*",title="average batch size after split")
        self.print_real_queryid(ax,shuffle_pdf)
        shuffle_pdf["split_ratio"]=shuffle_pdf["records read"]/shuffle_pdf['number of input batches']
        ax=shuffle_pdf.plot(y=["split_ratio","records read"],secondary_y=["records read"],figsize=(30,8),style="-*",title="Split Ratio")
        self.print_real_queryid(ax,shuffle_pdf)
        shuffle_pdf["compress_ratio"]=shuffle_pdf["data size"]/shuffle_pdf['shuffle bytes written']
        ax=shuffle_pdf.plot(y=["shuffle bytes written","compress_ratio"],secondary_y=["compress_ratio"],figsize=(30,8),style="-*",title="compress ratio")
        self.print_real_queryid(ax,shuffle_pdf)
        shufflewritepdf=shuffle_pdf
        ax=shufflewritepdf.plot.bar(y=["shuffle write time","time to spill","time to compress","time to split"],stacked=True,figsize=(30,8),title="split time + shuffle write time vs. shuffle bytes written")
        ax=shufflewritepdf.plot(ax=ax,y=["shuffle bytes written"],secondary_y=["shuffle bytes written"],style="-*")
        self.print_real_queryid(ax,shufflewritepdf)
        shuffle_pdf['avg input batch size']=shuffle_pdf["number of input rows"]/shuffle_pdf["number of input batches"]
        ax=shuffle_pdf.plot(y=["avg input batch size"],figsize=(30,8),style="b-*",title="average input batch size")
        ax=shuffle_pdf.plot.bar(ax=ax,y=['number of input rows'],secondary_y=True)
        self.print_real_queryid(ax,shuffle_pdf)
        
        metrics=self.queryplans.collect()
        coalesce=[]
        metricsid=[0]
        def get_metric(root):
            if root['nodeName'] in ["ColumnarExchange","ColumnarExchangeAdaptor"]:
                metricsid[0]=metricsid[0]+1
                for l in root["metrics"]:
                    coalesce.append([l['accumulatorId'],l["metricType"],l['name'],root["nodeName"],metricsid[0],root["simpleString"]])
            if root["children"] is not None:
                for c in root["children"]:
                    get_metric(c)
        for c in metrics:
            get_metric(c)

        tps={}
        for r in coalesce:
            rx=re.search(r"\[OUTPUT\] List\((.*)\)",r[5])
            if rx:
                if r[4] not in tps:
                    tps[r[4]]={}
                    fds=rx.group(1).split(", ")
                    for f in fds:
                        if f.endswith("Type"):
                            tp=re.search(r":(.+Type)",f).group(1)
                            if tp not in tps[r[4]]:
                                tps[r[4]][tp]=1
                            else:
                                tps[r[4]][tp]+=1
        if len(tps)>0:
            typedf=pandas.DataFrame(tps).T.reset_index()
            typedf=typedf.fillna(0)
            shuffle_pdf=pandas.merge(shuffle_pdf,typedf,left_on="nodeID",right_on="index")
            shufflewritepdf=shuffle_pdf
            ax=shufflewritepdf.plot.bar(y=["number of input rows"],stacked=True,figsize=(30,8),title="rows vs. shuffle data type")
            ax=shufflewritepdf.plot(ax=ax,y=list(typedf.columns[1:]),secondary_y=list(typedf.columns[1:]),style="-o")
            self.print_real_queryid(ax,shufflewritepdf)
            ax=shufflewritepdf.plot.bar(y=["time to split"],stacked=True,figsize=(30,8),title="split time vs. shuffle data type")
            ax=shufflewritepdf.plot(ax=ax,y=list(typedf.columns[1:]),secondary_y=list(typedf.columns[1:]),style="-o")
            self.print_real_queryid(ax,shufflewritepdf)

        
        
        shufflewritepdf.plot(x="shuffle bytes written",y=["shuffle write time","time to split"],figsize=(30,8),style="*")
        shufflewritepdf["avg shuffle batch size after split"]=shufflewritepdf["shuffle bytes written"]*1000000/shufflewritepdf['records read']
        shufflewritepdf["avg raw batch size after split"]=shufflewritepdf["data size"]*1000000/shufflewritepdf['records read']
        ax=shufflewritepdf.plot(y=["avg shuffle batch size after split","avg raw batch size after split","shuffle bytes written"],secondary_y=["shuffle bytes written"],figsize=(30,8),style="-*",title="avg batch KB after split")
        self.print_real_queryid(ax,shufflewritepdf)
        shufflewritepdf["avg batch# per splitted partition"]=shufflewritepdf['records read']/(shufflewritepdf['local blocks read']+shufflewritepdf['remote blocks read'])
        ax=shufflewritepdf.plot(y=["avg batch# per splitted partition",'records read'],secondary_y=['records read'],figsize=(30,8),style="-*",title="avg batch# per splitted partition")
        self.print_real_queryid(ax,shufflewritepdf)
        fig, ax = plt.subplots(figsize=(30,8))
        ax.set_title('shuffle wite bytes with stddev')
        ax.errorbar(x=shuffle_pdf.index,y=shuffle_pdf['shuffle bytes written_mean'], yerr=shuffle_pdf['shuffle bytes written_stddev'], linestyle='None', marker='o')
        self.print_real_queryid(ax,shuffle_pdf)
        shuffle_pdf['record batch per mapper per reducer']=shuffle_pdf['records read']/(shuffle_pdf["map_partnum"]*shuffle_pdf['reducer_partnum'])
        ax=shuffle_pdf.plot(y=["record batch per mapper per reducer"],figsize=(30,8),style="b-*",title="record batch per mapper per reducer")
        self.print_real_queryid(ax,shuffle_pdf)
        
        inputsize = self.df.select("Stage ID","Executor ID", "Task ID", F.explode("Accumulables")) \
              .select("Stage ID","Executor ID", "Task ID","col.*") \
              .where("Name='input size in bytes' or Name='size of files read'") \
              .groupBy("Task ID") \
              .agg((F.sum("Update")).alias("input read"))
        stageinput=self.df.where("event='SparkListenerTaskEnd'" )\
                                .join(inputsize,on=["Task ID"],how="left")\
                                .fillna(0) \
                                .select(F.col('Host'), F.col("real_queryid"),F.col('Stage ID'),F.col('Task ID'),
                                        F.round((F.col('Finish Time')/1000-F.col('Launch Time')/1000),2).alias('elapsedtime'),
                                        F.round((F.col('`input read`')+F.col('`Bytes Read`')+F.col('`Local Bytes Read`')+F.col('`Remote Bytes Read`'))/1024/1024,2).alias('input'))
        baisstage=stageinput.groupBy("real_queryid","Stage ID").agg(F.mean("elapsedtime").alias("elapsed"),F.mean("input").alias("input"),
                                                            (F.stddev("elapsedtime")).alias("elapsedtime_err"),
                                                            (F.stddev("input")).alias("input_err"),
                                                            (F.max("elapsedtime")-F.mean("elapsedtime")).alias("elapsed_max"),
                                                            (F.mean("elapsedtime")-F.min("elapsedtime")).alias("elapsed_min"),
                                                            (F.max("input")-F.mean("input")).alias("input_max"),
                                                            (F.mean("input")-F.min("input")).alias("input_min")).orderBy("real_queryid","Stage ID")
        dfx=baisstage.toPandas()
        fig, ax = plt.subplots(figsize=(30,8))
        ax.set_title('input size')
        ax.errorbar(x=dfx.index,y=dfx['input'], yerr=dfx['input_err'], fmt='ok', ecolor='red', lw=3)
        ax.errorbar(x=dfx.index,y=dfx['input'],yerr=[dfx['input_min'],dfx['input_max']],
                     fmt='.k', ecolor='gray', lw=1)
        self.print_real_queryid(ax,dfx)
        
        fig, ax = plt.subplots(figsize=(30,8))
        ax.set_title('stage time')

        ax.errorbar(x=dfx.index,y=dfx['elapsed'], yerr=dfx['elapsedtime_err'], fmt='ok', ecolor='red', lw=5)
        ax.errorbar(x=dfx.index,y=dfx['elapsed'],yerr=[dfx['elapsed_min'],dfx['elapsed_max']],
                     fmt='.k', ecolor='gray', lw=1)

        self.print_real_queryid(ax,dfx)
        return (shuffle_pdf,dfx)
    
    def get_stages_w_odd_partitions(appals,**kwargs):
        if appals.df is None:
            appals.load_data()
        return appals.df.where("Event='SparkListenerTaskEnd'")\
                    .groupBy("Stage ID","real_queryid")\
                    .agg((F.sum(F.col('Finish Time')-F.col('Launch Time'))/1000).alias("elapsed time"),
                         F.count('*').alias('partitions'))\
                    .where(F.col("partitions")%(appals.executor_cores*appals.executor_instances/appals.taskcpus)!=0)\
                    .orderBy(F.desc("elapsed time")).toPandas()
   
    def get_scaned_column_v1(appals):
        def get_scans(node):
            if node['nodeName'].startswith("Scan arrow"):
                scans.append(node)
            for c in node['children']:
                get_scans(c)

        alltable=[]
        for qid in range(1,23):
            scans=[]
            plans=appals.queryplans.where("real_queryid="+str(qid)).collect()
            get_scans(plans[0])
            for s in scans:
                alltable.append([qid,",".join([l.split(":")[0] for l in re.split(r'[<>]',s['metadata']['ReadSchema'])[1].split(",")])])
        return alltable
    
    def get_scaned_column_v2(appals):
        def get_scans(node):
            if node['nodeName'].startswith("ColumnarBatchScan"):
                scans.append(node)
            for c in node['children']:
                get_scans(c)

        alltable=[]
        for qid in range(1,23):
            scans=[]
            plans=appals.queryplans.where("real_queryid="+str(qid)).collect()
            get_scans(plans[0])
            for s in scans:
                alltable.append([qid,",".join([l.split("#")[0] for l in re.split(r"[\[\]]",s['simpleString'])[1].split(",")])])
        return alltable
    
    def compare_query(appals,queryid,appbaseals):
        print(f"~~~~~~~~~~~~~~~~~~~~~~~~~~~~Query{queryid}~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
        appals.show_critical_path_time_breakdown(queryid=22)
        s1=appals.get_stage_stat(queryid=queryid)
        s2=appbaseals.get_stage_stat(queryid=queryid)
        ls=s1[['Stage ID','elapsed time']]
        ls.columns=['l sid','l time']
        rs=s2[['Stage ID','elapsed time']]
        rs.columns=['r sid','r time']
        js=ls.join(rs)
        js['gap']=js['r time'] - js['l time']
        js['gap']=js['gap'].round(2)
        display(js)
        display(s1)
        display(s2)
        stagesmap={}
        for x in range(0,min(len(s1),len(s2))):
            stagesmap[s1['Stage ID'][x]]=s2['Stage ID'][x]
        totaltime=sum(s1['elapsed time'])
        acctime=0
        s1time=s1.sort_values("elapsed time",ascending=False,ignore_index=True)
        ldfx=appals.get_metric_output_rowcnt(queryid=queryid)
        rdfx=appbaseals.get_metric_output_rowcnt(queryid=queryid)

        for x in range(0,len(s1time)):
            sid1=int(s1time['Stage ID'][x])
            sid2=int(stagesmap[sid1])
            print(f"============================================================")
            display(ldfx[ldfx['Stage ID']==sid1])
            display(rdfx[ldfx['Stage ID']==sid2])
            print(f" Gazelle  Query {queryid}  Stage {sid1}")
            xf=appals.get_query_plan(stageid=sid1,show_simple_string=True)
            print(f" Photon  Query {queryid}  Stage {sid2}")
            xf=appbaseals.get_query_plan(stageid=sid2,show_simple_string=True)
            acctime+=s1time['elapsed time'][x]
            if acctime/totaltime>=0.9:
                break

In [None]:
notlist=['resource.executor.cores',
 'spark.app.id',
 'spark.app.initial.file.urls',
 'spark.app.name',
 'spark.app.startTime',
 'spark.driver.port',
 'spark.job.description',
 'spark.jobGroup.id',
 'spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
 'spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
 'spark.rdd.scope',
 'spark.sql.execution.id',
 '__fetch_continuous_blocks_in_batch_enabled',
 'spark.driver.appUIAddress'
 'spark.driver.appUIAddress',
 'spark.driver.host',
 'spark.driver.appUIAddress',
 'spark.driver.extraClassPath',
 'spark.eventLog.dir',
 'spark.executorEnv.CC',
 'spark.executorEnv.LD_LIBRARY_PATH',
 'spark.executorEnv.LD_PRELOAD',
 'spark.executorEnv.LIBARROW_DIR',
 'spark.files',
 'spark.history.fs.logDirectory',
 'spark.sql.warehouse.dir',
 'spark.yarn.appMasterEnv.LD_PRELOAD',
 'spark.yarn.dist.files'
]
def comp_spark_conf(app0,app1):   
    pdf_sparkconf_0=app0.get_spark_config()
    pdf_sparkconf_1=app1.get_spark_config()
    pdfc=pdf_sparkconf_0.join(pdf_sparkconf_1,lsuffix=app0.appid[-8:],rsuffix=app1.appid[-8:])
    pdfc["0"+app0.appid[-8:]]=pdfc["0"+app0.appid[-8:]].str.lower()
    pdfc["0"+app1.appid[-8:]]=pdfc["0"+app1.appid[-8:]].str.lower()
    
    pdfc['comp']=(pdfc["0"+app0.appid[-8:]]==pdfc["0"+app1.appid[-8:]])
    return pdfc.loc[(pdfc['comp']==False) & (~pdfc.index.isin(notlist))]

## Node log analysis

In [None]:
@pandas_udf("host string, id string,taskid int, time double", PandasUDFType.GROUPED_MAP)
def collect_udf_time(pdf):
    proxy_handler = request.ProxyHandler({})
    opener = request.build_opener(proxy_handler)

    rst=[]
    for idx,l in pdf.iterrows():
        ip="10.1.2.19"+l['Host'][-1:]
        execid="{:06d}".format(int(l['Executor ID'])+1)
        appid=l['appid']
        url = f'http://{ip}:8042/node/containerlogs/container_{appid}_01_{execid}/sparkuser/stderr/?start=0'
        # open the website with the opener
        req = opener.open(url)
        data = req.read().decode('utf8')
        cnt=data.split("\n")
        cnt_udf=[l.split(" ") for l in cnt if l.startswith('start UDF') or l.startswith('stop UDF')]
        unf_pdf=pandas.DataFrame(cnt_udf)
        srst=unf_pdf.loc[:,[0,4,6]]
        srst.columns=['id','taskid','time']
        srst['host']=l['Host']
        srst['taskid']=srst['taskid'].astype(int)
        srst['time']=srst['time'].apply(lambda f: float(re.search('\d+\.\d+',f).group(0)))
        rst.append(srst)
    return pandas.concat(rst)


class App_Log_Analysis_Node_log(App_Log_Analysis):
    def __init__(self, appid,jobids):
        App_Log_Analysis.__init__(self, appid,jobids)
    
    def generate_trace_view_list(self,id=0, **kwargs):
        if self.df is None:
            self.load_data()

        showcpu=kwargs['showcpu'] if 'showcpu' in kwargs else False
        
        appid=self.appid
        events=self.df.toPandas()
        coretrack={}
        trace_events=[]
        starttime=0
        taskend=[]
        trace={"traceEvents":[]}
        exec_hosts={}
        hostsdf=self.df.select("Host").distinct().orderBy("Host")
        hostid=100000
        ended_event=[]

        for i,l in hostsdf.toPandas().iterrows():
            exec_hosts[l['Host']]=hostid
            hostid=hostid+100000

        tskmap={}
        for idx,l in events.iterrows():
            if l['Event']=='SparkListenerTaskStart':
                hostid=exec_hosts[l['Host']]

                tsk=l['Task ID']
                pid=int(l['Executor ID'])*100+hostid
                stime=l['Launch Time']
                #the task's starttime and finishtime is the same, ignore it.
                if tsk in ended_event:
                    continue
                if not pid in coretrack:
                    tids={}
                    trace_events.append({
                       "name": "process_name",
                       "ph": "M",
                       "pid":pid,
                       "tid":0,
                       "args":{"name":"{:s}.{:s}".format(l['Host'],l['Executor ID'])}
                      })

                else:
                    tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==-1:
                        tids[t]=[tsk,stime]
                        break
                else:
                    t=len(tids)
                    tids[t]=[tsk,stime]
                #print("task {:d} tid is {:s}.{:d}".format(tsk,pid,t))
                coretrack[pid]=tids

            if l['Event']=='SparkListenerTaskEnd':
                sevt={}
                eevt={}
                hostid=exec_hosts[l['Host']]
                pid=int(l['Executor ID'])*100+hostid
                tsk=l['Task ID']
                fintime=l['Finish Time']

                tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==tsk:
                        tids[t]=[-1,-1]
                        break
                else:
                    ended_event.append(tsk)
                    continue
                for ps in reversed([key for key in tids.keys()]) :
                    if tids[ps][1]-fintime<0 and tids[ps][1]-fintime>=-2:
                        fintime=tids[ps][1]
                        tids[t]=tids[ps]
                        tids[ps]=[-1,-1]
                        break
                if starttime==0:
                    starttime=l['Launch Time']

                sstime=l['Launch Time']-starttime

                trace_events.append({
                       'tid':pid+int(t),
                       'ts':sstime,
                       'dur':fintime-l['Launch Time'],
                       'pid':pid,
                       "ph":'X',
                       'name':"stg{:d}".format(l['Stage ID']),
                       'args':{"job id": l['job id'],
                               "stage id": l['Stage ID'],
                               "tskid":tsk,
                               "input":builtins.round(l["Bytes Read"]/1024/1024,2),
                               "spill":builtins.round(l["Memory Bytes Spilled"]/1024/1024,2),
                               "Shuffle Read Metrics": "",
                               "|---Local Read": builtins.round(l["Local Bytes Read"]/1024/1024,2),
                               "|---Remote Read":builtins.round(l["Remote Bytes Read"]/1024/1024,2),
                               "Shuffle Write Metrics": "",
                               "|---Write":builtins.round(l['Shuffle Bytes Written']/1024/1024,2)
                               }
                      })
                tskmap[tsk]={'pid':pid,'tid':pid+int(t)}

        self.starttime=starttime
        self.tskmap=tskmap

        hostdf=self.df.select('Host','Executor ID',F.lit(appid[len('application_'):]).alias('appid')).distinct().orderBy('Host')
        rst=hostdf.groupBy('Host').apply(collect_udf_time)
        rst.cache()
        start_df=rst.where("id='start'").select(F.col('taskid').alias('start_taskid'),F.col('time').alias("starttime"))
        stop_df=rst.where("id='stop'").select('taskid',F.col('time').alias("stop_time"))
        df=start_df.join(stop_df, on=[start_df.start_taskid==stop_df.taskid,stop_df['stop_time']>=start_df['starttime']],how='left').groupBy('taskid','starttime').agg(F.min('stop_time').alias('stop_time'))
        pdf=df.toPandas() 
        for idx,l in pdf.iterrows():
                trace_events.append({
                     'tid':self.tskmap[l['taskid']]['tid'],
                     'ts':l['starttime']*1000-self.starttime,
                     'dur':(l['stop_time']-l['starttime'])*1000,                
                     'pid':self.tskmap[l['taskid']]['pid'],
                     'ph':'X',
                     'name':'udf'})
        
        return [json.dumps(l) for l in trace_events]


In [None]:
class App_Log_Analysis_Node_log(App_Log_Analysis):
    def __init__(self, appid,jobids):
        App_Log_Analysis.__init__(self, appid,jobids)
    
    def generate_trace_view_list(self,id=0, **kwargs):
        if self.df is None:
            self.load_data()

        showcpu=kwargs['showcpu'] if 'showcpu' in kwargs else False
        
        appid=self.appid
        events=self.df.toPandas()
        coretrack={}
        trace_events=[]
        starttime=0
        taskend=[]
        trace={"traceEvents":[]}
        exec_hosts={}
        hostsdf=self.df.select("Host").distinct().orderBy("Host")
        hostid=100000
        ended_event=[]

        for i,l in hostsdf.toPandas().iterrows():
            exec_hosts[l['Host']]=hostid
            hostid=hostid+100000

        tskmap={}
        for idx,l in events.iterrows():
            if l['Event']=='SparkListenerTaskStart':
                hostid=exec_hosts[l['Host']]

                tsk=l['Task ID']
                pid=int(l['Executor ID'])*100+hostid
                stime=l['Launch Time']
                #the task's starttime and finishtime is the same, ignore it.
                if tsk in ended_event:
                    continue
                if not pid in coretrack:
                    tids={}
                    trace_events.append({
                       "name": "process_name",
                       "ph": "M",
                       "pid":pid,
                       "tid":0,
                       "args":{"name":"{:s}.{:s}".format(l['Host'],l['Executor ID'])}
                      })

                else:
                    tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==-1:
                        tids[t]=[tsk,stime]
                        break
                else:
                    t=len(tids)
                    tids[t]=[tsk,stime]
                #print("task {:d} tid is {:s}.{:d}".format(tsk,pid,t))
                coretrack[pid]=tids

            if l['Event']=='SparkListenerTaskEnd':
                sevt={}
                eevt={}
                hostid=exec_hosts[l['Host']]
                pid=int(l['Executor ID'])*100+hostid
                tsk=l['Task ID']
                fintime=l['Finish Time']

                tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==tsk:
                        tids[t]=[-1,-1]
                        break
                else:
                    ended_event.append(tsk)
                    continue
                for ps in reversed([key for key in tids.keys()]) :
                    if tids[ps][1]-fintime<0 and tids[ps][1]-fintime>=-2:
                        fintime=tids[ps][1]
                        tids[t]=tids[ps]
                        tids[ps]=[-1,-1]
                        break
                if starttime==0:
                    starttime=l['Launch Time']

                sstime=l['Launch Time']-starttime

                trace_events.append({
                       'tid':pid+int(t),
                       'ts':sstime,
                       'dur':fintime-l['Launch Time'],
                       'pid':pid,
                       "ph":'X',
                       'name':"stg{:d}".format(l['Stage ID']),
                       'args':{"job id": l['job id'],
                               "stage id": l['Stage ID'],
                               "tskid":tsk,
                               "input":builtins.round(l["Bytes Read"]/1024/1024,2),
                               "spill":builtins.round(l["Memory Bytes Spilled"]/1024/1024,2),
                               "Shuffle Read Metrics": "",
                               "|---Local Read": builtins.round(l["Local Bytes Read"]/1024/1024,2),
                               "|---Remote Read":builtins.round(l["Remote Bytes Read"]/1024/1024,2),
                               "Shuffle Write Metrics": "",
                               "|---Write":builtins.round(l['Shuffle Bytes Written']/1024/1024,2)
                               }
                      })
                tskmap[tsk]={'pid':pid,'tid':pid+int(t)}

        self.starttime=starttime
        self.tskmap=tskmap

        hostdf=self.df.select('Host','Executor ID',F.lit(appid[len('application_'):]).alias('appid')).distinct().orderBy('Host')
        rst=hostdf.groupBy('Host').apply(collect_udf_time)
        rst.cache()
        start_df=rst.where("id='start'").select(F.col('taskid').alias('start_taskid'),F.col('time').alias("starttime"))
        stop_df=rst.where("id='stop'").select('taskid',F.col('time').alias("stop_time"))
        df=start_df.join(stop_df, on=[start_df.start_taskid==stop_df.taskid,stop_df['stop_time']>=start_df['starttime']],how='left').groupBy('taskid','starttime').agg(F.min('stop_time').alias('stop_time'))
        pdf=df.toPandas() 
        for idx,l in pdf.iterrows():
                trace_events.append({
                     'tid':self.tskmap[l['taskid']]['tid'],
                     'ts':l['starttime']*1000-self.starttime,
                     'dur':(l['stop_time']-l['starttime'])*1000,                
                     'pid':self.tskmap[l['taskid']]['pid'],
                     'ph':'X',
                     'name':'udf'})
        
        return [json.dumps(l) for l in trace_events]

In [None]:
class App_Log_Analysis_Node_Log_Uni(App_Log_Analysis):
    def __init__(self, file,jobids):
        App_Log_Analysis.__init__(self, file,jobids)
    
    def generate_trace_view_list(self,id=0, **kwargs):
        if self.df is None:
            self.load_data()

        showcpu=False
        
        shownodes=kwargs.get("shownodes",None)

        showdf=self.df #self.df.where(F.col("Host").isin(shownodes)) if shownodes else self.df

        events=showdf.drop("Accumulables","Stage IDs").orderBy("Launch Time","Finish Time").toPandas()
        coretrack={}
        trace_events=[]
        starttime=0
        taskend=[]
        trace={"traceEvents":[]}
        exec_hosts={}
        hostsdf=showdf.select("Host").distinct().orderBy("Host")
        hostid=100000
        ended_event=[]

        applog=os.path.splitext(self.file)[0]+".stdout"
        logdfs=[]
        if fs.exists(applog):
            logdata=sc.textFile(os.path.splitext(self.file)[0]+".stdout",84)
            logdf=logdata.mapPartitions(splits).toDF()
            logdfs.append(logdf)

        p=os.path.split(self.file)
        for c in shownodes:
            f=p[0]+"/"+c+"/xgbtck.txt"
            if fs.exists(f):
                logdata=sc.textFile(f,84)
                logdf=logdata.mapPartitions(splits).toDF()
                logdfs.append(logdf)
        logdf=reduce(lambda l,r: l.concat(r),logdfs)
        logdf=logdf.cache()
        logdf.count()

        firstrow=logdf.limit(1).collect()

        for c in logdf.columns:
            if firstrow[0][c]!="xgbtck":
                logdf=logdf.drop(c)
            else:
                break

        usefulc=["xgbtck","event","ts","elapsed","threadid","taskid"]
        for i in range(0,len(usefulc)):
            logdf=logdf.withColumnRenamed(logdf.columns[i],usefulc[i])

        logdf=logdf.where(F.col("event").isin(['load_library','data_load','data_convert']))
        
        task_thread=logdf.where("event='data_convert'").select(F.col("taskid").astype(IntegerType()),F.col("threadid").astype(IntegerType())).distinct().toPandas().set_index('taskid').to_dict('index')
        #task_thread={}

        for i,l in hostsdf.toPandas().iterrows():
            exec_hosts[l['Host']]=hostid
            hostid=hostid+100000

        tskmap={}
        for idx,l in events.iterrows():
            if l['Event']=='SparkListenerTaskStart':
                hostid=exec_hosts[l['Host']]

                tsk=l['Task ID']
                pid=int(l['Executor ID'])*100+hostid
                stime=l['Launch Time']
                #the task's starttime and finishtime is the same, ignore it.
                if tsk in ended_event:
                    continue
                if not pid in coretrack:
                    tids={}
                    trace_events.append({
                       "name": "process_name",
                       "ph": "M",
                       "pid":pid,
                       "tid":0,
                       "args":{"name":"{:s}.{:s}".format(l['Host'],l['Executor ID'])}
                      })

                else:
                    tids=coretrack[pid]

                tidarr=[tsk,stime]

                for t in tids.keys():
                    if tids[t][0]==-1:
                        tids[t]=tidarr
                        break
                else:
                    t=len(tids)
                    tids[t]=tidarr
                #print("task {:d} tid is {:s}.{:d}".format(tsk,pid,t))
                coretrack[pid]=tids

            if l['Event']=='SparkListenerTaskEnd':
                sevt={}
                eevt={}
                hostid=exec_hosts[l['Host']]
                pid=int(l['Executor ID'])*100+hostid
                tsk=l['Task ID']
                fintime=l['Finish Time']

                tids=coretrack[pid]
                for t in tids.keys():
                    if tids[t][0]==tsk:
                        tids[t]=[-1,-1]
                        break
                else:
                    ended_event.append(tsk)
                    continue
                for ps in reversed([key for key in tids.keys()]):
                    if (tids[ps][1]-fintime<0 and tids[ps][1]-fintime>=-2) or \
                        (tsk in task_thread and tids[ps][0] in task_thread and task_thread[tsk]["threadid"]==task_thread[tids[ps][0]]["threadid"]):
                        fintime=tids[ps][1]
                        tids[t]=tids[ps]
                        tids[ps]=[-1,-1]
                        break
                if starttime==0:
                    starttime=l['Launch Time']

                sstime=l['Launch Time']-starttime

                trace_events.append({
                       'tid':pid+int(t),
                       'ts':sstime,
                       'dur':fintime-l['Launch Time'],
                       'pid':pid,
                       "ph":'X',
                       'name':"stg{:d}".format(l['Stage ID']),
                       'args':{"job id": l['Job ID'],
                               "stage id": l['Stage ID'],
                               "tskid":tsk,
                               "input":builtins.round(l["Bytes Read"]/1024/1024,2),
                               "spill":builtins.round(l["Memory Bytes Spilled"]/1024/1024,2),
                               "Shuffle Read Metrics": "",
                               "|---Local Read": builtins.round(l["Local Bytes Read"]/1024/1024,2),
                               "|---Remote Read":builtins.round(l["Remote Bytes Read"]/1024/1024,2),
                               "Shuffle Write Metrics": "",
                               "|---Write":builtins.round(l['Shuffle Bytes Written']/1024/1024,2)
                               }
                      })
                tskmap[tsk]={'pid':pid,'tid':pid+int(t)}

        self.starttime=starttime
        self.tskmap=tskmap

        tskmapdf = spark.createDataFrame(pandas.DataFrame(self.tskmap).T.reset_index())
        logdf=logdf.withColumn("ts",F.col("ts").astype(LongType()))
        logdf=logdf.withColumn("taskid",F.col("taskid").astype(LongType()))
        logdf=logdf.withColumnRenamed("event",'type')
        mgd=logdf.join(tskmapdf,on=(F.col('taskid')==F.col("index")),how="right")
        rstdf=mgd.select(F.col('tid').alias("tid"),
          (F.round(F.col('ts')-F.lit(self.starttime),3)).alias("ts"),
          F.round(F.col("elapsed"),3).alias("dur"),
           F.lit(F.col('pid')).alias("pid"),
           F.lit("X").alias("ph"),
           F.col("type").alias("name")
           ).where(F.col("ts").isNotNull()).orderBy('ts')

        #        logdf=logdf.withColumn("type",F.substring_index("event","_",1))
        #        window= Window.partitionBy(logdf['taskid']).orderBy("type","ts")
        #        logdfx=logdf.select("taskid","event","type","ts",F.lag('ts',1).over(window).alias("last"),F.lag('rownum',1).over(window).alias("rownum")).orderBy("taskid","ts").where("event like '%end'")


        output=[json.dumps(l) for l in trace_events]
        output.extend(rstdf.toJSON().collect())

        return output

# perf trace analysis

In [None]:
def split_trace(x):
    fi=[]
    for l in x:
        rst1=re.search(r"^(\d+\.\d+).*sched:(sched_switch):.+:(\d+) \[\d+\] (\S+) ==> .+:(\d+) """,l)
        rst2=re.search(r"(\d+\.\d+) \( +(\d+\.\d+) +ms\):[^/]+/(\d+) (recvfrom|sendto)\(fd: \d+<\S+:\[\d+\]>, \S+: 0x[a-f0-9]+, \S+: (\d+)",l)
        rst3=re.search(r"(\d+\.\d+) \( +\): [^/]+/(\d+) (recvfrom|sendto)\(fd: \d+<\S+:\[\d+\]>, \S+: 0x[a-f0-9]+, \S+: (\d+)",l)
        rst4=re.search(r"(\d+\.\d+) \( *(\d+\.\d+) ms\): [^/]+/(\d+)  ... \[continued\]: (sendto|recvfrom|poll)",l)
        rst5=re.search(r"(\d+\.\d+) \( +(\d+\.\d+) +ms\): [^/]+/(\d+) (poll)",l)
        rst6=re.search(r"(\d+\.\d+) \( +\): [^/]+/(\d+) (poll)",l)

        rstx=re.search(r"(\d+\.\d+)*sched:(sched_switch):.*prev_pid=(\d+).*prev_state=(\S+) ==> .*next_pid=(\d+)""",l)
        if not rst1:
            rst1=rstx
        
        if rst1:
            fi.append((rst1.group(1),rst1.group(2),rst1.group(3),rst1.group(4),rst1.group(5))) #time, switch, src, status, dst
        elif rst2:
            fi.append((rst2.group(1),rst2.group(4),rst2.group(3),rst2.group(2),rst2.group(5))) #time, sed/rcv, pid, ms, size 
        elif rst3:
            fi.append((rst3.group(1),rst3.group(3),rst3.group(2),0, rst3.group(4)))             #time, sed/rcv, pid, 0, size
        elif rst4:
            fi.append((rst4.group(1),rst4.group(4),rst4.group(3),rst4.group(2), 0))              #time, sed/rcv, pid, ms, 0
        elif rst5:
            fi.append((rst5.group(1),rst5.group(4),rst5.group(3),rst5.group(2), 0))              #time, sed/rcv, pid, ms, 0
        elif rst6:
            fi.append((rst6.group(1),rst6.group(3),rst6.group(2),0, 0))              #time, sed/rcv, pid, ms0, 0
        elif not re.match(r"^ +?",l):
            fi.append((0,l,'','',''))
    return iter(fi)
                  


class Perf_trace_analysis(Analysis):
    def __init__(self,sar_file):
        Analysis.__init__(self,sar_file)
        self.starttime=None
        
    def load_data(self):
        sardata=sc.textFile(self.file)
        sardf=sardata.mapPartitions(split_trace).toDF()
        display(sardf.where("_1=0").limit(5).collect())
        sardf=sardf.withColumn("_1",F.col("_1").astype(DoubleType()))
        sardf=sardf.where("_1>0")
        starttime=sardf.agg(F.min("_1")).collect()[0][0]
        if self.starttime is None:
            self.starttime=(float(starttime))
        else:
            paths=os.path.split(self.file)
            if fs.exists(paths[0]+"/uptime.txt"):
                with fs.open(paths[0]+"/uptime.txt") as f:
                    strf=f.read().decode('ascii')
                    print("input starttime:",self.starttime,"uptime:",float(strf)*1000,"record starttime:",starttime)
                    self.starttime=self.starttime-float(strf)*1000
            else:
                print("uptime.txt isn't found, wrong")
                return
            
        self.df=sardf
        return sardf

    def generate_sched_view_list(self,id=0,**kwargs):
        sardf=self.df
        starttime=self.starttime
        starttime=starttime+kwargs.get("sched_time_offset",0)
        print("offset time",starttime)
        
        swdf=sardf.where("_2='sched_switch'")
        
        cputhreshold=kwargs.get("cpu_threshold",0.1)
        sched_cnt = kwargs.get("sched_cnt",10)
        
        pidstat_tids=kwargs.get("pidstat_tids",None)
        pidstat_tids_txt=kwargs.get("pidstat_tids_txt","sched_threads.txt")
        
        if pidstat_tids:
            if type(pidstat_tids) is list:
                tids=pidstat_tids
            else:
                tids=[re.split(r'\s+',t) for t in pidstat_tids.split("\n")]
                tids=[t[3] for t in tids if len(t)>4]
        else:
            paths=os.path.split(self.file)
            if fs.exists(paths[0]+"/"+pidstat_tids_txt):
                with fs.open(paths[0]+"/"+pidstat_tids_txt) as f:
                    tids=[l.strip() for l in f.read().decode('ascii').split("\n") if len(l)>0] 
            else:
                print("Wrong, no pidstat_tids args and no sched_threads.txt file")
                return []
        tidcnt=swdf.where(F.col("_5").isin(tids)).groupBy("_5").count()
        tidm10=tidcnt.where("count>{:d}".format(sched_cnt)).select("_5").collect()
        rtids=[t[0] for t in tidm10]
        rtiddf=swdf.where(F.col("_5").isin(rtids) | F.col("_3").isin(rtids))
        rtiddf=rtiddf.withColumn("_1",F.col("_1").astype(DoubleType())-starttime)
        rtiddf=rtiddf.withColumn("_3",F.col("_3").astype(IntegerType()))
        rtiddf=rtiddf.withColumn("_5",F.col("_5").astype(IntegerType()))
        rtiddf=rtiddf.withColumn("_1",F.round(F.col("_1"),3))
        rtidcol=rtiddf.collect()
        tidmap={}
        tidtotal={}
        for t in rtids:
            tidmap[int(t)]=0
            tidtotal[int(t)]=0
        trace_events=[]
        mintime=rtidcol[0]["_1"]
        maxtime=0
        for r in rtidcol:
            if r["_3"] in tidtotal:
                tidtotal[r["_3"]]=tidtotal[r["_3"]]+r["_1"]-tidmap[r["_3"]]
                tidmap[r["_3"]]=r["_1"]
                maxtime=r["_1"]
            if r["_5"] in tidmap:
                tidmap[r["_5"]]=r["_1"]
        for r in rtidcol:
            if r["_3"] in tidmap and tidtotal[r["_3"]]/(maxtime-mintime)>cputhreshold:
                trace_events.append({
                    'tid':r["_3"],
                     'ts':tidmap[r["_3"]],
                     'pid':id,
                     'ph':'X',
                     'dur':round(r["_1"]-tidmap[r["_3"]],3),
                     'name':r["_4"]
                })

                tidmap[r["_3"]]=r["_1"]
            if r["_5"] in tidmap:
                tidmap[r["_5"]]=r["_1"]
        return [json.dumps(l) for l in trace_events]

    def generate_nic_view_list(self,id=0,**kwargs):
        sardf=self.df
        starttime=self.starttime
        starttime=starttime+kwargs.get("sched_time_offset",0)
        print("offset time",starttime)
        
        nicdf=sardf.where("_2<>'sched_switch'")
        cntdf=nicdf.where("_2='continued'")
        cntdf=cntdf.select("_1","_3","_4").withColumnRenamed("_4","cnt_4")
        nicdf=nicdf.join(cntdf,on=["_1","_3"],how="leftouter")
        nicdf=nicdf.where("_2<>'continued'")
        nicdf=nicdf.select(F.col("_1"),F.col("_2"),F.col("_3"),F.when(F.col("cnt_4").isNull(), F.col("_4")).otherwise(F.col("cnt_4")).alias("_4"),F.col("_5"))
        nicdf=nicdf.withColumn("_1",F.col("_1").astype(DoubleType())-starttime)
        nicdf=nicdf.withColumn("_3",F.col("_3").astype(IntegerType()))
        nicdf=nicdf.withColumn("_5",F.col("_5").astype(IntegerType()))
        nicdf=nicdf.withColumn("_1",F.col("_1").astype(IntegerType()))
        nicdf=nicdf.withColumn("_4",F.col("_4").astype(DoubleType()))
        nicdf=nicdf.withColumn("_4",F.col("_4").astype(LongType()))
        return nicdf.select(
                F.col("_3").alias('tid'),
                (F.col("_1")).alias('ts'),
                F.lit(0).alias('pid'),
                F.lit('X').alias('ph'),
                F.col("_4").alias('dur'),
                F.col("_2").alias('name'),
                F.struct(
                    F.col("_5").alias("size")
                ).alias('args')
            ).toJSON().collect()
    
    def generate_trace_view_list(self,id=0,**kwargs):
        trace_events=Analysis.generate_trace_view_list(self,id,**kwargs)
        sardf=self.df
        starttime=self.starttime
        
        events=self.generate_sched_view_list(id,**kwargs)
        events.extend(self.generate_nic_view_list(id,**kwargs))
        events.extend(trace_events)
        
#        events.extend(nicdf.where("_5>1000 and _2='sendto'").select(
#                 F.lit(0).alias('tid'),
#                F.col("_1").alias('ts'),
#                F.lit(0).alias('pid'),
#                F.lit('i').alias('ph'),
#                F.col("_2").alias('name'),
#                F.lit("g").alias("s")
#            ).toJSON().collect())


        return events
                      

# Sar analysis

In [None]:
def splits(x):
    fi=[]
    for l in x:
        li=re.split(r'\s+',l)
        for j in range(len(li),118):
            li.append('')
        fi.append(li)
    return iter(fi)

class Sar_analysis(Analysis):
    def __init__(self,sar_file):
        Analysis.__init__(self,sar_file)
    
    def load_data(self):
        sardata=sc.textFile(self.file)
        sardf=sardata.mapPartitions(splits).toDF()
        sardf=sardf.where("_1<>'Average:'")
        
        colstart=1;
        ampm=sardf.where("_2='AM' or _2='PM'").count()
        if ampm==0:
            for i in range(len(sardf.columns),1,-1):
                sardf=sardf.withColumnRenamed(f'_{i}',f'_{i+1}')
            self.timeformat='yyyy-MM-dd HH:mm:ss '
            sardf=sardf.withColumn('_2',F.lit(''))
            #print('no PM/AM')
            colstart=1
        else:
            self.timeformat='yyyy-MM-dd hh:mm:ss a'
            colstart=2
            #print('with PM/AM')
        
        f=fs.open(self.file)
        t=f.readline()
        t=f.readline()
        while len(t)==1:
            t=f.readline()
        cols=t.decode('ascii')
        li=re.split(r'\s+',cols)
        ci=3;
        for c in li[colstart:]:
            sardf=sardf.withColumnRenamed(f"_{ci}",c)
            ci=ci+1
            
        sardf=sardf.where(F.col(li[-2])!=li[-2]).where(F.col("_1")!=F.lit("Linux"))        
        
        sardf.cache()
        self.df=sardf
        
        self.sarversion=""
        paths=os.path.split(self.file)
        if fs.exists(paths[0]+"/sarv.txt"):
            with fs.open(paths[0]+"/sarv.txt") as f:
                allcnt = f.read().decode('ascii')
                #print(allcnt)
                self.sarversion=allcnt.split("\n")[0].split(" ")[2]
        
        return sardf

    def col_df(self,cond,colname,args,slaver_id=0, thread_id=0):
        sardf=self.df
        starttime=self.starttime
        cpudf=sardf.where(cond)
        #cpudf.select(F.date_format(F.from_unixtime(F.lit(starttime/1000)), 'yyyy-MM-dd HH:mm:ss').alias('starttime'),'_1').show(1)

        cpudf=cpudf.withColumn('time',F.unix_timestamp(F.concat_ws(' ',F.date_format(F.from_unixtime(F.lit(starttime/1000)), 'yyyy-MM-dd'),F.col('_1'),F.col('_2')),self.timeformat))

        cols=cpudf.columns
                
        cpudf=cpudf.groupBy('time').agg(
            F.sum(F.when(F.col(cols[1]).rlike('^\d+(\.\d+)*$'),F.col(cols[1]).astype(FloatType())).otherwise(0)).alias(cols[1]),
            F.sum(F.when(F.col(cols[2]).rlike('^\d+(\.\d+)*$'),F.col(cols[2]).astype(FloatType())).otherwise(0)).alias(cols[2]),
            *[F.sum(F.col(c)).alias(c) for c in cols[3:] if not c.startswith("_") and c!="" and c!="time"]
        )
        
        traces=cpudf.orderBy(F.col("time")).select(
                F.lit(thread_id).alias('tid'),
                (F.expr("time*1000")-F.lit(self.starttime)).astype(IntegerType()).alias('ts'),
                F.lit(slaver_id).alias('pid'),
                F.lit('C').alias('ph'),
                F.lit(colname).alias('name'),
                args(cpudf).alias('args')
            ).toJSON().collect()
        return traces

    def generate_trace_view_list(self,id,**kwargs):
        trace_events=Analysis.generate_trace_view_list(self,id, **kwargs)
        return trace_events

    def get_stat(self,**kwargs):
        if self.df is None:
            self.load_data()
            
class Sar_cpu_analysis(Sar_analysis):
    def __init__(self,sar_file):
        Sar_analysis.__init__(self,sar_file)
    
    def generate_trace_view_list(self,id,**kwargs):
        trace_events=Sar_analysis.generate_trace_view_list(self,id, **kwargs)
        
        self.df=self.df.withColumn("%iowait",F.when(F.col("%iowait")>100,F.lit(100)).otherwise(F.col("%iowait")))
        
        trace_events.extend(self.col_df("CPU='all'",             "all cpu%",    lambda l: F.struct(
                                                                                                    F.floor(F.col('%user').astype(FloatType())).alias('user'),
                                                                                                    F.floor(F.col('%system').astype(FloatType())).alias('system'),
                                                                                                    F.floor(F.col('%iowait').astype(FloatType())).alias('iowait')
                                                                                                    ),                            id, 0))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":0,"args":{"sort_index ":0}}))
        
        return trace_events    
    def get_stat(sar_cpu,**kwargs):
        Sar_analysis.get_stat(sar_cpu)
        
        cpuutil=sar_cpu.df.where("CPU='all'").groupBy("_1").agg(*[F.mean(F.col(l).astype(FloatType())).alias(l) for l in ["%user","%system","%iowait"]]).orderBy("_1")
        cnt=cpuutil.count()
        user_morethan_90=cpuutil.where("`%user`>0.9").count()
        kernel_morethan_10=cpuutil.where("`%system`>0.1").count()
        iowait_morethan_10=cpuutil.where("`%iowait`>0.1").count()
        out=[['%user>90%',user_morethan_90/cnt],['%kernel>10%',kernel_morethan_10/cnt],["%iowait>10%",iowait_morethan_10/cnt]]
        avgutil=cpuutil.agg(*[F.mean(l).alias(l) for l in ["%user","%system","%iowait"]]).collect()
        out.extend([["avg " + l,avgutil[0][l]] for l in ["%user","%system","%iowait"]])
        pdout=pandas.DataFrame(out).set_index(0)
        pdout.columns=[sar_cpu.file.split("/")[-2]]
        return pdout
    
class Sar_mem_analysis(Sar_analysis):
    def __init__(self,sar_file):
        Sar_analysis.__init__(self,sar_file)
    
    def load_data(self):
        Sar_analysis.load_data(self)
        sarv=[int(l) for l in self.sarversion.split(".")]
        if sarv[0]>=12 and sarv[1]>=2:
            self.df=self.df.withColumn("kbrealused",F.col("kbmemused"))
        else:
            # sar 10.1.5, sar 11.6.1
            self.df=self.df.withColumn("kbrealused",F.col("kbmemused")-F.col("kbcached")-F.col("kbbuffers"))
    
    def generate_trace_view_list(self,id,**kwargs):
        trace_events=Sar_analysis.generate_trace_view_list(self,id, **kwargs)
        
        
        trace_events.extend(self.col_df(F.col('kbmemfree').rlike('^\d+$'),"mem % ",      lambda l: F.struct(F.floor(l['kbcached']*l['%memused']/l['kbmemused']).alias('cached'),  # kbcached / (kbmemfree+kbmemused)
                                                                                                       F.floor(l['kbbuffers']*l['%memused']/l['kbmemused']).alias('buffered'),# kbbuffers / (kbmemfree+kbmemused)
                                                                                                       F.floor(l['kbrealused']*l['%memused']/l['kbmemused']).alias('used')), # (%memused- kbcached-kbbuffers )/  (kbmemfree+kbmemused)
                                          id,1))
        #trace_events.extend(self.col_df(self.df._3.rlike('^\d+$'),"mem cmt % ",  lambda l: F.struct(F.floor(l._8*F.lit(100)/(l._3+l._4)).alias('commit/phy'),
        #                                                                                                   F.floor(l._10-l._8*F.lit(100)/(l._3+l._4)).alias('commit/all')),                                                             id))
        trace_events.extend(self.col_df(F.col('kbmemfree').rlike('^\d+$'),"pagecache % ",      lambda l: F.struct(F.floor((l['kbcached']-l['kbdirty'])*l['%memused']/l['kbmemused']).alias('clean'), 
                                                                                                       F.floor(l['kbdirty']*l['%memused']/l['kbmemused']).alias('dirty')),
                                          id,2))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":1,"args":{"sort_index ":1}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":2,"args":{"sort_index ":2}}))
        return trace_events    
    def get_stat(sar_mem,**kwargs):
        Sar_analysis.get_stat(sar_mem)
        
        memutil=sar_mem.df.where(F.col('kbmemfree').rlike('^\d+$')).select(F.floor(F.col('kbcached').astype(FloatType())*F.lit(100)*F.col('%memused')/F.col('kbmemused')).alias('cached'),  
                                                                                   F.floor(F.col('kbbuffers').astype(FloatType())*F.lit(100)*F.col('%memused')/F.col('kbmemused')).alias('buffered'),
                                                                                   F.floor(F.col('kbrealused').astype(FloatType())*F.lit(100)*F.col('%memused')/F.col('kbmemused')).alias('used'),
                                                                                   F.floor(F.col('kbdirty').astype(FloatType())*F.lit(100)*F.col('%memused')/F.col('kbmemused')).alias('dirty'))
        memsum=memutil.summary().toPandas()
        memsum=memsum.set_index("summary")
        out=[
            [[l + ' mean',float(memsum[l]["mean"])],
            [l + ' 75%',float(memsum[l]["75%"])],
            [l + ' max',float(memsum[l]["max"])]] for l in ["cached","used","dirty"]]
        out=[*out[0],*out[1]]
        pdout=pandas.DataFrame(out).set_index(0)
        pdout.columns=[sar_mem.file.split("/")[-2]]
        return pdout
    
class Sar_PageCache_analysis(Sar_analysis):
    def __init__(self,sar_file):
        Sar_analysis.__init__(self,sar_file)
    
    def load_data(self):
        Sar_analysis.load_data(self)
    
    def generate_trace_view_list(self,id,**kwargs):
        trace_events=Sar_analysis.generate_trace_view_list(self,id, **kwargs)
        
        
        trace_events.extend(self.col_df(F.col('pgpgin/s').rlike('^\d'),"page inout",      lambda l: F.struct(
                                                                                                       F.floor(l['pgpgin/s']/1024).alias('in'),
                                                                                                       F.floor(l['pgpgout/s']/1024).alias('out')),
                                          id,11))
        trace_events.extend(self.col_df(F.col('pgpgin/s').rlike('^\d'),"faults",      lambda l: F.struct(F.floor((l['majflt/s'])).alias('major'), 
                                                                                                       F.floor(l['fault/s']-l['majflt/s']).alias('minor')),
                                          id,12))
        trace_events.extend(self.col_df(F.col('pgpgin/s').rlike('^\d'),"page free",      lambda l: F.struct(F.floor((l['pgfree/s']*4/1024)).alias('free')),
                                          id,13))
        trace_events.extend(self.col_df(F.col('pgpgin/s').rlike('^\d'),"scan",      lambda l: F.struct(F.floor((l['pgscank/s'])*4/1024).alias('kernel'), 
                                                                                                       F.floor(l['pgscand/s']*4/1024).alias('app')),
                                          id,14))
        trace_events.extend(self.col_df(F.col('pgpgin/s').rlike('^\d'),"vmeff",      lambda l: F.struct(F.floor((l['%vmeff'])).alias('steal')),
                                          id,15))
        
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":11,"args":{"sort_index ":11}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":12,"args":{"sort_index ":12}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":13,"args":{"sort_index ":13}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":14,"args":{"sort_index ":14}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":15,"args":{"sort_index ":15}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":16,"args":{"sort_index ":16}}))
        return trace_events    
    def get_stat(sar_mem,**kwargs):
        Sar_analysis.get_stat(sar_mem)
        
        memutil=sar_mem.df.where(F.col('pgpgin/s').rlike('^\d')).select(F.floor(F.col('pgpgin/s').astype(FloatType())/1024).alias('pgin'),  
                                                                                   F.floor(F.col('pgpgout/s').astype(FloatType())/1024).alias('pgout'),
                                                                                   F.floor(F.col('fault/s').astype(FloatType())-F.col('majflt/s').astype(FloatType())).alias('fault')
                                                                                   )
        memsum=memutil.summary().toPandas()
        memsum=memsum.set_index("summary")
        out=[
            [[l + ' mean',float(memsum[l]["mean"])],
            [l + ' 75%',float(memsum[l]["75%"])],
            [l + ' max',float(memsum[l]["max"])]] for l in ["pgin","pgout","fault"]]
        out=[*out[0],*out[1],*out[2]]
        pdout=pandas.DataFrame(out).set_index(0)
        pdout.columns=[sar_mem.file.split("/")[-2]]
        return pdout
    
    
class Sar_disk_analysis(Sar_analysis):
    def __init__(self,sar_file):
        Sar_analysis.__init__(self,sar_file)
        
    def load_data(self):
        Sar_analysis.load_data(self)
        
        self.df=self.df.withColumn("%util",F.col("%util").astype(IntegerType()))
        used_disk=self.df.groupBy("DEV").agg(F.max(F.col("%util")).alias("max_util"),F.mean("%util").alias("avg_util")).where(F.col("max_util")>10).collect()
        self.df=self.df.where(F.col("DEV").isin([l['DEV'] for l in used_disk]))
        #print("used disks with its max util% and avg util% are: ")
        #display([(l['DEV'],l["max_util"],l["avg_util"]) for l in used_disk])
        
        if "rd_sec/s" in self.df.columns:
            self.df=self.df.withColumn("rkB/s",F.expr('cast(`rd_sec/s` as float)*512/1024'))
        if "wr_sec/s" in self.df.columns:
            self.df=self.df.withColumn("wkB/s",F.expr('cast(`wr_sec/s` as float)*512/1024'))
        
        if "areq-sz" in self.df.columns:
            self.df=self.df.withColumnRenamed("areq-sz","avgrq-sz")
        if "aqu-sz" in self.df.columns:
            self.df=self.df.withColumnRenamed("aqu-sz","avgqu-sz")
            
        if "rkB/s" in self.df.columns:
            self.df=self.df.withColumn("rkB/s",F.expr('cast(`rkB/s` as float)/1024'))
        if "wkB/s" in self.df.columns:
            self.df=self.df.withColumn("wkB/s",F.expr('cast(`wkB/s` as float)/1024'))

    def generate_trace_view_list(self,id,**kwargs):
        trace_events=Sar_analysis.generate_trace_view_list(self,id, **kwargs)

        disk_prefix=kwargs.get('disk_prefix',"")
        
        if type(disk_prefix)==str:
            diskfilter = "DEV like '"+disk_prefix+"%'"
        elif type(disk_prefix)==list:
            diskfilter = "DEV in ("+",".join(disk_prefix)+")"
        else:
            diskfilter = "DEV like '%'"

        print(diskfilter)
        devcnt=self.df.where(diskfilter).select("DEV").distinct().count()
        
        trace_events.extend(self.col_df(diskfilter,      "disk b/w",       lambda l: F.struct(
                                                                                                            F.floor(F.col("rKB/s")).alias('read'),
                                                                                                            F.floor(F.col("wKB/s")).alias('write')),id, 3))
        trace_events.extend(self.col_df(diskfilter,      "disk%",       lambda l: F.struct(
                                                                                                            (F.col("%util")/F.lit(devcnt)).alias('%util')),id, 4))
        trace_events.extend(self.col_df(diskfilter,      "req size",       lambda l: F.struct(
                                                                                                            (F.col("avgrq-sz")/F.lit(devcnt)).alias('avgrq-sz')),id, 5))
        trace_events.extend(self.col_df(diskfilter,      "queue size",       lambda l: F.struct(
                                                                                                            (F.col("avgqu-sz")/F.lit(512*devcnt/1024)).alias('avgqu-sz')),id, 6))
        trace_events.extend(self.col_df(diskfilter,      "await",       lambda l: F.struct(
                                                                                                            (F.col("await")/F.lit(devcnt)).alias('await')),id,7))
        
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":3,"args":{"sort_index ":3}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":4,"args":{"sort_index ":4}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":5,"args":{"sort_index ":5}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":6,"args":{"sort_index ":6}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":7,"args":{"sort_index ":7}}))
        return trace_events    

    def get_stat(sar_disk,**kwargs):
        Sar_analysis.get_stat(sar_disk)
        disk_prefix=kwargs.get('disk_prefix',"")
        
        if type(disk_prefix)==str:
            diskfilter = "DEV like '"+disk_prefix+"%'"
        elif type(disk_prefix)==list:
            diskfilter = "DEV in ("+",".join(disk_prefix)+")"
        else:
            diskfilter = "DEV like '%'"

        diskutil=sar_disk.df.where(diskfilter).groupBy("_1").agg(F.mean(F.col("%util").astype(FloatType())).alias("%util")).orderBy("_1")
        totalcnt=diskutil.count()
        time_morethan_90=diskutil.where(F.col("%util")>90).count()/totalcnt
        avgutil=diskutil.agg(F.mean("%util")).collect()
        out=[["avg disk util",avgutil[0]["avg(%util)"]],
            ["time more than 90%", time_morethan_90]]
        diskbw=sar_disk.df.where(diskfilter).groupBy("_1").agg(F.sum(F.col("rKB/s")).alias("rd_bw"),F.sum(F.col("wKB/s")).alias("wr_bw"))
        bw=diskbw.agg(F.sum("rd_bw").alias("total read"),F.sum("wr_bw").alias("total write"),F.mean("rd_bw").alias("read bw"),F.mean("wr_bw").alias("write bw"),F.max("rd_bw").alias("max read"),F.max("wr_bw").alias("max write")).collect()
        maxread=bw[0]["max read"]
        maxwrite=bw[0]["max write"]
        rdstat, wrstat = diskbw.stat.approxQuantile(['rd_bw','wr_bw'],[0.75,0.95,0.99],0.0)
        time_rd_morethan_95 = diskbw.where(F.col("rd_bw")>rdstat[1]).count()/totalcnt
        time_wr_morethan_95 = diskbw.where(F.col("wr_bw")>rdstat[1]).count()/totalcnt
        out.append(['total read (G)' , bw[0]["total read"]/1024])
        out.append(['total write (G)', bw[0]["total write"]/1024])
        out.append(['avg read bw (MB/s)', bw[0]["read bw"]])
        out.append(['avg write bw (MB/s)', bw[0]["write bw"]])
        out.append(['read bw %75', rdstat[0]])
        out.append(['read bw %95', rdstat[1]])
        out.append(['read bw max', rdstat[2]])
        out.append(['time_rd_morethan_95', time_rd_morethan_95])
        out.append(['write bw %75', wrstat[0]])
        out.append(['write bw %95', wrstat[1]])
        out.append(['write bw max', wrstat[2]])
        out.append(['time_wr_morethan_95', time_wr_morethan_95])
        pdout=pandas.DataFrame(out).set_index(0)
        pdout.columns=[sar_disk.file.split("/")[-2]]
        return pdout
    
class Sar_nic_analysis(Sar_analysis):
    def __init__(self,sar_file):
        Sar_analysis.__init__(self,sar_file)
    
    def generate_trace_view_list(self,id,**kwargs):
        trace_events=Sar_analysis.generate_trace_view_list(self,id, **kwargs)
        
        nicfilter=""
        if 'nic_prefix' in kwargs.keys():
            nicfilter= "IFACE in (" + ",".join(kwargs.get('nic_prefix',["'eth3'","'enp24s0f1'"])) + ")"
        else:
            nicfilter= "IFACE != 'lo'"
        
        trace_events.extend(self.col_df(nicfilter,       "eth ",        lambda l: F.struct(F.floor(F.expr('cast(`rxkB/s` as float)/1024')).alias('rxmb/s'),F.floor(F.expr('cast(`txkB/s` as float)/1024')).alias('txmb/s')),                id, 8))
        trace_events.extend(self.col_df("_3 like 'ib%'",        "ib ",        lambda l: F.struct(F.floor(F.expr('cast(`rxkB/s` as float)/1024')).alias('rxmb/s'),F.floor(F.expr('cast(`txkB/s` as float)/1024')).alias('txmb/s')),                id, 9))
        trace_events.extend(self.col_df("_3 = 'lo'",            "lo ",         lambda l: F.struct(F.floor(F.expr('cast (`rxkB/s` as float)/1024')).alias('rxmb/s'),F.floor(F.expr('cast (`txkB/s` as float)/1024')).alias('txmb/s')),              id, 10))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":8,"args":{"sort_index ":8}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":9,"args":{"sort_index ":9}}))
        trace_events.append(json.dumps({"name": "thread_sort_index","ph": "M","pid":id,"tid":10,"args":{"sort_index ":10}}))
        return trace_events  
    
    def get_stat(sar_nic,**kwargs):
        Sar_analysis.get_stat(sar_nic)
        nicfilter=""
        
        if 'nic_prefix' in kwargs.keys():
            nicfilter= "IFACE in (" + ",".join(kwargs.get('nic_prefix',["'eth3'","'enp24s0f1'"])) + ")"
        else:
            nicfilter= "IFACE != 'lo'"
            
        nicbw=sar_nic.df.where(nicfilter).groupBy("_1").agg(F.sum(F.col("rxkB/s").astype(FloatType())/1024).alias("rx MB/s")).orderBy("_1")
        if nicbw.count()==0:
            out=[["rx MB/s 75%",0],["rx MB/s 95%",0],["rx MB/s 99%",0]]
        else:
            out=nicbw.stat.approxQuantile(['rx MB/s'],[0.75,0.95,0.99],0.0)[0]
            out=[["rx MB/s 75%",out[0]],["rx MB/s 95%",out[1]],["rx MB/s 99%",out[2]]]
        pdout=pandas.DataFrame(out).set_index(0)
        pdout.columns=[sar_nic.file.split("/")[-2]]
        return pdout

# PID State analysis

In [None]:
class Pidstat_analysis(Analysis):
    def __init__(self,sar_file):
        Analysis.__init__(self,sar_file)
    
    def load_data(self):
        sardata=sc.textFile(self.file)
        sardf=sardata.mapPartitions(splits).toDF()
        sardf=sardf.where("_1<>'Average:'")
        
        headers=sardf.where("_4='TID' or _5='TID'").limit(1).collect()
        r=headers[0].asDict()
        findtime=False
        for i,v in r.items():
            if(v=="Time"):
                findtime=True
        if not findtime:
            r["_1"]="Time"
        for i,v in r.items():
            if(v!=""):
                sardf=sardf.withColumnRenamed(i,v)
        sardf=sardf.where("TGID='0' or TGID='-'") 

        self.df=sardf
        return sardf


    def generate_trace_view_list(self,id,**kwargs):
        trace_list=Analysis.generate_trace_view_list(self,id,**kwargs)
        sardf=self.df
        starttime=self.starttime
        
        sardf=sardf.withColumn("%CPU",F.col("%CPU").astype(FloatType()))
        sardf=sardf.withColumn("Time",F.col("Time").astype(LongType()))
        sardf=sardf.withColumn("TID",F.col("TID").astype(LongType()))
        hotthreads=sardf.where("`%CPU`>30").groupBy("TID").count().collect()
        hts=[(r[0],r[1]) for r in hotthreads]
        htc=[r[1] for r in hotthreads]
        if len(htc)==0:
            return trace_list
        maxcnt=max(htc)
        hts=[r[0] for r in hts if r[1]>maxcnt/2]
        tdfs=list(map(lambda x: sardf.withColumnRenamed("TID","TID_"+str(x)).withColumnRenamed("%CPU","CPU_"+str(x)).where(F.col("TID")==x).select("Time","TID_"+str(x),"CPU_"+str(x)),hts))
        finaldf=reduce(lambda x,y: x.join(y,on=["Time"]),tdfs)
        othersdf=sardf.where("TID not in ("+",".join(map(lambda x: str(x),hts))+")").groupBy("Time").agg(F.sum("%CPU").alias("CPU_Other"))
        finaldf=finaldf.join(othersdf,on=["Time"])
        finaldf=finaldf.orderBy("Time")
        hts.append("Other")
        stt=[F.col("CPU_"+str(x)).alias(str(x)) for x in hts]
        args=F.struct(*stt)
        
        trace_list.extend(finaldf.select(
                F.lit(6).alias('tid'),
                (F.expr("Time*1000")-F.lit(starttime)).astype(IntegerType()).alias('ts'),
                F.lit(id).alias('pid'),
                F.lit('C').alias('ph'),
                F.lit("pidstat").alias('name'),
                args.alias('args')
            ).toJSON().collect())
        return trace_list
        

# Perf stat Analysis

In [None]:
class Perfstat_analysis(Analysis):
    def __init__(self,sar_file):
        Analysis.__init__(self,sar_file)
    
    def load_data(self):
        sardata=sc.textFile(self.file)
        sardf=sardata.mapPartitions(splits).toDF()
        
        paths=os.path.split(self.file)
        if fs.exists(paths[0]+"/perfstarttime"):
            with fs.open(paths[0]+"/perfstarttime") as f:
                strf=f.read().decode('ascii')
        else:
            print("error, perfstarttime not found")
            return
        
        tsc_freq_file = os.path.join(paths[0], 'tsc_freq')
        if fs.exists(tsc_freq_file):
            self.tsc_freq = int(spark.read.text(tsc_freq_file).collect()[0][0])
        else:
            print(f'{tsc_freq_file} not exists')
            return
        
        totalcores_file = os.path.join(paths[0], 'totalcores')
        if fs.exists(totalcores_file):
            self.totalcores = int(spark.read.text(totalcores_file).collect()[0][0])
        else:
            print(f'{totalcores_file} not exists')
            return
        
        strf=strf[len("# started on "):].strip()
        starttime=datetime.strptime(strf, "%a %b %d %H:%M:%S %Y").timestamp()*1000
        sardf=sardf.where("_1<>'#'")
        sardf=sardf.withColumn("ts",F.col("_2").astype(DoubleType())*1000+F.lit(starttime)).where("ts is not null").select("ts","_3","_4")
        sardf=sardf.withColumn('_3', F.regexp_replace('_3', ',', '').astype(LongType()))
        sardf=sardf.cache()
        self.df=sardf
        return sardf


    def generate_trace_view_list(self,id,**kwargs):
        trace_list=Analysis.generate_trace_view_list(self,id,**kwargs)
        sardf=self.df
        starttime=self.starttime
        
        stringIndexer = StringIndexer(inputCol="_4", outputCol="syscall_idx")
        model = stringIndexer.fit(sardf)
        sardf=model.transform(sardf)
        
#        cnts=sardf.select("_4").distinct().collect()
#        cnts=[l['_4'] for l in cnts]
#        cntmap={ cnts[i]:i  for i in range(0, len(cnts) ) }
#        mapexpr=F.create_map([F.lit(x) for x in chain(*cntmap.items())])
#        sardf.select(mapexpr.getItem(F.col("_4")))
        
        sardf=sardf.withColumn("syscall_idx",F.col("syscall_idx").astype(IntegerType()))
        
        trace_list.extend(sardf.select(
            (F.lit(100)+F.col("syscall_idx")).alias('tid'),
            (F.col("ts")-F.lit(starttime)).astype(LongType()).alias('ts'),
            F.lit(id).alias('pid'),
            F.lit('C').alias('ph'),
            F.col("_4").alias('name'),
            F.struct(F.col("_3").alias("cnt")).alias('args')
        ).toJSON().collect())
        return trace_list
    
    def get_stat(self, **kwargs):
        if self.df is None:
            self.load_data()

        raw_data = spark.read.text(self.file)

        # Filter out non-data lines and split the data into columns
        filtered_data = raw_data.filter(
            ~raw_data.value.startswith('#') & raw_data.value.rlike(r"^\s*\d")
        )

        split_data = filtered_data.rdd.map(lambda row: row[0].split()).map(
            lambda parts: (float(parts[0]), int(parts[1].replace(",", "")), parts[2], '' if len(parts) == 3 else parts[4])
        )

        schema = ["time", "counts", "events", "ipc"]
        df = split_data.toDF(schema)

        events_df = df.filter(col('ipc') == '')
        ipc_df = df.filter(col('ipc') != '')

        instructions = ipc_df.select(_sum(col("counts"))).collect()[0][0] / 1e9
        avg_ipc = ipc_df.select(avg(col("ipc"))).collect()[0][0]

        df_ccu_ref_tsc = events_df.select(col('time'), col('counts')).filter(col('events') == 'cpu_clk_unhalted.ref_tsc').withColumnRenamed('counts', 'cpu_clk_unhalted_ref_tsc')
        df_ccu_thread = events_df.select(col('time'), col('counts')).filter(col('events') == 'cpu_clk_unhalted.thread').withColumnRenamed('counts', 'cpu_clk_unhalted_thread')

        window_spec = Window.orderBy("time")
        df_ccu_ref_tsc = df_ccu_ref_tsc.withColumn("prev_time", lag("time").over(window_spec))
        df_ccu_ref_tsc = df_ccu_ref_tsc.withColumn("prev_time", when(col("prev_time").isNull(), 0).otherwise(col("prev_time")))
        df_ccu_ref_tsc = df_ccu_ref_tsc.withColumn("tsc", (col("time") - col("prev_time")) * self.tsc_freq)

        joined_df = df_ccu_ref_tsc.join(df_ccu_thread, on=["time"], how="inner")
        cpu_freq_df = joined_df.withColumn("freq", joined_df.cpu_clk_unhalted_thread / joined_df.cpu_clk_unhalted_ref_tsc * self.tsc_freq / 1e9)
        cpu_freq = cpu_freq_df.select(avg(col('freq'))).collect()[0][0]

        cpu_util_df = df_ccu_ref_tsc.withColumn("cpu%", col("cpu_clk_unhalted_ref_tsc") / col("tsc") / self.totalcores * 100)
        cpu_util = cpu_util_df.select(avg(col('cpu%'))).collect()[0][0]

        out = [['ipc', avg_ipc], ['instructions', instructions], ['cpu_freq', cpu_freq], ['cpu%', cpu_util]]
        pdout=pandas.DataFrame(out).set_index(0)
        
        return pdout

# GPU analysis

In [None]:
class gpu_analysis(Analysis):
    def __init__(self,gpu_file):
        Analysis.__init__(self,gpu_file)
        
    def load_data(self):
        df_pf=spark.read.format("com.databricks.spark.csv").option("header","true").option("mode", "DROPMALFORMED").option("delimiter", ",").load(self.file)
        df_pf2=df_pf.withColumn('timestamp',F.unix_timestamp(F.col('timestamp'),'yyyy/MM/dd HH:mm:ss')*1000+(F.split(F.col('timestamp'),'\.')[1]).astype(IntegerType()))
        df_pf2=df_pf2.withColumnRenamed(' utilization.gpu [%]','gpu_util')
        df_pf2=df_pf2.withColumnRenamed(' utilization.memory [%]','mem_util')
        df_pf2=df_pf2.withColumnRenamed(' memory.used [MiB]','mem_used')
        df_pf2=df_pf2.withColumnRenamed(' index','index')
        df_pf2=df_pf2.withColumn('gpu_util', (F.split('gpu_util',' ')[1]).astype(IntegerType()))
        df_pf2=df_pf2.withColumn('mem_util', (F.split('mem_util',' ')[1]).astype(IntegerType()))
        df_pf2=df_pf2.withColumn('mem_used', (F.split('mem_used',' ')[1]).astype(IntegerType()))
        df_pf.cache()
        self.df=df_pf2
        return df_pf2

    def generate_trace_view_list(self,id,**kwargs):
        Analysis.generate_trace_view_list(self,id)
            
        df_pf2=self.df
        starttime=self.starttime
        trace_events=[]
        
        trace_events.extend(df_pf2.orderBy(df_pf2['timestamp']).select(
            F.col('index').alias('tid'),
            (F.expr("timestamp")-F.lit(starttime)).astype(IntegerType()).alias('ts'),
            F.lit(id).alias('pid'),
            F.lit('C').alias('ph'),
            F.concat(F.lit('gpu_util_'),F.col('index')).alias('name'),
            F.struct(F.col('gpu_util').alias('gpu')).alias('args')
        ).toJSON().collect())

        trace_events.extend(df_pf2.orderBy(df_pf2['timestamp']).select(
            F.col('index').alias('tid'),
            (F.expr("timestamp")-F.lit(starttime)).astype(IntegerType()).alias('ts'),
            F.lit(int(id)+1).alias('pid'),
            F.lit('C').alias('ph'),
            F.concat(F.lit('mem_util_'),F.col('index')).alias('name'),
            F.struct((F.col('mem_used')/F.lit(32768)).alias('mem')).alias('args')
        ).toJSON().collect())

        return trace_events

In [None]:
def splits_dmon(x):
    fi=[]
    for l in x:
        l=l.strip()
        if l.startswith('20'):
            li=re.split(r'\s+',l)
            if len(li)==11:
                fi.append(li)
    return iter(fi)

class gpu_dmon_analysis(Analysis):
    def __init__(self,gpu_file):
        Analysis.__init__(self,gpu_file)
        
    def load_data(self):
        df_pf=sc.textFile(self.file)
        df_pf=df_pf.mapPartitions(splits_dmon).toDF()
        
        df_pf2=df_pf.withColumn('_1',F.unix_timestamp(F.concat_ws(' ',F.col('_1'),F.col('_2')),'yyyyMMdd HH:mm:ss')*1000)
        for c in range(3,12):
            df_pf2=df_pf2.withColumn(f'_{c}',F.col(f'_{c}').astype(IntegerType()))

        df_pf.cache()
        self.df=df_pf2
        return df_pf2

    def generate_trace_view_list(self,id,**kwargs):
        Analysis.generate_trace_view_list(self,id)

        df_pf2=self.df
        starttime=self.starttime
        trace_events=[]
        
        trace_events.extend(df_pf2.orderBy(df_pf2['_1']).select(
            F.col('_3').alias('tid'),
            (F.expr("_1")-F.lit(starttime)).astype(IntegerType()).alias('ts'),
            F.lit(id).alias('pid'),
            F.lit('C').alias('ph'),
            F.concat(F.lit('gpu_util_'),F.col('_3')).alias('name'),
            F.struct(F.col('_4').alias('gpu')).alias('args')
        ).toJSON().collect())

        trace_events.extend(df_pf2.orderBy(df_pf2['_1']).select(
            F.col('_3').alias('tid'),
            (F.expr("_1")-F.lit(starttime)).astype(IntegerType()).alias('ts'),
            F.lit(id+1).alias('pid'),
            F.lit('C').alias('ph'),
            F.concat(F.lit('mem_util_'),F.col('_3')).alias('name'),
            F.struct(F.col('_5').alias('mem')).alias('args')
        ).toJSON().collect())

        trace_events.extend(df_pf2.orderBy(df_pf2['_1']).select(
            F.col('_3').alias('tid'),
            (F.expr("_1")-F.lit(starttime)).astype(IntegerType()).alias('ts'),
            F.lit(id+2).alias('pid'),
            F.lit('C').alias('ph'),
            F.concat(F.lit('gpu_freq_'),F.col('_3')).alias('name'),
            F.struct(F.col('_9').alias('gpu_freq')).alias('args')
        ).toJSON().collect())

        trace_events.extend(df_pf2.orderBy(df_pf2['_1']).select(
            F.col('_3').alias('tid'),
            (F.expr("_1")-F.lit(starttime)).astype(IntegerType()).alias('ts'),
            F.lit(id+3).alias('pid'),
            F.lit('C').alias('ph'),
            F.concat(F.lit('pcie_'),F.col('_3')).alias('name'),
            F.struct(F.col('_10').alias('tx'),F.col('_11').alias('rx')).alias('args')
        ).toJSON().collect())

        return trace_events


# DASK analysis

In [None]:
def split_dask(x):
    fi=[]
    for l in x:
        print(l)
        li=[]
        if l.startswith('('):
            lx=re.split(r'[()]',l)
            lv=lx[1]
            p=re.search(r"'(.*)-([0-9a-f]+)', *(\d+)",lv)
            if not p:
                print("dask log first field doesn't match (.*)-[0-9a-f]+', *(\d+)")
                return
            li.append(p.group(1))
            li.extend(lx[2].split(",")[1:])
            li.append(p.group(3))
        else:
            li=l.split(',')
            p=re.search(r"(.*)-([0-9a-f]+-[0-9a-f]+-[0-9a-f]+-[0-9a-f]+-[0-9a-f]+)$",li[0])
            if not p:
                p=re.search(r"(.*)-([0-9a-f]+)$",li[0])
            
            li[0]=p.group(1)
            li.append(p.group(2))
        fi.append(li)
    return iter(fi)

class dask_analysis(Analysis):
    def __init__(self,dask_file):
        Analysis.__init__(self,dask_file)

    def load_data(self):
        rdds=sc.textFile(self.file)
        df_pf=rdds.mapPartitions(split_dask).toDF()
        df_pf=df_pf.withColumnRenamed('_1','_c0')
        df_pf=df_pf.withColumnRenamed('_2','_c1')
        df_pf=df_pf.withColumnRenamed('_3','_c2')
        df_pf=df_pf.withColumnRenamed('_4','_c3')
        df_pf=df_pf.withColumnRenamed('_5','_id')
        
        df_pf=df_pf.withColumn('_c1',F.split(F.col('_c1'),":")[2])
        df_pf=df_pf.withColumn('_c3',df_pf._c3.astype(DoubleType())*1000) 
        df_pf=df_pf.withColumn('_c2',df_pf._c2.astype(DoubleType())*1000)
        
        df_pf.cache()
        self.df=df_pf
        self.starttime=df_pf.agg(F.min("_c2")).collect()[0]['min(_c2)']
        return df_pf

    def generate_trace_view_list(self,id,**kwargs):
        Analysis.generate_trace_view_list(self,id)
        
        df_pf=self.df

        window = Window.partitionBy("_c1").orderBy("_c3")
        df_pf=df_pf.withColumn("last_tsk_done", F.lag('_c3', 1, None).over(window))
        df_pf=df_pf.withColumn('last_tsk_done',F.coalesce('last_tsk_done','_c2'))
        df_pf=df_pf.withColumn('last_tsk_done',F.when(F.col('_c2')>F.col('last_tsk_done'),F.col('_c2')).otherwise(F.col('last_tsk_done')) )
        
        trace_events=[]
        
        trace_events.extend(df_pf.select(
            F.col('_c1').alias('tid'),
            (F.col('last_tsk_done')-F.lit(self.starttime)).astype(IntegerType()).alias('ts'),
            F.expr('_c3 - last_tsk_done  ').alias('dur'),
            F.lit(id).alias('pid'),
            F.lit('X').alias('ph'),
            F.col('_c0').alias('name'),
            F.struct(F.col('_id').alias('uuid')).alias('args')
        ).toJSON().collect())

        return trace_events

In [None]:
class dask_analysis_log(dask_analysis):
    def __init__(self,dask_file,logs):
        Analysis.__init__(self,dask_file)

    def load_data(self):
        rdds=sc.textFile(self.file)
        df_pf=rdds.mapPartitions(split_dask).toDF()
        df_pf=df_pf.withColumnRenamed('_1','_c0')
        df_pf=df_pf.withColumnRenamed('_2','_c1')
        df_pf=df_pf.withColumnRenamed('_3','_c2')
        df_pf=df_pf.withColumnRenamed('_4','_c3')
        df_pf=df_pf.withColumnRenamed('_5','_id')
        
        df_pf=df_pf.withColumn('_c1',F.split(F.col('_c1'),":")[2])
        df_pf=df_pf.withColumn('_c3',df_pf._c3.astype(DoubleType())*1000) 
        df_pf=df_pf.withColumn('_c2',df_pf._c2.astype(DoubleType())*1000)
        
        df_pf.cache()
        self.df=df_pf
        self.starttime=df_pf.agg(F.min("_c2")).collect()[0]['min(_c2)']
        return df_pf

    def generate_trace_view_list(self,id,**kwargs):
        Analysis.generate_trace_view_list(self,id)
        
        df_pf=self.df

        window = Window.partitionBy("_c1").orderBy("_c3")
        df_pf=df_pf.withColumn("last_tsk_done", F.lag('_c3', 1, None).over(window))
        df_pf=df_pf.withColumn('last_tsk_done',F.coalesce('last_tsk_done','_c2'))
        df_pf=df_pf.withColumn('last_tsk_done',F.when(F.col('_c2')>F.col('last_tsk_done'),F.col('_c2')).otherwise(F.col('last_tsk_done')) )
        
        trace_events=[]
        
        trace_events.extend(df_pf.select(
            F.col('_c1').alias('tid'),
            (F.col('last_tsk_done')-F.lit(self.starttime)).astype(IntegerType()).alias('ts'),
            F.expr('_c3 - last_tsk_done  ').alias('dur'),
            F.lit(id).alias('pid'),
            F.lit('X').alias('ph'),
            F.col('_c0').alias('name'),
            F.struct(F.col('_id').alias('uuid')).alias('args')
        ).toJSON().collect())

        return trace_events

# instantevent analysis

In [None]:
## format: _2 = Name; _3 = time

class InstantEvent_analysis(Analysis):
    def __init__(self,sar_file):
        Analysis.__init__(self,sar_file)
    
    def load_data(self):
        sardata=sc.textFile(self.file)
        sardf=sardata.mapPartitions(splits).toDF()
        self.df=sardf
        return sardf


    def generate_trace_view_list(self,id=0,**kwargs):
        Analysis.generate_trace_view_list(self,id)
        sardf=self.df
        starttime=self.starttime
        return sardf.select(F.lit(0).alias('tid'),
                (F.col("_3").astype(DoubleType())*1000-F.lit(starttime)).astype(IntegerType()).alias('ts'),
                F.lit(0).alias('pid'),
                F.lit('i').alias('ph'),
                F.col("_2").alias('name'),
                F.lit("g").alias("s")
            ).toJSON().collect()

# HBM_Analysis

In [None]:
class HBM_analysis(Analysis):
    def __init__(self,file):
        Analysis.__init__(self,file)
    
    def load_data(self):
        df=spark.read.option("delimiter", ", ").option("header", "true").csv(self.file)
        self.df=df.withColumn("ts", F.unix_timestamp(df.timestamp)).withColumn("size", df.size.cast(LongType())).withColumn("free", df.free.cast(LongType()))
        return self.df

    def generate_trace_view_list(self,id,**kwargs):
        trace_list=Analysis.generate_trace_view_list(self,id,**kwargs)
        hbmdf=self.df
        starttime=self.starttime
        
        trace_list.extend(hbmdf.select(
            F.lit(0).alias('tid'),
            (F.col("ts") * F.lit(1000)-F.lit(starttime)).astype(LongType()).alias('ts'),
            F.lit(id).alias('pid'),
            F.lit('C').alias('ph'),
            F.lit("hbm").alias('name'),
            F.struct((F.col("size")-F.col("free")).alias('hbmused'), F.col("free").alias('hbmfree')).alias('args')
        ).toJSON().collect())
        
        trace_list.extend(hbmdf.select(
            F.lit(0).alias('tid'),
            (F.col("ts") * F.lit(1000)-F.lit(starttime)).astype(LongType()).alias('ts'),
            F.lit(id).alias('pid'),
            F.lit('C').alias('ph'),
            F.lit("hbm %").alias('name'),
            F.struct(((F.lit(1) - F.col("free") / F.col("size")) * F.lit(100)).alias('%hbmused')).alias('args')
        ).toJSON().collect())
        return trace_list

# Run base

In [None]:
class Run:
    def __init__(self,samples):
        self.samples=samples
    
    def generate_trace_view(self,appid,**kwargs):
        traces=[]
        
        for idx, s in enumerate(self.samples):
            traces.extend(s.generate_trace_view_list(idx,**kwargs))        
        output='''
        {
            "traceEvents": [
        
        ''' + \
        ",\n".join(traces)\
       + '''
            ]
        }'''

        with open('/home/sparkuser/trace_result/'+appid+'.json', 'w') as outfile:  
            outfile.write(output)

        print(f"http://{localhost}:1088/tracing_examples/trace_viewer.html#/tracing/test_data/{appid}.json")

# Dask Application Run

In [None]:
class Dask_Application_Run:
    def __init__(self, appid):
        self.appid=appid
        self.filedir="/tmp/dgx-2Log/"+self.appid+"/"
        
        self.analysis={
            'dask':{'als':dask_analysis(self.filedir+"cluster.log"),'pid':8000},
            'sar_cpu':{'als':Sar_cpu_analysis(self.filedir + "/"+"sar_cpu.sar"),'pid':10*0+0},
            'sar_disk':{'als':Sar_disk_analysis(self.filedir + "/"+"sar_disk.sar"),'pid':10*0+1},
            'sar_mem':{'als':Sar_mem_analysis(self.filedir + "/"+"sar_mem.sar"),'pid':10*0+2},
            'sar_nic':{'als':Sar_nic_analysis(self.filedir  + "/"+"sar_nic.sar"),'pid':10*0+3},
            'emon':{'als':Emon_Analysis(self.filedir +  "/"+"emon.rst"),'pid':10*0+4},
            'gpu':{'als':gpu_analysis(self.filedir + "/gpu.txt"),'pid':10*0+5},
        }
        
    
    def generate_trace_view(self,showsar=True,showemon=False,showgpu=True,**kwargs):
        traces=[]
        daskals=self.analysis['dask']['als']
        traces.extend(daskals.generate_trace_view_list(self.analysis['dask']['pid'],**kwargs))
        if showsar:
            sarals=self.analysis['sar_cpu']['als']
            sarals.starttime=daskals.starttime
            traces.extend(sarals.generate_trace_view_list(self.analysis['sar_cpu']['pid'],**kwargs))
            sarals=self.analysis['sar_disk']['als']
            sarals.starttime=daskals.starttime
            traces.extend(sarals.generate_trace_view_list(self.analysis['sar_disk']['pid'],**kwargs))
            sarals=self.analysis['sar_mem']['als']
            sarals.starttime=daskals.starttime
            traces.extend(sarals.generate_trace_view_list(self.analysis['sar_mem']['pid'],**kwargs))
            sarals=self.analysis['sar_nic']['als']
            sarals.starttime=daskals.starttime
            traces.extend(sarals.generate_trace_view_list(self.analysis['sar_nic']['pid'],**kwargs))
        if showemon:
            emonals=self.analysis['emon']['als']
            emonals.starttime=daskals.starttime
            traces.extend(emonals.generate_trace_view_list(self.analysis['emon']['pid'],**kwargs))
        if showgpu:
            gpuals=self.analysis['gpu']['als']
            gpuals.starttime=daskals.starttime
            traces.extend(gpuals.generate_trace_view_list(self.analysis['gpu']['pid'],**kwargs))
        
        output='''
        {
            "traceEvents": [
        
        ''' + \
        ",\n".join(traces)\
       + '''
            ]
        }'''

        with open('/home/sparkuser/trace_result/'+self.appid+'.json', 'w') as outfile:  
            outfile.write(output)

        print("http://sr219:1088/tracing_examples/trace_viewer.html#/tracing/test_data/"+self.appid+".json")

In [None]:
from datetime import datetime
datetime.fromtimestamp(1546439400)

In [None]:
class Dask_Application_Run2:
    def __init__(self, appid):
        self.appid=appid
        
        self.filedir="/tmp/dgx-2Log/"+self.appid+"/"
        self.dask=self.load_dask()
        self.sar=self.load_sar()
        self.gpu=self.load_gpu()
        
    
    def load_dask(self):
        return dask_analysis(self.filedir+"cluster.log")
    
    def load_sar(self):
        return Sar_analysis(self.filedir+"sar_data.sar")
        
    def load_emon(self):
        return Emon_Analysis(self.filedir+"emon.rst")
    
    def load_gpu(self):
        return gpu_dmon_analysis(self.filedir+"gpu_dmon.txt")
    
    def generate_dask_trace_view(self):
        return self.dask.generate_dask_trace_view(8000)
    
    def generate_sar_trace_view(self):
        return self.sar.generate_sar_trace_view(0)
    
    def generate_gpu_trace_view(self):
        return self.gpu.generate_gpu_trace_view(1)

    def generate_emon_trace_view(self,collected_cores):
        return self.emon.generate_emon_trace_view(5,collected_cores)
    
    def generate_trace_view(self,showsar=True,showemon=False,showgpu=True):
        traces=[]
        traces.extend(self.generate_dask_trace_view())
        if showsar:
            self.sar.starttime=self.dask.starttime
            traces.extend(self.generate_sar_trace_view())
        if showemon:
            traces.extend(self.generate_emon_trace_view(collected_cores))
        if showgpu:
            self.gpu.starttime=self.dask.starttime
            traces.extend(self.generate_gpu_trace_view())
        
        output='''
        {
            "traceEvents": [
        
        ''' + \
        ",\n".join(traces)\
       + '''
            ]
        }'''

        with open('/home/sparkuser/trace_result/'+self.appid+'.json', 'w') as outfile:  
            outfile.write(output)

        print(f"http://{localhost}:1088/tracing_examples/trace_viewer.html#/tracing/test_data/{appid}.json")

# Application RUN STD

In [None]:
class Application_Run_STD:
    def __init__(self, appid):
        self.appid=appid
        self.filedir="/tmp/dgx-2Log/"+self.appid+"/"
        
        self.analysis={
            'sar':{'als':Sar_analysis(self.filedir+"sar_data.sar"),'pid':0},
            'emon':{'als':Emon_Analysis(self.filedir+"emon.rst"),'pid':1},
            'gpu':{'als':gpu_analysis(self.filedir+"gpu.txt"),'pid':100},
        }
        
    
    def generate_trace_view(self,showsar=True,showemon=False,showgpu=True,**kwargs):
        traces=[]
        starttime=time.time()*1000
        if showsar:
            sarals=self.analysis['sar']['als']
            sarals.starttime=starttime
            traces.extend(sarals.generate_trace_view_list(self.analysis['sar']['pid'],**kwargs))
        if showemon:
            emonals=self.analysis['emon']['als']
            emonals.starttime=starttime
            traces.extend(emonals.generate_trace_view_list(self.analysis['emon']['pid'],**kwargs))
        if showgpu:
            gpuals=self.analysis['gpu']['als']
            gpuals.starttime=starttime
            traces.extend(gpuals.generate_trace_view_list(self.analysis['gpu']['pid'],**kwargs))
        
        output='''
        {
            "traceEvents": [
        
        ''' + \
        ",\n".join(traces)\
       + '''
            ]
        }'''

        with open('/home/sparkuser/trace_result/'+self.appid+'.json', 'w') as outfile:  
            outfile.write(output)

        print(f"http://{localhost}:1088/tracing_examples/trace_viewer.html#/tracing/test_data/{appid}.json")

# Application Run

In [None]:
class Application_Run:
    def __init__(self, appid,**kwargs):
        self.appid=appid
        
        basedir=kwargs.get("basedir","skylake")
        self.filedir="/"+basedir+"/"+self.appid+"/"
        self.basedir=basedir
        
        slaves=fs.list_status("/"+basedir+"/"+appid)
        slaves=[f['pathSuffix'] for f in slaves if f['type']=='DIRECTORY' and f['pathSuffix']!="summary.parquet"]
        
        jobids=kwargs.get("jobids",None)
        
        self.clients=slaves
        
        sarclnt={}
        for idx,l in enumerate(self.clients):
            sarclnt[l]={'sar_cpu':{'als':Sar_cpu_analysis(self.filedir + l + "/"+"sar_cpu.sar"),'pid':idx},
                'sar_disk':{'als':Sar_disk_analysis(self.filedir + l + "/"+"sar_disk.sar"),'pid':idx},
                'sar_mem':{'als':Sar_mem_analysis(self.filedir + l + "/"+"sar_mem.sar"),'pid':idx},
                'sar_nic':{'als':Sar_nic_analysis(self.filedir + l + "/"+"sar_nic.sar"),'pid':idx}
            }
            if fs.exists(self.filedir + l + "/sar_page.sar"):
                sarclnt[l]['sar_page']={'als':Sar_PageCache_analysis(self.filedir + l + "/"+"sar_page.sar"),'pid':idx}
            
            if fs.exists(self.filedir + l + "/pidstat.out"):
                sarclnt[l]['sar_pid']={'als':Pidstat_analysis(self.filedir + l + "/pidstat.out"),'pid':idx}
            if fs.exists(self.filedir + l + "/sched.txt"):
                sarclnt[l]['sar_perf']={'als':Perf_trace_analysis(self.filedir + l + "/sched.txt"),'pid':100+idx}
            if fs.exists(self.filedir + l + "/emon.rst"):
                self.show_emon=True
                sarclnt[l]['emon']={'als':Emon_Analysis(self.filedir + l + "/emon.rst"),'pid':200+idx}
            if fs.exists(self.filedir + l + "/perfstat.txt"):
                self.show_perfstat=True
                sarclnt[l]['perfstat']={'als':Perfstat_analysis(self.filedir + l + "/perfstat.txt"),'pid':300+idx}
            if fs.exists(self.filedir + l + "/gpu.txt"):
                sarclnt[l]['gpu']={'als':gpu_analysis(self.filedir + l + "/gpu.txt"),'pid':400+idx}
            
                
        self.analysis={
            "sar": sarclnt
        }
        
        if fs.exists(self.filedir+"app.log"):
            self.analysis['app']={'als':App_Log_Analysis(self.filedir+"app.log",jobids)}
        
        if fs.exists(self.filedir+"instevent.out"):
            self.analysis['instant']={'als':InstantEvent_analysis(self.filedir+"instevent.out")}
        
        self.starttime=0
        if fs.exists(self.filedir+"starttime"):
            with fs.open(self.filedir+"starttime") as f:
                st = f.read().decode('ascii')
                self.starttime=int(st)
    
    def generate_trace_view(self,showsar=True,showgpu=True,showhbm=False,**kwargs):
        traces=[]
        shownodes=kwargs.get("shownodes",self.clients)
        for l in shownodes:
            if l not in self.clients:
                print(l,"is not in clients",self.clients)
                return
        self.clients=shownodes
        
        xgbtcks=kwargs.get('xgbtcks',("calltrain",'enter','begin','end'))
        
        if "app" in self.analysis:
            appals=self.analysis['app']['als']
            appals.starttime=self.starttime
            traces.extend(appals.generate_trace_view_list(self.analysis['app'],**kwargs))
            self.starttime=appals.starttime
        
        if 'instant' in self.analysis:
            als=self.analysis['instant']['als']
            als.starttime=self.starttime
            traces.extend(als.generate_trace_view_list(**kwargs))
        
        counttime=kwargs.get("counttime",False)
        
        pidmap={}
        if showsar:
            for l in self.clients:
                for alskey, sarals in self.analysis["sar"][l].items():
                    t1 = time.time()
                    if alskey!="emon":
                        sarals['als'].starttime=self.starttime
                        traces.extend(sarals['als'].generate_trace_view_list(sarals['pid'],node=l, **kwargs))
                    elif self.show_emon:
                        sarals['als'].load_data()
                        pidmap[l]=sarals['pid']
                    if counttime:
                        print(l,alskey," spend time: ", time.time()-t1)
        if self.show_emon:
            t1 = time.time()
            emondfs=get_emon_parquets([self.appid,],self.basedir)
            emons=Emon_Analysis_All(emondfs)
            emons.starttime=self.starttime
            traces.extend(emons.generate_trace_view_list(0,pidmap=pidmap,**kwargs))
            if counttime:
                print("emon process spend time: ", time.time()-t1)
            self.emons=emons
        
        if showhbm:
            for l in self.clients:
                t1 = time.time()
                hbm_analysis=HBM_analysis(self.filedir + l + "/numactl.csv")
                hbm_analysis.starttime=self.starttime
                traces.extend(hbm_analysis.generate_trace_view_list(0,**kwargs))
                if counttime:
                    print(l, " hbm process spend time: ", time.time()-t1)
        
        for idx,l in enumerate(self.clients):
            traces.append(json.dumps({"name": "process_sort_index","ph": "M","pid":idx,"tid":0,"args":{"sort_index ":idx}}))
            traces.append(json.dumps({"name": "process_sort_index","ph": "M","pid":idx+100,"tid":0,"args":{"sort_index ":idx+100}}))
            traces.append(json.dumps({"name": "process_sort_index","ph": "M","pid":idx+200,"tid":0,"args":{"sort_index ":idx+200}}))
        
        if "app" in self.analysis:
            for pid in self.analysis['app']['als'].pids:
                traces.append(json.dumps({"name": "process_sort_index","ph": "M","pid":pid+200,"tid":0,"args":{"sort_index ":pid+200}}))

        allcnt=""
        for c in self.clients:
            paths=self.filedir+c
            if fs.exists(paths+"/xgbtck.txt"):
                with fs.open(paths+"/xgbtck.txt") as f:
                    tmp = f.read().decode('ascii')
                    allcnt=allcnt+tmp
        allcnt=allcnt.strip().split("\n")
        if len(allcnt) > 1:
            allcnt=[l.split(" ") for l in allcnt]
            cnts=pandas.DataFrame([[l[0],l[1],l[2],l[3]] for l in allcnt if len(l)>1 and l[1] in xgbtcks])
            if len(cnts) > 0:
                cnts.columns=['xgbtck','name','rank','time']
                cntgs=cnts.groupby("name").agg({"time":"min"})
                cntgs=cntgs.reset_index()
                cntgs.columns=['name','ts']
                cntgs['ph']="i"
                cntgs['ts']=pandas.to_numeric(cntgs['ts'])-self.starttime
                cntgs['pid']=0
                cntgs['tid']=0
                cntgs['s']='g'
                traces.extend([json.dumps(l) for l in cntgs.to_dict(orient='records')])
        
        output='''
        {
            "traceEvents": [
        
        ''' + \
        ",\n".join(traces)\
       + '''
            ],
            "displayTimeUnit": "ns"
        }'''

        with open('/home/sparkuser/trace_result/'+self.appid+'.json', 'w') as outfile:  
            outfile.write(output)
        
        traceview_link=f'http://{local_ip}:1088/tracing_examples/trace_viewer.html#/tracing/test_data/{self.appid}.json'
        display(HTML(f"<a href={traceview_link}>{traceview_link}</a>"))
        return traceview_link

    def getemonmetric(app,**kwargs):
        emondfs=get_emon_parquets([app.appid],app.basedir)
        emons=Emon_Analysis_All(emondfs)
        metric_msg_map={
            'emon_instr_retired':F.sum
        }
        
        emonmetric=kwargs.get("show_metric",None)

        outdf=None
        for k in emonmetric:
            m=emons.emon_metrics[k]
            for fk,fm in m['formula'].items():
                if k not in metric_msg_map:
                    metric_msg_map[k]=F.avg
                df=emons.gen_reduce_metric(k,list(range(0,emons.totalcores)),fk,metric_msg_map[k])
                tmpdf=df.groupBy("appid",'client').agg(*[l("`{:s}`".format(fk)).alias(get_alias_name(fk,l)) for l in [metric_msg_map[k]]]).toPandas()
                tmpdf=tmpdf.set_index("client").drop(columns=['appid']).T
                if outdf is None:
                    outdf=tmpdf
                else:
                    outdf=outdf.append(tmpdf)
        pandas.options.display.float_format = '{:,.2f}'.format
        return outdf
    
    def get_sar_stat(app,**kwargs):
        disk_prefix=kwargs.get("disk_prefix","dev259")
        nic_prefix = kwargs.get("nic_prefix",["'eth3'","'enp24s0f1'"])
        cpustat=[app.analysis["sar"][l]['sar_cpu']['als'].get_stat() for l in app.clients]
        cpustat=reduce(lambda l,r:l.join(r),cpustat)
        diskstat=[app.analysis["sar"][l]['sar_disk']['als'].get_stat(disk_prefix=disk_prefix) for l in app.clients]
        diskstat=reduce(lambda l,r:l.join(r),diskstat)
        memstat=[app.analysis["sar"][l]['sar_mem']['als'].get_stat() for l in app.clients]
        memstat=reduce(lambda l,r:l.join(r),memstat)
        nicstat=[app.analysis["sar"][l]['sar_nic']['als'].get_stat(nic_prefix=nic_prefix) for l in app.clients]
        nicstat=reduce(lambda l,r:l.join(r),nicstat)
        pagestat=[app.analysis["sar"][l]['sar_page']['als'].get_stat() for l in app.clients]
        pagestat=reduce(lambda l,r:l.join(r),pagestat)
        pandas.options.display.float_format = '{:,.2f}'.format
        return pandas.concat([cpustat,diskstat,memstat,nicstat,pagestat])
        
    def get_perf_stat(self, **kwargs):
        perfstat=[self.analysis["sar"][l]['perfstat']['als'].get_stat() for l in self.clients]
        return reduce(lambda l,r: l.join(r), perfstat)
        
    def get_summary(app, **kwargs):
        output=[]
        
        appals=app.analysis["app"]["als"]
        
        out=appals.get_query_time(plot=False)
        
        lrun=app.appid
        
        cmpcolumns=['runtime','disk spilled','shuffle_write','f_wait_time','input read','acc_task_time','output rows']
        outcut=out[cmpcolumns]
        
        pdsout=pandas.DataFrame(outcut.sum(),columns=[lrun])
        pdstime=pdsout  

        if app.show_emon:
            emondf=app.getemonmetric(**kwargs)
            def get_agg(emondf):
                aggs=[]
                for x in emondf.index:
                    if x.endswith("avg"):
                        aggs.append(emondf.loc[x].mean())
                    else:
                        aggs.append(emondf.loc[x].sum())

                emondf['agg']=aggs
                return emondf
            emondf=get_agg(emondf)

            emonsum=emondf[["agg"]]

            emonsum.columns=[lrun]

        print("sar metric")
        sardf=app.get_sar_stat(**kwargs)
        
        def get_sar_agg(sardf):
            aggs=[]
            for x in sardf.index:
                if "total" in x:
                    aggs.append(sardf.loc[x].sum())
                elif "max" in x:
                    aggs.append(sardf.loc[x].max())
                else:
                    aggs.append(sardf.loc[x].mean())

            sardf['agg']=aggs
            return sardf
        sardf=get_sar_agg(sardf)

        sarsum=sardf[["agg"]]

        sarsum.columns=[lrun]
        
        summary=pandas.concat([pdstime,sarsum])
        if app.show_emon:
            summary=pandas.concat([summary,emonsum])
        elif app.show_perfstat:
            print("perf stat metric")
            perf_stat = app.get_perf_stat(**kwargs)
            perf_stat = get_sar_agg(perf_stat)[['agg']]
            perf_stat.columns=[lrun]
            summary=pandas.concat([summary,perf_stat])
            
        df_sum=spark.createDataFrame(summary.T.reset_index())
        for c in df_sum.columns:
            df_sum=df_sum.withColumnRenamed(c,c.replace(" ","_").replace("(","").replace(")",""))
        df_sum.write.mode("overwrite").parquet(app.filedir+"summary.parquet")
        
        return summary
    
    def compare_app(app2,**kwargs):
        output=[]
        
        lbasedir=kwargs.get("basedir",app2.basedir)
        r_appid=kwargs.get("r_appid",app2.appid)
        
        app=kwargs.get("rapp",Application_Run(r_appid,basedir=lbasedir))

        show_queryplan_diff=kwargs.get("show_queryplan_diff",True)
        
        queryids=kwargs.get("queryids",None)
        
        appals=app.analysis["app"]["als"]
        appals2=app2.analysis["app"]["als"]

        out=appals.get_query_time(plot=False)
        out2=appals2.get_query_time(plot=False)

        lrun=app.appid
        rrun=app2.appid
        cmpcolumns=['runtime','shuffle_write','f_wait_time','input read','acc_task_time','output rows']
        outcut=out[cmpcolumns]
        out2cut=out2[cmpcolumns]
        cmp=outcut.join(out2cut,lsuffix='_'+lrun,rsuffix='_'+rrun)

        pdsout=pandas.DataFrame(outcut.sum(),columns=[lrun])
        pdsout2=pandas.DataFrame(out2cut.sum(),columns=[rrun])
        pdstime=pdsout.join(pdsout2)

        showemon=app.show_emon and app2.show_emon
        if showemon:
            print("emon metric")

            emondf=app.getemonmetric(**kwargs)
            emondf2=app2.getemonmetric(**kwargs)
            #in case we comare with two clsuter
            emondf.columns=emondf2.columns
            def get_agg(emondf):
                aggs=[]
                for x in emondf.index:
                    if x.endswith("avg"):
                        aggs.append(emondf.loc[x].mean())
                    else:
                        aggs.append(emondf.loc[x].sum())

                emondf['agg']=aggs
                return emondf
            emondf=get_agg(emondf)
            emondf2=get_agg(emondf2)

            emoncolumns=emondf.columns
            emoncmp=emondf.join(emondf2,lsuffix='_'+lrun,rsuffix='_'+rrun)
            emonsum=emoncmp[["agg_"+lrun,"agg_"+rrun]]

            emonsum.columns=[lrun,rrun]

        print("sar metric")
        sardf=app.get_sar_stat(**kwargs)
        sardf2=app2.get_sar_stat(**kwargs)
        
        def get_sar_agg(sardf):
            aggs=[]
            for x in sardf.index:
                if "total" in x:
                    aggs.append(sardf.loc[x].sum())
                elif "max" in x:
                    aggs.append(sardf.loc[x].max())
                else:
                    aggs.append(sardf.loc[x].mean())

            sardf['agg']=aggs
            return sardf
        sardf=get_sar_agg(sardf)
        sardf2=get_sar_agg(sardf2)
        #in case we compare two clusters
        sardf2.columns=sardf.columns

        sarcolumns=sardf.columns
        sarcmp=sardf.join(sardf2,lsuffix='_'+lrun,rsuffix='_'+rrun)
        sarsum=sarcmp[["agg_"+lrun,"agg_"+rrun]]

        sarsum.columns=[lrun,rrun]
        
        summary=pandas.concat([pdstime,sarsum])
        if showemon:
            summary=pandas.concat([summary,emonsum])
            
        summary["diff"]=numpy.where(summary[rrun] > 0, summary[lrun]/summary[rrun]-1, 0)
        
        
        def highlight_diff(x):
            styles=[]
            mx=x.max()
            mn=x.min()
            mx=max(mx,-mn,0.2)
            for j in x.index:
                m1=(x[j])/mx*100 if x[j]!=None else 0
                if m1>0:
                    styles.append(f'width: 400px ; background-image: linear-gradient(to right, transparent 50%, #5fba7d 50%, #5fba7d {50+m1/2}%, transparent {50+m1/2}%)')
                else:
                    styles.append(f'width: 400px ;background-image: linear-gradient(to left, transparent 50%, #f1a863 50%, #f1a863 {50-m1/2}%, transparent {50-m1/2}%)')
            return styles

        output.append(summary.style.apply(highlight_diff,subset=['diff']).format({lrun:"{:,.2f}",rrun:"{:,.2f}",'diff':"{:,.2%}"}).render())

        cmp_plot=cmp
        cmp_plot['diff']=cmp_plot['runtime_'+lrun]-cmp_plot['runtime_'+rrun]

        pltx=cmp_plot.sort_values(by='diff',axis=0).plot.bar(y=['runtime_'+lrun,'runtime_'+rrun],figsize=(30,8))
        better_num=sqldf('''select count(*) from cmp_plot where diff>0''')['count(*)'][0]
        pltx.text(0.1, 0.8,'{:d} queries are better'.format(better_num), ha='center', va='center', transform=pltx.transAxes)

        df1 = pandas.DataFrame('', index=cmp.index, columns=cmpcolumns)
        for l in cmpcolumns:
            for j in cmp.index:
                df1[l][j]=[cmp[l+"_"+lrun][j],cmp[l+"_"+rrun][j],cmp[l+"_"+lrun][j]/cmp[l+"_"+rrun][j]-1]

        def highlight_greater(x,columns):
            df1 = pandas.DataFrame('', index=x.index, columns=x.columns)
            for l in columns:
                m={}
                for j in x.index:
                    m[j] = (x[l][j][1] / x[l][j][0])*100 if x[l][j][0]!=0 else 100
                mx=max(m.values())-100
                mn=100-min(m.values())
                mx=max(mx,mn)
                for j in x.index:
                    m1=-(100-m[j])/mx*100 if x[l][j][0]!=0 else 0
                    if m1>0:
                        df1[l][j] = f'background-image: linear-gradient(to right, transparent 50%, #5fba7d 50%, #5fba7d {50+m1/2}%, transparent {50+m1/2}%)'
                    else:
                        df1[l][j] = f'background-image: linear-gradient(to left, transparent 50%, #f1a863 50%, #f1a863 {50-m1/2}%, transparent {50-m1/2}%)'

            return df1

        def display_compare(df,columns):
            output.append(df.style.set_properties(**{'width': '300px','border-style':'solid','border-width':'1px'}).apply(lambda x: highlight_greater(x,columns), axis=None).format(lambda x: '''
                                                                          <div style='max-width: 30%; min-width:30%;display:inline-block;'>{:,.2f}</div>
                                                                          <div style='max-width: 30%; min-width:30%; display:inline-block;'>{:,.2f}</div>
                                                                          <div style='max-width: 30%; min-width:30%; display:inline-block;color:blue'>{:,.2f}%</div>
                                                                       '''.format(x[0],x[1],x[2]*100)).render())
        display_compare(df1,cmpcolumns)

        df3 = pandas.DataFrame('', index=sarcmp.index, columns=sarcolumns)
        for l in sarcolumns:
            for j in df3.index:
                df3[l][j]=[sarcmp[l+"_"+lrun][j],sarcmp[l+"_"+rrun][j],sarcmp[l+"_"+lrun][j]/sarcmp[l+"_"+rrun][j]-1]
        display_compare(df3,sarcolumns)

        if showemon:
            df2 = pandas.DataFrame('', index=emoncmp.index, columns=emoncolumns)
            for l in emoncolumns:
                for j in df2.index:
                    df2[l][j]=[emoncmp[l+"_"+lrun][j],emoncmp[l+"_"+rrun][j],emoncmp[l+"_"+lrun][j]/emoncmp[l+"_"+rrun][j]-1]
            display_compare(df2,emoncolumns)

        print("time breakdown")
        ################################ time breakdown ##################################################################################################
        timel=appals.show_time_metric(plot=False)
        timer=appals2.show_time_metric(plot=False)
        timer.columns=[l.replace("scan time","time_batchscan") for l in timer.columns]
        timel.columns=[l.replace("scan time","time_batchscan") for l in timel.columns]
        rcols=timer.columns
        lcols=[]
        for c in [l.split("%")[1][1:] for l in rcols]:
            for t in timel.columns:
                if t.endswith(c):
                    lcols.append(t)
        for t in timel.columns:
            if t not in lcols:
                lcols.append(t)
        timel_adj=timel[lcols]

        fig, axs = plt.subplots(nrows=1, ncols=2, sharey=True,figsize=(30,8),gridspec_kw = {'width_ratios':[1, 1]})
        plt.subplots_adjust(wspace=0.01)
        ax=timel_adj.plot.bar(ax=axs[0],stacked=True)
        list_values=timel_adj.loc[0].values
        for rect, value in zip(ax.patches, list_values):
            h = rect.get_height() /2.
            w = rect.get_width() /2.
            x, y = rect.get_xy()
            ax.text(x+w, y+h,"{:,.2f}".format(value),horizontalalignment='center',verticalalignment='center',color="white")
        ax=timer.plot.bar(ax=axs[1],stacked=True)
        list_values=timer.loc[0].values
        for rect, value in zip(ax.patches, list_values):
            h = rect.get_height() /2.
            w = rect.get_width() /2.
            x, y = rect.get_xy()
            ax.text(x+w, y+h,"{:,.2f}".format(value),horizontalalignment='center',verticalalignment='center',color="white")

################################ critical time breakdown ##################################################################################################
        timel=appals.show_time_metric(plot=False,taskids=[l[0].item() for l in appals.criticaltasks])
        timer=appals2.show_time_metric(plot=False,taskids=[l[0].item() for l in appals2.criticaltasks])
        timer.columns=[l.replace("scan time","time_batchscan") for l in timer.columns]
        timel.columns=[l.replace("scan time","time_batchscan") for l in timel.columns]
        rcols=timer.columns
        lcols=[]
        for c in [l.split("%")[1][1:] for l in rcols]:
            for t in timel.columns:
                if t.endswith(c):
                    lcols.append(t)
        for t in timel.columns:
            if t not in lcols:
                lcols.append(t)
        timel_adj=timel[lcols]

        fig, axs = plt.subplots(nrows=1, ncols=2, sharey=True,figsize=(30,8),gridspec_kw = {'width_ratios':[1, 1]})
        plt.subplots_adjust(wspace=0.01)
        ax=timel_adj.plot.bar(ax=axs[0],stacked=True)
        list_values=timel_adj.loc[0].values
        for rect, value in zip(ax.patches, list_values):
            h = rect.get_height() /2.
            w = rect.get_width() /2.
            x, y = rect.get_xy()
            ax.text(x+w, y+h,"{:,.2f}".format(value),horizontalalignment='center',verticalalignment='center',color="white")
        ax=timer.plot.bar(ax=axs[1],stacked=True)
        list_values=timer.loc[0].values
        for rect, value in zip(ax.patches, list_values):
            h = rect.get_height() /2.
            w = rect.get_width() /2.
            x, y = rect.get_xy()
            ax.text(x+w, y+h,"{:,.2f}".format(value),horizontalalignment='center',verticalalignment='center',color="white")


        ################################ hot stage ##########################################################################################################

        hotstagel=appals.get_hottest_stages(plot=False)
        hotstager=appals2.get_hottest_stages(plot=False)
        hotstagel.style.format(lambda x: '''{:,.2f}'''.format(x))

        norm = matplotlib.colors.Normalize(vmin=0, vmax=max(hotstager.queryid))
        cmap = matplotlib.cm.get_cmap('brg')
        def setbkcolor(x):
            rgba=cmap(norm(x['queryid']))
            return ['background-color:rgba({:d},{:d},{:d},1); color:white'.format(int(rgba[0]*255),int(rgba[1]*255),int(rgba[2]*255))]*9

        output.append("<table><tr><td>" + hotstagel.style.apply(setbkcolor,axis=1).format({"total_time":lambda x: '{:,.2f}'.format(x),"stdev_time":lambda x: '{:,.2f}'.format(x),"acc_total":lambda x: '{:,.2%}'.format(x),"total":lambda x: '{:,.2%}'.format(x)}).render()+
             "</td><td>" +  hotstager.style.apply(setbkcolor,axis=1).format({"total_time":lambda x: '{:,.2f}'.format(x),"stdev_time":lambda x: '{:,.2f}'.format(x),"acc_total":lambda x: '{:,.2%}'.format(x),"total":lambda x: '{:,.2%}'.format(x)}).render()+             "</td></tr></table>")

        if not show_queryplan_diff:
            return "\n".join(output)
        
        print("hot stage")

        loperators=appals.getOperatorCount()
        roperators=appals2.getOperatorCount()
        loperators_rowcnt=appals.get_metric_output_rowcnt()
        roperators_rowcnt=appals2.get_metric_output_rowcnt()
        
        def show_query_diff(queryid, always_show=True):
            lops=pandas.DataFrame(loperators[queryid])
            lops.columns=['calls_l']
            lops=lops.loc[lops['calls_l'] >0]

            rops=pandas.DataFrame(roperators[queryid])
            rops.columns=["calls_r"]
            rops=rops.loc[rops['calls_r'] >0]
            lops_row=pandas.DataFrame(loperators_rowcnt[queryid])
            lops_row.columns=["rows_l"]
            lops_row=lops_row.loc[lops_row['rows_l'] >0]

            rops_row=pandas.DataFrame(roperators_rowcnt[queryid])
            rops_row.columns=["rows_r"]
            rops_row=rops_row.loc[rops_row['rows_r'] >0]

            opscmp=pandas.merge(pandas.merge(pandas.merge(lops,rops,how="outer",left_index=True,right_index=True),lops_row,how="outer",left_index=True,right_index=True),rops_row,how="outer",left_index=True,right_index=True)
            opscmp=opscmp.fillna("")
            
            def set_bk_color_opscmp(x):
                calls_l= 0 if x['calls_l']=="" else x['calls_l']
                calls_r= 0 if x['calls_r']=="" else x['calls_r']
                rows_l= 0 if x['rows_l']=="" else x['rows_l']
                rows_r= 0 if x['rows_r']=="" else x['rows_r']

                if calls_l > calls_r or rows_l > rows_r:
                    return ['background-color:#eb6b34']*4
                if calls_l < calls_r or rows_l < rows_r:
                    return ['background-color:#8ad158']*4
                return ['color:#dbd4d0']*4

            if always_show or not (opscmp["rows_l"].equals(opscmp["rows_r"]) and opscmp["calls_l"].equals(opscmp["calls_r"])):
                print(f"query  {queryid}  queryplan diff ")
                if not always_show:
                    output.append(f"<p><font size=4 color=red>query{queryid} is different</font></p>")
                output.append(opscmp.style.apply(set_bk_color_opscmp,axis=1).render())

                planl=appals.get_query_plan(queryid=queryid,show_plan_only=True,plot=False)
                planr=appals2.get_query_plan(queryid=queryid,show_plan_only=True,plot=False)
                output.append("<table><tr><td>"+planl+"</td><td>"+planr+"</td></tr></table>")

        outputx=df1['output rows']
        runtimex = df1['runtime']
        for x in outputx.index:
            if runtimex[x][0]/runtimex[x][1]<0.95 or runtimex[x][0]/runtimex[x][1]>1.05:
                output.append(f"<p><font size=4 color=red>query{x} is different,{lrun} time: {df1['runtime'][x][0]}, {rrun} time: {df1['runtime'][x][1]}</font></p>")
                if queryids is not None and x not in queryids:
                    print("query plan skipped")
                    continue
                try:
                    show_query_diff(x, True)
                except:
                    print(" query diff error")
            else:
                try:
                    show_query_diff(x, False)
                except:
                    print(" query diff error")
                
        return "\n".join(output)
                              

                              
    def show_queryplan_diff(app2, queryid,**kwargs):
        lbasedir=kwargs.get("basedir",app2.basedir)
        r_appid=kwargs.get("r_appid",app2.appid)
        
        app=kwargs.get("rapp",Application_Run(r_appid,basedir=lbasedir))

        appals=app.analysis["app"]["als"]
        appals2=app2.analysis["app"]["als"]

        hotstagel=appals.get_hottest_stages(plot=False)
        hotstager=appals2.get_hottest_stages(plot=False)
        hotstagel.style.format(lambda x: '''{:,.2f}'''.format(x))

        loperators=appals.getOperatorCount()
        roperators=appals2.getOperatorCount()
        loperators_rowcnt=appals.get_metric_output_rowcnt()
        roperators_rowcnt=appals2.get_metric_output_rowcnt()

        lrun=app.appid
        rrun=app2.appid

        output=[]

        def show_query_diff(queryid):
            lops=pandas.DataFrame(loperators[queryid])
            lops.columns=['calls_l']
            lops=lops.loc[lops['calls_l'] >0]

            rops=pandas.DataFrame(roperators[queryid])
            rops.columns=["calls_r"]
            rops=rops.loc[rops['calls_r'] >0]
            lops_row=pandas.DataFrame(loperators_rowcnt[queryid])
            lops_row.columns=["rows_l"]
            lops_row=lops_row.loc[lops_row['rows_l'] >0]

            rops_row=pandas.DataFrame(roperators_rowcnt[queryid])
            rops_row.columns=["rows_r"]
            rops_row=rops_row.loc[rops_row['rows_r'] >0]

            opscmp=pandas.merge(pandas.merge(pandas.merge(lops,rops,how="outer",left_index=True,right_index=True),lops_row,how="outer",left_index=True,right_index=True),rops_row,how="outer",left_index=True,right_index=True)
            opscmp=opscmp.fillna("")

            def set_bk_color_opscmp(x):
                calls_l= 0 if x['calls_l']=="" else x['calls_l']
                calls_r= 0 if x['calls_r']=="" else x['calls_r']
                rows_l= 0 if x['rows_l']=="" else x['rows_l']
                rows_r= 0 if x['rows_r']=="" else x['rows_r']

                if calls_l > calls_r or rows_l > rows_r:
                    return ['background-color:#eb6b34']*4
                if calls_l < calls_r or rows_l < rows_r:
                    return ['background-color:#8ad158']*4
                return ['color:#dbd4d0']*4

            output.append(opscmp.style.apply(set_bk_color_opscmp,axis=1).render())

            planl=appals.get_query_plan(queryid=queryid,show_plan_only=True,plot=False)
            planr=appals2.get_query_plan(queryid=queryid,show_plan_only=True,plot=False)
            output.append("<table><tr><td>"+planl+"</td><td>"+planr+"</td></tr></table>")

        x=queryid
        print("query ",x," queryplan diff ")
        #output.append(f"<p><font size=4 color=red>query{x} is different,{lrun} time: {df1['runtime'][x][0]}, {rrun} time: {df1['runtime'][x][1]}</font></p>")
        show_query_diff(x)
        display(HTML("\n".join(output)))
        return

# MISC

In [None]:
def reduce_metric(pdrst,slave_id,metric,core,agg_func):
    pdrst['rst']=pdrst.apply(lambda x:x['app_id'].get_reduce_metric(slave_id,metric,core,agg_func), axis=1)
    for l in agg_func:
        pdrst[get_alias_name(metric,l)]=pdrst.apply(lambda x:x['rst'].iloc[0][get_alias_name(metric,l)],axis=1)
    return pdrst.drop(columns=['rst'])

In [None]:
def cvt_number(n):
    try:
        if str(n).isdigit():
            return f'{n:,}'
        else:
            return f'{round(float(n),2):,}'
    except ValueError:
        return n

def parse_changelog(changelog):
    out=[]
    if fs.exists(changelog):
        with fs.open(changelog) as f:
            for l in f.readlines():
                l = l.decode('utf-8')
                if l.startswith("commit"):
                    out.append(re.sub(r"commit +(.+)",r"<font color=#BDCA57>commit </font><font color=#23C2BF>\1</font>",l))
                elif l.startswith("Author"):
                    out.append(re.sub(r"Author: +([^<]+) <(.+)>",r"<font color=#BDCA57>Author: </font><font color=#C02866>\1</font> <<font color=#BC0DBD>\2</font>> ",l))
                elif l.startswith("Date"):
                    out.append(re.sub(r"Date: +(\d\d\d\d-\d\d-\d\d)",r"<font color=#BDCA57>Author: </font>\1",l))
                else:
                    out.append(l)
    else:
        out.append(f'{os.path.basename(changelog)} not found!')
    return out

def generate_query_diff(name, comp_name, query_time_file, comp_query_time_file):
    result = []
    if fs.exists(query_time_file) and fs.exists(comp_query_time_file):
        result.append(['query', name, comp_name, 'difference', 'percentage'])
        
        qtimes = {}
        comp_qtimes = {}
        with fs.open(query_time_file) as f:
            qtimes = json.loads(f.read().decode('ascii'))
        with fs.open(comp_query_time_file) as f:
            comp_qtimes = json.loads(f.read().decode('ascii'))
        
        query_ids = sorted(qtimes.keys(), key=lambda x: str(len(x))+x if x[-1] != 'a' and x[-1] != 'b' else str(len(x)-1) + x)
        
        if len(comp_qtimes) != len(qtimes):
            raise Exception('Number of queries mismatch!')
        
        query_ids.append('total')
        qtimes['total'] = sum([float(i) for i in qtimes.values()])
        comp_qtimes['total'] = sum([float(i) for i in comp_qtimes.values()])
        
        for q in query_ids:
            t1 = qtimes.get(q)
            t2 = comp_qtimes.get(q)
            delta = str("{:.2f}".format(float(t2) - float(t1)))
            perc = str("{:.2f}".format((float(t2) / float(t1)) * 100)) + '%'
            result.append([q, str(t1), str(t2), delta, perc])
    return result

def append_summary(appid, base_dir, name, comp_appid, comp_base_dir, comp_name, baseline_appid, baseline_base_dir, statsall, output):
    with open(output,"a") as linkfile:

        difftable=''' <table border="1" cellpadding="0" cellspacing="0">
                            <tbody>'''
        for k,v in statsall.items():
            difftable+=f'''
                <tr>
                <td>{k}</td>
                <td>{cvt_number(v)}</td>
                </tr>'''
        difftable+='''
            </tbody>
        </table>\n'''
        linkfile.write(difftable)
        linkfile.write("\n<br><hr/>\n")
        
        linkfile.write("\n<font color=blue> gluten gitlog in last 2 days</font><br>\n")
        out=parse_changelog(os.path.join('/', base_dir, appid, 'changelog_gluten'))
        linkfile.write("<br>".join(out))
        linkfile.write("\n<br><hr/>\n")
        
        linkfile.write("\n<font color=blue> velox gitlog in last 2 days</font><br>\n")
        out=parse_changelog(os.path.join('/', base_dir, appid, 'changelog_velox'))
        linkfile.write("<br>".join(out))
        linkfile.write("\n<br><hr/>\n")
        
        linkfile.write('''<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-OutputArea-output " data-mime-type="text/html">\n''')
        
        def append_query_diff(their_appid, their_base_dir, their_name):
            query_diff=generate_query_diff(name, their_name, os.path.join('/', base_dir, appid, 'query_time.json'), os.path.join('/', their_base_dir, their_appid, 'query_time.json'))
            if query_diff:
                difftable='''
                <table border="1" cellpadding="0" cellspacing="0">
                    <tbody>'''
                for l in query_diff:
                    difftable+='''
                        <tr>'''
                    base=0
                    pr=0
                    if re.match(r"[0-9.]+",l[1]):
                        base=float(l[1])
                        l[1]="{:.2f}".format(base)
                    if re.match(r"[0-9.]+",l[2]):
                        pr=float(l[2])
                        l[2]="{:.2f}".format(pr)

                    for d in l:
                        color='#000000'
                        if base > pr:
                            color='#6F9915'
                        elif base < pr:
                            color='#F92663'
                        difftable += f'''
                        <td><font color={color}>{d}</font></td>'''

                    difftable+='''
                        </tr>'''

                difftable+='''
                    </tbody>
                </table>'''
                linkfile.write(difftable)
                linkfile.write("\n<br><hr/>\n")
                # return percentage
                return query_diff[-1][-1]
            return ''

        baseline_perc = ''
        if comp_appid:
            append_query_diff(comp_appid, comp_base_dir, comp_name)
        if baseline_appid:
            baseline_perc = append_query_diff(baseline_appid, baseline_base_dir, 'Vanilla Spark')

        linkfile.write("</div>")
        
        return baseline_perc

In [None]:
def generate_email_body_title(appid, base_dir, name, comp_appid, comp_base_dir, comp_name, baseline_appid, baseline_base_dir, notebook, notebook_html, traceview, stats, summary, pr=''):
    statsall=collections.OrderedDict()
    for k,v in stats.items():
        statsall[k]=v
    for k,v in summary.to_dict()[appals.appid].items():
        statsall[k]=v
    
    pr_link=''
    if pr:
        pr_link=f'https://github.com/apache/incubator-gluten/pull/{pr}'
        title=!wget --quiet -O - $pr_link | sed -n -e 's!.*<title>\(.*\)</title>.*!\1!p'
        if not title:
            raise Exception(f'Failed to fetch PR link: {pr_link}')
        pr_link=f'pr link: <a href="{pr_link}">{title[0]}</a><br>'
    
    output=f'/tmp/{appid}.html'
    with open(output, 'w+') as f:
        f.writelines(f'''
<font style="font-family: Courier New"">
history event: <a href="http://{local_ip}:18080/tmp/sparkEventLog/{appid}/jobs/">http://{local_ip}:18080/tmp/sparkEventLog/{appid}/jobs/</a><br>
notebook: <a href="http://{local_ip}:8889/notebooks/{base_dir}/{notebook}">http://{local_ip}:8889/notebooks/{base_dir}/{notebook}</a><br>
notebook html: <a href="http://{local_ip}:8889/view/{base_dir}/{notebook_html}">http://{local_ip}:8889/view/{base_dir}/{notebook_html}</a><br>
traceview: <a href="{traceview}">{traceview}</a><br>
{pr_link}
</font><hr/>''')
    baseline_perc = append_summary(appid, base_dir, name, comp_appid, comp_base_dir, comp_name, baseline_appid, baseline_base_dir, statsall, output)
    
    title_prefix = f"[ {datetime.now().strftime('%m_%d_%Y')} ]" if not pr else f"[ PR {pr} ]"
    title = f'{title_prefix} {name} {appid} {baseline_perc}'
    return output,title

# TPCDS query map

In [None]:
m='''1	q01
    2	q02
    3	q03
    4	q04
    5	q05
    6	q06
    7	q07
    8	q08
    9	q09
    10	q10
    11	q11
    12	q12
    13	q13
    14	q14a
    15	q14b
    16	q15
    17	q16
    18	q17
    19	q18
    20	q19
    21	q20
    22	q21
    23	q22
    24	q23a
    25	q23b
    26	q24a
    27	q24b
    28	q25
    29	q26
    30	q27
    31	q28
    32	q29
    33	q30
    34	q31
    35	q32
    36	q33
    37	q34
    38	q35
    39	q36
    40	q37
    41	q38
    42	q39a
    43	q39b
    44	q40
    45	q41
    46	q42
    47	q43
    48	q44
    49	q45
    50	q46
    51	q47
    52	q48
    53	q49
    54	q50
    55	q51
    56	q52
    57	q53
    58	q54
    59	q55
    60	q56
    61	q57
    62	q58
    63	q59
    64	q60
    65	q61
    66	q62
    67	q63
    68	q64
    69	q65
    70	q66
    71	q67
    72	q68
    73	q69
    74	q70
    75	q71
    76	q72
    77	q73
    78	q74
    79	q75
    80	q76
    81	q77
    82	q78
    83	q79
    84	q80
    85	q81
    86	q82
    87	q83
    88	q84
    89	q85
    90	q86
    91	q87
    92	q88
    93	q89
    94	q90
    95	q91
    96	q92
    97	q93
    98	q94
    99	q95
    100	q96
    101	q97
    102	q98
    103	q99'''.split("\n")
tpcds_query_map=[l.strip().split("\t") for l in m]
tpcds_query_map={int(l[0]):l[1] for l in tpcds_query_map}