Collections¶
Please, make sure you've covered Basics first.
Collections and c() wrapper¶
The syntax to create a conversion, which builds a dict, list, tuple or a
set is as follows: c({"a": 1}) - just wrap in a c call.
Let's build a dict from a tuple of two integers:
from convtools import conversion as c
converter = c(
{
"a": c.item(0),
"b": c.item(1),
"c": c.item(0) + c.item(1),
# keys are dynamic too
c.call_func("d{}".format, c.item(0)): "key is dynamic",
}
).gen_converter(debug=True)
assert converter((1, 2)) == {"a": 1, "b": 2, "c": 3, "d1": "key is dynamic"}
assert c([1, c.this, 2]).execute(None, debug=True) == [1, None, 2]
So to summarize on c() wrapper, it:
- leaves conversions untouched
- interprets collections as conversions which are to build such collections
- wraps everything else in
c.naive.
Optional items¶
It's possible to mark a particular item as optional, so it disappears from an
dict/list/tuple/set in certain cases:
from convtools import conversion as c
converter = c(
{
"a": c.item(0),
"b": c.optional(c.item(1), skip_if=c.item(1) < 10),
"c": c.optional(c.item(0) + c.item(1), keep_if=c.item(0)),
"d": c.optional(c.item(0), skip_value=1),
}
).gen_converter(debug=True)
assert converter((1, 2)) == {"a": 1, "c": 3}
Spread (dict unpacking)¶
c.spread(conversion) unpacks dict items into another dict, similar to Python's
** operator or JavaScript's spread syntax. This is useful for merging nested
dicts into a parent dict.
from convtools import conversion as c
# Basic spread: merge nested dict into parent
conv = c.dict(
("id", c.item("id")),
c.spread(c.item("metadata")),
).gen_converter(debug=True)
assert conv({"id": 1, "metadata": {"name": "Alice", "age": 30}}) == {
"id": 1,
"name": "Alice",
"age": 30,
}
# Multiple spreads in one dict
conv = c.dict(
c.spread(c.item("a")),
c.spread(c.item("b")),
).gen_converter()
assert conv({"a": {"x": 1}, "b": {"y": 2}}) == {"x": 1, "y": 2}
# Override behavior: later keys win
conv = c.dict(
("x", 1),
c.spread(c.item("overrides")),
).gen_converter()
assert conv({"overrides": {"x": 999}}) == {"x": 999}
# Spread combined with optional items
conv = c.dict(
("a", 1),
c.spread(c.item("extra")),
(c.optional(c.item("key"), skip_value=None), c.item("val")),
).gen_converter()
assert conv({"extra": {"x": 10}, "key": "b", "val": 2}) == {
"a": 1,
"x": 10,
"b": 2,
}
assert conv({"extra": {}, "key": None, "val": 2}) == {"a": 1}
def _converter(data_):
try:
return {
"id": data_["id"],
**data_["metadata"],
}
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Note
c.spread can only be used inside c.dict(). Using it elsewhere raises an
AssertionError.
Type casting¶
To cast to a type use a naive conversion method as_type:
from convtools import conversion as c
converter = c.this.as_type(list).gen_converter(debug=True)
assert converter(range(2)) == [0, 1]
Note
It may seem useless as it can be replaced with piping the result to list
function or just to calling list function directly, but in fact some
conversions override this method to achieve predicate-pushdown-like
optimizations.
Iterators & Comprehensions¶
Process¶
To iterate an input, there are the following conversions:
c.iteranditermethodc.list_compc.dict_compc.tuple_compc.set_comp.
Each of them accepts where argument to support conditions like:
[x for x in items if x > 10].
A few examples:
from convtools import conversion as c
converter = c.iter(c.this + 1).gen_converter(debug=True)
assert list(converter(range(3))) == [1, 2, 3]
converter = c.item("objects").iter(c.this + 1).gen_converter(debug=True)
assert list(converter({"objects": range(3)})) == [1, 2, 3]
converter = c.list_comp(c.this + 1, where=c.this < 2).gen_converter(debug=True)
assert converter(range(3)) == [1, 2]
converter = c.dict_comp(c.this, c.this + 1).gen_converter(debug=True)
assert converter(range(3)) == {0: 1, 1: 2, 2: 3}
Note
It's important to note that a conversion passed into iter, list_comp and
other iteration methods defines the conversions of each element of the input
collection. This is one of the input-switching conversions.
filter¶
To filter an input use c.filter or filter conversion method:
from convtools import conversion as c
converter = c.filter(c.this < 3).gen_converter(debug=True)
assert list(converter(range(100))) == [0, 1, 2]
converter = c.this.filter(c.this < 3).gen_converter(debug=True)
assert list(converter(range(100))) == [0, 1, 2]
sort¶
sort conversion extends usual python's sorted with per-item asc/desc +
none_first/none_last configuration:
c.sort(key=None, reverse=False) or c.this.sort(key=None, reverse=False)
Args:
key: callable or conversion/tuple of conversions to form a sorting
key, to be passed to sorted
reverse (bool): to be passed to sorted
c.this.asc(none_last=None, none_first=None)
c.this.desc(none_last=None, none_first=None)
from convtools import conversion as c
converter = c.this.sort(key=lambda x: x, reverse=True).gen_converter(
debug=True
)
assert list(converter(range(3))) == [2, 1, 0]
data = [
{"a": 0, "b": 4},
{"a": None, "b": 3},
{"a": 1, "b": 2},
{"a": 0, "b": 1},
]
converter = c.this.sort(
key=(
c.item("a").desc(none_first=True),
c.item("b"),
)
).gen_converter(debug=True)
assert converter(data) == [
{"a": None, "b": 3},
{"a": 1, "b": 2},
{"a": 0, "b": 1},
{"a": 0, "b": 4},
]
sorting_key¶
c.sorting_key(*keys) returns a callable, which can be passed to sorted as:
from convtools import conversion as c
data = [
{"a": 0, "b": 4},
{"a": None, "b": 3},
{"a": 1, "b": 2},
{"a": 0, "b": 1},
]
with c.OptionsCtx() as options:
options.debug = True
result = sorted(
data,
key=c.sorting_key(
c.item("a").desc(none_first=True),
c.item("b"),
),
)
assert result == [
{"a": None, "b": 3},
{"a": 1, "b": 2},
{"a": 0, "b": 1},
{"a": 0, "b": 4},
]
zip, repeat, flatten¶
Whenever you need to annotate something or just zip sequences, it's convenient to have these shortcuts/helpers:
c.zip/c.zip_longestc.repeatflattenmethod
from convtools import conversion as c
converter = (
c.iter(
c.zip(
c.repeat(c.item("a")),
c.item("b"),
)
)
.flatten()
.as_type(list)
.gen_converter(debug=True)
)
assert converter([{"a": 1, "b": [2, 3]}, {"a": 10, "b": [4, 5]}]) == [
(1, 2),
(1, 3),
(10, 4),
(10, 5),
]
c.zip supports keyword arguments to build dicts:
from convtools import conversion as c
converter = (
c.iter(
c.zip(
a=c.repeat(c.item("a")),
b=c.item("b"),
)
)
.flatten()
.as_type(list)
.gen_converter(debug=True)
)
assert converter([{"a": 1, "b": [2, 3]}, {"a": 10, "b": [4, 5]}]) == [
{"a": 1, "b": 2},
{"a": 1, "b": 3},
{"a": 10, "b": 4},
{"a": 10, "b": 5},
]
c.zip_longest works like c.zip, but pads shorter iterables with a fill_value (default: None):
from convtools import conversion as c
converter = (
c.zip_longest(
c.item("a"),
c.item("b"),
fill_value="N/A",
)
.as_type(list)
.gen_converter(debug=True)
)
assert converter({"a": [1, 2, 3], "b": [4, 5]}) == [
(1, 4),
(2, 5),
(3, "N/A"),
]
c.zip_longest also supports keyword arguments to build dicts:
from convtools import conversion as c
converter = (
c.zip_longest(
x=c.item("a"),
y=c.item("b"),
fill_value="N/A",
)
.as_type(list)
.gen_converter(debug=True)
)
assert converter({"a": [1, 2, 3], "b": [4, 5]}) == [
{"x": 1, "y": 4},
{"x": 2, "y": 5},
{"x": 3, "y": "N/A"},
]
len, min, max¶
c.this.len(): shortcut toc.this.pipe(len)orc.call_func(len, c.this)c.max: shortcut toc.call_func(max, ...)c.min: shortcut toc.call_func(min, ...)
chunk_by, chunk_by_condition¶
It's a common task to chunk a sequence by: values, chunk size, condition or combination of them. Here are two conversions to achieve this:
c.chunk_by(*by, size=None)c.chunk_by_condition(condition)- it takes the condition as a conversion of an element (c.this) and the existing chunk (c.CHUNK). See Placeholders & Special References for more context-specific references.
from convtools import conversion as c
# BY VALUES
assert c.chunk_by(c.item(0), c.item(1)).as_type(list).execute(
[(0, 0), (0, 0), (0, 1), (1, 1), (1, 1)], debug=True
) == [[(0, 0), (0, 0)], [(0, 1)], [(1, 1), (1, 1)]]
# BY SIZE
assert c.chunk_by(size=3).as_type(list).execute(range(5), debug=True) == [
[0, 1, 2],
[3, 4],
]
# BY VALUE AND SIZE
assert c.chunk_by(c.this // 10, size=3).as_type(list).execute(
[0, 1, 2, 3, 10, 19, 21, 24, 25], debug=True
) == [[0, 1, 2], [3], [10, 19], [21, 24, 25]]
# BY CONDITION
assert (
c.chunk_by_condition(c.this - c.CHUNK.item(-1) < 10)
.as_type(list)
.execute([1, 5, 15, 20, 29, 40, 50, 58], debug=True)
) == [[1, 5], [15, 20, 29], [40], [50, 58]]
We'll cover aggregations later, but bear with me -- chunk conversions have
aggregate method:
from convtools import conversion as c
converter = (
c.chunk_by(size=3)
.aggregate(
{
"x": c.ReduceFuncs.First(c.this),
"y": c.ReduceFuncs.Last(c.this),
"z": c.ReduceFuncs.Sum(c.this),
}
)
.as_type(list)
.gen_converter(debug=True)
)
assert converter([0, 1, 2, 3, 4, 5, 6, 7]) == [
{"x": 0, "y": 2, "z": 3},
{"x": 3, "y": 5, "z": 12},
{"x": 6, "y": 7, "z": 13},
]
unordered_chunk_by¶
Slice an iterable into chunks by values and sizes without keeping order of items.
c.unordered_chunk_by(
*by,
size: Optional[int] = None,
max_items_in_memory: Optional[int] = None,
portion_to_pop_on_max_memory_hit: float = 0.5,
)
Args:
by: fields/conversions to use for slicing into chunks (elements with
same values go to the same chunk)
size: (optional) positive int to limit max size of a chunk
max_items_in_memory: (optional) positive int to limit max number of
items held in memory
portion_to_pop_on_max_memory_hit: portion of items to pop when
max_items_in_memory limit is hit
from convtools import conversion as c
data = [(i % 2, i) for i in range(10)]
assert (
c.unordered_chunk_by(c.item(0)).as_type(list).execute(data, debug=True)
) == [
[(0, 0), (0, 2), (0, 4), (0, 6), (0, 8)],
[(1, 1), (1, 3), (1, 5), (1, 7), (1, 9)],
]
assert (
c.unordered_chunk_by(c.item(0), size=4)
.as_type(list)
.execute(data, debug=True)
) == [
[(0, 0), (0, 2), (0, 4), (0, 6)],
[(1, 1), (1, 3), (1, 5), (1, 7)],
[(0, 8)],
[(1, 9)],
]
assert (
c.unordered_chunk_by(
c.item(0),
size=4,
max_items_in_memory=6,
portion_to_pop_on_max_memory_hit=0.5,
)
.as_type(list)
.execute(data, debug=True)
) == [
[(0, 0), (0, 2), (0, 4)],
[(1, 1), (1, 3), (1, 5), (1, 7)],
[(0, 6), (0, 8)],
[(1, 9)],
]
take_while, drop_while¶
take_whilereimplementsitertools.takewhile- terminates once condition evaluates to falsedrop_whilereimplementsitertools.dropwhile- drops elements while the condition evaluates to true, then yields from the first one where it evaluates to false
from itertools import count
from convtools import conversion as c
converter = c.take_while(c.this < 3).as_type(list).gen_converter(debug=True)
assert converter(count()) == [0, 1, 2]
converter = c.drop_while(c.this < 3).as_type(list).gen_converter(debug=True)
assert converter(range(5)) == [3, 4]
iter_unique¶
c.iter_unique(element_conv=None, by_=None) and iter_unique methods iterate
through an iterable and yield processed elements, which are distinct in terms
of the provided condition:
- if
element_convisNone, it assumesc.this - if
by_is None, it assumeselement_conv
from convtools import conversion as c
# SIMPLE UNIQUE
converter = c.iter_unique().as_type(list).gen_converter(debug=True)
assert converter([0, 0, 0, 1, 1, 2]) == [0, 1, 2]
# UNIQUE BY MODULO OF 3
converter = (
c.iter_unique(by_=c.this % 3).as_type(list).gen_converter(debug=True)
)
assert converter(range(10)) == [0, 1, 2]
# UNIQUE BY ID, YIELD NAMES
converter = (
c.item("data")
.iter_unique(c.item("name"), by_=c.item("id"))
.as_type(list)
.gen_converter(debug=True)
)
assert converter(
{
"data": [
{"name": "foo", "id": 1},
{"name": "foo", "id": 1},
{"name": "bar", "id": 1},
{"name": "def", "id": 2},
]
}
) == ["foo", "def"]
iter_windows¶
c.iter_windows iterates through an iterable and yields tuples, which are
obtained by sliding a window of a given width and by moving the window by
specified step size as follows: c.iter_windows(width=7, step=1).
It yields partial windows at the boundaries.
from convtools import conversion as c
converter = c.iter_windows(3, step=1).as_type(list).gen_converter(debug=True)
assert converter(range(5)) == [
(0,),
(0, 1),
(0, 1, 2),
(1, 2, 3),
(2, 3, 4),
(3, 4),
(4,),
]
cumulative¶
cumulative(prepare_first, reduce_two, label_name=None) method allows to
define cumulative conversions.
prepare_firstdefines conversion of the first elementreduce_twodefines conversion of two elementsc.PREVreferences the previous cumulative value; see Placeholders & Special References
from convtools import conversion as c
assert (
c.iter(c.cumulative(c.this, c.this + c.PREV))
.as_type(list)
.execute([0, 1, 2, 3, 4], debug=True)
) == [0, 1, 3, 6, 10]
In cases where the value in accumulator needs to be cleared, usually it happens in nested iterators, take 2 steps:
- label your cumulative
- use
c.cumulative_resetto reset where necessary
from convtools import conversion as c
assert (
c.iter(
c.cumulative_reset("abc")
.iter(c.cumulative(c.this, c.this + c.PREV, label_name="abc"))
.as_type(list)
)
.as_type(list)
.execute([[0, 1, 2], [3, 4]], debug=True)
) == [[0, 1, 3], [3, 7]]