TransWikia.com

Python таймаут, блокирующая функция

Stack Overflow на русском Asked on November 24, 2021

есть функция

def mqtt_sub_(domen, user, password, topic, timeout_, version = '4.1'):#receive message v4.1
    @timeout(5)
    def body():
        if version == '4.1':
            topic = topic+"/resp"
        msg = subscribe.simple(topic_ ,hostname=domen, auth = {'username':user,'password':password})
        text = msg.payload.decode().replace('u0000','').replace('resp','')
        return (text)
    try:
        answer = body()
    except:
        answer = 'error'
    finally:
        return answer

timeout

from functools import wraps
import errno
import os
import signal

class TimeoutError(Exception):
    pass
"""timeout - signal exception module"""

def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator

проблема в том что subscribe.simple является блокирующей функцией и ожидает ответа, тем самым декоратор таймаута срабатывает только если закрыть программу(ctrl+c)

One Answer

Ваш вариант довольно популярный, но имеет ряд ограничений.

alarm прерывает даже блокирующие функции - надо смотреть в каком месте лок. Но не работает в потоках это точно. Мне пришлось переделать его так:

def timeout(seconds=30, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def wrapper(*args, **kwargs):

            result = None

            def runner():
                nonlocal result
                result = func(*args, **kwargs)
                return result

            t = threading.Thread(target=runner)
            t.start()
            t.join(seconds)
            if t.is_alive():
                ctypes.pythonapi.PyThreadState_SetAsyncExc(t.ident, ctypes.py_object(TimeoutError))
                raise TimeoutError(error_message)

            return result

        return wrapper
    return decorator

Но в этом варианте заблокированная функция может провисеть в памяти пока не рпазблокируется - не вернется из сишного кода в питон.

В вашем случае я бы посмотрел на асинхронные варианты релизации MQTT

Answered by eri on November 24, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP