Skip to content

Polars PipeProcessor Tutorial

Note

This tutorial assumes that you have Polars installed in your Python environment. You can install Polars and other tutorial dependencies by executing the command pip install dpipes[demo].

As mentioned in Getting Started, dpipes.PipeProcessor is extensible to any API that implements a Pandas-like DataFrame.pipe method.

Tip

dPipes is extensible beyond Pandas-like APIs, too. The dpipes.Pipeline module generalizes the pipeline composability to any arbitrary Python functions and objects.

We'll run through a condensed version Pandas tutorial, using Polars as the swap from Pandas to Polars is 1:1. The only changes occur in the transformation functions-- Polars API is much more concise than Pandas.

Basic Example

We'll be using a sample from the Online Retail II data set. It contains all the transactions occurring for a UK-based and registered, non-store online retail between 01/12/2009 and 09/12/2011.The company mainly sells unique all-occasion gift-ware.

The full dataset is available at the UCI Machine Learning Repository.

Here's what the first few rows look like:

Invoice StockCode Description Quantity InvoiceDate Price Customer ID Country
489434 85048 15CM CHRISTMAS GLASS BALL 20 LIGHTS 12 2009-12-01 07:45:00 6.95 13085 United Kingdom
489434 79323P PINK CHERRY LIGHTS 12 2009-12-01 07:45:00 6.75 13085 United Kingdom
489434 79323W WHITE CHERRY LIGHTS 12 2009-12-01 07:45:00 6.75 13085 United Kingdom
489434 22041 RECORD FRAME 7" SINGLE SIZE 48 2009-12-01 07:45:00 2.1 13085 United Kingdom
489434 21232 STRAWBERRY CERAMIC TRINKET BOX 24 2009-12-01 07:45:00 1.25 13085 United Kingdom

Setup

We'll import required packages and read in the example dataset:

import re
import typing as T

import polars as pl
from polars import testing

from dpipes.processor import ColumnPipeProcessor, PipeProcessor

data = pl.read_csv("examples/sample.csv", ignore_errors=True)

Transformations

Lets' define some functions to transform our data. We'll start off by converting the camel-case column names to snake-case:

def camel_to_snake(x: str) -> str:
    return re.sub(r"(?<!^)(?=[A-Z][a-z])", "_", re.sub(r"\s+", "_", x))


def clean_colnames(df: pl.DataFrame) -> pl.DataFrame:
    return df.rename({x: camel_to_snake(x).lower() for x in df.columns})

Next, we'll define a few functions that will calculate the total price per line item, calculate the total price per invoice order, calculate the number of unique products in each order, and calculate the total number of items in each order.

Finally, let's define one function where we convert floats to integers, and another were we convert integers to strings. We'll use these functions to cast customer_id field as a string.

Transformation Functions with Polars
def add_line_total(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns(
        line_item_total=pl.col("quantity") * pl.col("price")
    )


def add_order_total(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns(
        order_total=pl.col("line_item_total").sum().over("invoice")
    )


def add_order_num_products(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns(
        order_size=pl.count().over("invoice")
    )


def add_total_order_size(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns(
        order_size=pl.col("quantity").sum().over("invoice")
    )


def float_to_int(
    df: pl.DataFrame, cols: T.Union[str, T.Sequence[str]], fillna: int = -99999
) -> pl.DataFrame:
    return df.with_columns(pl.col(cols).fill_nan(fillna).cast(int))


def int_to_string(
    df: pl.DataFrame, cols: T.Union[str, T.Sequence[str]]
) -> pl.DataFrame:
    return df.with_columns(pl.col(cols).cast(str))

Data Pipeline

Now, let's chain these functions together to make a simple processing pipeline.

Method Chaining

We can use Polars' dataframe.pipe method to chain all these operations together. We'll add two DataFrame.pipe calls with a lambda function to apply our casting operations to a specific column.

Using Polars Pipe and Method Chaining
result_a = (
    data.pipe(clean_colnames)
    .pipe(add_line_total)
    .pipe(add_order_total)
    .pipe(add_order_num_products)
    .pipe(add_total_order_size)
    .pipe(lambda x: float_to_int(x, "customer_id"))
    .pipe(lambda x: int_to_string(x, "customer_id"))
)

PipeProcessor

Now, let's see how this looks using the dpipe.PipeProcessor class. We'll instantiate an object by passing a list of functions that we want to run, in order. We can this use this object to run the pipeline on any passed dataset.

We'll create two new PipeProcessor objects: one to process functions on the customer_id function, and another that will compose both our original and column-specific pipelines into a single processor.

One can easily create an arbitrary number of sub-pipelines and pipeline compositions.

PipeProcessor Composition
ps = PipeProcessor([
    clean_colnames,
    add_line_total,
    add_order_total,
    add_order_num_products,
    add_total_order_size
])
col_ps = PipeProcessor([float_to_int, int_to_string], {"cols": "customer_id"})
pipeline = PipeProcessor([ps, col_ps])
result_b = pipeline(data)

testing.assert_frame_equal(result_a, result_b)

Note

Note that we only passed a single dictionary to the dpipes.PipeProcessor constructor, and it broadcast those keyword arguments to both functions within the pipeline.

Although both methods produce identical results, only the use of PipeProcessor provides a reusable, modular pipeline object.

ColumnPipeProcessor

Finally, if the only keyword arguments to our transformation functions are column names, we can choose to use the dpipes.ColumnPipeProcessor, instead. Similar to the dpipes.PipeProcessor class, we can pass in a single column or single list of columns to broadcast to the functions within the pipeline. You can also specify specific column(s) for each function to act on by passing a list of lists.

ColumnPipeProcessor
col_ps = ColumnPipeProcessor([float_to_int, int_to_string], cols="customer_id")
pipeline = PipeProcessor([ps, col_ps])

result_b = pipeline(data)

testing.assert_frame_equal(result_a, result_b)