Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
287 views
in Technique[技术] by (71.8m points)

python - Executing a function in the background while using limited number of cores/threads and queue the extra executions?

I want to use a limited number of threads (at most 2) to run a function in a class for removing some files on disk in the background. The rest of my code within the same class is independent of this background function and might get executed tens of times more than the background function. However, I still need to enforce the core/thread limit. So it is possible that the background jobs might exceed 2 and I need to queue them. Note that my background function does not take any arguments.

I am pretty new to multi-threading and multi-processing but I think I have done my homework and looked at many posts here on Stack Overflow and tried a couple of approaches. However, none of those approaches seems to work for me. Here's the structure of my code:

class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        self.backgroundFunc() #I want to run this in the background

Here's how I run the code

import myClass

myClassInstance = myClass()
For element in someList:
    myClassInstance.mainFunc(elem=element)

Note that I cannot start the background job before the stuff in mainFunc has started running.

And here is my first try with threading in my class file:

from threading import Thread
class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        thr = Thread(target=self.backgroundFunc)
        thr.start()

However, the problem with this approach is that the program crashes at random times; sometimes right at the beginning of prpgram execution and sometimes later the erro messages are also different every time. I guess it's possibly because threads do not block a piece of memory and things might be being written/read from those memory cells. Or, unlikely, maybe this is because I am running my code on a server and there are some limitations enforced from the server on the allocated resources. In addition, I cannot set a limit on the number of threads and cannot do queuing, in case mainFunc code gets executed more than twice while I already have two background jobs running.

Here's another try with multiprocessing.Process:

from multiprocessing import Process
class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        p = Process(target=self.backgroundFunc)
        p.start()

The problem with this approach is that Process will use as many threads/cores that my machine has in its disposal and since the rest of my code automatically is run in parallel, everything becomes super slow very quickly.

I eventually arrived at multiprocessing.Pool but I am still pretty confused on how I can use it effectively. Anyways, here's my try with Pool:

from multiprocessing import Pool
class myClass(object):
    def __init__(self):
        #some stuff
        self.pool = Pool(processes=2)
    def backgroundFunc(self):
        # delete some files on disk
        print('some stuff')
    def mainFunc(self, elem):
        # Do some other things
        self.pool.apply_async(self.backgroundFunc)

However, apply_async seems not to work. None of the print statements that I have in the backgroundFunc print anything on the screen. I added self.pool.close() after apply_async but I get some errors soon after the second processes start. I tried using things like self.pool.apply and some others but it seems that they require a function that takes limited arguments. But my backgroundFunc does not take any arguments. Finally, I do not know how I can do the queuing that I explained earlier using Pool.

Also, I want to have control over how many times and when I want to run backgroundFunc. Also, mainFunc should not wait for all threads to finish running before it exits. If that happens, I won't benefit from multi threading because the background function might take too long to finish. Maybe I should have been more clear in the question; sorry about that.

So I would really appreciate if someone can help me with this. I am pretty confused. Thanks in advance!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

the program crashes randomly. I guess

It would be easier to concentrate on one problem at a time, without guessing, so, what's the crash?

Here's a test with threading that might inspire you, based on the example for queue.

#!python3
#coding=utf-8
""" https://stackoverflow.com/q/49081260/ """

import sys, time, threading, queue

print(sys.version)

class myClass:
    """ """

    def __init__(self):
        """ """
        self.q = queue.Queue()
        self.threads = []
        self.num_worker_threads = 2

    def backgroundFunc(self):
        """ """
        print("thread started")
        while True:
            item = self.q.get()
            if item is None:
                #self.q.task_done()
                break
            print("working on ", item)
            time.sleep(0.5)
            self.q.task_done()
        print("thread stopping")

    def mainFunc(self):
        """ """

        print("starting thread(s)")
        for i in range(self.num_worker_threads):
            t = threading.Thread(target=self.backgroundFunc)
            t.start()
            self.threads.append(t)

        print("giving thread(s) some work")
        for item in range(5):
            self.q.put(item)

        print("giving thread(s) more work")
        for item in range(5,10):
            self.q.put(item)

        # block until all tasks are done
        print("waiting for thread(s) to finish")
        self.q.join()

        # stop workers
        print("stopping thread(s)")
        for i in range(self.num_worker_threads):
            self.q.put(None)
        for t in self.threads:
            self.q.join()

        print("finished")



if __name__ == "__main__":
    print("instance")
    myClassInstance = myClass()

    print("run")
    myClassInstance.mainFunc()

    print("end.")

It prints

3.6.1 (v3.6.1:69c0db5, Mar 21 2017, 17:54:52) [MSC v.1900 32 bit (Intel)]
instance
run
starting thread(s)
thread started
thread started
giving thread(s) some work
giving thread(s) more work
waiting for thread(s) to finish
working on  0
working on  1
working on  2
working on  3
working on  4
working on  5
working on  6
working on  7
working on  8
working on  9
stopping thread(s)
thread stopping
thread stopping
finished
end.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...