This post comes from a place of frustration in not being able to create simple time series features with window functions like the median or slope in Pyspark. This approach is by no means optimal, but it got the job done for purposes.

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import sys
import numpy as np
import time


To make our work more organized, we use pyspark's ML pipeline tool.

from pyspark.ml.pipeline import Transformer
from pyspark.ml import Pipeline


Our code all fits into one class, where we specify which feature to use, what size window, and what statistic we want to compute with each window.

## Implementation

class HourWindowFeat(Transformer):
def __init__(self, hours, feats, stat):
self.hours = hours
self.feats = feats
self.stat = stat

def this():
this(Identifiable.randomUID("HourWindowFeat"))

def copy(extra):
defaultCopy(extra)

def slope(self, series):
if series == [] or len(series) == 1:
return 0
series = np.array(series)
if (series == -1).all():
return 0
x = np.where(series != -1.0)
y = series[np.where(series != -1.0)]
coefficients, residuals, _, _, _ = np.polyfit(x,y,1,full=True)
return coefficients

def _transform(self, df):
hour_to_sec = lambda i: i * 3600
w = (Window()
.partitionBy(col("encounter_id"))
.orderBy(col("time").cast('long'))
.rangeBetween(-hour_to_sec(self.hours-1), 0))

if self.stat == 'median':
median_udf = udf(lambda x: float(np.median(x)), FloatType())
for f in self.feats:
output = str(self.hours) + '_' + 'hour' + '_' + self.stat + '_' + f
df = df.withColumn('list', collect_list(f).over(w))\
.withColumn(output, round(median_udf('list'), 2))
df = df.drop('list')

elif self.stat == 'slope':
slope_udf = udf(lambda x: float(self.slope(x)), FloatType())
for f in self.feats:
output = str(self.hours) + '_' + 'hour' + '_' + self.stat + '_' + f
filled_column = 'na_filled_' + f
df = df.drop('list')
df = df.withColumn(filled_column, df[f]).fillna({filled_column:-1})\
.withColumn('list', collect_list(filled_column).over(w))\
.withColumn(output, round(slope_udf('list'), 2))
df = df.drop('list')
df = df.drop(filled_column)
else:
for f in self.feats:
output = str(self.hours) + '_' + 'hour' + '_' + self.stat.__name__ + '_' + f
df = df.withColumn(output, round(self.stat(f).over(w), 2))
return df


## Real Use Case

Here is the code I used in my research to identify which features (vital signs, lab values) to use, which window sizes (24, 48, and 72 hours) and what statistics were computed on each(min, max, mean, median, stddev, slope).

feats_24 =\
['temperature','heart_rate','respiratory_rate','O2_saturation','systolic_blood_pressure',\
'shock_index','diastolic_blood_pressure', 'pulse_pressure','mean_arterial_pressure','urine_output']
feats_48 = \
['temperature', 'heart_rate','respiratory_rate','O2_saturation','systolic_blood_pressure',\
'shock_index','diastolic_blood_pressure', 'pulse_pressure','mean_arterial_pressure','urine_output', 'serum_glucose', \
'serum_lactate', 'arterial_blood_gas_lactate', 'arterial_blood_gas_PCO2', 'arterial_blood_gas_PaO2', \
'arterial_blood_gas_pH', 'venous_blood_gas_lactate', 'venous_blood_gas_PCO2', 'venous_blood_gas_PaO2', 'venous_blood_gas_pH']
feats_72 = \
['temperature', 'heart_rate','respiratory_rate','O2_saturation','systolic_blood_pressure',\
'shock_index','diastolic_blood_pressure', 'pulse_pressure', 'mean_arterial_pressure','urine_output','serum_white_blood_count',\
'serum_lymphocyte_count','serum_immature_granulocytes','serum_eosinophil_count','serum_monocyte_count',\
'serum_neutrophil_count','serum_hemoglobin', 'serum_hematocrit', 'serum_platelet_count', 'serum_sodium',\
'serum_chloride', 'serum_CO2', 'serum_BUN', 'serum_creatinine', 'BUN_CR', 'serum_glucose', 'serum_anion_gap',\
'serum_bilirubin_total', 'serum_AST', 'serum_ALT', 'serum_ALP', 'serum_protein', 'serum_albumin', 'serum_lactate',\
'arterial_blood_gas_lactate', 'arterial_blood_gas_PCO2', 'arterial_blood_gas_PaO2', 'arterial_blood_gas_pH',\
'venous_blood_gas_lactate', 'venous_blood_gas_PCO2', 'venous_blood_gas_PaO2', 'venous_blood_gas_pH']
feats_120 = \
['serum_white_blood_count','serum_lymphocyte_count','serum_immature_granulocytes','serum_eosinophil_count',\
'serum_monocyte_count','serum_neutrophil_count','serum_hemoglobin', 'serum_hematocrit', 'serum_platelet_count',\
'serum_sodium', 'serum_chloride', 'serum_CO2', 'serum_BUN', 'serum_creatinine', 'BUN_CR', 'serum_anion_gap',\
'serum_bilirubin_total', 'serum_AST', 'serum_ALT', 'serum_ALP', 'serum_protein', 'serum_albumin']

min_48 = HourWindowFeat(hours=48, feats=feats_48, stat=min)
min_72 = HourWindowFeat(hours=72, feats=feats_72, stat=min)
min_120 = HourWindowFeat(hours=120, feats=feats_120, stat=min)
max_48 = HourWindowFeat(hours=48, feats=feats_48, stat=max)
max_72 = HourWindowFeat(hours=72, feats=feats_72, stat=max)
max_120 = HourWindowFeat(hours=120, feats=feats_120, stat=max)
mean_48 = HourWindowFeat(hours=48, feats=feats_48, stat=mean)
mean_72 = HourWindowFeat(hours=72, feats=feats_72, stat=mean)
mean_120 = HourWindowFeat(hours=120, feats=feats_120, stat=mean)
median_48 = HourWindowFeat(hours=48, feats=feats_48, stat='median')
median_72 = HourWindowFeat(hours=72, feats=feats_72, stat='median')
median_120 = HourWindowFeat(hours=120, feats=feats_120, stat='median')
slope_72 = HourWindowFeat(hours=72, feats=feats_72, stat='slope')
slope_120 = HourWindowFeat(hours=120, feats=feats_120, stat='slope')
std_24 = HourWindowFeat(hours=24, feats=feats_24, stat=stddev)
std_48 = HourWindowFeat(hours=48, feats=feats_48, stat=stddev)
std_72 = HourWindowFeat(hours=72, feats=feats_72, stat=stddev)
std_120 = HourWindowFeat(hours=120, feats=feats_120, stat=stddev)

FeaturesPipeline =  Pipeline(stages=[min_48, min_72, min_120, max_48, max_72, max_120, mean_48, mean_72, mean_120, median_48, median_72, median_120, slope_72, slope_120, std_24, std_48, std_72, std_120])
FeaturesPipeline =  Pipeline(stages=[min_48, min_72, min_120])
FeaturesPipeline =  Pipeline(stages=[min_48, min_72, min_120, max_48, max_72, max_120])


Finally, we fit our pipeline to the data.

FeaturesPipeline =  Pipeline(stages=[min_48, min_72])
Featpip = FeaturesPipeline.fit(df)
df_feats = Featpip.transform(df)


I'll be fleshing out this post when I have more time, but please feel free to send me questions/comments on anything related to this!