Build a data processing pipeline from scratch

Within 100 lines of code using Pandas

Posted by J.J. Young on Jan 8, 2017

To build a data processing pipeline, we need to answer the following problem:

  • First, what is the format of the data?
  • Second, what operation will be performed on the data?
  • Thirdly, what is pipeline and how to excute a pipeline?
  • At last, how to scale to large dataset?

For the first question, we will always assume that the data to be processed is in table-like format. In the early stage of machine learning applications, data proceesing can always be categried into two types.
One is extract feature columns from existed feature columns, the other is doing aggregatition on existed feature columns based on certain metrics. All in all, these two are column-based. The data processing framework should support column execution.
Data pipeline is a seqence of transformation function excuting on the data, the previous excuter's output is the successive's input.
For the last question, we can mimic MapReduce to processing the data in a data parallel style with multiprocessing.

Pandas - a Short Introduction

Here we will use pandas to process our data. In a enterprise, we usually deal a lot with json data. First let us start to generate some sample data:

data = [{'state': 'Florida',
         'shortname': 'FL',
         'info': {
             'governor': 'Rick Scott'
         },
         'counties': [{'name': 'Dade', 'population': 12345},
                      {'name': 'Broward', 'population': 40000},
                      {'name': 'Palm Beach', 'population': 60000}]},
        {'state': 'Ohio',
         'shortname': 'OH',
         'info': {
             'governor': 'John Kasich'
         },
         'counties': [{'name': 'Summit', 'population': 1234},
                      {'name': 'Cuyahoga', 'population': 1337}]}]
from pandas.io.json import json_normalize

df = json_normalize(data, 'counties', ['state', 'shortname',
                                           ['info', 'governor']])

To update the columns of the Pandas, either ways below are equivalent.

df['some_new_col1'] = df['state'].map(lambda e: e)

df['some_new_col2'] = df.apply(lambda e: e['state'])

Sometimes the new features may be depended on serveral other columns.

df['some_new_col3'] = df[['state', 'name']].map(lambda e: " ".join(e))

Processing Data in Pipeline Style

In traditional machine applications, such as face recognition, it is consisting of serveral steps. First apply face calibration methods, then extract the features and then train the classifiers. Finally turn the data model into production usage. These can be viewed as the sequence of transformation over the faces data.

Express this in a abstract way, this can be viewed as the following.

f(g(h(df), arg1=a), arg2=b, arg3=c)

But it looks rather awkward, thanks to Pandas built-in pipe method, we can simply write code as following.

df.pipe(h) \
  .pipe(g, arg1=a) \
  .pipe(f, arg2=b, arg3=c)

The f, g, h refered above can be functions, lambda expression or even class. In a lot of application I developed, I am in favor of class. Because it can offer more flexibility in controlling the behavior of the operators in the pipeline. But how can a class work like a function? That is Simple, use the Python magic function.

class SomeMagicFunction:
      def __init__(self, *args, **kwargs):
            self.args = args
            self.kwargs = kwargs

      def __call__(self, *args, **kwargs):
            print("do some thing magic with {} and {}".format(self.args, self.kwargs))

A more realistic one is like the following.

df.pipe(DimensionReduction('iso', to_col='dimension_reduced'), conf=conf, status=status) \
  .pipe(RiskScoreCalculator("knn", from_col='dimension_reduced', to_col='abnormal_score'), conf=conf, status=status) \
  .pipe(NormalizeAbnormalScore(from_col='abnormal_score', to_col='abnormal_score_normalize'), conf=conf, status=status) \
  .pipe(UpdateLabel(0), conf=conf, status=status) \
  .pipe(UpdateDescription(0), conf=conf, status=status) \
  .pipe(UpdateToRedis(start_time, end_time), conf=conf, status=status) \
  .pipe(UpdateImportance(data_manager.current_batch_json), conf=conf, status=status) \
  .pipe(UpdateToES(data_manager.current_batch_json), conf=conf, status=status)

DimensionReduction, RiskScoreCalculator, NormalizeAbnormalScore, UpdateLabel, UpdateDescription will apply new columns to the dataframe according to its own function. UpdateToRedis, UpdateImportance, UpdateToES will update the enriched result into databases for downstream systems to process the data. The conf and status is dictionary value in python. We assume that conf should never be applied with write methods, and each operator in the pipeline can only write small amount of data in status as a message queue. This actually uses Python dictionary side effect(dictionary is passed by reference not value) and should be taken carefully.

Scale to Large Dataset

At last, we nearly come to the end of pipeline processing. Still if the dataset is large, how can we deal with it? I guess someone experienced will say Dask is what you neeed. Actually, we can still use Pandas but in a parallel manner.

Parallel computing can be categorised into three kinds: data parallel, task parallel and mixed mode. For processing the dataframe, it is obvious that data parallel should be easy. By applying MapReduce methodology, data can be chunked into pieces and processed seperatedly and combine these together. We only need to implement column based enrichment operation and parallel aggregate. These are mostly needed.

import multiprocessing as mp
import pandas as pd

def parallel_runner(fn, *args, **kwargs):
    data = args[0]
    data_chunks = split_strategy(data)(data)
    with mp.Pool() as pool:
        results = pool.starmap(partial(fn, **kwargs),
                               ((i,) + args[1:] for i in data_chunks))
    return pd.concat(results).reset_index(drop=True)

The split_strategy is ignored here. It can be customised with fixed size or fixed chunk count etc. Partial function is used here, acting as some kind of trick, as the starmap function of multiprocessing pool does not support key word arguments. The first argument of the parameter args should always be the dataframe to be processed. And the parallel aggregatition is as following, the reader should not be much supprised. The dfGrouped is grouped-by object of the processed dataframe.

def parallel_aggregate(dfGrouped, func):
    with mp.Pool() as pool:
        ret_list = pool.starmap(func, ((name, group) for name, group in dfGrouped))
    return pd.concat(ret_list)

Hopefully, from this blog, data pipeline designing is cleard and simplified with Pandas.