17
17
from fastapi import Depends , FastAPI , HTTPException , Path , Query , Request , Response
18
18
from aiohttp import ClientSession
19
19
20
+ # Constants
21
+ min_wait_time_between_archives = datetime .timedelta (hours = 1 )
22
+
20
23
engine : sqlalchemy .ext .asyncio .AsyncEngine = None
21
24
async_session : sqlalchemy .ext .asyncio .async_sessionmaker [sqlalchemy .ext .asyncio .AsyncSession ] = None
22
25
client_session : ClientSession = None
@@ -108,11 +111,12 @@ async def url_worker():
108
111
if next_job is None :
109
112
await asyncio .sleep (1 )
110
113
continue
111
- # First, make sure that we don't have to delay this URL (only one capture per 45 minutes)
112
- if next_job .url .last_seen and next_job .url .last_seen + datetime .timedelta (minutes = 45 ) > curtime :
113
- print (f"Re-querying job id={ next_job .id } for 45 minutes. Last seen at { next_job .url .last_seen } . Current time: { curtime } " )
114
+ # First, make sure that we don't have to delay this URL (only one capture per min_wait_time_between_archives)
115
+ if next_job .url .last_seen and next_job .url .last_seen + min_wait_time_between_archives > curtime :
116
+ next_queue_time : datetime .datetime = next_job .url .last_seen + min_wait_time_between_archives
117
+ print (f"Re-querying job id={ next_job .id } until { next_queue_time .strftime ('%c' )} . Last seen at { next_job .url .last_seen .strftime ('%c' )} ." )
114
118
async with session .begin ():
115
- stmt = update (Job ).where (Job .id == next_job .id ).values (delayed_until = next_job . url . last_seen + datetime . timedelta ( minutes = 45 ) )
119
+ stmt = update (Job ).where (Job .id == next_job .id ).values (delayed_until = next_queue_time )
116
120
await session .execute (stmt )
117
121
continue
118
122
if client_session is None :
@@ -135,7 +139,7 @@ async def url_worker():
135
139
async with session .begin ():
136
140
if next_job .retry < 4 :
137
141
print (f"Retrying job id={ next_job .id } for the { next_job .retry + 1 } time." )
138
- await session .execute (update (Job ).where (Job .id == next_job .id ).values (retry = next_job .retry + 1 , delayed_until = curtime + datetime . timedelta ( minutes = 45 ) ))
142
+ await session .execute (update (Job ).where (Job .id == next_job .id ).values (retry = next_job .retry + 1 , delayed_until = curtime + min_wait_time_between_archives ))
139
143
else :
140
144
await session .execute (update (Job ).where (Job .id == next_job .id ).values (failed = curtime , delayed_until = None ))
141
145
@@ -241,7 +245,7 @@ async def stats() -> Stats:
241
245
completed = dict ((await session .execute (select (Job .retry , sqlalchemy .func .count (Job .id )).where (Job .completed != None ).group_by (Job .retry ))).all ())
242
246
failed = (await session .scalar (select (sqlalchemy .func .count (Job .id )).where (Job .failed != None ))) or 0
243
247
batches = (await session .scalar (select (sqlalchemy .func .count (Batch .id )))) or 0
244
- super_recently_archived_urls = (await session .scalar (select (sqlalchemy .func .count (URL .id )).where (URL .last_seen != None ).where (URL .last_seen > datetime .datetime .now (tz = datetime .timezone .utc ) - datetime . timedelta ( minutes = 45 ) ))) or 0
248
+ super_recently_archived_urls = (await session .scalar (select (sqlalchemy .func .count (URL .id )).where (URL .last_seen != None ).where (URL .last_seen > datetime .datetime .now (tz = datetime .timezone .utc ) - min_wait_time_between_archives ))) or 0
245
249
recently_archived_urls = ((await session .scalar (select (sqlalchemy .func .count (URL .id )).where (URL .last_seen != None ).where (URL .last_seen > datetime .datetime .now (tz = datetime .timezone .utc ) - datetime .timedelta (hours = 4 )))) or 0 ) - super_recently_archived_urls
246
250
not_recently_archived_urls = (await session .scalar (select (sqlalchemy .func .count (URL .id )).where (URL .last_seen != None ).where (URL .last_seen < datetime .datetime .now (tz = datetime .timezone .utc ) - datetime .timedelta (hours = 4 )))) or 0
247
251
not_archived_urls = (await session .scalar (select (sqlalchemy .func .count (URL .id )).where (URL .last_seen == None ))) or 0
@@ -641,4 +645,4 @@ async def get_url_info(body: URLInfoBody) -> URLReturn:
641
645
"jobs" : [JobReturn .from_job (job ) for job in url .jobs ],
642
646
"first_seen" : url .first_seen ,
643
647
"last_seen" : url .last_seen
644
- }
648
+ }
0 commit comments