File tree Expand file tree Collapse file tree 1 file changed +6
-9
lines changed Expand file tree Collapse file tree 1 file changed +6
-9
lines changed Original file line number Diff line number Diff line change @@ -140,15 +140,13 @@ async def repeat_url_worker():
140
140
created_at : datetime .datetime = None
141
141
while True :
142
142
curtime = datetime .datetime .now (tz = datetime .timezone .utc )
143
- async with async_session () as session :
143
+ async with async_session () as session , session . begin () :
144
144
stmt = select (RepeatURL ).where (RepeatURL .active_since <= curtime ).order_by (RepeatURL .created_at )
145
- async with session .begin ():
146
- result = await session .scalars (stmt )
147
- jobs = result .all ()
145
+ result = await session .scalars (stmt )
146
+ jobs = result .all ()
148
147
stmt2 = select (URL .url ).join (Job ).where (URL .url .in_ ([job .url .url for job in jobs ]) & (Job .completed == None ) & (Job .failed == None ))
149
- async with session .begin ():
150
- result = await session .scalars (stmt2 )
151
- existing_jobs = result .all ()
148
+ result = await session .scalars (stmt2 )
149
+ existing_jobs = result .all ()
152
150
queued : list [Job ] = []
153
151
for job in jobs :
154
152
if (not job .url .last_seen or job .url .last_seen + datetime .timedelta (seconds = job .interval ) < curtime ) and job .url .url not in existing_jobs : # Job can be re-queued
@@ -158,8 +156,7 @@ async def repeat_url_worker():
158
156
created_at = curtime
159
157
queued .append (Job (url = job .url , priority = 10 , batches = [batch , job .batch ]))
160
158
if queued :
161
- async with session .begin ():
162
- session .add_all (queued )
159
+ session .add_all (queued )
163
160
await asyncio .sleep (60 )
164
161
165
162
You can’t perform that action at this time.
0 commit comments