Skip to content

Window functions

Prerequisites

Start with Basics for conversion fundamentals, Collections for iterable helpers, and Aggregations for reducers used over window frames.

See also

For reducer behavior, see c.ReduceFuncs and Reducers API. For SQL terminology, see PostgreSQL's window functions.

c.this.window(...).over(...) applies SQL-style window calculations to a Python iterable. It sorts rows into partitions, finds a frame for each row, and returns one result for every input row.

The first argument to window(...) is a conversion evaluated for each output row. It may contain c.WindowFuncs references to the current row, partition, peer group, and frame metadata. It may also contain c.ReduceFuncs reducers, which are evaluated over the current window frame.

When no ordering key is specified, all rows have the same ordering key and fall in one peer group. Row and peer-group indexes are zero-based; add 1 when you want SQL-style one-based numbering.

Common SQL equivalents

Row numbering and ranking

RowIndex() + 1 is equivalent to row_number(), PeerGroupFirstRowIndex() + 1 is equivalent to rank(), and PeerGroupIndex() + 1 is equivalent to dense_rank().

from convtools import conversion as c


data = [
    {"team": "A", "name": "Ada", "score": 20},
    {"team": "A", "name": "Ben", "score": 20},
    {"team": "A", "name": "Cy", "score": 10},
    {"team": "B", "name": "Dee", "score": 15},
    {"team": "B", "name": "Eli", "score": 12},
]

result = (
    c.this.window(
        {
            "name": c.WindowFuncs.Row().item("name"),
            "ranking": (
                c.WindowFuncs.RowIndex() + 1,
                c.WindowFuncs.PeerGroupFirstRowIndex() + 1,
                c.WindowFuncs.PeerGroupIndex() + 1,
            ),
        }
    )
    .over(
        partition_by=c.item("team"),
        order_by=c.item("score").desc(),
    )
    .execute(data)
)
assert result == [
    {"name": "Ada", "ranking": (1, 1, 1)},
    {"name": "Ben", "ranking": (2, 1, 1)},
    {"name": "Cy", "ranking": (3, 3, 2)},
    {"name": "Dee", "ranking": (1, 1, 1)},
    {"name": "Eli", "ranking": (2, 2, 2)},
]

Aggregate over a window

Reducers inside window(...) behave like aggregate window functions. This example is equivalent to sum(amount) over (order by day rows between 1 preceding and current row).

from convtools import conversion as c


data = [
    {"day": 1, "amount": 5},
    {"day": 2, "amount": 7},
    {"day": 3, "amount": 3},
    {"day": 4, "amount": 8},
]

result = (
    c.this.window(
        {
            "row": (
                c.WindowFuncs.Row().item("day"),
                c.WindowFuncs.Row().item("amount"),
            ),
            "rolling_sum": c.ReduceFuncs.Sum(c.item("amount")),
        }
    )
    .over(
        order_by=c.item("day"),
        frame_mode="ROWS",
        frame_start=(1, "PRECEDING"),
        frame_end="CURRENT ROW",
    )
    .execute(data)
)
assert result == [
    {"row": (1, 5), "rolling_sum": 5},
    {"row": (2, 7), "rolling_sum": 12},
    {"row": (3, 3), "rolling_sum": 10},
    {"row": (4, 8), "rolling_sum": 11},
]

Lag and lead

RowPreceding(offset) is equivalent to lag(...), and RowFollowing(offset) is equivalent to lead(...).

from convtools import conversion as c


data = [
    {"day": 1, "amount": 5},
    {"day": 2, "amount": 7},
    {"day": 3, "amount": 3},
    {"day": 4, "amount": 8},
]

result = (
    c.this.window(
        {
            "row": (
                c.WindowFuncs.Row().item("day"),
                c.WindowFuncs.Row().item("amount"),
            ),
            "neighbors": (
                c.WindowFuncs.RowPreceding(1).item("amount", default=None),
                c.WindowFuncs.RowFollowing(1).item("amount", default=None),
            ),
        }
    )
    .over(order_by=c.item("day"))
    .execute(data)
)
assert result == [
    {"row": (1, 5), "neighbors": (None, 7)},
    {"row": (2, 7), "neighbors": (5, 3)},
    {"row": (3, 3), "neighbors": (7, 8)},
    {"row": (4, 8), "neighbors": (3, None)},
]

c.WindowFuncs

Function Arguments Returns SQL equivalent
Row() none The current row from the current partition. Current row reference
RowIndex() none Zero-based row index within the current partition. row_number() - 1
RowPreceding(offset, default=None) offset: rows before current row; default: value when missing The row at offset rows before the current row, or default. lag(row, offset, default)
RowFollowing(offset, default=None) offset: rows after current row; default: value when missing The row at offset rows after the current row, or default. lead(row, offset, default)
PeerGroupFirstRow() none The first row in the current peer group. First row among ties
PeerGroupLastRow() none The last row in the current peer group. Last row among ties
PeerGroupFirstRowIndex() none Zero-based index of the first row in the current peer group. rank() - 1
PeerGroupLastRowIndex() none Zero-based index of the last row in the current peer group. End index of rank peer group
PeerGroupIndex() none Zero-based peer-group index within the current partition. dense_rank() - 1
FrameFirstRow(default=None) default: value for an empty frame First row in the current frame, or default. first_value(row)
FrameLastRow(default=None) default: value for an empty frame Last row in the current frame, or default. last_value(row)
FrameNthRow(n, default=None) n: zero-based row offset in the frame; default: value when missing The nth row in the current frame, or default. nth_value(row, n + 1)

Use .item(...), .attr(...), or any other conversion on row-returning functions to extract fields:

c.WindowFuncs.RowPreceding(1).item("amount", default=0)

.over(...) parameters

Parameter Default Description
partition_by not set Conversion used to split input rows into independent partitions. Use a tuple of conversions for multi-key partitions.
order_by not set Conversion, or tuple of conversions, used to order rows inside each partition. Equal ordering keys form a peer group. Supports sorting helpers such as .desc() and none_last=True.
frame_mode "RANGE" Frame interpretation: "RANGE" uses ordering-key values, "ROWS" uses row offsets, and "GROUPS" uses peer-group offsets.
frame_start "UNBOUNDED PRECEDING" Start boundary. Accepts "UNBOUNDED PRECEDING", "CURRENT ROW", or (offset, "PRECEDING" / "FOLLOWING").
frame_end "CURRENT ROW" End boundary. Accepts "UNBOUNDED FOLLOWING", "CURRENT ROW", or (offset, "PRECEDING" / "FOLLOWING").
frame_exclusion "NO OTHERS" Exclusion rule: "NO OTHERS", "CURRENT ROW", "GROUP", or "TIES".

Frame modes follow PostgreSQL terminology:

Mode Frame offset meaning
"RANGE" Offsets are added to or subtracted from the current row's ordering key. Offset frames require order_by.
"ROWS" Offsets are non-negative row counts before or after the current row.
"GROUPS" Offsets are peer-group counts before or after the current peer group.

For available reducers, see c.ReduceFuncs.

Frame mode examples

from datetime import date, timedelta
from convtools import conversion as c


data = [
    {"a": 1, "dt": date(2020, 1, 1), "b": 1},
    {"a": 2, "dt": date(2020, 1, 1), "b": 6},
    {"a": 1, "dt": date(2020, 1, 2), "b": 3},
    {"a": 1, "dt": date(2020, 1, 2), "b": 4},
    {"a": 1, "dt": date(2020, 1, 2), "b": 2},
    {"a": 1, "dt": date(2020, 1, 3), "b": 5},
    {"a": 1, "dt": date(2020, 1, 4), "b": 6},
    {"a": 1, "dt": date(2020, 1, 5), "b": 7},
    {"a": 1, "dt": date(2020, 1, 7), "b": 8},
]

result = (
    c.this.window(
        {
            "sum": c.ReduceFuncs.Sum(c.item("b")),
            "idx": c.WindowFuncs.RowIndex(),
            "frame": c.ReduceFuncs.Array(c.item("b")),
        }
    )
    .over(
        partition_by=c.item("a"),
        order_by=c.item("dt"),
        frame_mode="RANGE",
        frame_start=(timedelta(days=1), "PRECEDING"),
        frame_end=(timedelta(days=1), "FOLLOWING"),
    )
    .execute(data)
)
assert result == [
    {"sum": 10, "idx": 0, "frame": [1, 3, 4, 2]},
    {"sum": 6, "idx": 0, "frame": [6]},
    {"sum": 15, "idx": 1, "frame": [1, 3, 4, 2, 5]},
    {"sum": 15, "idx": 2, "frame": [1, 3, 4, 2, 5]},
    {"sum": 15, "idx": 3, "frame": [1, 3, 4, 2, 5]},
    {"sum": 20, "idx": 4, "frame": [3, 4, 2, 5, 6]},
    {"sum": 18, "idx": 5, "frame": [5, 6, 7]},
    {"sum": 13, "idx": 6, "frame": [6, 7]},
    {"sum": 8, "idx": 7, "frame": [8]},
]
from datetime import date, timedelta
from convtools import conversion as c


data = [
    {"a": 1, "dt": date(2020, 1, 1), "b": 1},
    {"a": 2, "dt": date(2020, 1, 1), "b": 6},
    {"a": 1, "dt": date(2020, 1, 2), "b": 3},
    {"a": 1, "dt": date(2020, 1, 2), "b": 4},
    {"a": 1, "dt": date(2020, 1, 2), "b": 2},
    {"a": 1, "dt": date(2020, 1, 3), "b": 5},
    {"a": 1, "dt": date(2020, 1, 3), "b": None},
    {"a": 1, "dt": date(2020, 1, 4), "b": 6},
    {"a": 1, "dt": date(2020, 1, 5), "b": 7},
    {"a": 1, "dt": date(2020, 1, 7), "b": 8},
]

result = (
    c.this.window(
        {
            "sum": c.ReduceFuncs.Sum(c.item("b")),
            "idx": c.WindowFuncs.RowIndex(),
            "frame": c.ReduceFuncs.Array(c.item("b")),
        }
    )
    .over(
        order_by=c.item("b").desc(none_last=True),
        frame_mode="ROWS",
        frame_start="UNBOUNDED PRECEDING",
        frame_end="CURRENT ROW",
    )
    .execute(data)
)
assert result == [
    {"sum": 42, "idx": 8, "frame": [8, 7, 6, 6, 5, 4, 3, 2, 1]},
    {"sum": 21, "idx": 2, "frame": [8, 7, 6]},
    {"sum": 39, "idx": 6, "frame": [8, 7, 6, 6, 5, 4, 3]},
    {"sum": 36, "idx": 5, "frame": [8, 7, 6, 6, 5, 4]},
    {"sum": 41, "idx": 7, "frame": [8, 7, 6, 6, 5, 4, 3, 2]},
    {"sum": 32, "idx": 4, "frame": [8, 7, 6, 6, 5]},
    {"sum": 42, "idx": 9, "frame": [8, 7, 6, 6, 5, 4, 3, 2, 1, None]},
    {"sum": 27, "idx": 3, "frame": [8, 7, 6, 6]},
    {"sum": 15, "idx": 1, "frame": [8, 7]},
    {"sum": 8, "idx": 0, "frame": [8]},
]
from datetime import date, timedelta
from convtools import conversion as c


data = [
    {"a": 1, "dt": date(2020, 1, 1), "b": 1},
    {"a": 2, "dt": date(2020, 1, 1), "b": 6},
    {"a": 1, "dt": date(2020, 1, 2), "b": 3},
    {"a": 1, "dt": date(2020, 1, 2), "b": 4},
    {"a": 1, "dt": date(2020, 1, 2), "b": 2},
    {"a": 1, "dt": date(2020, 1, 3), "b": 5},
    {"a": 1, "dt": date(2020, 1, 4), "b": 6},
    {"a": 1, "dt": date(2020, 1, 5), "b": 7},
    {"a": 1, "dt": date(2020, 1, 7), "b": 8},
]

result = (
    c.this.window(
        {
            "dt_min": c.ReduceFuncs.Min(c.item("dt")).pipe(str),
            "dt_max": c.ReduceFuncs.Max(c.item("dt")).pipe(str),
            "idx": c.WindowFuncs.RowIndex(),
            "frame": c.ReduceFuncs.Array(c.item("b")),
        }
    )
    .over(
        order_by=c.item("dt"),
        frame_mode="GROUPS",
        frame_start=(1, "PRECEDING"),
        frame_end=(1, "FOLLOWING"),
    )
    .execute(data)
)
# fmt: off
assert result == [
    {"dt_min": "2020-01-01", "dt_max": "2020-01-02", "idx": 0, "frame": [1, 6, 3, 4, 2]},
    {"dt_min": "2020-01-01", "dt_max": "2020-01-02", "idx": 1, "frame": [1, 6, 3, 4, 2]},
    {"dt_min": "2020-01-01", "dt_max": "2020-01-03", "idx": 2, "frame": [1, 6, 3, 4, 2, 5]},
    {"dt_min": "2020-01-01", "dt_max": "2020-01-03", "idx": 3, "frame": [1, 6, 3, 4, 2, 5]},
    {"dt_min": "2020-01-01", "dt_max": "2020-01-03", "idx": 4, "frame": [1, 6, 3, 4, 2, 5]},
    {"dt_min": "2020-01-02", "dt_max": "2020-01-04", "idx": 5, "frame": [3, 4, 2, 5, 6]},
    {"dt_min": "2020-01-03", "dt_max": "2020-01-05", "idx": 6, "frame": [5, 6, 7]},
    {"dt_min": "2020-01-04", "dt_max": "2020-01-07", "idx": 7, "frame": [6, 7, 8]},
    {"dt_min": "2020-01-05", "dt_max": "2020-01-07", "idx": 8, "frame": [7, 8]},
]
# fmt: on