Source code for gamla.functional_async

import asyncio
from typing import Any, AsyncGenerator, Awaitable, Callable, Iterable, Type

from gamla import currying
from gamla.optimized import async_functions


[docs]def run_sync(f): """Runs a coroutine in a synchronous context, blocking until result arrives. >>> async def afoo(x): ... await asyncio.sleep(1) ... return x ... run_sync(afoo(1)) 1 (after 1 second of waiting) """ loop = asyncio.new_event_loop() return loop.run_until_complete(asyncio.ensure_future(f, loop=loop))
[docs]@currying.curry async def amap_ascompleted( f: Callable[[Any], Awaitable[Any]], it: Iterable, ) -> AsyncGenerator[Any, None]: """Returns an AsyncGenerator of the results after applying async `f` to each element of Iterable `it` >>> async def amulti(x) ... return x*2 ... async def to_list(ag): ... return [i async for i in ag] ... run_sync(to_list(amap_ascompleted(amulti, range(4)))) [6, 0, 2, 4] (In a random order) """ for future in asyncio.as_completed(map(f, it)): yield await future
[docs]@currying.curry async def aexcepts( exception_type: Type[Exception], func: Callable, handler: Callable, x: Any, ): """An async functional try/except block: await and return `func` on `x`. If fails with `exception_type`, return the reult of running handler on the error. >>> async def araise(x): ... raise ValueError ... run_sync(aexcepts(ValueError, araise, lambda e: e, 5)) ValueError() >>> async def a_just(x): ... return x ... run_sync(aexcepts(ValueError, a_just, lambda e: e, 5)) 5 """ try: return await func(x) except exception_type as error: return handler(error)
[docs]@currying.curry async def mapa(f: Callable, it: AsyncGenerator) -> AsyncGenerator: """Returns an AsyncGenerator of the results after applying `f` to each async element of `it` >>> async def arange(count): ... for i in range(count): ... yield(i) ... async def to_list(ag): ... return [i async for i in ag] ... run_sync(to_list(mapa(lambda x: x*2, arange(4)))) [0, 2, 4, 6] """ async for element in it: yield f(element)
[docs]async def aconcat(async_generator: AsyncGenerator) -> AsyncGenerator: """Concat iterables of an async_generator. >>> async def many_range(count): ... for i in range(count): ... yield range(i, i+1) ... async def to_list(ag): ... return [i async for i in ag] ... run_sync(to_list(aconcat(many_range(4)))) [0, 1, 2, 3] """ async for g in async_generator: for x in g: yield x
[docs]def afirst(*funcs: Callable, exception_type: Type[Exception]): """Runs given `funcs` serially until getting a succefull result. Returns the result of the first function that runs on `x` without raising `exception_type`. If all given function raise e`xception_type`, `exception_type` will be raised. >>> async def araise(x): ... raise ValueError ... run_sync(afirst(araise, lambda x: x*x, exception_type=ValueError)(3)) 9 """ async def afirst_inner(x: Any): for f in funcs: try: return await async_functions.to_awaitable(f(x)) except exception_type: pass raise exception_type return afirst_inner