This post comes from a place of frustration in not being able to create simple time series features with window functions like the median or slope in Pyspark. This approach is by no means optimal, but it got the job done for purposes.

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import sys
import numpy as np
import time

To make our work more organized, we use pyspark's ML pipeline tool.

from pyspark.ml.pipeline import Transformer
from pyspark.ml import Pipeline

Our code all fits into one class, where we specify which feature to use, what size window, and what statistic we want to compute with each window.

Implementation

class HourWindowFeat(Transformer):
    def __init__(self, hours, feats, stat):
        self.hours = hours 
        self.feats = feats
        self.stat = stat
        
    def this():
        this(Identifiable.randomUID("HourWindowFeat"))
        
    def copy(extra):
        defaultCopy(extra)
        
    def slope(self, series):
        if series == [] or len(series) == 1:
            return 0
        series = np.array(series)
        if (series == -1).all():
            return 0
        x = np.where(series != -1.0)[0]
        y = series[np.where(series != -1.0)]
        coefficients, residuals, _, _, _ = np.polyfit(x,y,1,full=True)
        return coefficients[0]
        
    def _transform(self, df):
        hour_to_sec = lambda i: i * 3600
        w = (Window()
             .partitionBy(col("encounter_id"))
             .orderBy(col("time").cast('long'))
             .rangeBetween(-hour_to_sec(self.hours-1), 0))
        
        if self.stat == 'median':
            median_udf = udf(lambda x: float(np.median(x)), FloatType())
            for f in self.feats:
                output = str(self.hours) + '_' + 'hour' + '_' + self.stat + '_' + f
                df = df.withColumn('list', collect_list(f).over(w))\
                        .withColumn(output, round(median_udf('list'), 2))
                df = df.drop('list')
                
        elif self.stat == 'slope':
            slope_udf = udf(lambda x: float(self.slope(x)), FloatType())
            for f in self.feats:
                output = str(self.hours) + '_' + 'hour' + '_' + self.stat + '_' + f
                filled_column = 'na_filled_' + f
                df = df.drop('list')
                df = df.withColumn(filled_column, df[f]).fillna({filled_column:-1})\
                           .withColumn('list', collect_list(filled_column).over(w))\
                           .withColumn(output, round(slope_udf('list'), 2))
                df = df.drop('list')
                df = df.drop(filled_column)
        else:
            for f in self.feats:
                output = str(self.hours) + '_' + 'hour' + '_' + self.stat.__name__ + '_' + f
                df = df.withColumn(output, round(self.stat(f).over(w), 2))
        return df

Real Use Case

Here is the code I used in my research to identify which features (vital signs, lab values) to use, which window sizes (24, 48, and 72 hours) and what statistics were computed on each(min, max, mean, median, stddev, slope).

feats_24 =\
['temperature','heart_rate','respiratory_rate','O2_saturation','systolic_blood_pressure',\
 'shock_index','diastolic_blood_pressure', 'pulse_pressure','mean_arterial_pressure','urine_output']
feats_48 = \
['temperature', 'heart_rate','respiratory_rate','O2_saturation','systolic_blood_pressure',\
 'shock_index','diastolic_blood_pressure', 'pulse_pressure','mean_arterial_pressure','urine_output', 'serum_glucose', \
 'serum_lactate', 'arterial_blood_gas_lactate', 'arterial_blood_gas_PCO2', 'arterial_blood_gas_PaO2', \
 'arterial_blood_gas_pH', 'venous_blood_gas_lactate', 'venous_blood_gas_PCO2', 'venous_blood_gas_PaO2', 'venous_blood_gas_pH']
feats_72 = \
    ['temperature', 'heart_rate','respiratory_rate','O2_saturation','systolic_blood_pressure',\
     'shock_index','diastolic_blood_pressure', 'pulse_pressure', 'mean_arterial_pressure','urine_output','serum_white_blood_count',\
     'serum_lymphocyte_count','serum_immature_granulocytes','serum_eosinophil_count','serum_monocyte_count',\
     'serum_neutrophil_count','serum_hemoglobin', 'serum_hematocrit', 'serum_platelet_count', 'serum_sodium',\
     'serum_chloride', 'serum_CO2', 'serum_BUN', 'serum_creatinine', 'BUN_CR', 'serum_glucose', 'serum_anion_gap',\
     'serum_bilirubin_total', 'serum_AST', 'serum_ALT', 'serum_ALP', 'serum_protein', 'serum_albumin', 'serum_lactate',\
     'arterial_blood_gas_lactate', 'arterial_blood_gas_PCO2', 'arterial_blood_gas_PaO2', 'arterial_blood_gas_pH',\
     'venous_blood_gas_lactate', 'venous_blood_gas_PCO2', 'venous_blood_gas_PaO2', 'venous_blood_gas_pH']
feats_120 = \
['serum_white_blood_count','serum_lymphocyte_count','serum_immature_granulocytes','serum_eosinophil_count',\
 'serum_monocyte_count','serum_neutrophil_count','serum_hemoglobin', 'serum_hematocrit', 'serum_platelet_count',\
 'serum_sodium', 'serum_chloride', 'serum_CO2', 'serum_BUN', 'serum_creatinine', 'BUN_CR', 'serum_anion_gap',\
 'serum_bilirubin_total', 'serum_AST', 'serum_ALT', 'serum_ALP', 'serum_protein', 'serum_albumin']    
min_48 = HourWindowFeat(hours=48, feats=feats_48, stat=min)
min_72 = HourWindowFeat(hours=72, feats=feats_72, stat=min)
min_120 = HourWindowFeat(hours=120, feats=feats_120, stat=min)
max_48 = HourWindowFeat(hours=48, feats=feats_48, stat=max)
max_72 = HourWindowFeat(hours=72, feats=feats_72, stat=max)
max_120 = HourWindowFeat(hours=120, feats=feats_120, stat=max)
mean_48 = HourWindowFeat(hours=48, feats=feats_48, stat=mean)
mean_72 = HourWindowFeat(hours=72, feats=feats_72, stat=mean)
mean_120 = HourWindowFeat(hours=120, feats=feats_120, stat=mean)
median_48 = HourWindowFeat(hours=48, feats=feats_48, stat='median')
median_72 = HourWindowFeat(hours=72, feats=feats_72, stat='median')
median_120 = HourWindowFeat(hours=120, feats=feats_120, stat='median')
slope_72 = HourWindowFeat(hours=72, feats=feats_72, stat='slope')
slope_120 = HourWindowFeat(hours=120, feats=feats_120, stat='slope')
std_24 = HourWindowFeat(hours=24, feats=feats_24, stat=stddev)
std_48 = HourWindowFeat(hours=48, feats=feats_48, stat=stddev)
std_72 = HourWindowFeat(hours=72, feats=feats_72, stat=stddev)
std_120 = HourWindowFeat(hours=120, feats=feats_120, stat=stddev)

FeaturesPipeline =  Pipeline(stages=[min_48, min_72, min_120, max_48, max_72, max_120, mean_48, mean_72, mean_120, median_48, median_72, median_120, slope_72, slope_120, std_24, std_48, std_72, std_120])
FeaturesPipeline =  Pipeline(stages=[min_48, min_72, min_120])
FeaturesPipeline =  Pipeline(stages=[min_48, min_72, min_120, max_48, max_72, max_120])

Finally, we fit our pipeline to the data.

FeaturesPipeline =  Pipeline(stages=[min_48, min_72])
Featpip = FeaturesPipeline.fit(df)
df_feats = Featpip.transform(df)

I'll be fleshing out this post when I have more time, but please feel free to send me questions/comments on anything related to this!