Contrib / Tables¶
Table
is a helper to work with table-like data. It infers the header from the
first row or accepts yours and allows you to massage columns and rows in a
stream-friendly manner (without operations which require to consume the whole
sequence).
Table
data can only be consumed once: since a table takes an iterable,
there's no way to iterate the second time.
Read / Output rows¶
Table.from_rows
allows to initialize a table with an iterable of rows.
Arguments are as follows:
rows
- iterable ofdict
,tuple
,list
is considered as multi-column table, any other types as single-column one.header
supports multiple typesbool
- whether to infer the header or not. Default is None with the only exception: when an iterable of dicts is accepted, unlessheader=False
it automatically infers the header from the first dict.list
andtuple
- specify column namesdict
- keys specify column names, values are indexes to be get column values
duplicate_columns
raise
- the default forfrom_rows
. It raisesValueError
when encounters duplicate column namesmangle
- it mangles duplicate column names like:a
,a_1
,a_2
keep
- duplicate columns are left as is, but when referenced the first one is useddrop
- duplicate columns are skipped
skip_rows
- number of rows to be skipped at the beginning, default is 0
into_iter_rows
method outputs the results as an iterator of rows,
arguments are:
type_
should be exactly one of the following:dict
,list
,dict
include_header=None
whether to emit header or not
from convtools import conversion as c
from convtools.contrib.tables import Table
with c.OptionsCtx() as options:
options.debug = True
# NO HEADER PROVIDED
assert list(
Table.from_rows([(1, 2, 3), (2, 3, 4)]).into_iter_rows(dict)
) == [
{"COLUMN_0": 1, "COLUMN_1": 2, "COLUMN_2": 3},
{"COLUMN_0": 2, "COLUMN_1": 3, "COLUMN_2": 4},
]
# HEADER IS PROVIDED
assert list(
Table.from_rows(
[[1, 2, 3], [2, 3, 4]], header=["a", "b", "c"]
).into_iter_rows(dict)
) == [
{"a": 1, "b": 2, "c": 3},
{"a": 2, "b": 3, "c": 4},
]
# INFERS HEADER ON DEMAND
assert list(
Table.from_rows(
[["a", "b", "c"], [1, 2, 3], [2, 3, 4]], header=True
).into_iter_rows(dict)
) == [
{"a": 1, "b": 2, "c": 3},
{"a": 2, "b": 3, "c": 4},
]
# INFERS HEADER AUTOMATICALLY BECAUSE THE FIRST ELEMENT IS A DICT
assert list(
Table.from_rows([{"a": 1, "b": 2}, {"a": 2, "b": 3}]).into_iter_rows(
tuple, include_header=True
)
) == [("a", "b"), (1, 2), (2, 3)]
def converter(data_):
try:
return ({"COLUMN_0": i[0], "COLUMN_1": i[1], "COLUMN_2": i[2]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return ({"a": i[0], "b": i[1], "c": i[2]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return ({"a": i[0], "b": i[1], "c": i[2]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return (
(
i["a"],
i["b"],
)
for i in data_
)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Read CSV-like¶
Table.from_csv
initializes a table by reading a csv-like file, arguments are:
filepath_or_buffer
- a filepath or something the built-incsv.reader
can read (iterator of lines)header
supports multiple typesbool
- whether to infer the header or notlist
andtuple
- specify column namesdict
- keys specify column names, values are indexes to be get column values
duplicate_columns
raise
- It raisesValueError
when encounters duplicate column namesmangle
- the default forfrom_csv
. It mangles duplicate column names like:a
,a_1
,a_2
keep
- duplicate columns are left as is, but when referenced the first one is useddrop
- duplicate columns are skipped
skip_rows
- number of rows to be skipped at the beginning, default is 0dialect
- a dialect acceptable for the built-incsv.reader
. There's a helper method to create dialects without defining classes:Table.csv_dialect(delimiter="\t")
for tab-separated files.encoding
- default isutf-8
into_csv
method writes the results to a csv-like file, arguments are:
filepath_or_buffer
- a filepath or somethingcsv.writer
can write todialect
- a dialect acceptable bycsv.writer
. There's a helper method to create dialects without defining classes:Table.csv_dialect(delimiter="\t")
for tab-separated files.encoding
- default isutf-8
from convtools import conversion as c
from convtools.contrib.tables import Table
with c.OptionsCtx() as options:
options.debug = True
# READING CSV
# file content:
# a,b
# 1,2
# 2,3
assert list(
Table
.from_csv("tests/csvs/ab.csv", header=True)
.into_iter_rows(dict)
# .into_csv("output.csv") # TO WRITE TO A FILE
) == [
{"a": "1", "b": "2"},
{"a": "2", "b": "3"},
]
# READING TSV
# file content:
# a\tb
# 1\t2
# 2\t3
assert list(
Table.from_csv(
"tests/csvs/ac.csv",
header=True,
dialect=Table.csv_dialect(delimiter="\t"),
).into_iter_rows(dict)
) == [
{"a": "2", "c": "4"},
{"a": "1", "c": "3"},
]
# READ TSV + SKIP EXISTING HEADER + REMAP COLUMNS
# file content:
# a\tb
# 1\t2
# 2\t3
assert list(
Table.from_csv(
"tests/csvs/ac.csv",
header={"a": 1, "c": 0},
skip_rows=1,
dialect=Table.csv_dialect(delimiter="\t"),
).into_iter_rows(dict)
) == [
{"a": "4", "c": "2"},
{"a": "3", "c": "1"},
]
def converter(data_):
try:
return ({"a": i[0], "b": i[1]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return ({"a": i[0], "c": i[1]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return ({"a": i[1], "c": i[0]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Rename, take / rearrange, drop columns¶
rename(columns)
allows to rename columns, it accepts arguments of different types:tuple
andlist
define new column names (length of passed columns should match the number of columns of the table)dict
defines a mapping from old column names to new ones
take(*column_names)
leaves only specified columns (order matters), omitting the resttake
can accept...
, which references all non-mentioned columns, so it's easy to rearrange them:table.take("c", "d", ...)
drop(*column_names)
obviously drops columns, keeping the rest as-is.
from convtools import conversion as c
from convtools.contrib.tables import Table
with c.OptionsCtx() as options:
options.debug = True
assert list(
Table.from_rows([("A", "b", "c"), (1, 2, 3), (2, 3, 4)], header=True)
.rename({"A": "a"})
.drop("b")
.take("c", ...) # MAKE "c" COLUMN THE FIRST ONE
.into_iter_rows(dict)
) == [{"c": 3, "a": 1}, {"c": 4, "a": 2}]
def converter(data_):
try:
return ({"c": i[2], "a": i[0]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Add, update columns¶
There's only one method to add and update columns, it's update
and takes any
number of keyword arguments, where keys are names of new or existing columns,
while values are conversions to be applied row-wise.
Use c.col("column name")
to reference the values of various columns.
from convtools import conversion as c
from convtools.contrib.tables import Table
with c.OptionsCtx() as options:
options.debug = True
assert list(
Table.from_rows([(1, -2), (2, -3)], ["a", "b"])
.update(c=c.col("a") + c.col("b")) # ADDING NEW COLUMN: "c"
.update(c=c.call_func(abs, c.col("c"))) # UPDATING NEW COLUMN: "c"
.into_iter_rows(dict)
) == [{"a": 1, "b": -2, "c": 1}, {"a": 2, "b": -3, "c": 1}]
def converter(data_):
try:
return (
{"a": i[0], "b": i[1], "c": abs(i[2])}
for i in (
(
i_i[0],
i_i[1],
(i_i[0] + i_i[1]),
)
for i_i in data_
)
)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Filter rows¶
To filter rows, pass a conversion to the filter
method, which will be used as
a condition.
Use c.col("column name")
to reference the values of various columns.
from convtools import conversion as c
from convtools.contrib.tables import Table
with c.OptionsCtx() as options:
options.debug = True
assert list(
Table.from_rows([(1, -2), (2, -3)], header=["a", "b"])
.filter(c.col("b") < -2)
.into_iter_rows(dict)
) == [{"a": 2, "b": -3}]
def converter(data_):
try:
return ({"a": i[0], "b": i[1]} for i in data_ if (((i[1] < -2))))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Join tables¶
To join two tables, use join
method, which accepts the following arguments:
table
- another table to join withon
can be either:- a join conversion like
c.LEFT.col("a") == c.RIGHT.col("A")
- or an iterable of column names to join on
- a join conversion like
how
is to be one of:"inner"
,"left"
,"right"
,"outer"
(same as"full"
)suffixes
is a tuple of two strings (left and right) to be concatenated with column names of conflicting columns (on
columns passed as an iterable of strings don't count). Default is("_LEFT", "_RIGHT")
.
from convtools import conversion as c
from convtools.contrib.tables import Table
with c.OptionsCtx() as options:
options.debug = True
# JOIN ON COLUMN NAMES
assert list(
Table.from_rows([(1, 2), (2, 3)], ["a", "b"])
.join(
Table.from_rows([(1, 3), (2, 4)], ["a", "c"]),
how="inner",
on=["a"],
)
.into_iter_rows(dict)
) == [
{"a": 1, "b": 2, "c": 3},
{"a": 2, "b": 3, "c": 4},
]
# JOIN ON CONDITION
assert list(
Table.from_rows([(1, 2), (2, 30)], ["a", "b"])
.join(
Table.from_rows([(1, 3), (2, 4)], ["a", "c"]),
how="full",
on=c.and_(
c.LEFT.col("a") == c.RIGHT.col("a"),
c.LEFT.col("b") < c.RIGHT.col("c"),
),
)
.into_iter_rows(dict)
) == [
{"a_LEFT": 1, "b": 2, "a_RIGHT": 1, "c": 3},
{"a_LEFT": 2, "b": 30, "a_RIGHT": None, "c": None},
{"a_LEFT": None, "b": None, "a_RIGHT": 2, "c": 4},
]
def aggregate_i(_none, data_, *, __v=__naive_values__["__v"]):
agg_data_i_v0 = _none
checksum_ = 0
it_ = iter(data_)
for row_i in it_:
if agg_data_i_v0 is _none:
checksum_ += 1
agg_data_i_v0 = defaultdict(list)
agg_data_i_v0[row_i[0]].append(row_i)
else:
agg_data_i_v0[row_i[0]].append(row_i)
if checksum_ == 1:
globals()["__BROKEN_EARLY__"] = True # DEBUG ONLY
break
for row_i in it_:
agg_data_i_v0[row_i[0]].append(row_i)
return __v if (agg_data_i_v0 is _none) else (setattr(agg_data_i_v0, "default_factory", None) or agg_data_i_v0)
def join_(left_, right_, _none):
hash_to_right_items = aggregate_i(_none, right_)
del right_
for left_item in left_:
left_key = left_item[0]
right_items = hash_to_right_items[left_key] if (left_key in hash_to_right_items) else ()
for right_item in right_items:
yield left_item, right_item
def converter(data_, *, right):
global __none__
_none = __none__
try:
return join_(data_, right, _none)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return ({"a": i[0][0], "b": i[0][1], "c": i[1][1]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def aggregate_e(_none, data_, *, __v=__naive_values__["__v"]):
agg_data_e_v0 = _none
checksum_ = 0
it_ = iter(data_)
for row_e in it_:
if agg_data_e_v0 is _none:
checksum_ += 1
agg_data_e_v0 = defaultdict(list)
agg_data_e_v0[row_e[0]].append(row_e)
else:
agg_data_e_v0[row_e[0]].append(row_e)
if checksum_ == 1:
globals()["__BROKEN_EARLY__"] = True # DEBUG ONLY
break
for row_e in it_:
agg_data_e_v0[row_e[0]].append(row_e)
return __v if (agg_data_e_v0 is _none) else (setattr(agg_data_e_v0, "default_factory", None) or agg_data_e_v0)
def join_(left_, right_, _none):
yielded_right_ids = set()
hash_to_right_items = aggregate_e(_none, right_)
del right_
for left_item in left_:
left_key = left_item[0]
right_items = iter((((i for i in hash_to_right_items[left_key] if (((left_item[1] < i[1])))) if (left_key in hash_to_right_items) else ())))
right_item = next(right_items, _none)
if right_item is _none:
yield left_item, None
else:
yielded_right_ids.add(id(right_item))
yield left_item, right_item
for right_item in right_items:
yielded_right_ids.add(id(right_item))
yield left_item, right_item
yield from (
(None, right_item) for right_item in (item for items in hash_to_right_items.values() for item in items) if id(right_item) not in yielded_right_ids
)
def converter(data_, *, right):
global __none__
_none = __none__
try:
return join_(data_, right, _none)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return (
{
"a_LEFT": ((None if (i[0] is None) else i[0][0])),
"b": ((None if (i[0] is None) else i[0][1])),
"a_RIGHT": ((None if (i[1] is None) else i[1][0])),
"c": ((None if (i[1] is None) else i[1][1])),
}
for i in data_
)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Chain tables¶
chain
method concatenates tables vertically. It has the following parameters:
table
to chainfill_value
is used to fill gaps; default isNone
from convtools import conversion as c
from convtools.contrib.tables import Table
with c.OptionsCtx() as options:
options.debug = True
assert list(
Table.from_rows([(1, 2), (2, 3)], ["a", "b"])
.chain(
Table.from_rows([(1, 3), (2, 4)], ["a", "c"]),
fill_value=0,
)
.into_iter_rows(dict)
) == [
{"a": 1, "b": 2, "c": 0},
{"a": 2, "b": 3, "c": 0},
{"a": 1, "b": 0, "c": 3},
{"a": 2, "b": 0, "c": 4},
]
def converter(data_):
try:
return (
(
i[0],
i[1],
0,
)
for i in data_
)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return (
(
i[0],
0,
i[1],
)
for i in data_
)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return ({"a": i[0], "b": i[1], "c": i[2]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Zip tables¶
zip
method concatenates tables horizontally. Its parameters are:
table
to zipfill_value
is used to fill gaps; default isNone
Warning
- Before using this method, please make sure you are not looking for
Table.join
. - Be cautious with using
.into_iter_rows(dict)
here, because by defaultzip
uses"keep"
duplicate_columns
strategy, so you'll lose duplicate columns in case of collision becausedict
will take care of it
from convtools import conversion as c
from convtools.contrib.tables import Table
with c.OptionsCtx() as options:
options.debug = True
assert list(
Table.from_rows([(1, 2), (2, 3)], ["a", "b"])
.zip(
Table.from_rows([(10, 3), (20, 4)], ["a", "c"]),
fill_value=0,
)
.into_iter_rows(tuple, include_header=True)
) == [("a", "b", "a", "c"), (1, 2, 10, 3), (2, 3, 20, 4)]
def converter(data_):
try:
return (
(
i[0][0],
i[0][1],
i[1][0],
i[1][1],
)
for i in data_
)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Explode table¶
explode
method transforms a table with a column, which contains lists, to a
table with values of these lists, by repeating values of other columns. It's
only parameter is column_name
to explode.
from convtools import conversion as c
from convtools.contrib.tables import Table
with c.OptionsCtx() as options:
options.debug = True
assert list(
Table.from_rows([{"a": 1, "b": [1, 2, 3]}, {"a": 10, "b": [4, 5, 6]}])
.explode("b")
.into_iter_rows(tuple, include_header=True)
) == [
("a", "b"),
(1, 1),
(1, 2),
(1, 3),
(10, 4),
(10, 5),
(10, 6),
]
def converter(data_):
try:
return (
(
i["a"],
i["b"],
)
for i in data_
)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return (
(
row_[0],
value_,
)
for row_ in data_
for value_ in row_[1]
)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Using tables inside conversions¶
It's impossible to make Table
work directly inside other conversions, because
it would create more than one code generating layer.
But you most definitely can leverage piping to callables:
from convtools import conversion as c
from convtools.contrib.tables import Table
with c.OptionsCtx() as options:
options.debug = True
input_data = [["a", "b"], [1, 2], [3, 4]]
converter = (
c.this.pipe(
lambda it: Table.from_rows(it, header=True).into_iter_rows(dict)
)
.as_type(list)
.gen_converter()
)
assert converter(input_data) == [
{"a": 1, "b": 2},
{"a": 3, "b": 4},
]
def converter(data_, *, __lambda=__naive_values__["__lambda"]):
try:
return list(__lambda(data_))
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
def converter(data_):
try:
return ({"a": i[0], "b": i[1]} for i in data_)
except __exceptions_to_dump_sources:
__convtools__code_storage.dump_sources()
raise
Note
Keep in mind, that unlike conversions, Table
doesn't have gen_converter
method, so the code cannot be generated once during a warm-up and used
multiple times. Tables generate their code at each run.