Skip to content

Contrib / Tables

Table is a helper to work with table-like data. It infers the header from the first row or accepts yours and allows you to massage columns and rows in a stream-friendly manner (without operations which require to consume the whole sequence).

Table data can only be consumed once: since a table takes an iterable, there's no way to iterate the second time.

Read / Output rows

Table.from_rows allows to initialize a table with an iterable of rows. Arguments are as follows:

  • rows - iterable of dict, tuple, list is considered as multi-column table, any other types as single-column one.
  • header supports multiple types
    • bool - whether to infer the header or not. Default is None with the only exception: when an iterable of dicts is accepted, unless header=False it automatically infers the header from the first dict.
    • list and tuple - specify column names
    • dict - keys specify column names, values are indexes to be get column values
  • duplicate_columns
    • raise - the default for from_rows. It raises ValueError when encounters duplicate column names
    • mangle - it mangles duplicate column names like: a, a_1, a_2
    • keep - duplicate columns are left as is, but when referenced the first one is used
    • drop - duplicate columns are skipped
  • skip_rows - number of rows to be skipped at the beginning, default is 0

into_iter_rows method outputs the results as an iterator of rows, arguments are:

  • type_ should be exactly one of the following: dict, list, dict
  • include_header=None whether to emit header or not
from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    # NO HEADER PROVIDED
    assert list(
        Table.from_rows([(1, 2, 3), (2, 3, 4)]).into_iter_rows(dict)
    ) == [
        {"COLUMN_0": 1, "COLUMN_1": 2, "COLUMN_2": 3},
        {"COLUMN_0": 2, "COLUMN_1": 3, "COLUMN_2": 4},
    ]

    # HEADER IS PROVIDED
    assert list(
        Table.from_rows(
            [[1, 2, 3], [2, 3, 4]], header=["a", "b", "c"]
        ).into_iter_rows(dict)
    ) == [
        {"a": 1, "b": 2, "c": 3},
        {"a": 2, "b": 3, "c": 4},
    ]

    # INFERS HEADER ON DEMAND
    assert list(
        Table.from_rows(
            [["a", "b", "c"], [1, 2, 3], [2, 3, 4]], header=True
        ).into_iter_rows(dict)
    ) == [
        {"a": 1, "b": 2, "c": 3},
        {"a": 2, "b": 3, "c": 4},
    ]

    # INFERS HEADER AUTOMATICALLY BECAUSE THE FIRST ELEMENT IS A DICT
    assert list(
        Table.from_rows([{"a": 1, "b": 2}, {"a": 2, "b": 3}]).into_iter_rows(
            tuple, include_header=True
        )
    ) == [("a", "b"), (1, 2), (2, 3)]
def converter(data_):
    try:
        return ({"COLUMN_0": i[0], "COLUMN_1": i[1], "COLUMN_2": i[2]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return ({"a": i[0], "b": i[1], "c": i[2]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return ({"a": i[0], "b": i[1], "c": i[2]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return (
            (
                i["a"],
                i["b"],
            )
            for i in data_
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Read CSV-like

Table.from_csv initializes a table by reading a csv-like file, arguments are:

  • filepath_or_buffer - a filepath or something the built-in csv.reader can read (iterator of lines)
  • header supports multiple types
    • bool - whether to infer the header or not
    • list and tuple - specify column names
    • dict - keys specify column names, values are indexes to be get column values
  • duplicate_columns
    • raise - It raises ValueError when encounters duplicate column names
    • mangle - the default for from_csv. It mangles duplicate column names like: a, a_1, a_2
    • keep - duplicate columns are left as is, but when referenced the first one is used
    • drop - duplicate columns are skipped
  • skip_rows - number of rows to be skipped at the beginning, default is 0
  • dialect - a dialect acceptable for the built-in csv.reader. There's a helper method to create dialects without defining classes: Table.csv_dialect(delimiter="\t") for tab-separated files.
  • encoding - default is utf-8

into_csv method writes the results to a csv-like file, arguments are:

  • filepath_or_buffer - a filepath or something csv.writer can write to
  • dialect - a dialect acceptable by csv.writer. There's a helper method to create dialects without defining classes: Table.csv_dialect(delimiter="\t") for tab-separated files.
  • encoding - default is utf-8
from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    # READING CSV
    # file content:
    #   a,b
    #   1,2
    #   2,3
    assert list(
        Table
        .from_csv("tests/csvs/ab.csv", header=True)
        .into_iter_rows(dict)
        # .into_csv("output.csv")  # TO WRITE TO A FILE
    ) == [
        {"a": "1", "b": "2"},
        {"a": "2", "b": "3"},
    ]

    # READING TSV
    # file content:
    #   a\tb
    #   1\t2
    #   2\t3
    assert list(
        Table.from_csv(
            "tests/csvs/ac.csv",
            header=True,
            dialect=Table.csv_dialect(delimiter="\t"),
        ).into_iter_rows(dict)
    ) == [
        {"a": "2", "c": "4"},
        {"a": "1", "c": "3"},
    ]

    # READ TSV + SKIP EXISTING HEADER + REMAP COLUMNS
    # file content:
    #   a\tb
    #   1\t2
    #   2\t3
    assert list(
        Table.from_csv(
            "tests/csvs/ac.csv",
            header={"a": 1, "c": 0},
            skip_rows=1,
            dialect=Table.csv_dialect(delimiter="\t"),
        ).into_iter_rows(dict)
    ) == [
        {"a": "4", "c": "2"},
        {"a": "3", "c": "1"},
    ]
def converter(data_):
    try:
        return ({"a": i[0], "b": i[1]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return ({"a": i[0], "c": i[1]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return ({"a": i[1], "c": i[0]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Rename, take / rearrange, drop columns

  1. rename(columns) allows to rename columns, it accepts arguments of different types:
    • tuple and list define new column names (length of passed columns should match the number of columns of the table)
    • dict defines a mapping from old column names to new ones
  2. take(*column_names) leaves only specified columns (order matters), omitting the rest
    • take can accept ..., which references all non-mentioned columns, so it's easy to rearrange them: table.take("c", "d", ...)
  3. drop(*column_names) obviously drops columns, keeping the rest as-is.
from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    assert list(
        Table.from_rows([("A", "b", "c"), (1, 2, 3), (2, 3, 4)], header=True)
        .rename({"A": "a"})
        .drop("b")
        .take("c", ...)  # MAKE "c" COLUMN THE FIRST ONE
        .into_iter_rows(dict)
    ) == [{"c": 3, "a": 1}, {"c": 4, "a": 2}]
def converter(data_):
    try:
        return ({"c": i[2], "a": i[0]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Add, update columns

There's only one method to add and update columns, it's update and takes any number of keyword arguments, where keys are names of new or existing columns, while values are conversions to be applied row-wise.

Use c.col("column name") to reference the values of various columns.

from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    assert list(
        Table.from_rows([(1, -2), (2, -3)], ["a", "b"])
        .update(c=c.col("a") + c.col("b"))  # ADDING NEW COLUMN: "c"
        .update(c=c.call_func(abs, c.col("c")))  # UPDATING NEW COLUMN: "c"
        .into_iter_rows(dict)
    ) == [{"a": 1, "b": -2, "c": 1}, {"a": 2, "b": -3, "c": 1}]
def converter(data_):
    try:
        return (
            {"a": i[0], "b": i[1], "c": abs(i[2])}
            for i in (
                (
                    i_i[0],
                    i_i[1],
                    (i_i[0] + i_i[1]),
                )
                for i_i in data_
            )
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Filter rows

To filter rows, pass a conversion to the filter method, which will be used as a condition.

Use c.col("column name") to reference the values of various columns.

from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    assert list(
        Table.from_rows([(1, -2), (2, -3)], header=["a", "b"])
        .filter(c.col("b") < -2)
        .into_iter_rows(dict)
    ) == [{"a": 2, "b": -3}]
def converter(data_):
    try:
        return ({"a": i[0], "b": i[1]} for i in data_ if (((i[1] < -2))))
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Join tables

To join two tables, use join method, which accepts the following arguments:

  • table - another table to join with
  • on can be either:
    • a join conversion like c.LEFT.col("a") == c.RIGHT.col("A")
    • or an iterable of column names to join on
  • how is to be one of: "inner", "left", "right", "outer" (same as "full")
  • suffixes is a tuple of two strings (left and right) to be concatenated with column names of conflicting columns (on columns passed as an iterable of strings don't count). Default is ("_LEFT", "_RIGHT").
from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    # JOIN ON COLUMN NAMES
    assert list(
        Table.from_rows([(1, 2), (2, 3)], ["a", "b"])
        .join(
            Table.from_rows([(1, 3), (2, 4)], ["a", "c"]),
            how="inner",
            on=["a"],
        )
        .into_iter_rows(dict)
    ) == [
        {"a": 1, "b": 2, "c": 3},
        {"a": 2, "b": 3, "c": 4},
    ]

    # JOIN ON CONDITION
    assert list(
        Table.from_rows([(1, 2), (2, 30)], ["a", "b"])
        .join(
            Table.from_rows([(1, 3), (2, 4)], ["a", "c"]),
            how="full",
            on=c.and_(
                c.LEFT.col("a") == c.RIGHT.col("a"),
                c.LEFT.col("b") < c.RIGHT.col("c"),
            ),
        )
        .into_iter_rows(dict)
    ) == [
        {"a_LEFT": 1, "b": 2, "a_RIGHT": 1, "c": 3},
        {"a_LEFT": 2, "b": 30, "a_RIGHT": None, "c": None},
        {"a_LEFT": None, "b": None, "a_RIGHT": 2, "c": 4},
    ]
def aggregate_i(_none, data_, *, __v=__naive_values__["__v"]):
    agg_data_i_v0 = _none

    checksum_ = 0
    it_ = iter(data_)
    for row_i in it_:
        if agg_data_i_v0 is _none:
            checksum_ += 1
            agg_data_i_v0 = defaultdict(list)
            agg_data_i_v0[row_i[0]].append(row_i)
        else:
            agg_data_i_v0[row_i[0]].append(row_i)
        if checksum_ == 1:
            globals()["__BROKEN_EARLY__"] = True  # DEBUG ONLY
            break
    for row_i in it_:
        agg_data_i_v0[row_i[0]].append(row_i)

    return __v if (agg_data_i_v0 is _none) else (setattr(agg_data_i_v0, "default_factory", None) or agg_data_i_v0)

def join_(left_, right_, _none):
    hash_to_right_items = aggregate_i(_none, right_)
    del right_
    for left_item in left_:
        left_key = left_item[0]
        right_items = hash_to_right_items[left_key] if (left_key in hash_to_right_items) else ()
        for right_item in right_items:
            yield left_item, right_item

def converter(data_, *, right):
    global __none__
    _none = __none__
    try:
        return join_(data_, right, _none)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return ({"a": i[0][0], "b": i[0][1], "c": i[1][1]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def aggregate_e(_none, data_, *, __v=__naive_values__["__v"]):
    agg_data_e_v0 = _none

    checksum_ = 0
    it_ = iter(data_)
    for row_e in it_:
        if agg_data_e_v0 is _none:
            checksum_ += 1
            agg_data_e_v0 = defaultdict(list)
            agg_data_e_v0[row_e[0]].append(row_e)
        else:
            agg_data_e_v0[row_e[0]].append(row_e)
        if checksum_ == 1:
            globals()["__BROKEN_EARLY__"] = True  # DEBUG ONLY
            break
    for row_e in it_:
        agg_data_e_v0[row_e[0]].append(row_e)

    return __v if (agg_data_e_v0 is _none) else (setattr(agg_data_e_v0, "default_factory", None) or agg_data_e_v0)

def join_(left_, right_, _none):
    yielded_right_ids = set()
    hash_to_right_items = aggregate_e(_none, right_)
    del right_
    for left_item in left_:
        left_key = left_item[0]
        right_items = iter((((i for i in hash_to_right_items[left_key] if (((left_item[1] < i[1])))) if (left_key in hash_to_right_items) else ())))
        right_item = next(right_items, _none)
        if right_item is _none:
            yield left_item, None
        else:
            yielded_right_ids.add(id(right_item))
            yield left_item, right_item
            for right_item in right_items:
                yielded_right_ids.add(id(right_item))
                yield left_item, right_item
    yield from (
        (None, right_item) for right_item in (item for items in hash_to_right_items.values() for item in items) if id(right_item) not in yielded_right_ids
    )

def converter(data_, *, right):
    global __none__
    _none = __none__
    try:
        return join_(data_, right, _none)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return (
            {
                "a_LEFT": ((None if (i[0] is None) else i[0][0])),
                "b": ((None if (i[0] is None) else i[0][1])),
                "a_RIGHT": ((None if (i[1] is None) else i[1][0])),
                "c": ((None if (i[1] is None) else i[1][1])),
            }
            for i in data_
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Chain tables

chain method concatenates tables vertically. It has the following parameters:

  • table to chain
  • fill_value is used to fill gaps; default is None
from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    assert list(
        Table.from_rows([(1, 2), (2, 3)], ["a", "b"])
        .chain(
            Table.from_rows([(1, 3), (2, 4)], ["a", "c"]),
            fill_value=0,
        )
        .into_iter_rows(dict)
    ) == [
        {"a": 1, "b": 2, "c": 0},
        {"a": 2, "b": 3, "c": 0},
        {"a": 1, "b": 0, "c": 3},
        {"a": 2, "b": 0, "c": 4},
    ]
def converter(data_):
    try:
        return (
            (
                i[0],
                i[1],
                0,
            )
            for i in data_
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return (
            (
                i[0],
                0,
                i[1],
            )
            for i in data_
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return ({"a": i[0], "b": i[1], "c": i[2]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Zip tables

zip method concatenates tables horizontally. Its parameters are:

  • table to zip
  • fill_value is used to fill gaps; default is None

Warning

  • Before using this method, please make sure you are not looking for Table.join.
  • Be cautious with using .into_iter_rows(dict) here, because by default zip uses "keep" duplicate_columns strategy, so you'll lose duplicate columns in case of collision because dict will take care of it
from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    assert list(
        Table.from_rows([(1, 2), (2, 3)], ["a", "b"])
        .zip(
            Table.from_rows([(10, 3), (20, 4)], ["a", "c"]),
            fill_value=0,
        )
        .into_iter_rows(tuple, include_header=True)
    ) == [("a", "b", "a", "c"), (1, 2, 10, 3), (2, 3, 20, 4)]
def converter(data_):
    try:
        return (
            (
                i[0][0],
                i[0][1],
                i[1][0],
                i[1][1],
            )
            for i in data_
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Explode table

explode method transforms a table with a column, which contains lists, to a table with values of these lists, by repeating values of other columns. It's only parameter is column_name to explode.

from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    assert list(
        Table.from_rows([{"a": 1, "b": [1, 2, 3]}, {"a": 10, "b": [4, 5, 6]}])
        .explode("b")
        .into_iter_rows(tuple, include_header=True)
    ) == [
        ("a", "b"),
        (1, 1),
        (1, 2),
        (1, 3),
        (10, 4),
        (10, 5),
        (10, 6),
    ]
def converter(data_):
    try:
        return (
            (
                i["a"],
                i["b"],
            )
            for i in data_
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return (
            (
                row_[0],
                value_,
            )
            for row_ in data_
            for value_ in row_[1]
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Wide to long

Experimental feature

It was added on Mar 6, 2024 and may be stabilized ~ in half a year.

wide_to_long method turns a table from wide to long view, turning a single row into multiple rows, which have fewer columns:

from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    assert list(
        Table.from_rows(
            [{"name": "John", "height": 200, "age": 30, "mood": "good"}]
        )
        .wide_to_long(
            col_for_names="metric", col_for_values="value", keep_cols=("name",)
        )
        .into_iter_rows(dict)
    ) == [
        {"name": "John", "metric": "height", "value": 200},
        {"name": "John", "metric": "age", "value": 30},
        {"name": "John", "metric": "mood", "value": "good"},
    ]
def converter(data_):
    try:
        return (
            (
                i["name"],
                i["height"],
                i["age"],
                i["mood"],
            )
            for i in data_
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return (
            new_row
            for row_ in data_
            for new_row in (
                (
                    row_[0],
                    "height",
                    row_[1],
                ),
                (
                    row_[0],
                    "age",
                    row_[2],
                ),
                (
                    row_[0],
                    "mood",
                    row_[3],
                ),
            )
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return ({"name": i[0], "metric": i[1], "value": i[2]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Using tables inside conversions

It's impossible to make Table work directly inside other conversions, because it would create more than one code generating layer.

But you most definitely can leverage piping to callables:

from convtools import conversion as c
from convtools.contrib.tables import Table

with c.OptionsCtx() as options:
    options.debug = True

    input_data = [["a", "b"], [1, 2], [3, 4]]
    converter = (
        c.this.pipe(
            lambda it: Table.from_rows(it, header=True).into_iter_rows(dict)
        )
        .as_type(list)
        .gen_converter()
    )
    assert converter(input_data) == [
        {"a": 1, "b": 2},
        {"a": 3, "b": 4},
    ]
def converter(data_, *, __lambda=__naive_values__["__lambda"]):
    try:
        return list(__lambda(data_))
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return ({"a": i[0], "b": i[1]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Note

Keep in mind, that unlike conversions, Table doesn't have gen_converter method, so the code cannot be generated once during a warm-up and used multiple times. Tables generate their code at each run.