Time Series Features in PySpark
Finally, how to get the median and slope for window features in Pyspark
This post comes from a place of frustration in not being able to create simple time series features with window functions like the median or slope in Pyspark. This approach is by no means optimal, but it got the job done for purposes.
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import sys
import numpy as np
import time
To make our work more organized, we use pyspark's ML pipeline tool.
from pyspark.ml.pipeline import Transformer
from pyspark.ml import Pipeline
Our code all fits into one class, where we specify which feature to use, what size window, and what statistic we want to compute with each window.
class HourWindowFeat(Transformer):
def __init__(self, hours, feats, stat):
self.hours = hours
self.feats = feats
self.stat = stat
def this():
this(Identifiable.randomUID("HourWindowFeat"))
def copy(extra):
defaultCopy(extra)
def slope(self, series):
if series == [] or len(series) == 1:
return 0
series = np.array(series)
if (series == -1).all():
return 0
x = np.where(series != -1.0)[0]
y = series[np.where(series != -1.0)]
coefficients, residuals, _, _, _ = np.polyfit(x,y,1,full=True)
return coefficients[0]
def _transform(self, df):
hour_to_sec = lambda i: i * 3600
w = (Window()
.partitionBy(col("encounter_id"))
.orderBy(col("time").cast('long'))
.rangeBetween(-hour_to_sec(self.hours-1), 0))
if self.stat == 'median':
median_udf = udf(lambda x: float(np.median(x)), FloatType())
for f in self.feats:
output = str(self.hours) + '_' + 'hour' + '_' + self.stat + '_' + f
df = df.withColumn('list', collect_list(f).over(w))\
.withColumn(output, round(median_udf('list'), 2))
df = df.drop('list')
elif self.stat == 'slope':
slope_udf = udf(lambda x: float(self.slope(x)), FloatType())
for f in self.feats:
output = str(self.hours) + '_' + 'hour' + '_' + self.stat + '_' + f
filled_column = 'na_filled_' + f
df = df.drop('list')
df = df.withColumn(filled_column, df[f]).fillna({filled_column:-1})\
.withColumn('list', collect_list(filled_column).over(w))\
.withColumn(output, round(slope_udf('list'), 2))
df = df.drop('list')
df = df.drop(filled_column)
else:
for f in self.feats:
output = str(self.hours) + '_' + 'hour' + '_' + self.stat.__name__ + '_' + f
df = df.withColumn(output, round(self.stat(f).over(w), 2))
return df
Here is the code I used in my research to identify which features (vital signs, lab values) to use, which window sizes (24, 48, and 72 hours) and what statistics were computed on each(min, max, mean, median, stddev, slope).
feats_24 =\
['temperature','heart_rate','respiratory_rate','O2_saturation','systolic_blood_pressure',\
'shock_index','diastolic_blood_pressure', 'pulse_pressure','mean_arterial_pressure','urine_output']
feats_48 = \
['temperature', 'heart_rate','respiratory_rate','O2_saturation','systolic_blood_pressure',\
'shock_index','diastolic_blood_pressure', 'pulse_pressure','mean_arterial_pressure','urine_output', 'serum_glucose', \
'serum_lactate', 'arterial_blood_gas_lactate', 'arterial_blood_gas_PCO2', 'arterial_blood_gas_PaO2', \
'arterial_blood_gas_pH', 'venous_blood_gas_lactate', 'venous_blood_gas_PCO2', 'venous_blood_gas_PaO2', 'venous_blood_gas_pH']
feats_72 = \
['temperature', 'heart_rate','respiratory_rate','O2_saturation','systolic_blood_pressure',\
'shock_index','diastolic_blood_pressure', 'pulse_pressure', 'mean_arterial_pressure','urine_output','serum_white_blood_count',\
'serum_lymphocyte_count','serum_immature_granulocytes','serum_eosinophil_count','serum_monocyte_count',\
'serum_neutrophil_count','serum_hemoglobin', 'serum_hematocrit', 'serum_platelet_count', 'serum_sodium',\
'serum_chloride', 'serum_CO2', 'serum_BUN', 'serum_creatinine', 'BUN_CR', 'serum_glucose', 'serum_anion_gap',\
'serum_bilirubin_total', 'serum_AST', 'serum_ALT', 'serum_ALP', 'serum_protein', 'serum_albumin', 'serum_lactate',\
'arterial_blood_gas_lactate', 'arterial_blood_gas_PCO2', 'arterial_blood_gas_PaO2', 'arterial_blood_gas_pH',\
'venous_blood_gas_lactate', 'venous_blood_gas_PCO2', 'venous_blood_gas_PaO2', 'venous_blood_gas_pH']
feats_120 = \
['serum_white_blood_count','serum_lymphocyte_count','serum_immature_granulocytes','serum_eosinophil_count',\
'serum_monocyte_count','serum_neutrophil_count','serum_hemoglobin', 'serum_hematocrit', 'serum_platelet_count',\
'serum_sodium', 'serum_chloride', 'serum_CO2', 'serum_BUN', 'serum_creatinine', 'BUN_CR', 'serum_anion_gap',\
'serum_bilirubin_total', 'serum_AST', 'serum_ALT', 'serum_ALP', 'serum_protein', 'serum_albumin']
min_48 = HourWindowFeat(hours=48, feats=feats_48, stat=min)
min_72 = HourWindowFeat(hours=72, feats=feats_72, stat=min)
min_120 = HourWindowFeat(hours=120, feats=feats_120, stat=min)
max_48 = HourWindowFeat(hours=48, feats=feats_48, stat=max)
max_72 = HourWindowFeat(hours=72, feats=feats_72, stat=max)
max_120 = HourWindowFeat(hours=120, feats=feats_120, stat=max)
mean_48 = HourWindowFeat(hours=48, feats=feats_48, stat=mean)
mean_72 = HourWindowFeat(hours=72, feats=feats_72, stat=mean)
mean_120 = HourWindowFeat(hours=120, feats=feats_120, stat=mean)
median_48 = HourWindowFeat(hours=48, feats=feats_48, stat='median')
median_72 = HourWindowFeat(hours=72, feats=feats_72, stat='median')
median_120 = HourWindowFeat(hours=120, feats=feats_120, stat='median')
slope_72 = HourWindowFeat(hours=72, feats=feats_72, stat='slope')
slope_120 = HourWindowFeat(hours=120, feats=feats_120, stat='slope')
std_24 = HourWindowFeat(hours=24, feats=feats_24, stat=stddev)
std_48 = HourWindowFeat(hours=48, feats=feats_48, stat=stddev)
std_72 = HourWindowFeat(hours=72, feats=feats_72, stat=stddev)
std_120 = HourWindowFeat(hours=120, feats=feats_120, stat=stddev)
FeaturesPipeline = Pipeline(stages=[min_48, min_72, min_120, max_48, max_72, max_120, mean_48, mean_72, mean_120, median_48, median_72, median_120, slope_72, slope_120, std_24, std_48, std_72, std_120])
FeaturesPipeline = Pipeline(stages=[min_48, min_72, min_120])
FeaturesPipeline = Pipeline(stages=[min_48, min_72, min_120, max_48, max_72, max_120])
Finally, we fit our pipeline to the data.
FeaturesPipeline = Pipeline(stages=[min_48, min_72])
Featpip = FeaturesPipeline.fit(df)
df_feats = Featpip.transform(df)
I'll be fleshing out this post when I have more time, but please feel free to send me questions/comments on anything related to this!