Collections¶
Warning
Please, make sure you've covered Reference / Basics first.
Collections and c()
wrapper¶
The syntax to create a conversion, which builds a dict
, list
, tuple
or a
set
is as follows: c({"a": 1})
- just wrap in a c
call.
Let's build a dict from a tuple of two integers:
from convtools import conversion as c
converter = c(
{
"a": c.item(0),
"b": c.item(1),
"c": c.item(0) + c.item(1),
# keys are dynamic too
c.call_func("d{}".format, c.item(0)): "key is dynamic",
}
).gen_converter(debug=True)
assert converter((1, 2)) == {"a": 1, "b": 2, "c": 3, "d1": "key is dynamic"}
assert c([1, c.this, 2]).execute(None, debug=True) == [1, None, 2]
def converter(data_, *, __format=__naive_values__["__format"]):
try:
return {"a": data_[0], "b": data_[1], "c": (data_[0] + data_[1]), __format(data_[0]): "key is dynamic"}
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return [1, data_, 2]
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
So to summarize on c()
wrapper, it:
- leaves conversions untouched
- interprets collections as conversions which are to build such collections
- wraps everything else in
c.naive
.
Optional items¶
It's possible to mark a particular item as optional, so it disappears from an
dict
/list
/tuple
/set
in certain cases:
from convtools import conversion as c
converter = c(
{
"a": c.item(0),
"b": c.optional(c.item(1), skip_if=c.item(1) < 10),
"c": c.optional(c.item(0) + c.item(1), keep_if=c.item(0)),
"d": c.optional(c.item(0), skip_value=1),
}
).gen_converter(debug=True)
assert converter((1, 2)) == {"a": 1, "c": 3}
def optional_items_generator(data_):
yield ("a", data_[0])
if not (data_[1] < 10):
yield ("b", data_[1])
if data_[0]:
yield ("c", (data_[0] + data_[1]))
if data_[0] != 1:
yield ("d", data_[0])
def converter(data_):
try:
return dict(optional_items_generator(data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Type casting¶
To cast to a type use a naive conversion method as_type
:
from convtools import conversion as c
converter = c.this.as_type(list).gen_converter(debug=True)
assert converter(range(2)) == [0, 1]
def converter(data_):
try:
return list(data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Note
It may seem useless as it can be replaced with piping the result to list
function or just to calling list
function directly, but in fact some
conversions override this method to achieve predicate-pushdown-like
optimizations.
Iterators & Comprehensions¶
Process¶
To iterate an input, there are the following conversions:
c.iter
anditer
methodc.list_comp
c.dict_comp
c.tuple_comp
c.set_comp
.
Each of them accepts where
argument to support conditions like:
[x for x in items if x > 10]
.
A few examples:
from convtools import conversion as c
converter = c.iter(c.this + 1).gen_converter(debug=True)
assert list(converter(range(3))) == [1, 2, 3]
converter = c.item("objects").iter(c.this + 1).gen_converter(debug=True)
assert list(converter({"objects": range(3)})) == [1, 2, 3]
converter = c.list_comp(c.this + 1, where=c.this < 2).gen_converter(debug=True)
assert converter(range(3)) == [1, 2]
converter = c.dict_comp(c.this, c.this + 1).gen_converter(debug=True)
assert converter(range(3)) == {0: 1, 1: 2, 2: 3}
def converter(data_):
try:
return ((i + 1) for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return ((i + 1) for i in data_["objects"])
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return [(i + 1) for i in data_ if (i < 2)]
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return {i: (i + 1) for i in data_}
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Note
It's important to note that a conversion passed into iter
, list_comp
and other iteration methods defines the conversions of each element of
the input collection. This is one of the input-switching conversions.
filter¶
To filter an input use c.filter
or filter
conversion method:
from convtools import conversion as c
converter = c.filter(c.this < 3).gen_converter(debug=True)
assert list(converter(range(100))) == [0, 1, 2]
converter = c.this.filter(c.this < 3).gen_converter(debug=True)
assert list(converter(range(100))) == [0, 1, 2]
def converter(data_):
try:
return (i for i in data_ if ((i < 3)))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return (i for i in data_ if ((i < 3)))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
sort¶
sort
method is a shortcut to c.call_func(sorted, c.this, ...)
from convtools import conversion as c
converter = c.this.sort(key=lambda x: x, reverse=True).gen_converter(
debug=True
)
assert list(converter(range(3))) == [2, 1, 0]
def converter(data_, *, __lambda=__naive_values__["__lambda"]):
try:
return sorted(data_, key=__lambda, reverse=True)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
zip, repeat, flatten¶
Whenever you need to annotate something or just zip sequences, it's convenient to have these shortcuts/helpers:
c.zip
c.repeat
flatten
method
from convtools import conversion as c
converter = (
c.iter(
c.zip(
c.repeat(c.item("a")),
c.item("b"),
)
)
.flatten()
.as_type(list)
.gen_converter(debug=True)
)
assert converter([{"a": 1, "b": [2, 3]}, {"a": 10, "b": [4, 5]}]) == [
(1, 2),
(1, 3),
(10, 4),
(10, 5),
]
def converter(data_, *, __from_iterable=__naive_values__["__from_iterable"], __repeat=__naive_values__["__repeat"]):
try:
return list(__from_iterable((zip(__repeat(i["a"]), i["b"]) for i in data_)))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
c.zip
supports keyword arguments to build dicts:
from convtools import conversion as c
converter = (
c.iter(
c.zip(
a=c.repeat(c.item("a")),
b=c.item("b"),
)
)
.flatten()
.as_type(list)
.gen_converter(debug=True)
)
assert converter([{"a": 1, "b": [2, 3]}, {"a": 10, "b": [4, 5]}]) == [
{"a": 1, "b": 2},
{"a": 1, "b": 3},
{"a": 10, "b": 4},
{"a": 10, "b": 5},
]
def converter(data_, *, __repeat=__naive_values__["__repeat"], __from_iterable=__naive_values__["__from_iterable"]):
try:
return list(__from_iterable((({"a": i_i[0], "b": i_i[1]} for i_i in zip(__repeat(i["a"]), i["b"])) for i in data_)))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
len, min, max¶
c.this.len()
: shortcut toc.this.pipe(len)
orc.call_func(len, c.this)
c.max
: shortcut toc.call_func(max, ...)
c.min
: shortcut toc.call_func(min, ...)
chunk_by, chunk_by_condition¶
It's a common task to chunk a sequence by: values, chunk size, condition or combination of them. Here are two conversions to achieve this:
c.chunk_by(*by, size=None)
c.chunk_by_condition(condition)
- it takes the condition as a conversion of an element (c.this
) and the existing chunk (c.CHUNK
)
from convtools import conversion as c
# BY VALUES
assert c.chunk_by(c.item(0), c.item(1)).as_type(list).execute(
[(0, 0), (0, 0), (0, 1), (1, 1), (1, 1)], debug=True
) == [[(0, 0), (0, 0)], [(0, 1)], [(1, 1), (1, 1)]]
# BY SIZE
assert c.chunk_by(size=3).as_type(list).execute(range(5), debug=True) == [
[0, 1, 2],
[3, 4],
]
# BY VALUE AND SIZE
assert c.chunk_by(c.this // 10, size=3).as_type(list).execute(
[0, 1, 2, 3, 10, 19, 21, 24, 25], debug=True
) == [[0, 1, 2], [3], [10, 19], [21, 24, 25]]
# BY CONDITION
assert (
c.chunk_by_condition(c.this - c.CHUNK.item(-1) < 10)
.as_type(list)
.execute([1, 5, 15, 20, 29, 40, 50, 58], debug=True)
) == [[1, 5], [15, 20, 29], [40], [50, 58]]
def chunk_by(items_):
items_ = iter(items_)
try:
item_ = next(items_)
except StopIteration:
return
chunk_ = [item_]
chunk_item_signature = (
item_[0],
item_[1],
)
for item_ in items_:
new_item_signature = (
item_[0],
item_[1],
)
if chunk_item_signature == new_item_signature:
chunk_.append(item_)
else:
yield chunk_
chunk_ = [item_]
chunk_item_signature = new_item_signature
yield chunk_
def converter(data_):
try:
return list(chunk_by(data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def chunk_by(items_):
items_ = iter(items_)
try:
item_ = next(items_)
except StopIteration:
return
chunk_ = [item_]
size_ = 1
for item_ in items_:
if size_ < 3:
chunk_.append(item_)
size_ = size_ + 1
else:
yield chunk_
chunk_ = [item_]
size_ = 1
yield chunk_
def converter(data_):
try:
return list(chunk_by(data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def chunk_by(items_):
items_ = iter(items_)
try:
item_ = next(items_)
except StopIteration:
return
chunk_ = [item_]
chunk_item_signature = item_ // 10
size_ = 1
for item_ in items_:
new_item_signature = item_ // 10
if chunk_item_signature == new_item_signature and size_ < 3:
chunk_.append(item_)
size_ = size_ + 1
else:
yield chunk_
chunk_ = [item_]
chunk_item_signature = new_item_signature
size_ = 1
yield chunk_
def converter(data_):
try:
return list(chunk_by(data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def chunk_by_condition(items_):
items_ = iter(items_)
try:
chunk_ = [next(items_)]
except StopIteration:
return
for item_ in items_:
if (item_ - chunk_[-1]) < 10:
chunk_.append(item_)
else:
yield chunk_
chunk_ = [item_]
yield chunk_
def converter(data_):
try:
return list(chunk_by_condition(data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
We'll cover aggregations later, but bear with me -- chunk conversions have
aggregate
method:
from convtools import conversion as c
converter = (
c.chunk_by(size=3)
.aggregate(
{
"x": c.ReduceFuncs.First(c.this),
"y": c.ReduceFuncs.Last(c.this),
"z": c.ReduceFuncs.Sum(c.this),
}
)
.as_type(list)
.gen_converter(debug=True)
)
assert converter([0, 1, 2, 3, 4, 5, 6, 7]) == [
{"x": 0, "y": 2, "z": 3},
{"x": 3, "y": 5, "z": 12},
{"x": 6, "y": 7, "z": 13},
]
def aggregate_(_none, data_):
agg_data__v0 = agg_data__v1 = agg_data__v2 = _none
checksum_ = 0
it_ = iter(data_)
for row_ in it_:
if agg_data__v0 is _none:
checksum_ += 1
agg_data__v0 = row_
agg_data__v1 = row_
agg_data__v2 = row_ or 0
else:
agg_data__v1 = row_
agg_data__v2 += row_ or 0
if checksum_ == 1:
globals()["__BROKEN_EARLY__"] = True # DEBUG ONLY
break
for row_ in it_:
agg_data__v1 = row_
agg_data__v2 += row_ or 0
return {
"x": ((None if (agg_data__v0 is _none) else agg_data__v0)),
"y": ((None if (agg_data__v1 is _none) else agg_data__v1)),
"z": ((0 if (agg_data__v2 is _none) else agg_data__v2)),
}
def chunk_by(items_):
items_ = iter(items_)
try:
item_ = next(items_)
except StopIteration:
return
chunk_ = [item_]
size_ = 1
for item_ in items_:
if size_ < 3:
chunk_.append(item_)
size_ = size_ + 1
else:
yield chunk_
chunk_ = [item_]
size_ = 1
yield chunk_
def converter(data_):
global __none__
_none = __none__
try:
return [aggregate_(_none, i) for i in chunk_by(data_)]
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
take_while, drop_while¶
take_while
reimplementsitertools.takewhile
- terminates once condition evaluates to falsedrop_while
reimplementsitertools.dropwhile
- yields elements starting from the first one where condition evaluates to true
from itertools import count
from convtools import conversion as c
converter = c.take_while(c.this < 3).as_type(list).gen_converter(debug=True)
assert converter(count()) == [0, 1, 2]
converter = c.drop_while(c.this < 3).as_type(list).gen_converter(debug=True)
assert converter(range(5)) == [3, 4]
def take_while_(it_):
for item_ in it_:
if item_ < 3:
yield item_
else:
break
def converter(data_):
try:
return list(take_while_(data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def drop_while_(it_, *, __chain=__naive_values__["__chain"]):
it_ = iter(it_)
for item_ in it_:
if not ((item_ < 3)):
break
else:
return ()
return __chain((item_,), it_)
def converter(data_):
try:
return list(drop_while_(data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
iter_unique¶
c.iter_unique(element_conv=None, by_=None)
and iter_unique
methods iterate
through an iterable and yield processed elements, which are distinct in terms
of the provided condition:
- if
element_conv
isNone
, it assumesc.this
- if
by_
is None, it assumeselement_conv
from convtools import conversion as c
# SIMPLE UNIQUE
converter = c.iter_unique().as_type(list).gen_converter(debug=True)
assert converter([0, 0, 0, 1, 1, 2]) == [0, 1, 2]
# UNIQUE BY MODULO OF 3
converter = (
c.iter_unique(by_=c.this % 3).as_type(list).gen_converter(debug=True)
)
assert converter(range(10)) == [0, 1, 2]
# UNIQUE BY ID, YIELD NAMES
converter = (
c.item("data")
.iter_unique(c.item("name"), by_=c.item("id"))
.as_type(list)
.gen_converter(debug=True)
)
assert converter(
{
"data": [
{"name": "foo", "id": 1},
{"name": "foo", "id": 1},
{"name": "bar", "id": 1},
{"name": "def", "id": 2},
]
}
) == ["foo", "def"]
def iter_unique(data_):
s_ = set()
s_add = s_.add
for item_ in data_:
if item_ not in s_:
s_add(item_)
yield item_
def converter(data_):
try:
return list(iter_unique(data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def iter_unique(data_):
s_ = set()
s_add = s_.add
for item_ in data_:
by_ = item_ % 3
if by_ not in s_:
s_add(by_)
yield item_
def converter(data_):
try:
return list(iter_unique(data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def iter_unique(data_):
s_ = set()
s_add = s_.add
for item_ in data_:
by_ = item_["id"]
if by_ not in s_:
s_add(by_)
yield item_["name"]
def converter(data_):
try:
return list(iter_unique(data_["data"]))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
iter_windows¶
c.iter_windows
iterates through an iterable and yields tuples, which are
obtained by sliding a window of a given width and by moving the window by
specified step size as follows: c.iter_windows(width=7, step=1)
from convtools import conversion as c
converter = c.iter_windows(3, step=1).as_type(list).gen_converter(debug=True)
assert converter(range(5)) == [
(0,),
(0, 1),
(0, 1, 2),
(1, 2, 3),
(2, 3, 4),
(3, 4),
(4,),
]
def converter(data_, *, __iter_windows=__naive_values__["__iter_windows"]):
try:
return list(__iter_windows(data_, 3, 1))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
cumulative¶
cumulative(prepare_first, reduce_two, label_name=None)
method allows to
define cumulative conversions.
prepare_first
defines conversion of the first elementreduce_two
defines conversion of two elements
from convtools import conversion as c
assert (
c.iter(c.cumulative(c.this, c.this + c.PREV))
.as_type(list)
.execute([0, 1, 2, 3, 4], debug=True)
) == [0, 1, 3, 6, 10]
def pipe_(_labels, input_):
result_ = (input_ + _labels["a35ee6d037554881800e79164d9418ab"]) if ("a35ee6d037554881800e79164d9418ab" in _labels) else input_
_labels["a35ee6d037554881800e79164d9418ab"] = result_
return result_
def converter(data_):
_labels = {}
try:
return [pipe_(_labels, i) for i in data_]
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
In cases where the value in accumulator needs to be cleared, usually it happens in nested iterators, take 2 steps:
- label your cumulative
- use
c.cumulative_reset
to reset where necessary
from convtools import conversion as c
assert (
c.iter(
c.cumulative_reset("abc")
.iter(c.cumulative(c.this, c.this + c.PREV, label_name="abc"))
.as_type(list)
)
.as_type(list)
.execute([[0, 1, 2], [3, 4]], debug=True)
) == [[0, 1, 3], [3, 7]]
def pipe_(_labels, input_):
result_ = (input_ + _labels["abc"]) if ("abc" in _labels) else input_
_labels["abc"] = result_
return result_
def converter(data_):
_labels = {}
try:
return [[pipe_(_labels, i_i) for i_i in (_labels.pop("abc", None), i)[1]] for i in data_]
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise