TransWikia.com

Is there a straightforward way to run pandas.DataFrame.isin in parallel?

Data Science Asked by Therriault on February 11, 2021

I have a modeling and scoring program that makes heavy use of the DataFrame.isin function of pandas, searching through lists of facebook “like” records of individual users for each of a few thousand specific pages. This is the most time-consuming part of the program, more so than the modeling or scoring pieces, simply because it only runs on one core while the rest runs on a few dozen simultaneously.

Though I know I could manually break up the dataframe into chunks and run the operation in parallel, is there any straightforward way to do that automatically? In other words, is there any kind of package out there that will recognize I’m running an easily-delegated operation and automatically distribute it? Perhaps that’s asking for too much, but I’ve been surprised enough in the past by what’s already available in Python, so I figure it’s worth asking.

Any other suggestions about how this might be accomplished (even if not by some magic unicorn package!) would also be appreciated. Mainly, just trying to find a way to shave off 15-20 minutes per run without spending an equal amount of time coding the solution.

4 Answers

Unfortunately, parallelization is not yet implemented in pandas. You can join this github issue if you want to participate in the development of this feature.

I don't know any "magic unicorn package" for this purposes, so the best thing will be write your own solution. But if you still don't want to spend time on that and want to learn something new – you can try the two methods built into MongoDB (map reduce and agg framework). See mongodb_agg_framework.

Answered by Stanpol on February 11, 2021

I think your best bet would be rosetta. I'm finding it extremely useful and easy. Check its pandas methods.

You can get it by pip.

Answered by dmvianna on February 11, 2021

There is the useful dask library for parallel numpy/pandas jobs

Answered by Severin Pappadeux on February 11, 2021

There is a more common version of this question regarding parallelization on pandas apply function - so this is a refreshing question :)

First, I want to mention swifter since you asked for a "packaged" solution, and it appears on most SO question regarding pandas parallelization.

But.. I'd still like to share my personal gist code for it, since after several years of working with DataFrame I never found a 100% parallelization solution (mainly for the apply function) and I always had to come back for my "manual" code.

Thanks to you I made it more generic to support any (theoretically) DataFrame method by its name (so you won't have to keep versions for isin, apply, etc..).

I tested it on "isin", "apply" and "isna" functions using both python 2.7 and 3.6. It's under 20 lines, and I followed the pandas naming convention like "subset" and "njobs".

I also added a time comparison with dask equivalent code for "isin" and it seems ~ X2 times slower then this gist.

It includes 2 functions:

df_multi_core - this is the one you call. It accepts:

  1. Your df object
  2. The function name you'd like to call
  3. The subset of columns the function can be performed upon (helps reducing time / memory)
  4. The number of jobs to run in parallel (-1 or omit for all cores)
  5. Any other kwargs the df's function accepts (like "axis")

_df_split - this is an internal helper function that has to be positioned globally to the running module (Pool.map is "placement dependent"), otherwise I'd locate it internally..

here's the code from my gist (I'll add more pandas function tests there):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Bellow is a test code for a parallelized isin, comparing the native, multi-core gist and dask performance. On an I7 machine with 8 physical cores, I got around X4 times speedup. I'd love to hear what you get on your real data!

from time import time

if __name__ == '__main__': 
    sep = '-' * 50

    # isin test
    N = 10000000
    df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
    lookfor = np.random.randint(low=1, high=N, size=1000000)

    print('{}ntesting pandas isin on {}n{}'.format(sep, df.shape, sep))
    t1 = time()
    print('resultn{}'.format(df.isin(lookfor).sum()))
    t2 = time()
    print('time for native implementation {}n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
    print('resultn{}'.format(res.sum()))
    t4 = time()
    print('time for multi core implementation {}n{}'.format(round(t4 - t3, 2), sep))


    t5 = time()
    ddata = dd.from_pandas(df, npartitions=njobs)
    res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
    t6 = time()
    print('result random samplen{}'.format(res.sample(n=3, random_state=0)))
    print('time for dask implementation {}n{}'.format(round(t6 - t5, 2), sep))

--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1    953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for dask implementation 2.88

Answered by mork on February 11, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP