sequence.py 11.9 KB
Newer Older
Matthew Krafczyk's avatar
Matthew Krafczyk committed
1
2
import pandas as pd

3
def contiguous_group_indices(df_or_series, sequence_index_level=None, sequence_col=None, sequence_function=None):
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    """
    Produce series of continuous group labels for a given sequence column and sequencing function

    Suppose a column or index level contains a 'sequencable value'. This may be any
    value with a countable number of discrete elements which may be ordered 'by 1'.

    This function produces a series (with index matching the df) containing integers indicating
    contiguous groups of values. This is useful for proper column shift logic.

    Ex:
    Consider the following DataFrame of Security prices. It has a multi-level
    index whose second level is a sequencable value 'Quarter'.

    Security Quarter Price
    Sec-1    2019Q1  10.
             2019Q2  11.
             2019Q4  10.5
             2020Q1  10.6
             2020Q2  10.7
    Sec-2    2018Q1  25
             2018Q2  24
             2018Q3  25
             2018Q4  26
             2019Q2  20

29
    Passing sequence_index_level='Quarter' and a sequence function like yq_diff:
30
31
32
33
34
35
36
    def yr(quarter):
        return int(quarter[:4])
    def mon(quarter):
        return int(quarter[5:])
    def yq_diff(yq, yq_ref):
        return ((yr(yq)*4+mon(yq))-((yr(yq_ref)*4)+mon(yq_ref)))

37
    grp_ids = contiguous_group_indices(df, sequence_index_level='Quarter', sequence_function=yq_diff)
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

    Yields grp_ids as:

    Security Quarter
    Sec-1    2019Q1  1
             2019Q2  1
             2019Q4  2
             2020Q1  2
             2020Q2  2
    Sec-2    2018Q1  3
             2018Q2  3
             2018Q3  3
             2018Q4  3
             2019Q2  4

    The series indicates groups of contiguous values. We can find differences in price to
    previous quarters properly respecting gaps when they pop up.

    df['Price-diff'] = df['Price']-df['Price'].groupby(grp_ids).shift(1)
    """

    # Fetch sequence series
    sequence_series = None
61
62
63
64
65
66
67
68
    if type(df_or_series) is pd.DataFrame:
        if sequence_col is not None:
            sequence_series = df_or_series.loc[:,sequence_col]
        elif sequence_index_level is not None:
            if sequence_index_level not in df_or_series.index.names:
                raise ValueError(f"sequence_index_level {sequence_index_level} not in the data frame. available levels: {df_or_series.index.names}")
            level_idx = df_or_series.index.names.index(sequence_index_level)
            sequence_series = df_or_series.index.to_series().apply(lambda t: t[level_idx])
69
        else:
70
71
72
73
            # Get the dataframe index
            sequence_series = df_or_series.index.to_series()
    elif type(df_or_series) is pd.core.series.Series:
        sequence_series = df_or_series
74
    else:
75
76
77
78
79
        try:
            # Fallback assuming we might have an index. If we do this will succeed.
            sequence_series = df_or_series.to_series()
        except AttributeError:
            raise TypeError(f"You must pass a pandas series, index, or dataframe as df_or_series. Got type {type(df_or_series)}")
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

    # If the sequence function is None, set it as the simple difference formula
    if sequence_function is None:
        sequence_function_ = lambda s: s-ref_val
    else:
        sequence_function_ = lambda s: sequence_function(s, ref_val)

    # Compute differences against 'reference' value
    ref_val = sequence_series.iloc[0]
    try:
        sequence_values = sequence_series.apply(sequence_function_)
    except Exception as e:
        print(f"Tried to subtract sequence values but ran into an error!")
        raise e

    # Check that sequence is an integer type
    if not pd.api.types.is_integer_dtype(sequence_values.dtype):
        raise TypeError(f"Sequence value type: {sequence_values.dtype} is not an integer type!")

    # Detect sequential groups
    S = (sequence_values-sequence_values.shift(1)).fillna(0.0).astype(int)

    # Group ids
    # This procedure may fail if the selected index level isn't the 'lowest'.
    grp_ids = (S != 1).cumsum()

    return grp_ids

108
def sequence_df(df, lags, group_specs):
Matthew Krafczyk's avatar
Matthew Krafczyk committed
109
110
111
112
113
114
115
116
117
118
119
120
121
    """
    Sequence feature data into multi-component rows.

    This function takes a dataframe containing various features over a sequence. These
    are then stacked so neighboring values can be easily accessed by a specific sequence value.

    The input dataframe should have the following structure:

    Sequence | 'Feat 1' | 'Feat 2' |
    s1       | f1(s1)   | f2(s1)   |
    s2       | f1(s2)   | f2(s2)   |
    ...

122
    The function then returns for a set of lags:
Matthew Krafczyk's avatar
Matthew Krafczyk committed
123
124
125
126
127
128
129
130
131
132
133

    Sequence | ('Feat 1' , -2) | ('Feat 2', -2) | ('Feat 1', -1) | ('Feat 2', -1) | ('Feat 1', 0) | ('Feat 2', 0) |
    s3       | f1(s1)          | f2(s1)         | f1(s2)         | f2(s2)         | f1(s3)        | f2(s3)        |
    s4       | f1(s2)          | f2(s2)         | f1(s3)         | f2(s3)         | f1(s4)        | f2(s4)        |
    ...

    if res_df is the result dataframe, then for many models, the X matrix is simply:
    res_df.values[:,:num_days]

    Named Arguments
      df: A Pandas dataframe containing a set of features for each day
134
      lags: A list of lags to include
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
      group_specs: A list of tuples defining how groups are discovered.
        'group' type specs - Group type specs specify columns, or index levels where the groups are already defined.
        'sequence' type specs - Sequence type specs specify 'sequencable' columns. These columns have a 'by-one' well ordering defined.
            This well ordering can either be implicit if you use integers, or you can pass a function which defines it.
        'level' subtype specs - These specs indicate that the data passed indicate a specific level of the data frame's index should be used.
        'column' subtype specs - These specs indicate that the data passed with the spec is a column of some type. Either a name or a column type.
        'index' subtype specs - sequence specs also support the index subtype. This indicates to just use the index of the data frame.
        A spec is specified like so: (<type>, <subtype>, data, [<sequence_function>])

        A few examples:
        ('group', 'level', 'Security') - Use the 'Security' index level as a pre-defined grouping
        ('sequence', 'level', 'Date', days_diff) - Use the 'Date' index level as a sequencable column to define a grouping. Use the days_diff function to define the order
        ('sequence', 'index') - Use the index of the dataframe as a sequencable column. Since no function is specified, it will just use arithmetic.
        ('group', 'column', groups) - Use the groups series to define the groups to use. This is a column passed in.
        ('group', 'column', 'Group') - Use the 'Group' column of the dataframe to define the groups to use. This is a column passed in.
Matthew Krafczyk's avatar
Matthew Krafczyk committed
150
151
152
153
154

    returns
      A pandas dataframe containing rows of prediction and/or label data.
    """

155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
    by = []
    level = []
    remove_columns = []
    for spec in group_specs:
        if len(spec) < 2:
            raise ValueError("Group specs must contain at least three elements")
        if spec[0] == 'group':
            # These are group type specs
            if spec[1] == 'level':
                if spec[2] not in df.index.names:
                    raise ValueError(f"Level name {spec[2]} not found in index!")
                level.append(spec[2])
            elif spec[1] == 'column':
                if type(spec[2]) is pd.core.series.Series:
                    by.append(spec[2])
                else:
                    if spec[2] not in df.columns:
                        raise ValueError(f"Column name {spec[2]} not found!")
                    by.append(df[spec[2]])
                    remove_columns.append(spec[2])
175
            else:
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
                raise ValueError(f"Group subtype {spec[1]} not supported")
        elif spec[0] == 'sequence':
            # These are the sequence type specs
            if spec[1] == 'level':
                if spec[2] not in df.index.names:
                    raise ValueError(f"Level name {spec[2]} not found in index!")
                if len(spec) == 4:
                    if not callable(spec[3]):
                        raise ValueError(f"The fourth element of a group spec must be a callable!")
                    g_ids = contiguous_group_indices(df, sequence_index_level=spec[2], sequence_function=spec[3])
                else:
                    g_ids = contiguous_group_indices(df, sequence_index_level=spec[2])
                by.append(g_ids)
            elif spec[1] == 'column':
                if type(spec[2]) is pd.core.series.Series:
                    if len(spec) == 4:
                        if not callable(spec[3]):
                            raise ValueError(f"The fourth element of a group spec must be a callable!")
194
                        g_ids = contiguous_group_indices(spec[2], sequence_function=spec[3])
195
                    else:
196
                        g_ids = contiguous_group_indices(spec[2])
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
                else:
                    if spec[2] not in df.columns:
                        raise ValueError(f"Column name {spec[2]} not found!")
                    remove_columns.append(spec[2])
                    if len(spec) == 4:
                        if not callable(spec[3]):
                            raise ValueError(f"The fourth element of a group spec must be a callable!")
                        g_ids = contiguous_group_indices(df, sequence_col=spec[2], sequence_function=spec[3])
                    else:
                        g_ids = contiguous_group_indices(df, sequence_col=spec[2])
                by.append(g_ids)
            elif spec[1] == 'index':
                if len(spec) == 3:
                    if not callable(spec[2]):
                        raise ValueError(f"The third element of an index group spec must be a callable!")
                    g_ids = contiguous_group_indices(df, sequence_function=spec[2])
                else:
                    g_ids = contiguous_group_indices(df)
                by.append(g_ids)
            else:
                raise ValueError(f"Group subtype {spec[1]} not supported")
Matthew Krafczyk's avatar
Matthew Krafczyk committed
218
        else:
219
            raise ValueError(f"Group spec of type {spec[0]} not supported")
220

221
222
    # Remove columns
    temp_df = df.loc[:,list(filter(lambda c: c not in remove_columns, df.columns))]
223

224
225
226
227
228
229
230
231
232
233
    if len(level) == 0:
        level = None
    if len(by) == 0:
        by = None

    # There's a bug where if both by and level are passed to groupby, it throws an error:
    # TypeError: 'numpy.ndarray' object is not callable
    # We need to mitigate this by detecting if both by and level are non-zero and if so, transition the level values
    # to columns and add them to by.

234
    if (level is not None) and (by is not None):
235
236
237
238
239
240
        for lvl in level:
            lvl_idx = df.index.names.index(lvl)
            lvl_vals = df.index.to_series().apply(lambda t: t[lvl_idx])
            by.append(lvl_vals)
        level = None

241
242
243
244
245
    # Change column types to support nans
    integer_columns = temp_df.dtypes[temp_df.dtypes.apply(pd.api.types.is_integer_dtype)]
    for col_name in integer_columns.index:
        temp_df.loc[:,col_name] = temp_df.loc[:,col_name].astype(pd.Int64Dtype())

246
247
248
249
250
251
    if (by is None) and (level is None):
        # No need to group the DataFrame here.
        temp_gbydf = temp_df
    else:
        # Produce Groupby DataFrame
        temp_gbydf = temp_df.groupby(by=by, level=level, axis=0)
252
253
254
255
256
257
258

    dfs = []

    for lag in lags:
        slice_df = temp_gbydf.shift(-lag)

        if slice_df is not None:
259
260
            if len(slice_df) == 0:
                raise ValueError(f"No data at lag {lag}")
261
262
263
264
265
266
267
268
269
270
271
272
            slice_df.columns = pd.MultiIndex.from_product([slice_df.columns,[lag]])
            dfs.append(slice_df)

    # Join segments into full dataframe.
    DF = pd.concat(dfs, axis=1, join='outer').dropna()

    # Restore the original types of the integer columns
    for col in DF.columns:
        if col[0] in integer_columns.index:
            DF[col] = DF[col].astype(integer_columns.loc[col[0]])

    return DF