Source code for threaded._threaded

#    Copyright 2017 - 2020 Alexey Stepanov aka penguinolog
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""Threaded implementation.

Asyncio is supported

__all__ = ("Threaded", "threaded")

# Standard Library
import functools
import threading
import typing

# Local Implementation
from . import class_decorator

[docs]class Threaded(class_decorator.BaseDecorator): """Run function in separate thread.""" __slots__ = ("__name", "__daemon", "__started")
[docs] def __init__( self, name: typing.Optional[ typing.Union[str, typing.Callable[..., typing.Union["typing.Awaitable[typing.Any]", typing.Any]]] ] = None, daemon: bool = False, started: bool = False, ) -> None: """Run function in separate thread. :param name: New thread name. If callable: use as wrapped function. If none: use wrapped function name. :type name: typing.Optional[typing.Union[str, typing.Callable[..., typing.Union[typing.Awaitable, typing.Any]]]] :param daemon: Daemonize thread. :type daemon: bool :param started: Return started thread :type started: bool """ self.__daemon: bool = daemon self.__started: bool = started if callable(name): func: typing.Optional[typing.Callable[..., typing.Union["typing.Awaitable[typing.Any]", typing.Any]]] = name self.__name: typing.Optional[str] = "Threaded: " + getattr(name, "__name__", str(hash(name))) else: func, self.__name = None, name super().__init__(func=func)
@property def name(self) -> typing.Optional[str]: """Thread name. :rtype: typing.Optional[str] """ return self.__name @property def daemon(self) -> bool: """Start thread as daemon. :rtype: bool """ return self.__daemon @property def started(self) -> bool: """Return started thread. :rtype: bool """ return self.__started def __repr__(self) -> str: # pragma: no cover """For debug purposes. :return: repr data :rtype: str """ return f"{self.__class__.__name__}(name={!r}, daemon={self.daemon!r}, started={self.started!r}, )" def _get_function_wrapper( self, func: typing.Callable[..., typing.Union["typing.Awaitable[typing.Any]", typing.Any]] ) -> typing.Callable[..., threading.Thread]: """Here should be constructed and returned real decorator. :param func: Wrapped function :type func: typing.Callable[..., typing.Union[typing.Awaitable, typing.Any]] :return: wrapped function :rtype: typing.Callable[..., threading.Thread] """ prepared: typing.Callable[..., typing.Any] = self._await_if_required(func) name: typing.Optional[str] = if name is None: name = "Threaded: " + getattr(func, "__name__", str(hash(func))) # noinspection PyMissingOrEmptyDocstring @functools.wraps(prepared) def wrapper(*args: typing.Any, **kwargs: typing.Any) -> threading.Thread: """Thread getter. :return: Thread object :rtype: threading.Thread """ thread = threading.Thread(target=prepared, name=name, args=args, kwargs=kwargs, daemon=self.daemon) if self.started: thread.start() return thread return wrapper
[docs] def __call__( # pylint: disable=useless-super-delegation self, *args: typing.Union[typing.Callable[..., typing.Union["typing.Awaitable[typing.Any]", typing.Any]], typing.Any], **kwargs: typing.Any, ) -> typing.Union[threading.Thread, typing.Callable[..., threading.Thread]]: """Executable instance. :return: Thread object or Thread getter :rtype: Union[threading.Thread, Callable[..., threading.Thread]] """ return super().__call__(*args, **kwargs) # type: ignore
@typing.overload def threaded( name: typing.Callable[..., typing.Any], daemon: bool = False, started: bool = False ) -> typing.Callable[..., threading.Thread]: """Overload: Call decorator without arguments.""" @typing.overload # noqa: F811 def threaded(name: typing.Optional[str] = None, daemon: bool = False, started: bool = False) -> Threaded: """Overload: Name is not callable."""
[docs]def threaded( # noqa: F811 name: typing.Optional[typing.Union[str, typing.Callable[..., typing.Any]]] = None, daemon: bool = False, started: bool = False, ) -> typing.Union[Threaded, typing.Callable[..., threading.Thread]]: """Run function in separate thread. :param name: New thread name. If callable: use as wrapped function. If none: use wrapped function name. :type name: typing.Union[None, str, typing.Callable] :param daemon: Daemonize thread. :type daemon: bool :param started: Return started thread :type started: bool :return: Threaded instance, if called as function or argumented decorator, else callable wrapper :rtype: typing.Union[Threaded, typing.Callable[..., threading.Thread]] """ if callable(name): func, name = (name, "Threaded: " + getattr(name, "__name__", str(hash(name)))) return Threaded(name=name, daemon=daemon, started=started)(func) # type: ignore return Threaded(name=name, daemon=daemon, started=started)