Concurrently¶
Library helps to easily write concurrent executed code blocks.
Quick example:
import asyncio
from concurrently import concurrently
async def amain(loop):
"""
How to fetch some web pages with concurrently.
"""
urls = [ # define pages urls
'http://test/page_1',
'http://test/page_2',
'http://test/page_3',
'http://test/page_4',
]
results = {}
# immediately run wrapped function concurrent
# in 2 thread (asyncio coroutines)
@concurrently(2)
async def fetch_urls():
for url in urls:
# some function for download page
page = await fetch_page(url)
results[url] = page
# wait until all concurrent threads finished
await fetch_urls()
print(results)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(amain(loop))
- Decorator
@concurrently()
makes to main thinks: - starts concurrent execution specified count of decorated function
- returns special Waiter object to control the running functions
By default, the code runs as asyncio coroutines, but there are other supported ways to execute, by specifying the argument engine.
Details¶
Waiter¶
The @concurrently()
returns special object Waiter
to control the
running functions, like a wait until complete, stop and other.
-
class
concurrently.engines.
AbstractWaiter
¶ -
__call__
(suppress_exceptions: bool = False, fail_hard: bool = False)¶ The call blocks until the completion of all concurrent functions.
All exceptions in concurrent functions are captured and re-raise as
UnhandledExceptions
.You can customize this behavior with following options:
Parameters: - suppress_exceptions – don’t raise
UnhandledExceptions
- fail_hard – stop all functions and raise error if one of function abort with error
- suppress_exceptions – don’t raise
-
exceptions
() → List[Exception]¶ Returns list of all exception.
Useful with option
suppress_exceptions
.
-
stop
()¶ Interrupts execution functions.
-
Supported engines¶
AsyncIOEngine¶
Runs code as asyncio coroutines:
from concurrently import concurrently, AsyncIOEngine
...
@concurrently(2, engine=AsyncIOEngine, loop=loop) # loop is option
async def fetch_urls():
...
await fetch_urls()
-
class
concurrently.
AsyncIOEngine
(loop: asyncio.base_events.BaseEventLoop = None)¶ Parameters: loop – specific asyncio loop or use default if None
AsyncIOThreadEngine¶
Runs code in threads with asyncio:
from concurrently import concurrently, AsyncIOThreadEngine
...
@concurrently(2, engine=AsyncIOThreadEngine, loop=loop)
def fetch_urls(): # not async def
...
await fetch_urls()
-
class
concurrently.
AsyncIOThreadEngine
(loop: asyncio.base_events.BaseEventLoop = None)¶ Parameters: loop – specific asyncio loop or use default if None
ThreadEngine¶
Runs code in system threads:
from concurrently import concurrently, ThreadEngine
...
@concurrently(2, engine=ThreadEngine)
def fetch_urls(): # not async def
...
fetch_urls() # not await
-
class
concurrently.
ThreadEngine
¶