# -*- coding: utf-8 -*-
# --------------------------------------------------------------------
# The MIT License (MIT)
#
# Copyright (c) 2015 Jonathan Labéjof <jonathan.labejof@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# --------------------------------------------------------------------
"""Decorators dedicated to class or functions calls."""
from __future__ import absolute_import
from .interception import PrivateInterceptor
from .check import Target
from b3j0f.utils.iterable import first
from b3j0f.utils.version import getcallargs
from six import get_function_code
from six.moves import range
from sys import stderr, maxsize
from time import sleep
from functools import wraps
__all__ = ['Types', 'types', 'Curried', 'curried', 'Retries', 'Memoize']
@Target(callable)
[docs]class Types(PrivateInterceptor):
"""
Check routine parameters and return type.
"""
[docs] class TypesError(Exception):
"""Handle Types error."""
[docs] class SpecialCondition(object):
"""Handle SpecialCondition."""
def __init__(self, _type):
super(Types.SpecialCondition, self).__init__()
self._type = _type
[docs] def get_type(self):
"""Get special condition parameter type."""
return self._type
[docs] class NotNone(SpecialCondition):
"""Handle NotNone SpecialCondition."""
[docs] class NotEmpty(SpecialCondition):
"""Handle NotEmpty SpecialCondition."""
[docs] class NamedParameterType(object):
"""Handle Named Parameter Type."""
def __init__(self, name, parameter_type):
super(Types.NamedParameterType, self).__init__()
self._name = name
self._parameter_type = parameter_type
[docs] class NamedParameterTypes(object):
"""Handle Named Parameter Types."""
def __init__(self, target, named_parameter_types):
super(Types.NamedParameterTypes, self).__init__()
self._named_parameter_types = []
target_code = get_function_code(target)
for index in range(target_code.co_argcount):
targetparamname = target_code.co_varnames[index]
if targetparamname in named_parameter_types:
parameter_type = named_parameter_types[targetparamname]
named_parameter_type = Types.NamedParameterType(
targetparamname,
parameter_type
)
self._named_parameter_types.append(named_parameter_type)
else:
self._named_parameter_types.append(None)
#: return type attribute name
RTYPE = 'rtype'
#: parameter types attribute name
PTYPES = 'ptypes'
__slots__ = (RTYPE, PTYPES) + PrivateInterceptor.__slots__
"""
Check parameter or result types of decorated class or function call.
"""
def __init__(self, rtype=None, ptypes=None, *args, **kwargs):
"""
:param rtype:
"""
super(Types, self).__init__(*args, **kwargs)
self.rtype = rtype
self.ptypes = {} if ptypes is None else ptypes
@staticmethod
[docs] def check_value(value, expected_type):
"""Check Types parameters."""
result = False
if isinstance(expected_type, Types.NotNone):
result = value is not None and Types.check_value(
value,
expected_type.get_type()
)
else:
result = value is None
if not result:
value_type = type(value)
if isinstance(expected_type, Types.NotEmpty):
try:
result = len(value) != 0
if result:
_type = expected_type.get_type()
result = Types.check_value(value, _type)
except TypeError:
result = False
elif isinstance(expected_type, list):
result = issubclass(value_type, list)
if result:
if len(expected_type) == 0:
result = len(value) == 0
else:
_expected_type = expected_type[0]
for item in value:
result = Types.check_value(
item,
_expected_type
)
if not result:
break
elif isinstance(expected_type, set):
result = issubclass(value_type, set)
if result:
if len(expected_type) == 0:
result = len(value) == 0
else:
_expected_type = expected_type.copy().pop()
_value = value.copy()
value_length = len(_value)
for _ in range(value_length):
item = _value.pop()
result = Types.check_value(
item,
_expected_type)
if not result:
break
else:
result = issubclass(value_type, expected_type)
return result
def _interception(self, joinpoint):
target = joinpoint.target
args = joinpoint.args
kwargs = joinpoint.kwargs
if self.ptypes:
callargs = getcallargs(target, *args, **kwargs)
for arg in callargs:
value = callargs[arg]
expected_type = self.ptypes.get(arg)
if (
expected_type is not None and
not Types.check_value(value, expected_type)
):
raise Types.TypesError(
"wrong typed parameter for arg {0} : {1} ({2}). \
Expected: {3}."
.format(
arg, value, type(value), expected_type
)
)
result = joinpoint.proceed()
target = joinpoint.target
args = joinpoint.args
kwargs = joinpoint.kwargs
if self.rtype:
if not Types.check_value(result, self.rtype):
raise Types.TypesError(
"wrong result type for {0} with parameters {1}, {2}: {3} \
({4}). Expected {5}."
.format(
target, args, kwargs, result, type(result),
self.rtype
)
)
return result
[docs]def types(*args, **kwargs):
"""Quick alias for the Types Annotation with only args and kwargs
parameters.
:param tuple args: may contain rtype.
:param dict kwargs: may contain ptypes.
"""
rtype = first(args)
return Types(rtype=rtype, ptypes=kwargs)
[docs]class Curried(PrivateInterceptor):
"""Annotation that returns a function that keeps returning functions
until all arguments are supplied; then the original function is
evaluated.
Inspirated from Jeff Laughlin Consulting LLC projects.
"""
ARGS = 'args' #: args attribute name
KWARGS = 'kwargs' #: kwargs attribute name
DEFAULT_ARGS = 'default_args' #: default args attribute name
DEFAULT_KWARGS = 'default_kwargs' #: default kwargs attribute name
__slots__ = (
ARGS, KWARGS, DEFAULT_ARGS, DEFAULT_KWARGS
) + PrivateInterceptor.__slots__
[docs] class CurriedResult(object):
"""Curried result in case of missing arguments."""
__slots__ = ('curried', 'exception')
def __init__(self, curried, exception):
super(Curried.CurriedResult, self).__init__()
self.curried = curried
self.exception = exception
def __init__(self, varargs=None, keywords=None, *args, **kwargs):
"""
:param tuple varargs: function call varargs.
:param dict keywords: function call keywords.
"""
super(Curried, self).__init__(*args, **kwargs)
# initialize arguments
if varargs is None:
varargs = ()
if keywords is None:
keywords = {}
# set attributes
self.args = self.default_args = varargs
self.kwargs = self.default_kwargs = keywords
def _bind_target(self, target, *args, **kwargs):
@wraps(target)
def wrapper(*args, **kwargs):
"""Target wrapper."""
return target(*args, **kwargs)
result = super(Curried, self)._bind_target(
target=wrapper, *args, **kwargs
)
return result
def _interception(self, joinpoint, *args, **kwargs):
result = None
target = joinpoint.target
args = joinpoint.args
kwargs = joinpoint.kwargs
self.kwargs.update(kwargs)
self.args += args
try:
# check if all arguments are given
getcallargs(target, *self.args, **self.kwargs)
joinpoint.args = self.args
joinpoint.kwargs = self.kwargs
result = joinpoint.proceed()
except TypeError as ex:
# in case of problem, returns curried decorater and exception
result = Curried.CurriedResult(self, ex)
return result
[docs]def curried(*args, **kwargs):
"""Curried annotation with varargs and kwargs.
"""
return Curried(varargs=args, keywords=kwargs)
def example_exc_handler(tries_remaining, exception, delay):
"""Example exception handler; prints a warning to stderr.
tries_remaining: The number of tries remaining.
exception: The exception instance which was raised.
"""
print >> stderr, "Caught '{0}', {1} tries remaining, \
sleeping for {2} seconds".format(exception, tries_remaining, delay)
[docs]class Retries(PrivateInterceptor):
"""Function decorator implementing retrying logic.
condition: retry condition, among execution success or failure or both.
delay: Sleep this many seconds * backoff * try number after failure
backoff: Multiply delay by this factor after each failure
exceptions: A tuple of exception classes; default (Exception,)
hook: A function with the signature myhook(data, condition, tries_remaining
, mydelay) where data is result function or raised Exception, condition is
ON_ERROR or ON_SUCCESS depending on error or success execution function,
tries_remaining is tries remaining, and finally, mydelay is waiting seconds
between calls; default None.
The decorator will call the function up to max_tries times if it raises
an exception or if it simply execute the function, depending on state
condition.
By default it catches instances of the Exception class and subclasses.
This will recover after all but the most fatal errors. You may specify a
custom tuple of exception classes with the 'exceptions' argument; the
function will only be retried if it raises one of the specified
exceptions.
Additionally you may specify a hook function which will be called prior
to retrying with the number of remaining tries and the exception instance;
see given example. This is primarily intended to give the opportunity to
log the failure. Hook is not called after failure if no retries remain.
"""
MAX_TRIES = 'max_tries' #: max_tries attribute name.
DELAY = 'delay' #: delay attribute name.
BACKOFF = 'backoff' #: backoff attribute name.
EXCEPTIONS = 'exceptions' #: exceptions attribute name.
HOOK = 'hook' #: hook attribute name.
CONDITION = 'condition' #: condition attribute name.
DEFAULT_DELAY = 1
DEFAULT_BACKOFF = 2
DEFAULT_EXCEPTIONS = (Exception,)
ON_ERROR = 1 #: on error retries condition.
ON_SUCCESS = 2 #: on success retries condition.
ALL = ON_ERROR | ON_SUCCESS #: all retries condition.
__slots__ = (
MAX_TRIES, DELAY, BACKOFF, EXCEPTIONS, HOOK, CONDITION
) + PrivateInterceptor.__slots__
def __init__(
self,
max_tries,
delay=DEFAULT_DELAY,
backoff=DEFAULT_BACKOFF,
exceptions=DEFAULT_EXCEPTIONS,
hook=None,
condition=ALL,
*args, **kwargs
):
super(Retries, self).__init__(*args, **kwargs)
self.max_tries = max_tries
self.delay = delay
self.backoff = backoff
self.exceptions = exceptions
self.hook = hook
self.condition = condition
def _interception(self, joinpoint):
result = None
mydelay = self.delay
for tries_remaining in range(self.max_tries - 1, -1, -1):
try:
result = joinpoint.proceed()
except self.exceptions as ex:
mydelay = self._checkretry(
mydelay=mydelay, condition=Retries.ON_ERROR, data=ex,
tries_remaining=tries_remaining
)
else:
mydelay = self._checkretry(
mydelay=mydelay, condition=Retries.ON_SUCCESS, data=result,
tries_remaining=tries_remaining
)
if mydelay is None:
break # stop execution if mydelay is None
return result
def _checkretry(self, mydelay, condition, tries_remaining, data):
"""Check if input parameters allow to retries function execution.
:param float mydelay: waiting delay between two execution.
:param int condition: condition to check with this condition.
:param int tries_remaining: tries remaining.
:param data: data to hook.
"""
result = mydelay
if self.condition & condition and tries_remaining > 0:
# hook data with tries_remaining and mydelay
if self.hook is not None:
self.hook(data, condition, tries_remaining, mydelay)
# wait mydelay seconds
sleep(mydelay)
result *= self.backoff # increment mydelay with this backoff
elif condition is Retries.ON_ERROR:
raise data # raise data if no retries and on_error
else: # else Nonify mydelay to prevent callee function to stop
result = None
return result
[docs]class Memoize(PrivateInterceptor):
"""Save funtion results related to called parameters.
Parameters must be hashable."""
MAX_SIZE = 'max_size' #: max size result.
_CACHE = '_cache' #: cache object which stores results and params.
DEFAULT_MAX_SIZE = maxsize #: default max size value.
__slots__ = (MAX_SIZE, _CACHE) + PrivateInterceptor.__slots__
def __init__(self, max_size=DEFAULT_MAX_SIZE, *args, **kwargs):
super(Memoize, self).__init__(*args, **kwargs)
self.max_size = max_size
self._cache = {}
def _getkey(self, args, kwargs):
"""Get hash key from args and kwargs.
args and kwargs must be hashable.
:param tuple args: called vargs.
:param dict kwargs: called keywords.
:return: hash(tuple(args) + tuple((key, val) for key in sorted(kwargs)).
:rtype: int."""
values = list(args)
keys = sorted(list(kwargs))
for key in keys:
values.append((key, kwargs[key]))
result = hash(tuple(values))
return result
def _interception(self, joinpoint):
result = None
args = joinpoint.args
kwargs = joinpoint.kwargs
key = self._getkey(args, kwargs)
_cache = self._cache
if key in _cache:
_, _, result = _cache[key]
else:
result = joinpoint.proceed()
if len(self._cache) < self.max_size:
_cache[key] = (args, kwargs, result)
return result
[docs] def getparams(self, result):
"""Get result parameters.
:param result: cached result.
:raises: ValueError if result is not cached.
:return: args and kwargs registered with input result.
:rtype: tuple"""
for key in self._cache:
if self._cache[key][2] == result:
args, kwargs, _ = self._cache[key]
return args, kwargs
else:
raise ValueError('Result is not cached')
[docs] def clearcache(self):
"""Clear cache."""
self._cache.clear()