Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
940 views
in Technique[技术] by (71.8m points)

call multiprocessing in class method Python

Initially, I have a class to store some processed values and re-use those with its other methods.

The problem is when i tried to divide the class method into multiple process to speed up, python spawned processes but it seems didn't work (as I saw in Task Manager that only 1 process was running) and result is never delivered.

I did couple of search and found that pathos.multiprocessing can do this instead but I wonder if standard library can solve this problems?

from multiprocessing import Pool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return t

a = A(2)

a.run(list(range(10)))
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Your code fails as it cannot pickle the instance method (self.cal), which is what Python attempts to do when you're spawning multiple processes by mapping them to multiprocessing.Pool (well, there is a way to do it, but it's way too convoluted and not extremely useful anyway) - since there is no shared memory access it has to 'pack' the data and send it to the spawned process for unpacking. The same would happen to you if you tried to pickle the a instance.

The only shared memory access available in the multiprocessing package is a little known multiprocessing.pool.ThreadPool so if you really want to do this:

from multiprocessing.pool import ThreadPool

class A():
    def __init__(self, vl):
        self.vl = vl
    def cal(self, nb):
        return nb * self.vl
    def run(self, dt):
        t = ThreadPool(processes=4)
        rs = t.map(self.cal, dt)
        t.close()
        return rs

a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

But this will not give you parallelization as it essentially maps to your regular threads which do have access to the shared memory. You should pass class/static methods instead (if you need them called) accompanied with the data you want them to work with (in your case self.vl). If you need to share that data across processes you'll have to use some shared memory abstraction, like multiprocessing.Value, applying mutex along the way of course.

UPDATE

I said you could do it (and there are modules that more or less are doing it, check pathos.multiprocessing for example) but I don't think it's worth the trouble - when you come to a point where you have to trick your system into doing what you want, chances are you're either using a wrong system or you should rethink your design. But for the sake of informedness, here is one way to do what you want in a multiprocessing setting:

import sys
from multiprocessing import Pool

def parallel_call(params):  # a helper for calling 'remote' instances
    cls = getattr(sys.modules[__name__], params[0])  # get our class type
    instance = cls.__new__(cls)  # create a new instance without invoking __init__
    instance.__dict__ = params[1]  # apply the passed state to the new instance
    method = getattr(instance, params[2])  # get the requested method
    args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
    return method(*args)  # expand arguments, call our method and return the result

class A(object):

    def __init__(self, vl):
        self.vl = vl

    def cal(self, nb):
        return nb * self.vl

    def run(self, dt):
        t = Pool(processes=4)
        rs = t.map(parallel_call, self.prepare_call("cal", dt))
        t.close()
        return rs

    def prepare_call(self, name, args):  # creates a 'remote call' package for each argument
        for arg in args:
            yield [self.__class__.__name__, self.__dict__, name, arg]

if __name__ == "__main__":  # important protection for cross-platform use
    a = A(2)
    print(a.run(list(range(10))))
    # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

I think it's pretty self explanatory how it works, but in short it passes the name of your class, its current state (sans signals, tho), a desired method to be called and arguments to invoke it with to a parallel_call function which is called for each process in the Pool. Python automatically pickles and unpickles all this data so all parallel_call needs to do is reconstruct the original object, find a desired method in it and call it with the provided param(s).

This way we're passing only the data without trying to pass active objects so Python doesn't complain (well, in this case, try adding a reference to a instance method to your class parameters and see what happens) and everything works just fine.

If you want to go heavy on the 'magic' you can make it look exactly like your code (create your own Pool handler, pick up names from the functions and send the names to actual processes, etc.) but this should serve a sufficient function for your example.

However, before you raise your hopes up, keep in mind that this will work only when sharing a 'static' instance (an instance that doesn't change its initial state once you start invoking it in a multiprocessing context). If the A.cal method is to change the internal state of the vl property - it would affect only the instance where it changes (unless it changes in the main instance that calls the Pool between calls). If you want to share the state as well, you can upgrade parallel_call to pick up instance.__dict__ after the call and return it together with the method call result, then on the calling side you'd have to update the local __dict__ with the returned data to change the original state. And that's not enough - you'd actually have to create a shared dict and handle all the mutex staff to have it concurrently accessed by all the processes (you can use multiprocessing.Manager for that).

So, as I was saying, more trouble than its worth...


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...