Skip to content

Commit e767379

Browse files
committed
Add new gsheets archive capability
1 parent c1785f9 commit e767379

File tree

7 files changed

+327
-212
lines changed

7 files changed

+327
-212
lines changed

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ aiohttp = "<4,>=3.9.0"
99
asyncpg = ">=0.29.0"
1010
sqlalchemy = {extras = ["asyncio"], version = "*"}
1111
sentry-sdk = {extras = ["fastapi"], version = "*"}
12+
python-multipart = "*"
1213

1314
[dev-packages]
1415
uvicorn = {extras = ["standard"] }

Pipfile.lock

Lines changed: 140 additions & 195 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/main.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@
3838
profiles_sample_rate=1.0,
3939
)
4040

41+
archive_url_regex = re.compile(r"/web/(\d{14})")
42+
43+
44+
def get_archive_save_url_timestamp(timestamp: str) -> datetime.datetime:
45+
return datetime.datetime.strptime(timestamp, "%Y%m%d%H%M%S").replace(
46+
tzinfo=datetime.timezone.utc
47+
)
48+
4149

4250
# Constants
4351
min_wait_time_between_archives = datetime.timedelta(hours=1)
@@ -125,12 +133,10 @@ async def url_worker():
125133
allow_redirects=False,
126134
) as resp:
127135
resp.raise_for_status()
128-
if match := re.search(
129-
r"/web/(\d{14})", resp.headers.get("Location", "")
136+
if match := archive_url_regex.search(
137+
resp.headers.get("Location", "")
130138
):
131-
saved_dt = datetime.datetime.strptime(
132-
match.group(1), "%Y%m%d%H%M%S"
133-
).replace(tzinfo=datetime.timezone.utc)
139+
saved_dt = get_archive_save_url_timestamp(match.group(1))
134140
async with session.begin():
135141
await session.execute(
136142
update(URL)

src/routes/batch/create/__init__.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import datetime
2+
from typing import Iterable
3+
from pydantic import BaseModel, Field, model_validator
4+
from sqlalchemy import select
5+
6+
from src.routes.queue.batch import QueueBatchReturn
7+
from ....models import Job, URL, Batch
8+
from ....main import app, async_session
9+
10+
11+
class BatchItem(BaseModel):
12+
url: str
13+
completed: datetime.datetime | None = None
14+
failed: datetime.datetime | None = None
15+
created: datetime.datetime | None = None
16+
17+
@model_validator(mode="after")
18+
def validate(self):
19+
if self.completed and self.failed:
20+
raise ValueError("Batch item cannot be both completed and failed")
21+
if not self.completed and not self.failed:
22+
raise ValueError("Batch item must be either completed or failed")
23+
24+
@property
25+
def effective_creation_time(self) -> datetime.datetime:
26+
return self.created or self.completed or self.failed
27+
28+
29+
class CreateBatchBody(BaseModel):
30+
items: list[BatchItem]
31+
tags: list[str] = Field(default_factory=list)
32+
33+
34+
async def add_filled_batch(
35+
items: Iterable[BatchItem],
36+
*,
37+
priority: int = 0,
38+
tags: list[str],
39+
) -> QueueBatchReturn:
40+
urls = [item.url for item in items]
41+
async with async_session() as session, session.begin():
42+
batch = Batch()
43+
stmt = select(URL).where(URL.url.in_(urls))
44+
result = await session.scalars(stmt)
45+
existing_urls_items = result.all()
46+
existing_urls = {url.url for url in existing_urls_items}
47+
new_urls = set(urls) - set(existing_urls)
48+
if new_urls:
49+
new_url_models = [URL(url=url) for url in new_urls]
50+
session.add_all(new_url_models)
51+
del new_urls, new_url_models, existing_urls, existing_urls_items
52+
# Needed because bulk create doesn't return the created models with their IDs
53+
stmt = select(URL).where(URL.url.in_(urls))
54+
result = await session.scalars(stmt)
55+
url_models = result.all()
56+
else:
57+
url_models = existing_urls_items
58+
del new_urls, existing_urls, existing_urls_items
59+
url_map: dict[str, URL] = {url.url: url for url in url_models}
60+
del url_models
61+
jobs = []
62+
for item in items:
63+
job = Job(
64+
url=url_map[item.url],
65+
batches=[batch],
66+
priority=priority,
67+
completed=item.completed,
68+
failed=item.failed,
69+
)
70+
job.created_at = item.effective_creation_time
71+
jobs.append(job)
72+
url_map[item.url].last_seen = item.completed or url_map[item.url].last_seen
73+
session.add_all(jobs)
74+
75+
return QueueBatchReturn(batch_id=batch.id, job_count=len(jobs))
76+
77+
78+
@app.post("/batch/create")
79+
async def create_batch(body: CreateBatchBody, priority: int = 0) -> QueueBatchReturn:
80+
return await add_filled_batch(body.items, priority=priority, tags=body.tags)
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import datetime
2+
from typing import Literal
3+
from fastapi import UploadFile
4+
from . import add_filled_batch, BatchItem
5+
from csv import DictReader
6+
from typing import TypedDict
7+
8+
from ...queue.batch import QueueBatchReturn
9+
from ....main import app, archive_url_regex, get_archive_save_url_timestamp
10+
11+
heading = (
12+
"url",
13+
"capture_dupe_status",
14+
"new_capture_status",
15+
"archive_message",
16+
"archive_info",
17+
"first_archive_status",
18+
)
19+
20+
21+
class CSVBatchItem(TypedDict):
22+
url: str
23+
capture_dupe_status: Literal["New capture", "Already captured", ""]
24+
new_capture_status: str | Literal[""]
25+
archive_message: str
26+
archive_info: str | Literal[""]
27+
first_archive_status: Literal["First Archive", ""]
28+
29+
30+
@app.post("/batch/create/gsheets_archive")
31+
async def create_batch_gsheets_archive(
32+
file: UploadFile,
33+
priority: int = 0,
34+
tags: list[str] | None = None,
35+
created_override: datetime.datetime | None = None,
36+
) -> QueueBatchReturn:
37+
if tags is None:
38+
tags = []
39+
tags.append("wayback-machine-gsheets-archive")
40+
csv_items = (await file.read()).decode().splitlines(False)
41+
reader = DictReader(csv_items, fieldnames=heading)
42+
items_list: list[CSVBatchItem] = list(reader)
43+
items: list[BatchItem] = []
44+
for item in items_list:
45+
url = item["url"]
46+
if match := archive_url_regex.search(item["archive_message"]):
47+
archived_timestamp = get_archive_save_url_timestamp(match.group(1))
48+
items.append(
49+
BatchItem(
50+
url=url, completed=archived_timestamp, created=created_override
51+
)
52+
)
53+
elif item["archive_message"]: # A message other than an URL = failure
54+
items.append(
55+
BatchItem(
56+
url=url,
57+
failed=datetime.datetime.now(tz=datetime.timezone.utc),
58+
created=created_override,
59+
)
60+
)
61+
return await add_filled_batch(items, priority=priority, tags=tags)

src/routes/queue/batch.py renamed to src/routes/queue/batch/__init__.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,28 @@
1-
from pydantic import BaseModel
1+
from typing import Iterable
2+
from pydantic import BaseModel, Field
23
from sqlalchemy import select
3-
from ...models import Job, URL, Batch
4-
from ...main import app, async_session
4+
from ....models import Job, URL, Batch
5+
from ....main import app, async_session
56

67

78
class QueueBatchBody(BaseModel):
89
urls: list[str]
9-
priority: int = 0
10-
unique_only: bool = True
10+
tags: list[str] = Field(default_factory=list)
1111

1212

1313
class QueueBatchReturn(BaseModel):
1414
batch_id: int
1515
job_count: int
1616

1717

18-
@app.post("/queue/batch")
19-
async def queue_batch(body: QueueBatchBody) -> QueueBatchReturn:
20-
batch = Batch()
21-
urls = body.urls
22-
if body.unique_only:
23-
urls = set(urls)
18+
async def add_batch(
19+
urls: Iterable[str],
20+
*,
21+
priority: int = 0,
22+
tags: list[str],
23+
) -> QueueBatchReturn:
2424
async with async_session() as session, session.begin():
25+
batch = Batch()
2526
stmt = select(URL).where(URL.url.in_(urls))
2627
result = await session.scalars(stmt)
2728
existing_urls_items = result.all()
@@ -42,7 +43,14 @@ async def queue_batch(body: QueueBatchBody) -> QueueBatchReturn:
4243
del url_models
4344
jobs = []
4445
for url in urls:
45-
jobs.append(Job(url=url_map[url], batches=[batch], priority=body.priority))
46+
jobs.append(Job(url=url_map[url], batches=[batch], priority=priority))
4647
session.add_all(jobs)
4748

4849
return QueueBatchReturn(batch_id=batch.id, job_count=len(jobs))
50+
51+
52+
@app.post("/queue/batch")
53+
async def queue_batch(
54+
body: QueueBatchBody, priority: int = 0, unique_only: bool = True
55+
) -> QueueBatchReturn:
56+
return await add_batch(set(body.urls) if unique_only else body.urls, priority, unique_only)

src/routes/queue/batch/file.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from fastapi import UploadFile
2+
from pydantic import BaseModel, Field
3+
4+
from . import QueueBatchReturn, add_batch
5+
from ....main import app
6+
7+
class QueueBatchFileBody(BaseModel):
8+
file: UploadFile
9+
tags: list[str] = Field(default_factory=list)
10+
11+
@app.post("/queue/batch/file")
12+
async def queue_batch_file(body: QueueBatchFileBody, priority: int = 0, unique_only: bool = False, ) -> QueueBatchReturn:
13+
urls = (await body.file.read()).decode().splitlines(False)
14+
return await add_batch(set(urls) if unique_only else urls, priority, unique_only, tags=body.tags)

0 commit comments

Comments
 (0)