import asyncio
import builtins
import functools
import logging
from typing import Text
import tabulate
import toolz
import yappi
from gamla import functional_generic
from gamla.optimized import sync
logger = functional_generic.side_effect(logging.info)
[docs]def log_text(text: Text, level: int = logging.INFO):
"""Logs given text and returns the execution input,
can also log the input itself by using {} in the text string.
>>> f = log_text("hello world {}")
>>> f("It's me!")
INFO hello world It's me!
'It's me!'
"""
return functional_generic.side_effect(lambda x: logging.log(level, text.format(x)))
def _break(x):
builtins.breakpoint()
#: A util to inspect a pipline by opening a debug prompt.
#: Note:
#: - Materializes generators, as most of the time we are interested in looking into them, so can have unexpected results.
#: - The current value can be referenced by `x` in the debug prompt.
debug = functional_generic.compose_left(
functional_generic.when(functional_generic.is_generator, tuple),
functional_generic.side_effect(_break),
)
debug_after = functional_generic.after(debug)
debug_before = functional_generic.before(debug)
def _debug_generic(f):
return functional_generic.compose_left(
lambda *funcs: toolz.interleave([funcs, [debug] * len(funcs)]),
sync.star(f),
)
#: Replace regular `compose` calls with this to get a breakpoint at each step.
debug_compose = _debug_generic(functional_generic.compose)
#: Replace regular `compose_left` calls with this to get a breakpoint at each step.
debug_compose_left = _debug_generic(functional_generic.compose_left)
[docs]def debug_exception(f):
"""Debug exception in a pipeline stage by looking at the causal value.
>>> gamla.pipe(
... "abc",
... gamla.itemgetter("some_key"), # This would cause an exception.
... )
>>> gamla.pipe(
... "abc",
... gamla.debug_exception(gamla.itemgetter("some_key")), # Now we can see the cause of the exception - we expect a `dict` but get a `str`.
... )
"""
if asyncio.iscoroutinefunction(f):
async def debug_exception(*x, **kwargs):
try:
return await f(*x, **kwargs)
except Exception as e:
builtins.breakpoint()
raise e
else:
def debug_exception(*x, **kwargs): # type: ignore
try:
return f(*x, **kwargs)
except Exception as e:
builtins.breakpoint()
raise e
return functools.wraps(f)(debug_exception)
def _print_stats():
logging.info(
"\n\n\n"
+ tabulate.tabulate(
tuple(
map(
lambda stat: [
f"{stat.ttot:2f}",
f"{stat.tsub:2f}",
stat.ncall,
stat.name,
f"{stat.module}:{stat.lineno}",
],
yappi.get_func_stats(),
),
)[:100],
[
"ttot",
"tsub",
"ncall",
"name",
"module:lineno",
],
),
)
def profileit(f):
if asyncio.iscoroutinefunction(f):
@functools.wraps(f)
async def wrapper(*args, **kwargs):
yappi.set_clock_type("WALL")
with yappi.run():
result = await f(*args, **kwargs)
_print_stats()
return result
return wrapper
@functools.wraps(f)
def wrapper(*args, **kwargs):
yappi.set_clock_type("WALL")
with yappi.run():
result = f(*args, **kwargs)
_print_stats()
return result
return wrapper