Welcome to convtools¶
convtools is a specialized Python library designed for defining data transformations dynamically using a declarative approach. It automatically generates custom Python code for the user in the background.
Installation¶
pip install convtools
Structure¶
-
from convtools import conversion as c
exposes the main interface to build pipelines for processing data, doing complex aggregations and joins. -
from convtools.contrib.tables import Table
- stream processing of table-like data (e.g. CSV) -
from convtools.contrib import fs
- tiny utils to handle splitting buffers with custom newlines
Sneak peak on what's inside¶
Pipes¶
from convtools import conversion as c
input_data = [{"StoreID": " 123", "Quantity": "123"}]
# define a conversion (sometimes you may want to do this dynamically)
# takes iterable and returns iterable of dicts, stopping before the first
# one with quantity >= 1000, splitting into chunks of size = 1000
conversion = (
c.iter(
{
"id": c.item("StoreID").call_method("strip"),
"quantity": c.item("Quantity").as_type(int),
}
)
.take_while(c.item("quantity") < 1000)
.pipe(c.chunk_by(c.item("id"), size=1000))
.as_type(list)
)
# compile the conversion into an ad hoc function and run it
converter = conversion.gen_converter(debug=True)
# run it as any function
assert converter(input_data) == [[{"id": "123", "quantity": 123}]]
# OR in case of a one-shot use, skip the gen_converter part
conversion.execute(input_data)
def take_while_e(it_e):
for item_e in it_e:
if item_e["quantity"] < 1000:
yield item_e
else:
break
def chunk_by(items_):
items_ = iter(items_)
try:
item_ = next(items_)
except StopIteration:
return
chunk_ = [item_]
chunk_item_signature = item_["id"]
size_ = 1
for item_ in items_:
new_item_signature = item_["id"]
if chunk_item_signature == new_item_signature and size_ < 1000:
chunk_.append(item_)
size_ = size_ + 1
else:
yield chunk_
chunk_ = [item_]
chunk_item_signature = new_item_signature
size_ = 1
yield chunk_
def pipe_i(input_):
return list(chunk_by(input_))
def pipe_(input_):
return pipe_i(take_while_e(input_))
def converter(data_):
try:
return pipe_(({"id": i["StoreID"].strip(), "quantity": int(i["Quantity"])} for i in data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Aggregations & Group By¶
from convtools import conversion as c
input_data = [
{"a": 5, "b": "foo"},
{"a": 10, "b": "foo"},
{"a": 10, "b": "bar"},
{"a": 10, "b": "bar"},
{"a": 20, "b": "bar"},
]
# list of "a" values where "b" equals to "bar"
# "b" value of a row where "a" has Max value
conv = c.aggregate(
{
"a": c.ReduceFuncs.Array(c.item("a"), where=c.item("b") == "bar"),
"b": c.ReduceFuncs.MaxRow(
c.item("a"),
).item("b", default=None),
}
).gen_converter(debug=True)
assert conv(input_data) == {"a": [10, 10, 20], "b": "bar"}
def aggregate_(_none, data_, *, __get_1_or_default=__naive_values__["__get_1_or_default"]):
agg_data__v0 = agg_data__v1 = _none
checksum_ = 0
it_ = iter(data_)
for row_ in it_:
_r0_ = row_["a"]
if row_["b"] == "bar":
if agg_data__v0 is _none:
checksum_ += 1
agg_data__v0 = [row_["a"]]
else:
agg_data__v0.append(row_["a"])
if _r0_ is not None:
if agg_data__v1 is _none:
checksum_ += 1
agg_data__v1 = (_r0_, row_)
else:
if agg_data__v1[0] < _r0_:
agg_data__v1 = (_r0_, row_)
if checksum_ == 2:
globals()["__BROKEN_EARLY__"] = True # DEBUG ONLY
break
for row_ in it_:
_r0_ = row_["a"]
if row_["b"] == "bar":
agg_data__v0.append(row_["a"])
if _r0_ is not None:
if agg_data__v1[0] < _r0_:
agg_data__v1 = (_r0_, row_)
return {
"a": ((None if (agg_data__v0 is _none) else agg_data__v0)),
"b": __get_1_or_default(((None if (agg_data__v1 is _none) else agg_data__v1[1])), "b", None),
}
def converter(data_):
global __none__
_none = __none__
try:
return aggregate_(_none, data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
from convtools import conversion as c
input_data = [
{"a": 5, "b": "foo"},
{"a": 10, "b": "foo"},
{"a": 10, "b": "bar"},
{"a": 10, "b": "bar"},
{"a": 20, "b": "bar"},
]
conv = (
c.group_by(c.item("b"))
.aggregate(
{
"b": c.item("b"),
"a_first": c.ReduceFuncs.First(c.item("a"), where=c.item("a") > 5),
"a_max": c.ReduceFuncs.Max(c.item("a")),
}
)
.gen_converter(debug=True)
)
assert conv(input_data) == [
{"b": "foo", "a_first": 10, "a_max": 10},
{"b": "bar", "a_first": 10, "a_max": 20},
]
class AggData_:
__slots__ = ["v0", "v1"]
def __init__(self, _none=__none__):
self.v0 = _none
self.v1 = _none
def group_by_(_none, data_):
signature_to_agg_data_ = defaultdict(AggData_)
for row_ in data_:
agg_data_ = signature_to_agg_data_[row_["b"]]
_r0_ = row_["a"]
if row_["a"] > 5:
if agg_data_.v0 is _none:
agg_data_.v0 = row_["a"]
else:
pass
if _r0_ is not None:
if agg_data_.v1 is _none:
agg_data_.v1 = _r0_
else:
if agg_data_.v1 < _r0_:
agg_data_.v1 = _r0_
return [
{"b": signature_, "a_first": ((None if (agg_data_.v0 is _none) else agg_data_.v0)), "a_max": ((None if (agg_data_.v1 is _none) else agg_data_.v1))}
for signature_, agg_data_ in signature_to_agg_data_.items()
]
def converter(data_):
global __none__
_none = __none__
try:
return group_by_(_none, data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Built-in reducers like c.ReduceFuncs.Sum
¶
* Sum - auto-replaces False values with 0; default=0
* SumOrNone - sum or None if at least one None is encountered; default=None
* Max - max not None
* MaxRow - row with max not None
* Min - min not None
* MinRow - row with min not None
* Count - count of everything
* CountDistinct - len of resulting set of values
* First - first encountered value
* Last - last encountered value
* Average(value, weight=1) - pass custom weight conversion for weighted average
* Median
* Percentile(percentile, value, interpolation="linear")
c.ReduceFuncs.Percentile(95.0, c.item("x"))
interpolation is one of:
- "linear"
- "lower"
- "higher"
- "midpoint"
- "nearest"
* Mode
* TopK - c.ReduceFuncs.TopK(3, c.item("x"))
* Array
* ArrayDistinct
* ArraySorted
c.ReduceFuncs.ArraySorted(c.item("x"), key=lambda v: v, reverse=True)
DICT REDUCERS ARE IN FACT AGGREGATIONS THEMSELVES, BECAUSE VALUES GET REDUCED.
* Dict
c.ReduceFuncs.Dict(c.item("key"), c.item("x"))
* DictArray - dict values are lists of encountered values
* DictSum - values are sums
* DictSumOrNone
* DictMax
* DictMin
* DictCount
* DictCountDistinct
* DictFirst
* DictLast
AND LASTLY YOU CAN DEFINE YOUR OWN REDUCER BY PASSING ANY REDUCE FUNCTION
OF TWO ARGUMENTS TO ``c.reduce``.
Joins¶
from convtools import conversion as c
collection_1 = [
{"id": 1, "name": "Nick"},
{"id": 2, "name": "Joash"},
{"id": 3, "name": "Bob"},
]
collection_2 = [
{"ID": "3", "age": 17, "country": "GB"},
{"ID": "2", "age": 21, "country": "US"},
{"ID": "1", "age": 18, "country": "CA"},
]
input_data = (collection_1, collection_2)
conv = (
c.join(
c.item(0),
c.item(1),
c.and_(
c.LEFT.item("id") == c.RIGHT.item("ID").as_type(int),
c.RIGHT.item("age") >= 18,
),
how="left",
)
.pipe(
c.list_comp(
{
"id": c.item(0, "id"),
"name": c.item(0, "name"),
"age": c.item(1, "age", default=None),
"country": c.item(1, "country", default=None),
}
)
)
.gen_converter(debug=True)
)
assert conv(input_data) == [
{"id": 1, "name": "Nick", "age": 18, "country": "CA"},
{"id": 2, "name": "Joash", "age": 21, "country": "US"},
{"id": 3, "name": "Bob", "age": None, "country": None},
]
def aggregate_i(_none, data_, *, __v=__naive_values__["__v"]):
agg_data_i_v0 = _none
checksum_ = 0
it_ = iter(data_)
for row_i in it_:
if row_i["age"] >= 18:
if agg_data_i_v0 is _none:
checksum_ += 1
agg_data_i_v0 = defaultdict(list)
agg_data_i_v0[int(row_i["ID"])].append(row_i)
else:
agg_data_i_v0[int(row_i["ID"])].append(row_i)
if checksum_ == 1:
globals()["__BROKEN_EARLY__"] = True # DEBUG ONLY
break
for row_i in it_:
if row_i["age"] >= 18:
agg_data_i_v0[int(row_i["ID"])].append(row_i)
return __v if (agg_data_i_v0 is _none) else (setattr(agg_data_i_v0, "default_factory", None) or agg_data_i_v0)
def join_(left_, right_, _none):
hash_to_right_items = aggregate_i(_none, right_)
del right_
for left_item in left_:
left_key = left_item["id"]
right_items = iter(((hash_to_right_items[left_key] if (left_key in hash_to_right_items) else ())))
right_item = next(right_items, _none)
if right_item is _none:
yield left_item, None
else:
yield left_item, right_item
for right_item in right_items:
yield left_item, right_item
def converter(data_, *, __get_2_or_default=__naive_values__["__get_2_or_default"]):
global __none__
_none = __none__
try:
return [
{"id": i[0]["id"], "name": i[0]["name"], "age": __get_2_or_default(i, 1, "age", None), "country": __get_2_or_default(i, 1, "country", None)}
for i in join_(data_[0], data_[1], _none)
]
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Tables¶
from convtools.contrib.tables import Table
from convtools import conversion as c
with c.OptionsCtx() as options:
options.debug = True
# reads Iterable of rows
iterable_of_rows = (
Table.from_rows([(0, -1), (1, 2)], header=["a", "b"]).join(
Table
# reads tab-separated CSV file
.from_csv(
"tests/csvs/ac.csv",
header=True,
dialect=Table.csv_dialect(delimiter="\t"),
)
# transform column values
.update(
a=c.col("a").as_type(float),
c=c.col("c").as_type(int),
)
# filter rows by condition
.filter(c.col("c") >= 0),
# joins on column "a" values
on=["a"],
how="inner",
)
# rearrange columns
.take(..., "a")
# this is a generator to consume (tuple, list are supported too)
.into_iter_rows(dict)
)
assert list(iterable_of_rows) == [{"b": 2, "c": 3, "a": 1}]
def converter(data_):
try:
return (
i
for i in (
(
float(i_i[0]),
int(i_i[1]),
)
for i_i in data_
)
if (i[1] >= 0)
)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def aggregate_e(_none, data_, *, __v=__naive_values__["__v"]):
agg_data_e_v0 = _none
checksum_ = 0
it_ = iter(data_)
for row_e in it_:
if agg_data_e_v0 is _none:
checksum_ += 1
agg_data_e_v0 = defaultdict(list)
agg_data_e_v0[row_e[0]].append(row_e)
else:
agg_data_e_v0[row_e[0]].append(row_e)
if checksum_ == 1:
globals()["__BROKEN_EARLY__"] = True # DEBUG ONLY
break
for row_e in it_:
agg_data_e_v0[row_e[0]].append(row_e)
return __v if (agg_data_e_v0 is _none) else (setattr(agg_data_e_v0, "default_factory", None) or agg_data_e_v0)
def join_(left_, right_, _none):
hash_to_right_items = aggregate_e(_none, right_)
del right_
for left_item in left_:
left_key = left_item[0]
right_items = hash_to_right_items[left_key] if (left_key in hash_to_right_items) else ()
for right_item in right_items:
yield left_item, right_item
def converter(data_, *, right):
global __none__
_none = __none__
try:
return join_(data_, right, _none)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return ({"b": i[0][1], "c": i[1][1], "a": i[0][0]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
And of course you can mix all this together!
What's the point if there are tools like Pandas / Polars?¶
- convtools doesn't need to wrap data in a container to provide functionality, it simply runs the python code it generates on any input
- convtools is lightweight (though optional
black
is highly recommended for pretty-printing generated code out of curiosity) - convtools fosters building pipelines on top of iterators, allowing for stream processing
- convtools supports nested aggregations
- convtools is a set of primitives for code generation, so it's just different.
Is it debuggable?¶
Despite being compiled at runtime, it is (by both pdb
and pydevd
).
Support¶
- westandskif (Nikita Almakov) - Link to support
Reporting a Security Vulnerability¶
See the security policy.