Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
244 views
in Technique[技术] by (71.8m points)

python - How to use asyncio with existing blocking library?

I have few blocking functions foo, bar and I can't change those (Some internal library I don't control. Talks to one or more network services). How do I use it as async?. E.g. I wan't to do the following.

results = []
for inp in inps:
    val = foo(inp)
    result = bar(val)
    results.append(result)

This will be inefficient as I can call foo for the second input while I am waiting for the first and same for bar. How do I wrap them such that they are usable with asyncio (i.e new async, await syntax)?

Lets assume the functions are re-entrant. i.e it is fine to call foo again when already a previous foo is processing.


Update

Extending answer with reusable decorator. Click here for example.

def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))

    return inner
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

There are (sort of) two questions here: first, how to run blocking code asynchronously, and second, how to run async code concurrently (asyncio is single-threaded, so the GIL still applies, so it isn't truly parallel, but I digress).

Concurrent tasks can be created using asyncio.ensure_future, as documented here.

To run synchronous code, you will need to run the blocking code in an executor. Example:

import concurrent.futures
import asyncio
import time

def blocking(delay):
    time.sleep(delay)
    print('Completed.')

async def non_blocking(loop, executor):
    # Run three of the blocking tasks concurrently. asyncio.wait will
    # automatically wrap these in Tasks. If you want explicit access
    # to the tasks themselves, use asyncio.ensure_future, or add a
    # "done, pending = asyncio.wait..." assignment
    await asyncio.wait(
        fs={
            # Returns after delay=12 seconds
            loop.run_in_executor(executor, blocking, 12),
            
            # Returns after delay=14 seconds
            loop.run_in_executor(executor, blocking, 14),
            
            # Returns after delay=16 seconds
            loop.run_in_executor(executor, blocking, 16)
        },
        return_when=asyncio.ALL_COMPLETED
    )

loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
loop.run_until_complete(non_blocking(loop, executor))

If you want to schedule these tasks using a for loop (as in your example), you have several different strategies, but the underlying approach is to schedule the tasks using the for loop (or list comprehension, etc), await them with asyncio.wait, and then retrieve the results. Example:

done, pending = await asyncio.wait(
    fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
    return_when=asyncio.ALL_COMPLETED
)

# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...