Stack Overflow на русском Asked by Yuriy Tigiev on December 11, 2021
Как добавлять новые задачи в async.wait по мере завершения уже запущеных задач?
Ниже приведен кусок рабочего кода, который скачивает информацию с биржи по торговым парам за определенный период. Проблема данного скрипта в том, что если задать большое кол-во пар и период (по дням), то кол-во тасков сильно возрастает и при их обработке возникает проблемы с нехваткой памяти.
Как можно модифицировать приведенный ниже код, чтобы aio.wait обрабатывал не более 10 задач, а помере выполнения и закрытия текущей задачи, добавляли новые задачи из очереди ?
tasks = ( FetchTrades(exchange, pair, period[0], period[1]) for pair in pairsFiltered for period in periods)
pending = list( tasks )
resultsList = []
while pending:
done, pending = await aio.wait(
pending,
#timeout=30,
return_when=aio.FIRST_COMPLETED
)
for future in done:
result = future.result()
pair = result[0]
if(len(result[1]) > 0):
for i in range(0, len(result[1]), 10000):
lst = result[1][i: i+10000]
cur.executemany(sqlInsert, lst)
Реализовал свой механизм для лимитирования кол-во одновремемных обрабатываемых асинхронных функций с помощью semaphores.
async def fnc(sem, exchange, pair, periodSince, periodUntil, sql):
async with sem:
data = await FetchTrades(exchange, pair, periodSince, periodUntil)
if(len(data[1]) > 0):
await self.SaveToDatabase(sql, data[1])
return data
sem = aio.Semaphore(semaphore)
queue = [ fnc(sem, exchange, pair, period[0], period[1], sqlInsert) for pair in pairs for period in periods ]
pending = queue
while pending:
done, pending = await aio.wait(
pending,
#timeout=30,
return_when=aio.FIRST_COMPLETED
)
for future in done:
result = future.result()
pair = result[0]
processedPairs += 1
async def SaveToDatabase(self, sql, data):
batch = 5000
cur = self.dbConn.cursor()
for i in range(0, len(data), batch):
lst = data[i: i+batch]
with cf.ThreadPoolExecutor() as pool:
result = await self.loop.run_in_executor(pool, cur.executemany, sql, lst)
Answered by Yuriy Tigiev on December 11, 2021
Правильных вариантов может быть несколько. Можно ограничить количество задач семафором
sem = asyncio.Semaphore(value=10)
def done(future):
result = future.result()
pair = result[0]
if(len(result[1]) > 0):
for i in range(0, len(result[1]), 10000):
lst = result[1][i: i+10000]
cur.executemany(sqlInsert, lst)
sem.release()
for period in periods:
for pair in pairsFiltered:
await sem.acquire()
f = FetchTrades(exchange, pair, period[0], period[1])
f.add_done_callback(done)
await asyncio.wait(asyncio.all_tasks())
Answered by eri on December 11, 2021
Get help from others!
Recent Questions
Recent Answers
© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP