Skip to content

Collections

Please, make sure you've covered Basics first.

Collections and c() wrapper

The syntax to create a conversion, which builds a dict, list, tuple or a set is as follows: c({"a": 1}) - just wrap in a c call.

Let's build a dict from a tuple of two integers:

from convtools import conversion as c

converter = c(
    {
        "a": c.item(0),
        "b": c.item(1),
        "c": c.item(0) + c.item(1),
        # keys are dynamic too
        c.call_func("d{}".format, c.item(0)): "key is dynamic",
    }
).gen_converter(debug=True)

assert converter((1, 2)) == {"a": 1, "b": 2, "c": 3, "d1": "key is dynamic"}


assert c([1, c.this, 2]).execute(None, debug=True) == [1, None, 2]

So to summarize on c() wrapper, it:

  • leaves conversions untouched
  • interprets collections as conversions which are to build such collections
  • wraps everything else in c.naive.

Optional items

It's possible to mark a particular item as optional, so it disappears from an dict/list/tuple/set in certain cases:

from convtools import conversion as c

converter = c(
    {
        "a": c.item(0),
        "b": c.optional(c.item(1), skip_if=c.item(1) < 10),
        "c": c.optional(c.item(0) + c.item(1), keep_if=c.item(0)),
        "d": c.optional(c.item(0), skip_value=1),
    }
).gen_converter(debug=True)

assert converter((1, 2)) == {"a": 1, "c": 3}

Spread (dict unpacking)

c.spread(conversion) unpacks dict items into another dict, similar to Python's ** operator or JavaScript's spread syntax. This is useful for merging nested dicts into a parent dict.

from convtools import conversion as c

# Basic spread: merge nested dict into parent
conv = c.dict(
    ("id", c.item("id")),
    c.spread(c.item("metadata")),
).gen_converter(debug=True)

assert conv({"id": 1, "metadata": {"name": "Alice", "age": 30}}) == {
    "id": 1,
    "name": "Alice",
    "age": 30,
}

# Multiple spreads in one dict
conv = c.dict(
    c.spread(c.item("a")),
    c.spread(c.item("b")),
).gen_converter()

assert conv({"a": {"x": 1}, "b": {"y": 2}}) == {"x": 1, "y": 2}

# Override behavior: later keys win
conv = c.dict(
    ("x", 1),
    c.spread(c.item("overrides")),
).gen_converter()

assert conv({"overrides": {"x": 999}}) == {"x": 999}

# Spread combined with optional items
conv = c.dict(
    ("a", 1),
    c.spread(c.item("extra")),
    (c.optional(c.item("key"), skip_value=None), c.item("val")),
).gen_converter()

assert conv({"extra": {"x": 10}, "key": "b", "val": 2}) == {
    "a": 1,
    "x": 10,
    "b": 2,
}
assert conv({"extra": {}, "key": None, "val": 2}) == {"a": 1}

def _converter(data_):
    try:
        return {
            "id": data_["id"],
            **data_["metadata"],
        }
    except __exceptions_to_dump_sources:
        __convtools__code_storage.dump_sources()
        raise

Note

c.spread can only be used inside c.dict(). Using it elsewhere raises an AssertionError.

Type casting

To cast to a type use a naive conversion method as_type:

from convtools import conversion as c

converter = c.this.as_type(list).gen_converter(debug=True)

assert converter(range(2)) == [0, 1]

Note

It may seem useless as it can be replaced with piping the result to list function or just to calling list function directly, but in fact some conversions override this method to achieve predicate-pushdown-like optimizations.

Iterators & Comprehensions

Process

To iterate an input, there are the following conversions:

  • c.iter and iter method
  • c.list_comp
  • c.dict_comp
  • c.tuple_comp
  • c.set_comp.

Each of them accepts where argument to support conditions like: [x for x in items if x > 10].

A few examples:

from convtools import conversion as c

converter = c.iter(c.this + 1).gen_converter(debug=True)
assert list(converter(range(3))) == [1, 2, 3]

converter = c.item("objects").iter(c.this + 1).gen_converter(debug=True)
assert list(converter({"objects": range(3)})) == [1, 2, 3]

converter = c.list_comp(c.this + 1, where=c.this < 2).gen_converter(debug=True)
assert converter(range(3)) == [1, 2]

converter = c.dict_comp(c.this, c.this + 1).gen_converter(debug=True)
assert converter(range(3)) == {0: 1, 1: 2, 2: 3}

Note

It's important to note that a conversion passed into iter, list_comp and other iteration methods defines the conversions of each element of the input collection. This is one of the input-switching conversions.

filter

To filter an input use c.filter or filter conversion method:

from convtools import conversion as c

converter = c.filter(c.this < 3).gen_converter(debug=True)
assert list(converter(range(100))) == [0, 1, 2]

converter = c.this.filter(c.this < 3).gen_converter(debug=True)
assert list(converter(range(100))) == [0, 1, 2]

sort

sort conversion extends usual python's sorted with per-item asc/desc + none_first/none_last configuration:

c.sort(key=None, reverse=False) or c.this.sort(key=None, reverse=False)

Args:
  key: callable or conversion/tuple of conversions to form a sorting
    key, to be passed to sorted
  reverse (bool): to be passed to sorted

c.this.asc(none_last=None, none_first=None)

c.this.desc(none_last=None, none_first=None)

from convtools import conversion as c

converter = c.this.sort(key=lambda x: x, reverse=True).gen_converter(
    debug=True
)
assert list(converter(range(3))) == [2, 1, 0]


data = [
    {"a": 0, "b": 4},
    {"a": None, "b": 3},
    {"a": 1, "b": 2},
    {"a": 0, "b": 1},
]
converter = c.this.sort(
    key=(
        c.item("a").desc(none_first=True),
        c.item("b"),
    )
).gen_converter(debug=True)
assert converter(data) == [
    {"a": None, "b": 3},
    {"a": 1, "b": 2},
    {"a": 0, "b": 1},
    {"a": 0, "b": 4},
]
sorting_key

c.sorting_key(*keys) returns a callable, which can be passed to sorted as:

from convtools import conversion as c

data = [
    {"a": 0, "b": 4},
    {"a": None, "b": 3},
    {"a": 1, "b": 2},
    {"a": 0, "b": 1},
]
with c.OptionsCtx() as options:
    options.debug = True
    result = sorted(
        data,
        key=c.sorting_key(
            c.item("a").desc(none_first=True),
            c.item("b"),
        ),
    )
assert result == [
    {"a": None, "b": 3},
    {"a": 1, "b": 2},
    {"a": 0, "b": 1},
    {"a": 0, "b": 4},
]

zip, repeat, flatten

Whenever you need to annotate something or just zip sequences, it's convenient to have these shortcuts/helpers:

  1. c.zip / c.zip_longest
  2. c.repeat
  3. flatten method
from convtools import conversion as c

converter = (
    c.iter(
        c.zip(
            c.repeat(c.item("a")),
            c.item("b"),
        )
    )
    .flatten()
    .as_type(list)
    .gen_converter(debug=True)
)

assert converter([{"a": 1, "b": [2, 3]}, {"a": 10, "b": [4, 5]}]) == [
    (1, 2),
    (1, 3),
    (10, 4),
    (10, 5),
]

c.zip supports keyword arguments to build dicts:

from convtools import conversion as c

converter = (
    c.iter(
        c.zip(
            a=c.repeat(c.item("a")),
            b=c.item("b"),
        )
    )
    .flatten()
    .as_type(list)
    .gen_converter(debug=True)
)

assert converter([{"a": 1, "b": [2, 3]}, {"a": 10, "b": [4, 5]}]) == [
    {"a": 1, "b": 2},
    {"a": 1, "b": 3},
    {"a": 10, "b": 4},
    {"a": 10, "b": 5},
]

c.zip_longest works like c.zip, but pads shorter iterables with a fill_value (default: None):

from convtools import conversion as c

converter = (
    c.zip_longest(
        c.item("a"),
        c.item("b"),
        fill_value="N/A",
    )
    .as_type(list)
    .gen_converter(debug=True)
)

assert converter({"a": [1, 2, 3], "b": [4, 5]}) == [
    (1, 4),
    (2, 5),
    (3, "N/A"),
]

c.zip_longest also supports keyword arguments to build dicts:

from convtools import conversion as c

converter = (
    c.zip_longest(
        x=c.item("a"),
        y=c.item("b"),
        fill_value="N/A",
    )
    .as_type(list)
    .gen_converter(debug=True)
)

assert converter({"a": [1, 2, 3], "b": [4, 5]}) == [
    {"x": 1, "y": 4},
    {"x": 2, "y": 5},
    {"x": 3, "y": "N/A"},
]

len, min, max

  1. c.this.len(): shortcut to c.this.pipe(len) or c.call_func(len, c.this)
  2. c.max: shortcut to c.call_func(max, ...)
  3. c.min: shortcut to c.call_func(min, ...)

chunk_by, chunk_by_condition

It's a common task to chunk a sequence by: values, chunk size, condition or combination of them. Here are two conversions to achieve this:

  1. c.chunk_by(*by, size=None)
  2. c.chunk_by_condition(condition) - it takes the condition as a conversion of an element (c.this) and the existing chunk (c.CHUNK). See Placeholders & Special References for more context-specific references.
from convtools import conversion as c

# BY VALUES
assert c.chunk_by(c.item(0), c.item(1)).as_type(list).execute(
    [(0, 0), (0, 0), (0, 1), (1, 1), (1, 1)], debug=True
) == [[(0, 0), (0, 0)], [(0, 1)], [(1, 1), (1, 1)]]

# BY SIZE
assert c.chunk_by(size=3).as_type(list).execute(range(5), debug=True) == [
    [0, 1, 2],
    [3, 4],
]

# BY VALUE AND SIZE
assert c.chunk_by(c.this // 10, size=3).as_type(list).execute(
    [0, 1, 2, 3, 10, 19, 21, 24, 25], debug=True
) == [[0, 1, 2], [3], [10, 19], [21, 24, 25]]

# BY CONDITION
assert (
    c.chunk_by_condition(c.this - c.CHUNK.item(-1) < 10)
    .as_type(list)
    .execute([1, 5, 15, 20, 29, 40, 50, 58], debug=True)
) == [[1, 5], [15, 20, 29], [40], [50, 58]]

We'll cover aggregations later, but bear with me -- chunk conversions have aggregate method:

from convtools import conversion as c

converter = (
    c.chunk_by(size=3)
    .aggregate(
        {
            "x": c.ReduceFuncs.First(c.this),
            "y": c.ReduceFuncs.Last(c.this),
            "z": c.ReduceFuncs.Sum(c.this),
        }
    )
    .as_type(list)
    .gen_converter(debug=True)
)
assert converter([0, 1, 2, 3, 4, 5, 6, 7]) == [
    {"x": 0, "y": 2, "z": 3},
    {"x": 3, "y": 5, "z": 12},
    {"x": 6, "y": 7, "z": 13},
]

unordered_chunk_by

Slice an iterable into chunks by values and sizes without keeping order of items.

c.unordered_chunk_by(
    *by,
    size: Optional[int] = None,
    max_items_in_memory: Optional[int] = None,
    portion_to_pop_on_max_memory_hit: float = 0.5,
)
Args:
  by: fields/conversions to use for slicing into chunks (elements with
    same values go to the same chunk)
  size: (optional) positive int to limit max size of a chunk
  max_items_in_memory: (optional) positive int to limit max number of
    items held in memory
  portion_to_pop_on_max_memory_hit: portion of items to pop when
    max_items_in_memory limit is hit
from convtools import conversion as c

data = [(i % 2, i) for i in range(10)]

assert (
    c.unordered_chunk_by(c.item(0)).as_type(list).execute(data, debug=True)
) == [
    [(0, 0), (0, 2), (0, 4), (0, 6), (0, 8)],
    [(1, 1), (1, 3), (1, 5), (1, 7), (1, 9)],
]

assert (
    c.unordered_chunk_by(c.item(0), size=4)
    .as_type(list)
    .execute(data, debug=True)
) == [
    [(0, 0), (0, 2), (0, 4), (0, 6)],
    [(1, 1), (1, 3), (1, 5), (1, 7)],
    [(0, 8)],
    [(1, 9)],
]

assert (
    c.unordered_chunk_by(
        c.item(0),
        size=4,
        max_items_in_memory=6,
        portion_to_pop_on_max_memory_hit=0.5,
    )
    .as_type(list)
    .execute(data, debug=True)
) == [
    [(0, 0), (0, 2), (0, 4)],
    [(1, 1), (1, 3), (1, 5), (1, 7)],
    [(0, 6), (0, 8)],
    [(1, 9)],
]

take_while, drop_while

  1. take_while reimplements itertools.takewhile - terminates once condition evaluates to false
  2. drop_while reimplements itertools.dropwhile - drops elements while the condition evaluates to true, then yields from the first one where it evaluates to false
from itertools import count
from convtools import conversion as c

converter = c.take_while(c.this < 3).as_type(list).gen_converter(debug=True)
assert converter(count()) == [0, 1, 2]


converter = c.drop_while(c.this < 3).as_type(list).gen_converter(debug=True)
assert converter(range(5)) == [3, 4]

iter_unique

c.iter_unique(element_conv=None, by_=None) and iter_unique methods iterate through an iterable and yield processed elements, which are distinct in terms of the provided condition:

  • if element_conv is None, it assumes c.this
  • if by_ is None, it assumes element_conv
from convtools import conversion as c

# SIMPLE UNIQUE
converter = c.iter_unique().as_type(list).gen_converter(debug=True)
assert converter([0, 0, 0, 1, 1, 2]) == [0, 1, 2]

# UNIQUE BY MODULO OF 3
converter = (
    c.iter_unique(by_=c.this % 3).as_type(list).gen_converter(debug=True)
)
assert converter(range(10)) == [0, 1, 2]

# UNIQUE BY ID, YIELD NAMES
converter = (
    c.item("data")
    .iter_unique(c.item("name"), by_=c.item("id"))
    .as_type(list)
    .gen_converter(debug=True)
)
assert converter(
    {
        "data": [
            {"name": "foo", "id": 1},
            {"name": "foo", "id": 1},
            {"name": "bar", "id": 1},
            {"name": "def", "id": 2},
        ]
    }
) == ["foo", "def"]

iter_windows

c.iter_windows iterates through an iterable and yields tuples, which are obtained by sliding a window of a given width and by moving the window by specified step size as follows: c.iter_windows(width=7, step=1). It yields partial windows at the boundaries.

from convtools import conversion as c

converter = c.iter_windows(3, step=1).as_type(list).gen_converter(debug=True)

assert converter(range(5)) == [
    (0,),
    (0, 1),
    (0, 1, 2),
    (1, 2, 3),
    (2, 3, 4),
    (3, 4),
    (4,),
]

cumulative

cumulative(prepare_first, reduce_two, label_name=None) method allows to define cumulative conversions.

  • prepare_first defines conversion of the first element
  • reduce_two defines conversion of two elements
  • c.PREV references the previous cumulative value; see Placeholders & Special References
from convtools import conversion as c

assert (
    c.iter(c.cumulative(c.this, c.this + c.PREV))
    .as_type(list)
    .execute([0, 1, 2, 3, 4], debug=True)
) == [0, 1, 3, 6, 10]

In cases where the value in accumulator needs to be cleared, usually it happens in nested iterators, take 2 steps:

  1. label your cumulative
  2. use c.cumulative_reset to reset where necessary
from convtools import conversion as c

assert (
    c.iter(
        c.cumulative_reset("abc")
        .iter(c.cumulative(c.this, c.this + c.PREV, label_name="abc"))
        .as_type(list)
    )
    .as_type(list)
    .execute([[0, 1, 2], [3, 4]], debug=True)
) == [[0, 1, 3], [3, 7]]