Pandas PipeProcessor Tutorial
Basic Example
We'll be using a sample from the Online Retail II data set. It contains all the transactions occurring for a UK-based and registered, non-store online retail between 01/12/2009 and 09/12/2011.The company mainly sells unique all-occasion gift-ware.
The full dataset is available at the UCI Machine Learning Repository.
Here's what the first few rows look like:
Invoice | StockCode | Description | Quantity | InvoiceDate | Price | Customer ID | Country |
---|---|---|---|---|---|---|---|
489434 | 85048 | 15CM CHRISTMAS GLASS BALL 20 LIGHTS | 12 | 2009-12-01 07:45:00 | 6.95 | 13085 | United Kingdom |
489434 | 79323P | PINK CHERRY LIGHTS | 12 | 2009-12-01 07:45:00 | 6.75 | 13085 | United Kingdom |
489434 | 79323W | WHITE CHERRY LIGHTS | 12 | 2009-12-01 07:45:00 | 6.75 | 13085 | United Kingdom |
489434 | 22041 | RECORD FRAME 7" SINGLE SIZE | 48 | 2009-12-01 07:45:00 | 2.1 | 13085 | United Kingdom |
489434 | 21232 | STRAWBERRY CERAMIC TRINKET BOX | 24 | 2009-12-01 07:45:00 | 1.25 | 13085 | United Kingdom |
Setup
We'll import required packages and read in the example dataset:
import re
import typing as T
import pandas as pd
from dpipes.processor import ColumnPipeProcessor, PipeProcessor
data = pd.read_csv("examples/sample.csv")
Transformations
Lets' define some functions to transform our data. We'll start off by converting the camel-case column names to snake-case:
def camel_to_snake(x: str) -> str:
return re.sub(r"(?<!^)(?=[A-Z][a-z])", "_", re.sub(r"\s+", "_", x))
def clean_colnames(df: pd.DataFrame) -> pd.DataFrame:
return df.rename(lambda x: camel_to_snake(x).lower(), axis=1)
Next, we'll define a few functions that will calculate the total price per line item, calculate the total price per invoice order, calculate the number of unique products in each order, and calculate the total number of items in each order.
def add_line_total(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(line_item_total=lambda x: x["quantity"] * x["price"])
def add_order_total(df: pd.DataFrame) -> pd.DataFrame:
order_total = (
df.groupby("invoice")["line_item_total"].sum().reset_index(name="order_total")
)
return df.merge(order_total, how="left", on="invoice")
def add_order_num_products(df: pd.DataFrame) -> pd.DataFrame:
num_products = df.groupby("invoice").size().reset_index(name="order_num_products")
return df.merge(num_products, how="left", on="invoice")
def add_total_order_size(df: pd.DataFrame) -> pd.DataFrame:
order_size = df.groupby('invoice')['quantity'].sum().reset_index(name="order_size")
return df.merge(order_size, how="left", on="invoice")
Data Pipeline
Now, let's chain these functions together to make a simple processing pipeline.
Naive Version
There's nothing inherently wrong about processing the data this way-- the end results will be identical to other methods. However, some software engineers do advice against over-writing an object many times.
naive = clean_colnames(data)
naive = add_line_total(naive)
naive = add_order_total(naive)
naive = add_order_num_products(naive)
naive = add_total_order_size(naive)
Method Chaining
A better approach is to use Pandas' dataframe.pipe
method to chain all these operations together.
As noted, you'll find the results identical.
# Method chaining
result_a = (
data.pipe(clean_colnames)
.pipe(add_line_total)
.pipe(add_order_total)
.pipe(add_order_num_products)
.pipe(add_total_order_size)
)
pd.testing.assert_frame_equal(naive, result_a)
PipeProcessor
Now, let's see how this looks using the dpipe.PipeProcessor
class. We'll instantiate an object
by passing a list of functions that we want to run, in order. We can this use this object to run
the pipeline on any passed dataset.
ps = PipeProcessor([
clean_colnames,
add_line_total,
add_order_total,
add_order_num_products,
add_total_order_size
])
result_b = ps(data)
pd.testing.assert_frame_equal(result_a, result_b)
Tip
We're not showing it in this example, but the dpipe.PipeProcessor
can take an optional kwargs
parameter, which can be a single dictionary or a list of dictionaries that map keyword arguments.
If a single dictionary is passed, those keyword arguments will be broadcast to each function in
the pipeline. If a list of dictionaries is passed, each set of keyword arguments will be applied
to their respective functions, in order.
See dpipe.PipeProcessor
reference for details.
Further, we could now create modularized pipelines that can easily be imported and used elsewhere in code:
"""My pipeline module."""
from dpipes.processor import PipeProcessor
def task_1(...):
...
def task_2(...):
...
def task_3(...):
...
def task_4(...):
...
my_pipeline = PipeProcessor([task_1, task_2, task_3, task_4])
Processing Multiple Datasets
Continuing on, imagine now that you want to run the same pipeline on multiple datasets, all with a similar schema.
Method Chaining
Again, the end results will be identical-- but note how you would need to re-write (or copy/paste) your entire method-chained operation to run this pipeline on new datasets.
for ds in [split_1, split_2, split_3]:
result_a = (
ds.pipe(clean_colnames)
.pipe(add_line_total)
.pipe(add_order_total)
.pipe(add_order_num_products)
.pipe(add_total_order_size)
)
PipeProcessor
Contrast this with the dpipes.PipeProcessor
methodology, where you simply need to call the original
pipeline object.
for ds in [split_1, split_2, split_3]:
result_b = ps(ds)
pd.testing.assert_frame_equal(result_a, result_b)
Processing Individual Columns
Now, let's suppose that we want to add a few column-specific operations to our pipeline. Let's define
one function where we convert floats to integers, and another were we convert integers to strings.
We'll use these functions to cast customer_id
field as a string.
def float_to_int(
df: pd.DataFrame, cols: T.Union[str, T.Sequence[str]], fillna: int = -99999
) -> pd.DataFrame:
df[cols] = df[cols].fillna(fillna).astype(int)
return df
def int_to_string(
df: pd.DataFrame, cols: T.Union[str, T.Sequence[str]]
) -> pd.DataFrame:
df[cols] = df[cols].astype(str)
return df
Method Chaining
Here we'll add two DataFrame.pipe
calls with a lambda function to apply our casting operations
to a specific column.
result_a = (
data.pipe(clean_colnames)
.pipe(add_line_total)
.pipe(add_order_total)
.pipe(add_order_num_products)
.pipe(add_total_order_size)
.pipe(lambda x: float_to_int(x, "customer_id"))
.pipe(lambda x: int_to_string(x, "customer_id"))
)
PipeProcessor
We'll create two new PipeProcessor
objects: one to process functions on the customer_id
function,
and another that will compose both our original and column-specific pipelines into a single processor.
One can easily create an arbitrary number of sub-pipelines and pipeline compositions.
col_ps = PipeProcessor([float_to_int, int_to_string], {"cols": "customer_id"})
pipeline = PipeProcessor([ps, col_ps])
result_b = pipeline(data)
pd.testing.assert_frame_equal(result_a, result_b)
Note
Note that we only passed a single dictionary to the dpipes.PipeProcessor
constructor, and it
broadcast those keyword arguments to both functions within the pipeline.
Although both methods produce identical results, only the use of PipeProcessor
provides a reusable,
modular pipeline object.
ColumnPipeProcessor
Finally, if the only keyword arguments to our transformation functions are column names, we can
choose to use the dpipes.ColumnPipeProcessor
,
instead. Similar to the dpipes.PipeProcessor
class,
we can pass in a single column or single list of columns to broadcast to the functions within the
pipeline. You can also specify specific column(s) for each function to act on by passing a list of
lists.