Skip to content

Welcome to convtools

convtools is a specialized Python library designed for defining data transformations dynamically using a declarative approach. It automatically generates custom Python code for the user in the background.

License codecov Tests status Docs status PyPI Twitter Downloads Python versions

Installation

pip install convtools

Structure

  1. from convtools import conversion as c exposes the main interface to build pipelines for processing data, doing complex aggregations and joins.

  2. from convtools.contrib.tables import Table - stream processing of table-like data (e.g. CSV)

  3. from convtools.contrib import fs - tiny utils to handle splitting buffers with custom newlines

Sneak peak on what's inside

Pipes

from convtools import conversion as c

input_data = [{"StoreID": " 123", "Quantity": "123"}]

# define a conversion (sometimes you may want to do this dynamically)
#  takes iterable and returns iterable of dicts, stopping before the first
#  one with quantity >= 1000, splitting into chunks of size = 1000
conversion = (
    c.iter(
        {
            "id": c.item("StoreID").call_method("strip"),
            "quantity": c.item("Quantity").as_type(int),
        }
    )
    .take_while(c.item("quantity") < 1000)
    .pipe(c.chunk_by(c.item("id"), size=1000))
    .as_type(list)
)

# compile the conversion into an ad hoc function and run it
converter = conversion.gen_converter(debug=True)

# run it as any function
assert converter(input_data) == [[{"id": "123", "quantity": 123}]]

# OR in case of a one-shot use, skip the gen_converter part
conversion.execute(input_data)
def take_while_e(it_e):
    for item_e in it_e:
        if item_e["quantity"] < 1000:
            yield item_e
        else:
            break

def chunk_by(items_):
    items_ = iter(items_)
    try:
        item_ = next(items_)
    except StopIteration:
        return
    chunk_ = [item_]
    chunk_item_signature = item_["id"]
    size_ = 1
    for item_ in items_:
        new_item_signature = item_["id"]
        if chunk_item_signature == new_item_signature and size_ < 1000:
            chunk_.append(item_)
            size_ = size_ + 1
        else:
            yield chunk_
            chunk_ = [item_]
            chunk_item_signature = new_item_signature
            size_ = 1
    yield chunk_

def pipe_i(input_):
    return list(chunk_by(input_))

def pipe_(input_):
    return pipe_i(take_while_e(input_))

def converter(data_):
    try:
        return pipe_(({"id": i["StoreID"].strip(), "quantity": int(i["Quantity"])} for i in data_))
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Aggregations & Group By

from convtools import conversion as c

input_data = [
    {"a": 5, "b": "foo"},
    {"a": 10, "b": "foo"},
    {"a": 10, "b": "bar"},
    {"a": 10, "b": "bar"},
    {"a": 20, "b": "bar"},
]

# list of "a" values where "b" equals to "bar"
# "b" value of a row where "a" has Max value
conv = c.aggregate(
    {
        "a": c.ReduceFuncs.Array(c.item("a"), where=c.item("b") == "bar"),
        "b": c.ReduceFuncs.MaxRow(
            c.item("a"),
        ).item("b", default=None),
    }
).gen_converter(debug=True)

assert conv(input_data) == {"a": [10, 10, 20], "b": "bar"}
def aggregate_(_none, data_, *, __get_1_or_default=__naive_values__["__get_1_or_default"]):
    agg_data__v0 = agg_data__v1 = _none

    checksum_ = 0
    it_ = iter(data_)
    for row_ in it_:
        _r0_ = row_["a"]
        if row_["b"] == "bar":
            if agg_data__v0 is _none:
                checksum_ += 1
                agg_data__v0 = [row_["a"]]
            else:
                agg_data__v0.append(row_["a"])
        if _r0_ is not None:
            if agg_data__v1 is _none:
                checksum_ += 1
                agg_data__v1 = (_r0_, row_)
            else:
                if agg_data__v1[0] < _r0_:
                    agg_data__v1 = (_r0_, row_)
        if checksum_ == 2:
            globals()["__BROKEN_EARLY__"] = True  # DEBUG ONLY
            break
    for row_ in it_:
        _r0_ = row_["a"]
        if row_["b"] == "bar":
            agg_data__v0.append(row_["a"])
        if _r0_ is not None:
            if agg_data__v1[0] < _r0_:
                agg_data__v1 = (_r0_, row_)

    return {
        "a": ((None if (agg_data__v0 is _none) else agg_data__v0)),
        "b": __get_1_or_default(((None if (agg_data__v1 is _none) else agg_data__v1[1])), "b", None),
    }

def converter(data_):
    global __none__
    _none = __none__
    try:
        return aggregate_(_none, data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise
from convtools import conversion as c

input_data = [
    {"a": 5, "b": "foo"},
    {"a": 10, "b": "foo"},
    {"a": 10, "b": "bar"},
    {"a": 10, "b": "bar"},
    {"a": 20, "b": "bar"},
]

conv = (
    c.group_by(c.item("b"))
    .aggregate(
        {
            "b": c.item("b"),
            "a_first": c.ReduceFuncs.First(c.item("a"), where=c.item("a") > 5),
            "a_max": c.ReduceFuncs.Max(c.item("a")),
        }
    )
    .gen_converter(debug=True)
)

assert conv(input_data) == [
    {"b": "foo", "a_first": 10, "a_max": 10},
    {"b": "bar", "a_first": 10, "a_max": 20},
]
class AggData_:
    __slots__ = ["v0", "v1"]

    def __init__(self, _none=__none__):
        self.v0 = _none
        self.v1 = _none

def group_by_(_none, data_):
    signature_to_agg_data_ = defaultdict(AggData_)

    for row_ in data_:
        agg_data_ = signature_to_agg_data_[row_["b"]]
        _r0_ = row_["a"]
        if row_["a"] > 5:
            if agg_data_.v0 is _none:
                agg_data_.v0 = row_["a"]
            else:
                pass
        if _r0_ is not None:
            if agg_data_.v1 is _none:
                agg_data_.v1 = _r0_
            else:
                if agg_data_.v1 < _r0_:
                    agg_data_.v1 = _r0_

    return [
        {"b": signature_, "a_first": ((None if (agg_data_.v0 is _none) else agg_data_.v0)), "a_max": ((None if (agg_data_.v1 is _none) else agg_data_.v1))}
        for signature_, agg_data_ in signature_to_agg_data_.items()
    ]

def converter(data_):
    global __none__
    _none = __none__
    try:
        return group_by_(_none, data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise
Built-in reducers like c.ReduceFuncs.Sum
* Sum - auto-replaces False values with 0; default=0
* SumOrNone - sum or None if at least one None is encountered; default=None
* Max - max not None
* MaxRow - row with max not None
* Min - min not None
* MinRow - row with min not None
* Count - count of everything
* CountDistinct - len of resulting set of values
* First - first encountered value
* Last - last encountered value
* Average(value, weight=1) - pass custom weight conversion for weighted average
* Median
* Percentile(percentile, value, interpolation="linear")
    c.ReduceFuncs.Percentile(95.0, c.item("x"))
    interpolation is one of:
      - "linear"
      - "lower"
      - "higher"
      - "midpoint"
      - "nearest"
* Mode
* TopK - c.ReduceFuncs.TopK(3, c.item("x"))
* Array
* ArrayDistinct
* ArraySorted
    c.ReduceFuncs.ArraySorted(c.item("x"), key=lambda v: v, reverse=True)

DICT REDUCERS ARE IN FACT AGGREGATIONS THEMSELVES, BECAUSE VALUES GET REDUCED.
* Dict
    c.ReduceFuncs.Dict(c.item("key"), c.item("x"))
* DictArray - dict values are lists of encountered values
* DictSum - values are sums
* DictSumOrNone
* DictMax
* DictMin
* DictCount
* DictCountDistinct
* DictFirst
* DictLast

AND LASTLY YOU CAN DEFINE YOUR OWN REDUCER BY PASSING ANY REDUCE FUNCTION
OF TWO ARGUMENTS TO ``c.reduce``.

Joins

from convtools import conversion as c

collection_1 = [
    {"id": 1, "name": "Nick"},
    {"id": 2, "name": "Joash"},
    {"id": 3, "name": "Bob"},
]
collection_2 = [
    {"ID": "3", "age": 17, "country": "GB"},
    {"ID": "2", "age": 21, "country": "US"},
    {"ID": "1", "age": 18, "country": "CA"},
]
input_data = (collection_1, collection_2)

conv = (
    c.join(
        c.item(0),
        c.item(1),
        c.and_(
            c.LEFT.item("id") == c.RIGHT.item("ID").as_type(int),
            c.RIGHT.item("age") >= 18,
        ),
        how="left",
    )
    .pipe(
        c.list_comp(
            {
                "id": c.item(0, "id"),
                "name": c.item(0, "name"),
                "age": c.item(1, "age", default=None),
                "country": c.item(1, "country", default=None),
            }
        )
    )
    .gen_converter(debug=True)
)

assert conv(input_data) == [
    {"id": 1, "name": "Nick", "age": 18, "country": "CA"},
    {"id": 2, "name": "Joash", "age": 21, "country": "US"},
    {"id": 3, "name": "Bob", "age": None, "country": None},
]
def aggregate_i(_none, data_, *, __v=__naive_values__["__v"]):
    agg_data_i_v0 = _none

    checksum_ = 0
    it_ = iter(data_)
    for row_i in it_:
        if row_i["age"] >= 18:
            if agg_data_i_v0 is _none:
                checksum_ += 1
                agg_data_i_v0 = defaultdict(list)
                agg_data_i_v0[int(row_i["ID"])].append(row_i)
            else:
                agg_data_i_v0[int(row_i["ID"])].append(row_i)
        if checksum_ == 1:
            globals()["__BROKEN_EARLY__"] = True  # DEBUG ONLY
            break
    for row_i in it_:
        if row_i["age"] >= 18:
            agg_data_i_v0[int(row_i["ID"])].append(row_i)

    return __v if (agg_data_i_v0 is _none) else (setattr(agg_data_i_v0, "default_factory", None) or agg_data_i_v0)

def join_(left_, right_, _none):
    hash_to_right_items = aggregate_i(_none, right_)
    del right_
    for left_item in left_:
        left_key = left_item["id"]
        right_items = iter(((hash_to_right_items[left_key] if (left_key in hash_to_right_items) else ())))
        right_item = next(right_items, _none)
        if right_item is _none:
            yield left_item, None
        else:
            yield left_item, right_item
            for right_item in right_items:
                yield left_item, right_item

def converter(data_, *, __get_2_or_default=__naive_values__["__get_2_or_default"]):
    global __none__
    _none = __none__
    try:
        return [
            {"id": i[0]["id"], "name": i[0]["name"], "age": __get_2_or_default(i, 1, "age", None), "country": __get_2_or_default(i, 1, "country", None)}
            for i in join_(data_[0], data_[1], _none)
        ]
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Tables

from convtools.contrib.tables import Table
from convtools import conversion as c

with c.OptionsCtx() as options:
    options.debug = True

    # reads Iterable of rows
    iterable_of_rows = (
        Table.from_rows([(0, -1), (1, 2)], header=["a", "b"]).join(
            Table
            # reads tab-separated CSV file
            .from_csv(
                "tests/csvs/ac.csv",
                header=True,
                dialect=Table.csv_dialect(delimiter="\t"),
            )
            # transform column values
            .update(
                a=c.col("a").as_type(float),
                c=c.col("c").as_type(int),
            )
            # filter rows by condition
            .filter(c.col("c") >= 0),
            # joins on column "a" values
            on=["a"],
            how="inner",
        )
        # rearrange columns
        .take(..., "a")
        # this is a generator to consume (tuple, list are supported too)
        .into_iter_rows(dict)
    )

    assert list(iterable_of_rows) == [{"b": 2, "c": 3, "a": 1}]
def converter(data_):
    try:
        return (
            i
            for i in (
                (
                    float(i_i[0]),
                    int(i_i[1]),
                )
                for i_i in data_
            )
            if (i[1] >= 0)
        )
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def aggregate_e(_none, data_, *, __v=__naive_values__["__v"]):
    agg_data_e_v0 = _none

    checksum_ = 0
    it_ = iter(data_)
    for row_e in it_:
        if agg_data_e_v0 is _none:
            checksum_ += 1
            agg_data_e_v0 = defaultdict(list)
            agg_data_e_v0[row_e[0]].append(row_e)
        else:
            agg_data_e_v0[row_e[0]].append(row_e)
        if checksum_ == 1:
            globals()["__BROKEN_EARLY__"] = True  # DEBUG ONLY
            break
    for row_e in it_:
        agg_data_e_v0[row_e[0]].append(row_e)

    return __v if (agg_data_e_v0 is _none) else (setattr(agg_data_e_v0, "default_factory", None) or agg_data_e_v0)

def join_(left_, right_, _none):
    hash_to_right_items = aggregate_e(_none, right_)
    del right_
    for left_item in left_:
        left_key = left_item[0]
        right_items = hash_to_right_items[left_key] if (left_key in hash_to_right_items) else ()
        for right_item in right_items:
            yield left_item, right_item

def converter(data_, *, right):
    global __none__
    _none = __none__
    try:
        return join_(data_, right, _none)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

def converter(data_):
    try:
        return ({"b": i[0][1], "c": i[1][1], "a": i[0][0]} for i in data_)
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

And of course you can mix all this together!

What's the point if there are tools like Pandas / Polars?

  • convtools doesn't need to wrap data in a container to provide functionality, it simply runs the python code it generates on any input
  • convtools is lightweight (though optional black is highly recommended for pretty-printing generated code out of curiosity)
  • convtools fosters building pipelines on top of iterators, allowing for stream processing
  • convtools supports nested aggregations
  • convtools is a set of primitives for code generation, so it's just different.

Is it debuggable?

Despite being compiled at runtime, it is (by both pdb and pydevd).

Support

Reporting a Security Vulnerability

See the security policy.