convtools

convtools is a python library to declaratively define data transforms:

  • convtools.conversion - pipelines for processing collections, doing complex aggregations and joins.

  • convtools.contrib.tables - stream processing of table-like data (e.g. CSV)

Why would you need this?

  • you prefer declarative approach

  • you love functional programming

  • you believe that Python is high-level enough not to make you write aggregations and joins by hand

  • you need to serialize/validate objects

  • you need to dynamically define transforms (including at runtime)

  • you like the idea of having something write ad hoc code for you :)

Installation:

pip install convtools

Conversions - data transforms, aggregations, joins

# pip install convtools

from convtools import conversion as c

input_data = [{"StoreID": " 123", "Quantity": "123"}]

# define a conversion (sometimes you may want to do this dynamically)
#  takes iterable and returns iterable of dicts, stopping before the first
#  one with quantity >= 1000, splitting into chunks of size = 1000
conversion = (
    c.iter(
        {
            "id": c.item("StoreID").call_method("strip"),
            "quantity": c.item("Quantity").as_type(int),
        }
    )
    .take_while(c.item("quantity") < 1000)
    .pipe(
        c.chunk_by(c.item("id"), size=1000)
    )
    .as_type(list)
    .gen_converter(debug=True)
)

# compile the conversion into an ad hoc function and run it
converter = conversion.gen_converter()
converter(input_data)

# OR in case of a one-shot use
conversion.execute(input_data)
from convtools import conversion as c


def test_doc__index_intro():

    # ======== #
    # GROUP BY #
    # ======== #
    input_data = [
        {"a": 5, "b": "foo"},
        {"a": 10, "b": "foo"},
        {"a": 10, "b": "bar"},
        {"a": 10, "b": "bar"},
        {"a": 20, "b": "bar"},
    ]

    conv = (
        c.group_by(c.item("b"))
        .aggregate(
            {
                "b": c.item("b"),
                "a_first": c.ReduceFuncs.First(c.item("a")),
                "a_max": c.ReduceFuncs.Max(c.item("a")),
            }
        )
        .gen_converter(debug=True)
    )

    assert conv(input_data) == [
        {"b": "foo", "a_first": 5, "a_max": 10},
        {"b": "bar", "a_first": 10, "a_max": 20},
    ]

    # ========= #
    # AGGREGATE #
    # ========= #
    conv = c.aggregate(
        {
            # list of "a" values where "b" equals to "bar"
            "a": c.ReduceFuncs.Array(c.item("a"), where=c.item("b") == "bar"),
            # "b" value of a row where "a" has Max value
            "b": c.ReduceFuncs.MaxRow(
                c.item("a"),
            ).item("b", default=None),
        }
    ).gen_converter(debug=True)

    assert conv(input_data) == {"a": [10, 10, 20], "b": "bar"}

    # ==== #
    # JOIN #
    # ==== #
    collection_1 = [
        {"id": 1, "name": "Nick"},
        {"id": 2, "name": "Joash"},
        {"id": 3, "name": "Bob"},
    ]
    collection_2 = [
        {"ID": "3", "age": 17, "country": "GB"},
        {"ID": "2", "age": 21, "country": "US"},
        {"ID": "1", "age": 18, "country": "CA"},
    ]
    input_data = (collection_1, collection_2)

    conv = (
        c.join(
            c.item(0),
            c.item(1),
            c.and_(
                c.LEFT.item("id") == c.RIGHT.item("ID").as_type(int),
                c.RIGHT.item("age") >= 18,
            ),
            how="left",
        )
        .pipe(
            c.list_comp(
                {
                    "id": c.item(0, "id"),
                    "name": c.item(0, "name"),
                    "age": c.item(1, "age", default=None),
                    "country": c.item(1, "country", default=None),
                }
            )
        )
        .gen_converter(debug=True)
    )

    assert conv(input_data) == [
        {"id": 1, "name": "Nick", "age": 18, "country": "CA"},
        {"id": 2, "name": "Joash", "age": 21, "country": "US"},
        {"id": 3, "name": "Bob", "age": None, "country": None},
    ]

What reducers are supported by aggregations?

Built-in ones, exposed like c.ReduceFuncs.Sum:
  • Sum

  • SumOrNone

  • Max

  • MaxRow

  • Min

  • MinRow

  • Count

  • CountDistinct

  • First

  • Last

  • Average

  • Median

  • Percentile - c.ReduceFuncs.Percentile(95.0, c.item("x"))

  • Mode

  • TopK - c.ReduceFuncs.TopK(3, c.item("x"))

  • Array

  • ArrayDistinct

  • ArraySorted - c.ReduceFuncs.ArraySorted(c.item("x"), key=lambda v: v, reverse=True)

  • Dict - c.ReduceFuncs.Dict(c.item("key"), c.item("x"))

  • DictArray

  • DictSum

  • DictSumOrNone

  • DictMax

  • DictMin

  • DictCount

  • DictCountDistinct

  • DictFirst

  • DictLast

and any reduce function of two arguments you pass in c.reduce.

Contrib / Table - stream processing of table-like data

Table helper allows to massage CSVs and table-like data:
  • join / zip / chain tables

  • take / drop / rename columns

  • filter rows

  • update / update_all values

from convtools.contrib.tables import Table
from convtools import conversion as c

# reads Iterable of rows
(
    Table.from_rows([(0, -1), (1, 2)], header=["a", "b"]).join(
        Table
        # reads tab-separated CSV file
        .from_csv(
            "tests/csvs/ac.csv",
            header=True,
            dialect=Table.csv_dialect(delimiter="\t"),
        )
        # transform column values
        .update(
            a=c.col("a").as_type(float),
            c=c.col("c").as_type(int),
        )
        # filter rows by condition
        .filter(c.col("c") >= 0),
        # joins on column "a" values
        on=["a"],
        how="inner",
    )
    # rearrange columns
    .take(..., "a")
    # this is a generator to consume (tuple, list are supported too)
    .into_iter_rows(dict)
)

Is it any different from tools like Pandas / Polars?

  • convtools doesn’t wrap data in any container, it just writes and runs the code which perform the conversion you defined

  • convtools is a lightweight library with no dependencies (however optional black is highly recommended for pretty-printing generated code when debugging)

  • convtools is about defining and reusing conversions – declarative approach, while wrapping data in high-performance containers is more of being imperative

  • convtools supports nested aggregations

Is this thing debuggable?

Despite being compiled at runtime, it is (by both pdb and pydevd).

Next steps:

  1. QuickStart

  2. Cheatsheet

  3. Table - Stream processing

A few more examples:

All-in-one example: word count

import re
from itertools import chain

from convtools import conversion as c


def test_doc__index_word_count():

    # Let's say we need to count words across all files
    input_data = [
        "war-and-peace-1.txt",
        "war-and-peace-2.txt",
        "war-and-peace-3.txt",
        "war-and-peace-4.txt",
    ]

    # # iterate an input and read file lines
    #
    # def read_file(filename):
    #     with open(filename) as f:
    #         for line in f:
    #             yield line
    # extract_strings = c.generator_comp(c.call_func(read_file, c.this))

    # to simplify testing
    extract_strings = c.generator_comp(
        c.call_func(lambda filename: [filename], c.this)
    )

    # 1. make ``re`` pattern available to the code to be generated
    # 2. call ``finditer`` method of the pattern and pass the string
    #    as an argument
    # 3. pass the result to the next conversion
    # 4. iterate results, call ``.group()`` method of each re.Match
    #    and call ``.lower()`` on each result
    split_words = (
        c.naive(re.compile(r"\w+"))
        .call_method("finditer", c.this)
        .pipe(
            c.generator_comp(
                c.this.call_method("group", 0).call_method("lower")
            )
        )
    )

    # ``extract_strings`` is the generator of strings
    # so we iterate it and pass each item to ``split_words`` conversion
    vectorized_split_words = c.generator_comp(c.this.pipe(split_words))

    # flattening the result of ``vectorized_split_words``, which is
    # a generator of generators of strings
    flatten = c.call_func(
        chain.from_iterable,
        c.this,
    )

    # aggregate the input, the result is a single dict
    # words are keys, values are count of words
    dict_word_to_count = c.aggregate(
        c.ReduceFuncs.DictCount(c.this, c.this, default=dict)
    )

    # take top N words by:
    #  - call ``.items()`` method of the dict (the result of the aggregate)
    #  - pass the result to ``sorted``
    #  - take the slice, using input argument named ``top_n``
    #  - cast to a dict
    take_top_n = (
        c.this.call_method("items")
        .sort(key=lambda t: t[1], reverse=True)
        .pipe(c.this[: c.input_arg("top_n")])
        .as_type(dict)
    )

    # the resulting pipeline is pretty self-descriptive, except the ``c.if_``
    # part, which checks the condition (first argument),
    # and returns the 2nd if True OR the 3rd (input data by default) otherwise
    pipeline = (
        extract_strings.pipe(flatten)
        .pipe(vectorized_split_words)
        .pipe(flatten)
        .pipe(dict_word_to_count)
        .pipe(
            c.if_(
                c.input_arg("top_n").is_not(None),
                c.this.pipe(take_top_n),
            )
        )
        # Define the resulting converter function signature.  In fact this
        # isn't necessary if you don't need to specify default values
    ).gen_converter(signature="data_, top_n=None")

    assert pipeline(input_data, top_n=3) == {"war": 4, "and": 4, "peace": 4}

Contents

Indices and tables