analysis/webservice/Filtering.py (99 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import traceback
import numpy as np
from scipy.signal import filtfilt, butter
log = logging.getLogger('Filtering')
def __fieldToList(results, field):
a = np.zeros(len(results))
for n in range(0, len(results)):
a[n] = results[n][field]
return a
def __listToField(results, l, field):
if results is None or l is None:
raise Exception("Cannot transpose values if they're null")
if not len(results) == len(l):
raise Exception("Cannot transpose values between lists of inequal length")
for n in range(0, len(results)):
results[n][field] = l[n]
def applySeasonalCycleFilter1d(l):
if len(l) <= 12:
return l
for a in range(0, 12):
values = []
for b in range(a, len(l), 12):
values.append(l[b])
avg = np.average(values)
for b in range(a, len(l), 12):
l[b] -= avg
return l
def applySeasonalCycleFilter2d(l):
return l
'''
Implements monthly filtering of seasonal cycles.
'''
def applySeasonalCycleFilter(l):
if len(np.shape(l)) == 1:
return applySeasonalCycleFilter1d(l)
elif len(np.shape(l)) == 2:
return applySeasonalCycleFilter2d(l)
else:
raise Exception("Cannot apply seasonal cycle filter: Unsupported array shape")
def applySeasonalCycleFilterOnResultsField(results, field):
l = __fieldToList(results, field)
applySeasonalCycleFilter(l)
__listToField(results, l, field)
def applySeasonalCycleFilterOnResults(results):
[applySeasonalCycleFilterOnResultsField(results, field) for field in ['mean', 'max', 'min']]
'''
http://www.nehalemlabs.net/prototype/blog/2013/04/05/an-introduction-to-smoothing-time-series-in-python-part-i-filtering-theory/
'''
def applyLowPassFilter(y, lowcut=12.0, order=9.0):
if len(y) - 12 <= lowcut:
lowcut = 3
nyq = 0.5 * len(y)
low = lowcut / nyq
# high = highcut / nyq
b, a = butter(order, low)
m = min([len(y), len(a), len(b)])
padlen = 30 if m >= 30 else m
fl = filtfilt(b, a, y, padlen=padlen)
return fl
def applyFiltersOnField(results, field, applySeasonal=False, applyLowPass=False, append=""):
x = __fieldToList(results, field)
if applySeasonal:
x = applySeasonalCycleFilter(x)
if applyLowPass:
x = applyLowPassFilter(x)
__listToField(results, x, "%s%s" % (field, append))
def applyAllFiltersOnField(results, field, applySeasonal=True, applyLowPass=True):
try:
if applySeasonal:
applyFiltersOnField(results, field, applySeasonal=True, applyLowPass=False, append="Seasonal")
except Exception as e:
# If it doesn't work log the error but ignore it
tb = traceback.format_exc()
log.warn("Error calculating Seasonal filter:\n%s" % tb)
try:
if applyLowPass:
applyFiltersOnField(results, field, applySeasonal=False, applyLowPass=True, append="LowPass")
except Exception as e:
# If it doesn't work log the error but ignore it
tb = traceback.format_exc()
log.warn("Error calculating LowPass filter:\n%s" % tb)
try:
if applySeasonal and applyLowPass:
applyFiltersOnField(results, field, applySeasonal=True, applyLowPass=True, append="SeasonalLowPass")
except Exception as e:
# If it doesn't work log the error but ignore it
tb = traceback.format_exc()
log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
'''
class ResultsFilter(object):
def __init__(self):
pass
def filter(self, results, append, **kwargs):
pass
class SeasonalCycleFilter(ResultsFilter):
def filter(self, results, append, **kwargs):
[applySeasonalCycleFilterOnResultsField(results, field) for field in ['mean', 'max', 'min']]
if __name__ == "__main__":
foo = "bar"
f = ResultsFilter()
f.test("Tester", blah=foo)
'''