TransWikia.com

Как добавлять новые задачи в async.wait по мере завершения уже запущеных задач?

Stack Overflow на русском Asked by Yuriy Tigiev on December 11, 2021

Как добавлять новые задачи в async.wait по мере завершения уже запущеных задач?

Ниже приведен кусок рабочего кода, который скачивает информацию с биржи по торговым парам за определенный период. Проблема данного скрипта в том, что если задать большое кол-во пар и период (по дням), то кол-во тасков сильно возрастает и при их обработке возникает проблемы с нехваткой памяти.
Как можно модифицировать приведенный ниже код, чтобы aio.wait обрабатывал не более 10 задач, а помере выполнения и закрытия текущей задачи, добавляли новые задачи из очереди ?

    tasks = ( FetchTrades(exchange, pair, period[0], period[1]) for pair in pairsFiltered for period in periods)
    pending = list( tasks )
    resultsList = []
    while pending:
        done, pending = await aio.wait(
                        pending, 
                        #timeout=30, 
                        return_when=aio.FIRST_COMPLETED
            )
        for future in done:
            result = future.result()
            pair = result[0]
            if(len(result[1]) > 0):
                for i in range(0, len(result[1]), 10000): 
                    lst = result[1][i: i+10000]
                    cur.executemany(sqlInsert, lst)

2 Answers

Реализовал свой механизм для лимитирования кол-во одновремемных обрабатываемых асинхронных функций с помощью semaphores.

        async def fnc(sem, exchange, pair, periodSince, periodUntil, sql):
            async with sem:
                data = await FetchTrades(exchange, pair, periodSince, periodUntil)
                if(len(data[1]) > 0):
                    await self.SaveToDatabase(sql, data[1])
                return data

        sem = aio.Semaphore(semaphore)

        queue = [ fnc(sem, exchange, pair, period[0], period[1], sqlInsert) for pair in pairs for period in periods ]

        pending = queue
        while pending:
            done, pending = await aio.wait(
                            pending, 
                            #timeout=30, 
                            return_when=aio.FIRST_COMPLETED
                )
            for future in done:
                result = future.result()
                pair = result[0]
                processedPairs += 1


    async def SaveToDatabase(self,  sql, data):
        batch = 5000
        cur = self.dbConn.cursor()
        for i in range(0, len(data), batch): 
            lst = data[i: i+batch]
            with cf.ThreadPoolExecutor() as pool:
                result = await self.loop.run_in_executor(pool, cur.executemany, sql, lst)

Answered by Yuriy Tigiev on December 11, 2021

Правильных вариантов может быть несколько. Можно ограничить количество задач семафором

sem = asyncio.Semaphore(value=10)

def done(future):
    result = future.result()
    pair = result[0]
    if(len(result[1]) > 0):
        for i in range(0, len(result[1]), 10000): 
            lst = result[1][i: i+10000]
            cur.executemany(sqlInsert, lst)  
    sem.release()     


for period in periods:
    for pair in pairsFiltered:
        await sem.acquire()
        f = FetchTrades(exchange, pair, period[0], period[1])
        f.add_done_callback(done)

await asyncio.wait(asyncio.all_tasks())

Answered by eri on December 11, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP