sequence.py 11.9 KB
Newer Older
 Matthew Krafczyk committed Dec 17, 2020 1 2 ``````import pandas as pd `````` Matthew Krafczyk committed Mar 01, 2021 3 ``````def contiguous_group_indices(df_or_series, sequence_index_level=None, sequence_col=None, sequence_function=None): `````` Matthew Krafczyk committed Feb 05, 2021 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 `````` """ Produce series of continuous group labels for a given sequence column and sequencing function Suppose a column or index level contains a 'sequencable value'. This may be any value with a countable number of discrete elements which may be ordered 'by 1'. This function produces a series (with index matching the df) containing integers indicating contiguous groups of values. This is useful for proper column shift logic. Ex: Consider the following DataFrame of Security prices. It has a multi-level index whose second level is a sequencable value 'Quarter'. Security Quarter Price Sec-1 2019Q1 10. 2019Q2 11. 2019Q4 10.5 2020Q1 10.6 2020Q2 10.7 Sec-2 2018Q1 25 2018Q2 24 2018Q3 25 2018Q4 26 2019Q2 20 `````` Matthew Krafczyk committed Feb 10, 2021 29 `````` Passing sequence_index_level='Quarter' and a sequence function like yq_diff: `````` Matthew Krafczyk committed Feb 05, 2021 30 31 32 33 34 35 36 `````` def yr(quarter): return int(quarter[:4]) def mon(quarter): return int(quarter[5:]) def yq_diff(yq, yq_ref): return ((yr(yq)*4+mon(yq))-((yr(yq_ref)*4)+mon(yq_ref))) `````` Matthew Krafczyk committed Feb 10, 2021 37 `````` grp_ids = contiguous_group_indices(df, sequence_index_level='Quarter', sequence_function=yq_diff) `````` Matthew Krafczyk committed Feb 05, 2021 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 `````` Yields grp_ids as: Security Quarter Sec-1 2019Q1 1 2019Q2 1 2019Q4 2 2020Q1 2 2020Q2 2 Sec-2 2018Q1 3 2018Q2 3 2018Q3 3 2018Q4 3 2019Q2 4 The series indicates groups of contiguous values. We can find differences in price to previous quarters properly respecting gaps when they pop up. df['Price-diff'] = df['Price']-df['Price'].groupby(grp_ids).shift(1) """ # Fetch sequence series sequence_series = None `````` Matthew Krafczyk committed Mar 01, 2021 61 62 63 64 65 66 67 68 `````` if type(df_or_series) is pd.DataFrame: if sequence_col is not None: sequence_series = df_or_series.loc[:,sequence_col] elif sequence_index_level is not None: if sequence_index_level not in df_or_series.index.names: raise ValueError(f"sequence_index_level {sequence_index_level} not in the data frame. available levels: {df_or_series.index.names}") level_idx = df_or_series.index.names.index(sequence_index_level) sequence_series = df_or_series.index.to_series().apply(lambda t: t[level_idx]) `````` Matthew Krafczyk committed Feb 05, 2021 69 `````` else: `````` Matthew Krafczyk committed Mar 01, 2021 70 71 72 73 `````` # Get the dataframe index sequence_series = df_or_series.index.to_series() elif type(df_or_series) is pd.core.series.Series: sequence_series = df_or_series `````` Matthew Krafczyk committed Feb 05, 2021 74 `````` else: `````` Matthew Krafczyk committed Mar 01, 2021 75 76 77 78 79 `````` try: # Fallback assuming we might have an index. If we do this will succeed. sequence_series = df_or_series.to_series() except AttributeError: raise TypeError(f"You must pass a pandas series, index, or dataframe as df_or_series. Got type {type(df_or_series)}") `````` Matthew Krafczyk committed Feb 05, 2021 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 `````` # If the sequence function is None, set it as the simple difference formula if sequence_function is None: sequence_function_ = lambda s: s-ref_val else: sequence_function_ = lambda s: sequence_function(s, ref_val) # Compute differences against 'reference' value ref_val = sequence_series.iloc[0] try: sequence_values = sequence_series.apply(sequence_function_) except Exception as e: print(f"Tried to subtract sequence values but ran into an error!") raise e # Check that sequence is an integer type if not pd.api.types.is_integer_dtype(sequence_values.dtype): raise TypeError(f"Sequence value type: {sequence_values.dtype} is not an integer type!") # Detect sequential groups S = (sequence_values-sequence_values.shift(1)).fillna(0.0).astype(int) # Group ids # This procedure may fail if the selected index level isn't the 'lowest'. grp_ids = (S != 1).cumsum() return grp_ids `````` Matthew Krafczyk committed Feb 11, 2021 108 ``````def sequence_df(df, lags, group_specs): `````` Matthew Krafczyk committed Dec 17, 2020 109 110 111 112 113 114 115 116 117 118 119 120 121 `````` """ Sequence feature data into multi-component rows. This function takes a dataframe containing various features over a sequence. These are then stacked so neighboring values can be easily accessed by a specific sequence value. The input dataframe should have the following structure: Sequence | 'Feat 1' | 'Feat 2' | s1 | f1(s1) | f2(s1) | s2 | f1(s2) | f2(s2) | ... `````` Matthew Krafczyk committed Feb 11, 2021 122 `````` The function then returns for a set of lags: `````` Matthew Krafczyk committed Dec 17, 2020 123 124 125 126 127 128 129 130 131 132 133 `````` Sequence | ('Feat 1' , -2) | ('Feat 2', -2) | ('Feat 1', -1) | ('Feat 2', -1) | ('Feat 1', 0) | ('Feat 2', 0) | s3 | f1(s1) | f2(s1) | f1(s2) | f2(s2) | f1(s3) | f2(s3) | s4 | f1(s2) | f2(s2) | f1(s3) | f2(s3) | f1(s4) | f2(s4) | ... if res_df is the result dataframe, then for many models, the X matrix is simply: res_df.values[:,:num_days] Named Arguments df: A Pandas dataframe containing a set of features for each day `````` Matthew Krafczyk committed Feb 10, 2021 134 `````` lags: A list of lags to include `````` Matthew Krafczyk committed Feb 11, 2021 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 `````` group_specs: A list of tuples defining how groups are discovered. 'group' type specs - Group type specs specify columns, or index levels where the groups are already defined. 'sequence' type specs - Sequence type specs specify 'sequencable' columns. These columns have a 'by-one' well ordering defined. This well ordering can either be implicit if you use integers, or you can pass a function which defines it. 'level' subtype specs - These specs indicate that the data passed indicate a specific level of the data frame's index should be used. 'column' subtype specs - These specs indicate that the data passed with the spec is a column of some type. Either a name or a column type. 'index' subtype specs - sequence specs also support the index subtype. This indicates to just use the index of the data frame. A spec is specified like so: (, , data, []) A few examples: ('group', 'level', 'Security') - Use the 'Security' index level as a pre-defined grouping ('sequence', 'level', 'Date', days_diff) - Use the 'Date' index level as a sequencable column to define a grouping. Use the days_diff function to define the order ('sequence', 'index') - Use the index of the dataframe as a sequencable column. Since no function is specified, it will just use arithmetic. ('group', 'column', groups) - Use the groups series to define the groups to use. This is a column passed in. ('group', 'column', 'Group') - Use the 'Group' column of the dataframe to define the groups to use. This is a column passed in. `````` Matthew Krafczyk committed Dec 17, 2020 150 151 152 153 154 `````` returns A pandas dataframe containing rows of prediction and/or label data. """ `````` Matthew Krafczyk committed Feb 11, 2021 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 `````` by = [] level = [] remove_columns = [] for spec in group_specs: if len(spec) < 2: raise ValueError("Group specs must contain at least three elements") if spec[0] == 'group': # These are group type specs if spec[1] == 'level': if spec[2] not in df.index.names: raise ValueError(f"Level name {spec[2]} not found in index!") level.append(spec[2]) elif spec[1] == 'column': if type(spec[2]) is pd.core.series.Series: by.append(spec[2]) else: if spec[2] not in df.columns: raise ValueError(f"Column name {spec[2]} not found!") by.append(df[spec[2]]) remove_columns.append(spec[2]) `````` Matthew Krafczyk committed Feb 10, 2021 175 `````` else: `````` Matthew Krafczyk committed Feb 11, 2021 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 `````` raise ValueError(f"Group subtype {spec[1]} not supported") elif spec[0] == 'sequence': # These are the sequence type specs if spec[1] == 'level': if spec[2] not in df.index.names: raise ValueError(f"Level name {spec[2]} not found in index!") if len(spec) == 4: if not callable(spec[3]): raise ValueError(f"The fourth element of a group spec must be a callable!") g_ids = contiguous_group_indices(df, sequence_index_level=spec[2], sequence_function=spec[3]) else: g_ids = contiguous_group_indices(df, sequence_index_level=spec[2]) by.append(g_ids) elif spec[1] == 'column': if type(spec[2]) is pd.core.series.Series: if len(spec) == 4: if not callable(spec[3]): raise ValueError(f"The fourth element of a group spec must be a callable!") `````` Matthew Krafczyk committed Mar 01, 2021 194 `````` g_ids = contiguous_group_indices(spec[2], sequence_function=spec[3]) `````` Matthew Krafczyk committed Feb 11, 2021 195 `````` else: `````` Matthew Krafczyk committed Mar 01, 2021 196 `````` g_ids = contiguous_group_indices(spec[2]) `````` Matthew Krafczyk committed Feb 11, 2021 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 `````` else: if spec[2] not in df.columns: raise ValueError(f"Column name {spec[2]} not found!") remove_columns.append(spec[2]) if len(spec) == 4: if not callable(spec[3]): raise ValueError(f"The fourth element of a group spec must be a callable!") g_ids = contiguous_group_indices(df, sequence_col=spec[2], sequence_function=spec[3]) else: g_ids = contiguous_group_indices(df, sequence_col=spec[2]) by.append(g_ids) elif spec[1] == 'index': if len(spec) == 3: if not callable(spec[2]): raise ValueError(f"The third element of an index group spec must be a callable!") g_ids = contiguous_group_indices(df, sequence_function=spec[2]) else: g_ids = contiguous_group_indices(df) by.append(g_ids) else: raise ValueError(f"Group subtype {spec[1]} not supported") `````` Matthew Krafczyk committed Dec 17, 2020 218 `````` else: `````` Matthew Krafczyk committed Feb 11, 2021 219 `````` raise ValueError(f"Group spec of type {spec[0]} not supported") `````` Matthew Krafczyk committed Feb 10, 2021 220 `````` `````` Matthew Krafczyk committed Feb 11, 2021 221 222 `````` # Remove columns temp_df = df.loc[:,list(filter(lambda c: c not in remove_columns, df.columns))] `````` Matthew Krafczyk committed Feb 10, 2021 223 `````` `````` Matthew Krafczyk committed Feb 11, 2021 224 225 226 227 228 229 230 231 232 233 `````` if len(level) == 0: level = None if len(by) == 0: by = None # There's a bug where if both by and level are passed to groupby, it throws an error: # TypeError: 'numpy.ndarray' object is not callable # We need to mitigate this by detecting if both by and level are non-zero and if so, transition the level values # to columns and add them to by. `````` Matthew Krafczyk committed May 19, 2021 234 `````` if (level is not None) and (by is not None): `````` Matthew Krafczyk committed Feb 11, 2021 235 236 237 238 239 240 `````` for lvl in level: lvl_idx = df.index.names.index(lvl) lvl_vals = df.index.to_series().apply(lambda t: t[lvl_idx]) by.append(lvl_vals) level = None `````` Matthew Krafczyk committed Feb 10, 2021 241 242 243 244 245 `````` # Change column types to support nans integer_columns = temp_df.dtypes[temp_df.dtypes.apply(pd.api.types.is_integer_dtype)] for col_name in integer_columns.index: temp_df.loc[:,col_name] = temp_df.loc[:,col_name].astype(pd.Int64Dtype()) `````` Matthew Krafczyk committed May 19, 2021 246 247 248 249 250 251 `````` if (by is None) and (level is None): # No need to group the DataFrame here. temp_gbydf = temp_df else: # Produce Groupby DataFrame temp_gbydf = temp_df.groupby(by=by, level=level, axis=0) `````` Matthew Krafczyk committed Feb 10, 2021 252 253 254 255 256 257 258 `````` dfs = [] for lag in lags: slice_df = temp_gbydf.shift(-lag) if slice_df is not None: `````` Matthew Krafczyk committed Apr 02, 2021 259 260 `````` if len(slice_df) == 0: raise ValueError(f"No data at lag {lag}") `````` Matthew Krafczyk committed Feb 10, 2021 261 262 263 264 265 266 267 268 269 270 271 272 `````` slice_df.columns = pd.MultiIndex.from_product([slice_df.columns,[lag]]) dfs.append(slice_df) # Join segments into full dataframe. DF = pd.concat(dfs, axis=1, join='outer').dropna() # Restore the original types of the integer columns for col in DF.columns: if col[0] in integer_columns.index: DF[col] = DF[col].astype(integer_columns.loc[col[0]]) return DF``````