Skip to content

API Reference

This page documents the public API of safefeat.

build_features

safefeat.core.build_features

build_features(
    spine,
    tables,
    spec,
    *,
    entity_col="entity_id",
    cutoff_col="cutoff_time",
    event_time_cols=None,
    allowed_lag="0s",
    return_report=False,
)

Build leakage-safe features from event tables.

Parameters:

Name Type Description Default
spine DataFrame

DataFrame containing entity identifiers and cutoff times.

required
tables dict[str, DataFrame]

Mapping of table name to event DataFrame.

required
spec FeatureSpec or list[WindowAgg]

Feature specification describing windows and aggregations.

required
entity_col str

Name of entity identifier column.

"entity_id"
cutoff_col str

Name of cutoff timestamp column.

"cutoff_time"
event_time_cols dict[str, str]

Mapping of table name to event timestamp column.

None
allowed_lag str

Allowed tolerance for future timestamps (pandas timedelta string).

"0s"
return_report bool

If True, return a tuple (features_df, AuditReport) with audit information about dropped/kept event pairs.

False

Returns:

Type Description
DataFrame or (DataFrame, AuditReport)

Feature matrix aligned to the spine. If return_report is True a second return value contains the audit report.

Source code in src/safefeat/core.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def build_features(spine, tables, spec, *, entity_col="entity_id", cutoff_col="cutoff_time",
                   event_time_cols=None, allowed_lag="0s", return_report=False):
    """Build leakage-safe features from event tables.

    Parameters
    ----------
    spine : pandas.DataFrame
        DataFrame containing entity identifiers and cutoff times.
    tables : dict[str, pandas.DataFrame]
        Mapping of table name to event DataFrame.
    spec : FeatureSpec or list[WindowAgg]
        Feature specification describing windows and aggregations.
    entity_col : str, default="entity_id"
        Name of entity identifier column.
    cutoff_col : str, default="cutoff_time"
        Name of cutoff timestamp column.
    event_time_cols : dict[str, str]
        Mapping of table name to event timestamp column.
    allowed_lag : str, default="0s"
        Allowed tolerance for future timestamps (pandas timedelta string).
    return_report : bool, default=False
        If True, return a tuple ``(features_df, AuditReport)`` with audit
        information about dropped/kept event pairs.

    Returns
    -------
    pandas.DataFrame or (pandas.DataFrame, AuditReport)
        Feature matrix aligned to the spine. If ``return_report`` is True a
        second return value contains the audit report.
    """

    if event_time_cols is None:
        raise ValueError("event_time_cols must be provided, e.g. {'events': 'event_time'}")

    # validate spine
    if entity_col not in spine.columns or cutoff_col not in spine.columns:
        raise ValueError(f"Required columns {entity_col} and/or {cutoff_col} not found in spine DataFrame")

    out = spine.copy()
    out[cutoff_col] = pd.to_datetime(out[cutoff_col], errors="raise")
    spine_subset = out[[entity_col, cutoff_col]]

    report = AuditReport() if return_report else None

    if isinstance(spec, list):
        spec = FeatureSpec(blocks=spec)

    for block in spec.blocks:
        if isinstance(block, WindowAgg): 
            events_df = tables[block.table] # get the events table specified in the block
            event_time_col = event_time_cols[block.table] # get the event time column for this table

            # Collect audit on first window; reuse for subsequent windows or report
            audit_data_for_table = None

            for w in block.windows: # for each window specified in the block
                # get events in the window using the helper function
                w_label = "all" if w is None else w.lower()
                result = _events_in_window(
                    spine=spine_subset,
                    events=events_df,
                    time_window=w,
                    allowed_lag=allowed_lag,
                    entity_col=entity_col,
                    cutoff_col=cutoff_col,
                    event_time_col=event_time_col,
                    collect_audit=return_report,
                )

                if return_report:
                    in_window, audit_data = result
                    # Only capture audit from first window (it's the same for all windows of the same table)
                    if audit_data_for_table is None:
                        audit_data_for_table = audit_data
                else:
                    in_window = result

                # process each metric in block.metrics
                for dim, aggs in block.metrics.items():
                    if dim != "*" and dim not in in_window.columns:
                        raise ValueError(f"Column '{dim}' not found in table '{block.table}'")

                    if dim == "*":
                        # wildcard: count
                        if "count" in aggs:
                            counts = (
                                in_window.groupby([entity_col, cutoff_col], sort=False)
                                .size()
                                .reset_index(name="count")
                            )
                            feature_name = f"{block.table}__n_events__{w_label}"
                            merged = spine_subset.merge(counts, on=[entity_col, cutoff_col], how="left")
                            out[feature_name] = merged["count"].fillna(0).astype(int).values
                    else:
                        gb = in_window.groupby([entity_col, cutoff_col], sort=False)

                        # named column aggregations
                        if "sum" in aggs:
                            sum_agg = (
                                gb[dim]
                                .sum()
                                .reset_index(name="sum_val")
                            )
                            feature_name = f"{block.table}__{dim}__sum__{w_label}"
                            merged = spine_subset.merge(sum_agg, on=[entity_col, cutoff_col], how="left")
                            out[feature_name] = merged["sum_val"].fillna(0).values

                        if "mean" in aggs:
                            mean_agg = (
                                gb[dim]
                                .mean()
                                .reset_index(name="mean_val")
                            )
                            feature_name = f"{block.table}__{dim}__mean__{w_label}"
                            merged = spine_subset.merge(mean_agg, on=[entity_col, cutoff_col], how="left")
                            out[feature_name] = merged["mean_val"].fillna(0).values
                        if "nunique" in aggs:
                            nunique_agg = (
                                gb[dim]
                                .nunique()
                                .reset_index(name="nunique_val")
                            )
                            feature_name = f"{block.table}__{dim}__nunique__{w_label}"
                            merged = spine_subset.merge(nunique_agg, on=[entity_col, cutoff_col], how="left")
                            out[feature_name] = merged["nunique_val"].fillna(0).astype(int).values

            # Add audit data for this table if collecting reports
            if return_report and audit_data_for_table is not None:
                table_audit = TableAudit(
                    table=block.table,
                    total_joined_pairs=audit_data_for_table["total_joined_pairs"],
                    kept_pairs=audit_data_for_table["kept_pairs"],
                    dropped_future_pairs=audit_data_for_table["dropped_future_pairs"],
                    max_future_delta=audit_data_for_table["max_future_delta"],
                )
                report.add_table(table_audit)

        elif isinstance(block, RecencyBlock): # compute recency feature
            events_df = tables[block.table]
            event_time_col = event_time_cols[block.table]

            # Filter events if a filter is specified
            filtered_events = events_df.copy()
            if block.filter_col is not None: 
                filtered_events = filtered_events[filtered_events[block.filter_col] == block.filter_value] 

            # Compute time since last event for each entity-cutoff pair
            recency_features = _compute_recency(
                spine=spine_subset,
                events=filtered_events,
                entity_col=entity_col,
                cutoff_col=cutoff_col,
                event_time_col=event_time_col,
                allowed_lag=allowed_lag,
            )

            # Add recency feature column
            feature_name = f"{block.table}__recency"
            if block.filter_col is not None:
                feature_name += f"__{block.filter_col}_{block.filter_value}"

            merged = spine_subset.merge(recency_features, on=[entity_col, cutoff_col], how="left")
            out[feature_name] = merged["recency_days"].values

        else:
            raise ValueError(f"Unknown block type: {type(block)}")

    if return_report:
        return out, report
    return out

Feature Specification

WindowAgg

safefeat.spec.WindowAgg dataclass

Specification for aggregating events within a time window.

Attributes:

Name Type Description
table str

Name of the events table to read (key in the tables mapping passed to :func:build_features).

windows List[Optional[str]]

List of window lengths expressed as duration strings (e.g. "7D", "30D", "3M", "1Y"). Supported units:

  • D – days (e.g. "30D" = last 30 days)
  • H – hours (e.g. "24H" = last 24 hours)
  • min – minutes
  • s – seconds
  • M – calendar months (e.g. "1M", "3M")
  • Y – calendar years (e.g. "1Y", "2Y")

For each window a set of features will be produced.

How M and Y windows are calculated

M and Y use calendar-aware arithmetic via relativedelta, meaning they respect actual month lengths rather than assuming a fixed number of days.

The window is a sliding lookback from the cutoff — it does not snap to calendar month or year boundaries.

Given a cutoff date, the window start is computed as::

window_start = cutoff - relativedelta(months=n)  # for M
window_start = cutoff - relativedelta(years=n)   # for Y

Events are included if window_start < event_time <= cutoff (window start is exclusive, cutoff is inclusive).

Examples:

  • cutoff = 2024-02-03, window = "1M" → window_start = 2024-01-03 → Jan 1 ❌, Jan 3 ❌, Jan 4 ✅, Jan 31 ✅, Feb 3 ✅

  • cutoff = 2024-03-31, window = "1M" → window_start = 2024-02-29 (leap year aware) → Feb 28 ❌, Feb 29 ✅, Mar 15 ✅

  • cutoff = 2024-02-03, window = "1Y" → window_start = 2023-02-03 → Feb 3 2023 ❌, Feb 4 2023 ✅, Jan 31 2024 ✅

.. note:: "1M" is not the same as "30D". A 1-month window from 2024-02-03 starts on 2024-01-03 (31 days back), while "30D" starts on 2024-01-04 (exactly 30 days back).

Use None in the list to compute features over all history prior to the cutoff with no lookback limit (e.g. windows=["30D", None]). The resulting column suffix will be all (e.g. events__n_events__all).

metrics Dict[str, List[str]]

Mapping from a column name to a list of aggregations to compute. Use "*" as a wildcard key to request event counts (only ["count"] is supported for the wildcard). Example: {"*": ["count"], "amount": ["sum", "mean"]}.

Examples:

import pandas as pd
from safefeat import build_features, WindowAgg

spine = pd.DataFrame({
    "entity_id": ["u1"],
    "cutoff_time": ["2024-01-10"],
})

events = pd.DataFrame({
    "entity_id": ["u1", "u1"],
    "event_time": ["2024-01-05", "2024-01-08"],
    "amount": [10, 20],
})

spec = [
    WindowAgg(
        table="events",
        windows=["7D", "3M", "1Y"],
        metrics={"amount": ["sum"], "*": ["count"]},
    )
]

X = build_features(
    spine=spine,
    tables={"events": events},
    spec=spec,
    event_time_cols={"events": "event_time"},
)

# column names produced:
[events__n_events__7d
events__amount__sum__7d
events__n_events__3m
events__amount__sum__3m
events__n_events__1y
events__amount__sum__1y]
Source code in src/safefeat/spec.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@dataclass
class WindowAgg:
    """Specification for aggregating events within a time window.

    Attributes
    ----------
    table:
        Name of the events table to read (key in the `tables` mapping passed
        to :func:`build_features`).
    windows:
        List of window lengths expressed as duration strings (e.g.
        ``"7D"``, ``"30D"``, ``"3M"``, ``"1Y"``). Supported units:

        - ``D`` – days (e.g. ``"30D"`` = last 30 days)
        - ``H`` – hours (e.g. ``"24H"`` = last 24 hours)
        - ``min`` – minutes
        - ``s`` – seconds
        - ``M`` – calendar months (e.g. ``"1M"``, ``"3M"``)
        - ``Y`` – calendar years (e.g. ``"1Y"``, ``"2Y"``)

        For each window a set of features will be produced.

        **How M and Y windows are calculated**

        ``M`` and ``Y`` use calendar-aware arithmetic via ``relativedelta``,
        meaning they respect actual month lengths rather than assuming a fixed
        number of days.

        The window is a **sliding lookback** from the cutoff — it does not
        snap to calendar month or year boundaries.

        Given a cutoff date, the window start is computed as::

            window_start = cutoff - relativedelta(months=n)  # for M
            window_start = cutoff - relativedelta(years=n)   # for Y

        Events are included if ``window_start < event_time <= cutoff``
        (window start is **exclusive**, cutoff is **inclusive**).

        Examples:

        - cutoff = ``2024-02-03``, window = ``"1M"``
          → window_start = ``2024-01-03``
          → Jan 1 ❌, Jan 3 ❌, Jan 4 ✅, Jan 31 ✅, Feb 3 ✅

        - cutoff = ``2024-03-31``, window = ``"1M"``
          → window_start = ``2024-02-29`` (leap year aware)
          → Feb 28 ❌, Feb 29 ✅, Mar 15 ✅

        - cutoff = ``2024-02-03``, window = ``"1Y"``
          → window_start = ``2023-02-03``
          → Feb 3 2023 ❌, Feb 4 2023 ✅, Jan 31 2024 ✅

        .. note::
            ``"1M"`` is not the same as ``"30D"``. A 1-month window from
            ``2024-02-03`` starts on ``2024-01-03`` (31 days back), while
            ``"30D"`` starts on ``2024-01-04`` (exactly 30 days back).

        Use ``None`` in the list to compute features over all history prior
        to the cutoff with no lookback limit (e.g. ``windows=["30D", None]``).
        The resulting column suffix will be ``all`` (e.g.
        ``events__n_events__all``).

    metrics:
        Mapping from a column name to a list of aggregations to compute. Use
        ``"*"`` as a wildcard key to request event counts (only ``["count"]``
        is supported for the wildcard). Example: ``{"*": ["count"],
        "amount": ["sum", "mean"]}``.

    Examples
    --------
    ```python
    import pandas as pd
    from safefeat import build_features, WindowAgg

    spine = pd.DataFrame({
        "entity_id": ["u1"],
        "cutoff_time": ["2024-01-10"],
    })

    events = pd.DataFrame({
        "entity_id": ["u1", "u1"],
        "event_time": ["2024-01-05", "2024-01-08"],
        "amount": [10, 20],
    })

    spec = [
        WindowAgg(
            table="events",
            windows=["7D", "3M", "1Y"],
            metrics={"amount": ["sum"], "*": ["count"]},
        )
    ]

    X = build_features(
        spine=spine,
        tables={"events": events},
        spec=spec,
        event_time_cols={"events": "event_time"},
    )

    # column names produced:
    [events__n_events__7d
    events__amount__sum__7d
    events__n_events__3m
    events__amount__sum__3m
    events__n_events__1y
    events__amount__sum__1y]
    ```
    """
    table: str
    windows: List[Optional[str]]
    metrics: Dict[str, List[str]]

    def __post_init__(self):
        # basic shape/type checks
        if not isinstance(self.metrics, dict):
            raise ValueError("metrics must be a dict")

        # allowed aggregations
        allowed_aggs = {"count", "sum", "mean", "nunique"}

        for dim, aggs in self.metrics.items():
            # each value should be a list of strings
            if not isinstance(aggs, list) or not all(isinstance(a, str) for a in aggs):
                raise ValueError(f"aggregations for '{dim}' must be a list of strings")

            if dim == "*":
                # wildcard only supports a single count
                if aggs != ["count"]:
                    raise ValueError("'*' dimension only supports ['count']")
            else:
                # ensure every aggregation is in allow list
                for a in aggs:
                    if a not in allowed_aggs:
                        raise ValueError(f"unsupported aggregation '{a}' for dimension '{dim}'")

RecencyBlock

safefeat.spec.RecencyBlock dataclass

Specification for computing time since the most recent event.

This block computes the number of days between the cutoff and the most recent matching event for each entity-cutoff pair. Optionally the block can be restricted to events that match filter_col == filter_value.

Parameters:

Name Type Description Default
table str

Name of the events table to use.

required
filter_col Optional[str]

Optional name of a column to filter on (for example "event_type").

None
filter_value Optional[str]

Optional value that filter_col must equal to be considered.

None

Examples:

import pandas as pd
from safefeat import build_features, RecencyBlock

spine = pd.DataFrame({
    "entity_id": ["u1"],
    "cutoff_time": ["2024-01-10"],
})

events = pd.DataFrame({
    "entity_id": ["u1"],
    "event_time": ["2024-01-08"],
})

spec = [RecencyBlock(table="events")]

X = build_features(
    spine=spine,
    tables={"events": events},
    spec=spec,
    event_time_cols={"events": "event_time"},
)

X["events__recency"].iloc[0]
# 2
Source code in src/safefeat/spec.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@dataclass
class RecencyBlock:
    """Specification for computing time since the most recent event.

    This block computes the number of days between the cutoff and the most
    recent matching event for each entity-cutoff pair. Optionally the block
    can be restricted to events that match ``filter_col == filter_value``.

    Parameters
    ----------
    table:
        Name of the events table to use.
    filter_col:
        Optional name of a column to filter on (for example ``"event_type"``).
    filter_value:
        Optional value that ``filter_col`` must equal to be considered.

    Examples
    --------
    ```python
    import pandas as pd
    from safefeat import build_features, RecencyBlock

    spine = pd.DataFrame({
        "entity_id": ["u1"],
        "cutoff_time": ["2024-01-10"],
    })

    events = pd.DataFrame({
        "entity_id": ["u1"],
        "event_time": ["2024-01-08"],
    })

    spec = [RecencyBlock(table="events")]

    X = build_features(
        spine=spine,
        tables={"events": events},
        spec=spec,
        event_time_cols={"events": "event_time"},
    )

    X["events__recency"].iloc[0]
    # 2
    ```
    """
    table: str
    filter_col: Optional[str] = None
    filter_value: Optional[str] = None

    def __post_init__(self):
        # both filter_col and filter_value must be provided together or not at all
        if (self.filter_col is None) != (self.filter_value is None):
            raise ValueError("Both filter_col and filter_value must be provided together")

Example

1. Basic window features (count, sum, mean)

import pandas as pd
from safefeat import build_features, WindowAgg

spine = pd.DataFrame({
    "entity_id": ["u1"],
    "cutoff_time": ["2024-01-10"],
})

events = pd.DataFrame({
    "entity_id": ["u1", "u1", "u1", "u1"],
    "event_time": ["2024-01-05", "2024-01-06", "2023-01-01", "2024-01-20"],
    "amount": [10.0, 20.0, 999.0, 999.0],
})

X = build_features(
    spine=spine,
    tables={"events": events},
    spec=[
        WindowAgg(
            table="events",
            windows=["7D", "30D"],
            metrics={
                "*": ["count"],
                "amount": ["sum", "mean"],
            },
        )
    ],
    event_time_cols={"events": "event_time"},
)

print(X)

Expected output :


| entity_id | cutoff_time | events__n_events__7d | events__amount__sum__7d | events__amount__mean__7d | events__n_events__30d | events__amount__sum__30d | events__amount__mean__30d |
| --------- | ----------- | -------------------- | ----------------------- | ------------------------ | --------------------- | ------------------------ | ------------------------- |
| u1        | 2024-01-10  | 2                    | 30.0                    | 15.0                     | 2                     | 30.0                     | 15.0                      |