Thanks to multiprocessing, it is relatively straightforward to write parallel code in Python. However, these processes communicate by copying and (de)serializing data, which can make parallel code even slower when large objects are passed back and forth. This post shows how to use shared memory to avoid all the copying and serializing, making it possible to have fast parallel code that works with large datasets.

The problem

In order to demonstrate the problem empirically, let us create a large data-frame and do some processing on each row:

import multiprocessing as mp
import numpy as np
import pandas as pd
from tqdm import tqdm
rows, cols = 1000, 5000
df = pd.DataFrame(
    np.random.random(size=(rows, cols)),
    columns=[f'Col-{i}' for i in range(cols)],
    index=[f'Idx-{i}' for i in range(rows)]
)

print(f'Data size: {df.values.nbytes / 1024 / 1204:.1f} MB')
df.iloc[:5, :5]
Data size: 32.4 MB
Col-0 Col-1 Col-2 Col-3 Col-4
Idx-0 0.759636 0.475505 0.625045 0.806929 0.447964
Idx-1 0.085767 0.038912 0.726519 0.771332 0.287777
Idx-2 0.781693 0.589335 0.485391 0.090572 0.559625
Idx-3 0.309681 0.307883 0.656973 0.104621 0.171662
Idx-4 0.477405 0.211749 0.692536 0.574679 0.379362

Following is a simple, and a bit silly, row transformation: we construct a random matrix of the same size as the dataframe, take the mean across rows, and compute the outer product between it and the specified row. It is not too heavy computationally, but (this is the important part), both its inputs and outputs are matrices of size $1000\times5000$, which have five millions entries.

def do_work(args):
    df, idx = args
    data = np.random.random(size=(len(df), len(df.columns)))
    result = np.outer(df.loc[idx], data.mean(axis=0))
    return result

We first set a baseline by transforming 250 random rows in a sequential fashion:

process_rows = np.random.choice(len(df), 250)
for i in tqdm(process_rows):
    result = do_work((df, df.index[i]))
100%|██████████| 250/250 [00:17<00:00, 13.98it/s]

This is how you would naively transform this code to a parallel version using multiprocessing:

with mp.Pool() as pool:
    tasks = ((df, df.index[idx]) for idx in process_rows)
    result = pool.imap(do_work, tasks)
    for res in tqdm(result, total=len(process_rows)):
        pass
100%|██████████| 250/250 [01:37<00:00,  2.57it/s]

This is much slower! As I hinted above, the problem is that the processes are exchanging a lot of data that has to be serialized, copied, and de-serialized. All of this takes time.

The solution

Thansk to shared_memory, making this fast is a breeze! A caveat, though: it only works with Python 3.8 or above.

We are first going to deal with plain numpy arrays, then build upon this to share pandas dataframes. The idea is to write a wrapper that takes care of moving data to and from the shared memory. I strongly encourage you to read the documentation I linked above, which explains this in more detail.

from multiprocessing.shared_memory import SharedMemory


class SharedNumpyArray:
    '''
    Wraps a numpy array so that it can be shared quickly among processes,
    avoiding unnecessary copying and (de)serializing.
    '''
    def __init__(self, array):
        '''
        Creates the shared memory and copies the array therein
        '''
        # create the shared memory location of the same size of the array
        self._shared = SharedMemory(create=True, size=array.nbytes)
        
        # save data type and shape, necessary to read the data correctly
        self._dtype, self._shape = array.dtype, array.shape
        
        # create a new numpy array that uses the shared memory we created.
        # at first, it is filled with zeros
        res = np.ndarray(
            self._shape, dtype=self._dtype, buffer=self._shared.buf
        )
        
        # copy data from the array to the shared memory. numpy will
        # take care of copying everything in the correct format
        res[:] = array[:]

    def read(self):
        '''
        Reads the array from the shared memory without unnecessary copying.
        '''
        # simply create an array of the correct shape and type,
        # using the shared memory location we created earlier
        return np.ndarray(self._shape, self._dtype, buffer=self._shared.buf)

    def copy(self):
        '''
        Returns a new copy of the array stored in shared memory.
        '''
        return np.copy(self.read_array())
        
    def unlink(self):
        '''
        Releases the allocated memory. Call when finished using the data,
        or when the data was copied somewhere else.
        '''
        self._shared.close()
        self._shared.unlink()

Now, use this class to wrap the array, and send this wrapped object as parameter to a process and/or as a return value of the process, it’s that simple!

In particular, note that the array itself is not saved in the object. That is the whole point, we do not want to move it around! We can move the shared memory, though, because doing so will not copy the underlying memory, only a reference to it will be moved. Also note the unlink function: you must not forget to call it whenever you are done working with the array, or, alternatively, when you stored a copy somewhere else. If you don’t do this, that shared memory will never be disposed of, and eventually you will run out of memory.

Using that wrapper, it is trivial to share a pandas dataframe: we wrap the values using the class above, and save index and columns.

class SharedPandasDataFrame:
    '''
    Wraps a pandas dataframe so that it can be shared quickly among processes,
    avoiding unnecessary copying and (de)serializing.
    '''
    def __init__(self, df):
        '''
        Creates the shared memory and copies the dataframe therein
        '''
        self._values = SharedNumpyArray(df.values)
        self._index = df.index
        self._columns = df.columns

    def read(self):
        '''
        Reads the dataframe from the shared memory
        without unnecessary copying.
        '''
        return pd.DataFrame(
            self._values.read(),
            index=self._index,
            columns=self._columns
        )
    
    def copy(self):
        '''
        Returns a new copy of the dataframe stored in shared memory.
        '''
        return pd.DataFrame(
            self._values.copy(),
            index=self._index,
            columns=self._columns
        )
        
    def unlink(self):
        '''
        Releases the allocated memory. Call when finished using the data,
        or when the data was copied somewhere else.
        '''
        self._values.unlink()

Here is how to use them, and how quick it is:

def work_fast(args):
    shared_df, idx = args
    
    # read dataframe from shared memory
    df = shared_df.read()
    
    # call old function
    result = do_work((df, idx))
    
    # wrap and return the result
    return SharedNumpyArray(result)
shared_df = SharedPandasDataFrame(df)

with mp.Pool() as pool:
    tasks = ((shared_df, df.index[idx]) for idx in process_rows)
    result = pool.imap(work_fast, tasks)
    for res in tqdm(result, total=len(process_rows)):
        res.unlink()  # IMPORTANT

shared_df.unlink()  # IMPORTANT
100%|██████████| 250/250 [00:13<00:00, 17.94it/s]

Wait a minute! you might say, this is barely faster than the single-process version! Well yes, but it is roughly seven times faster than the multiprocessing version :). You might have noticed that there is still some copying going on, after all: when we create the shared memory, we have to copy the array in there. Depending on the computations you perform in the worker process, you might be able to avoid this, e.g. by pre-allocating the shared memory and performing only in-place operations, but it strongly depends on exactly what and how you compute.

The advantage of multiprocessing with shared memory becomes more apparent when workers perform more computations. For example, suppose we want to take the convolution of the whole dataframe with a $20\times20$ filter made by $20^2$ random entries of the specified row:

from scipy.ndimage import convolve

def do_work(args):
    df, idx = args
    kernel_idx = np.random.choice(df.shape[1], 20 * 20)
    kernel = df.loc[idx][kernel_idx].values.reshape((20, 20))
    result = convolve(df.values, kernel)
    return result

As before, let us compare the sequential and multi-process versions. We can safely rule out the naive multiprocessing solution.

process_rows = np.random.choice(len(df), 24)
for i in tqdm(process_rows):
    result = do_work((df, df.index[i]))
100%|██████████| 24/24 [00:46<00:00,  1.93s/it]
shared_df = SharedPandasDataFrame(df)

with mp.Pool() as pool:
    tasks = ((shared_df, df.index[idx]) for idx in process_rows)
    result = pool.imap(work_fast, tasks)
    for res in tqdm(result, total=len(process_rows)):
        res.unlink()  # IMPORTANT

shared_df.unlink()  # IMPORTANT
100%|██████████| 24/24 [00:08<00:00,  2.83it/s]

Now that the computations take much longer than copying the result, the advantage becomes clearer.

When is this solution (not) applicable?

As discussed above, there is still some copying involved, therefore it is not straightforward to tell when this solution might be faster. When the size of the result is large, but the computations required to obtain it are not so heavy, a sequential approach might be faster, but it is not clear where to draw the line.

Another case to watch out is when you have large inputs, but small outputs. This solution is not necessary when you only read the input, but do not modify it. This is because the inputs follow a mechanism called copy-on-write, i.e. are not copied unless they are modified. This can be shown by slightly modifying the example above to return the sum of the convolution, instead of the convolution itself:

def do_work(args):
    df, idx = args
    kernel_idx = np.random.choice(df.shape[1], 20 * 20)
    kernel = df.loc[idx][kernel_idx].values.reshape((20, 20))
    result = convolve(df.values, kernel)
    return result.sum()


with mp.Pool() as pool:
    tasks = ((df, df.index[idx]) for idx in process_rows)
    result = pool.imap(do_work, tasks)
    for res in tqdm(result, total=len(process_rows)):
        pass
100%|██████████| 24/24 [00:09<00:00,  2.47it/s]

This is now as fast as the code above returning the whole result of the convolution.

However, if you have classes the trick above becomes necessary again. I honestly do not know why. This also happens if the worker function is defined outside of the class.

class Worker:
    def __init__(self):
        self.data = np.random.random(size=(10000, 10000))

    @staticmethod
    def work_not_shared(args):
        data, i = args
        return data[i].mean()
    
    def run_not_shared(self):
        with mp.Pool() as pool:
            tasks = [[self.data, idx] for idx in range(24)]
            result = pool.imap(self.work_not_shared, tasks)
            for res in tqdm(result, total=len(tasks)):
                pass
    
    @staticmethod
    def work_shared(args):
        data, i = args
        return data.read()[i].mean()
    
    def run_shared(self):
        shared = SharedNumpyArray(self.data)
        with mp.Pool() as pool:
            tasks = [[shared, idx] for idx in range(24)]
            result = pool.imap(self.work_shared, tasks)
            for res in tqdm(result, total=len(tasks)):
                pass
        shared.unlink()


print('Shared')
Worker().run_shared()

print('Not shared')
Worker().run_not_shared()
Shared


100%|██████████| 24/24 [00:00<00:00, 10363.77it/s]


Not shared


100%|██████████| 24/24 [00:29<00:00,  1.21s/it]

Happy multiprocessing!

This blog post, by the way, is fully contained in a jupyter notebook downloadable from here.