Skip to content

Commit cb524f8

Browse files
committed
Add tags support
1 parent 5f62bd0 commit cb524f8

File tree

7 files changed

+250
-62
lines changed

7 files changed

+250
-62
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
"""Add batch tags
2+
3+
Revision ID: 9a098a265cbd
4+
Revises: f79ac07ca876
5+
Create Date: 2024-01-17 15:29:50.515286
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "9a098a265cbd"
16+
down_revision: Union[str, None] = "f79ac07ca876"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
# ### commands auto generated by Alembic - please adjust! ###
23+
op.create_table(
24+
"batch_tags",
25+
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
26+
sa.Column("name", sa.String(length=256), nullable=False),
27+
sa.PrimaryKeyConstraint("id"),
28+
)
29+
op.create_index(op.f("ix_batch_tags_name"), "batch_tags", ["name"], unique=True)
30+
op.create_table(
31+
"batch_tag_batches",
32+
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
33+
sa.Column("batch_id", sa.Integer(), nullable=False),
34+
sa.Column("batch_tag_id", sa.Integer(), nullable=False),
35+
sa.ForeignKeyConstraint(
36+
["batch_id"],
37+
["batches.id"],
38+
),
39+
sa.ForeignKeyConstraint(
40+
["batch_tag_id"],
41+
["batch_tags.id"],
42+
),
43+
sa.PrimaryKeyConstraint("id"),
44+
sa.UniqueConstraint("batch_id", "batch_tag_id", name="_batch_batch_tag_uc"),
45+
)
46+
op.create_index(
47+
op.f("ix_batch_tag_batches_batch_id"),
48+
"batch_tag_batches",
49+
["batch_id"],
50+
unique=False,
51+
)
52+
op.create_index(
53+
op.f("ix_batch_tag_batches_batch_tag_id"),
54+
"batch_tag_batches",
55+
["batch_tag_id"],
56+
unique=False,
57+
)
58+
op.create_index(
59+
op.f("ix_batches_created_at"), "batches", ["created_at"], unique=False
60+
)
61+
op.create_index(op.f("ix_jobs_completed"), "jobs", ["completed"], unique=False)
62+
op.create_index(op.f("ix_jobs_created_at"), "jobs", ["created_at"], unique=False)
63+
op.create_index(
64+
op.f("ix_jobs_delayed_until"), "jobs", ["delayed_until"], unique=False
65+
)
66+
op.create_index(op.f("ix_jobs_failed"), "jobs", ["failed"], unique=False)
67+
op.create_index(
68+
op.f("ix_repeat_urls_created_at"), "repeat_urls", ["created_at"], unique=False
69+
)
70+
op.create_index(op.f("ix_urls_first_seen"), "urls", ["first_seen"], unique=False)
71+
op.create_index(op.f("ix_urls_last_seen"), "urls", ["last_seen"], unique=False)
72+
# ### end Alembic commands ###
73+
74+
75+
def downgrade() -> None:
76+
# ### commands auto generated by Alembic - please adjust! ###
77+
op.drop_index(op.f("ix_urls_last_seen"), table_name="urls")
78+
op.drop_index(op.f("ix_urls_first_seen"), table_name="urls")
79+
op.drop_index(op.f("ix_repeat_urls_created_at"), table_name="repeat_urls")
80+
op.drop_index(op.f("ix_jobs_failed"), table_name="jobs")
81+
op.drop_index(op.f("ix_jobs_delayed_until"), table_name="jobs")
82+
op.drop_index(op.f("ix_jobs_created_at"), table_name="jobs")
83+
op.drop_index(op.f("ix_jobs_completed"), table_name="jobs")
84+
op.drop_index(op.f("ix_batches_created_at"), table_name="batches")
85+
op.drop_index(
86+
op.f("ix_batch_tag_batches_batch_tag_id"), table_name="batch_tag_batches"
87+
)
88+
op.drop_index(op.f("ix_batch_tag_batches_batch_id"), table_name="batch_tag_batches")
89+
op.drop_table("batch_tag_batches")
90+
op.drop_index(op.f("ix_batch_tags_name"), table_name="batch_tags")
91+
op.drop_table("batch_tags")
92+
# ### end Alembic commands ###

src/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818

1919
import sqlalchemy
20+
import sqlalchemy.ext.asyncio
2021
from aiohttp import ClientSession
2122
from fastapi import Depends, FastAPI, HTTPException, Query, Request, Response
2223
from fastapi.middleware.cors import CORSMiddleware

src/models.py

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import datetime
2+
from typing import Self
3+
from sqlalchemy import select
24
from sqlalchemy.orm import Mapped, mapped_column
35
import sqlalchemy.ext.asyncio
46
import sqlalchemy.orm
@@ -12,6 +14,71 @@ class Base(
1214
pass
1315

1416

17+
class BatchTag(Base):
18+
__tablename__ = "batch_tags"
19+
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, init=False)
20+
name: Mapped[str] = mapped_column(
21+
sqlalchemy.String(length=256), unique=True, index=True
22+
)
23+
24+
batches: Mapped[list["Batch"]] = sqlalchemy.orm.relationship(
25+
"Batch",
26+
secondary="batch_tag_batches",
27+
back_populates="tags",
28+
init=False,
29+
repr=False,
30+
)
31+
32+
@classmethod
33+
async def resolve_list(cls, names: set[str]) -> list[Self]:
34+
from .main import async_session
35+
36+
if not names:
37+
return []
38+
39+
async with async_session() as session, session.begin():
40+
stmt = select(cls).where(cls.name.in_(names))
41+
result = (await session.scalars(stmt)).all()
42+
seen = {r.name: r for r in result}
43+
seen_set = set(seen.keys())
44+
missing = names - seen_set
45+
if missing:
46+
newly_created: list[BatchTag] = [
47+
BatchTag(name=name) for name in missing
48+
]
49+
session.add_all(newly_created)
50+
return [*seen.values(), *newly_created]
51+
return list(seen.values())
52+
53+
54+
class BatchTagBatch(Base):
55+
__tablename__ = "batch_tag_batches"
56+
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, init=False)
57+
batch_id: Mapped[int] = mapped_column(
58+
sqlalchemy.ForeignKey("batches.id"), index=True
59+
)
60+
batch_tag_id: Mapped[int] = mapped_column(
61+
sqlalchemy.ForeignKey(BatchTag.id), index=True
62+
)
63+
64+
batch: Mapped["Batch"] = sqlalchemy.orm.relationship(
65+
"Batch", lazy="joined", innerjoin=True, foreign_keys=[batch_id], viewonly=True
66+
)
67+
batch_tag: Mapped["BatchTag"] = sqlalchemy.orm.relationship(
68+
"BatchTag",
69+
lazy="joined",
70+
innerjoin=True,
71+
foreign_keys=[batch_tag_id],
72+
viewonly=True,
73+
)
74+
75+
__table_args__ = (
76+
sqlalchemy.UniqueConstraint(
77+
"batch_id", "batch_tag_id", name="_batch_batch_tag_uc"
78+
),
79+
)
80+
81+
1582
class Batch(Base):
1683
__tablename__ = "batches"
1784

@@ -23,10 +90,19 @@ class Batch(Base):
2390
init=False,
2491
index=True,
2592
)
93+
locked: Mapped[bool] = mapped_column(
94+
default=False
95+
) # Indicates that a batch is locked (no more jobs can be added to it)
2696

2797
jobs: Mapped[list["Job"]] = sqlalchemy.orm.relationship(
2898
"Job", secondary="batch_jobs", back_populates="batches", init=False, repr=False
2999
)
100+
tags: Mapped[list[BatchTag]] = sqlalchemy.orm.relationship(
101+
BatchTag,
102+
secondary="batch_tag_batches",
103+
back_populates="batches",
104+
default_factory=list,
105+
)
30106

31107

32108
class URL(Base):
@@ -119,9 +195,6 @@ class Job(Base):
119195
id: Mapped[int] = mapped_column(
120196
sqlalchemy.BigInteger, primary_key=True, autoincrement=True, init=False
121197
)
122-
batches: Mapped[list[Batch]] = sqlalchemy.orm.relationship(
123-
Batch, secondary="batch_jobs", back_populates="jobs"
124-
)
125198
url_id: Mapped[int] = mapped_column(
126199
sqlalchemy.ForeignKey(URL.id),
127200
nullable=False,
@@ -132,6 +205,9 @@ class Job(Base):
132205
url: Mapped[URL] = sqlalchemy.orm.relationship(
133206
URL, lazy="joined", innerjoin=True, foreign_keys=[url_id], back_populates="jobs"
134207
)
208+
batches: Mapped[list[Batch]] = sqlalchemy.orm.relationship(
209+
Batch, secondary="batch_jobs", back_populates="jobs", default_factory=list
210+
)
135211
created_at: Mapped[datetime.datetime] = mapped_column(
136212
sqlalchemy.DateTime(timezone=True),
137213
server_default=sqlalchemy.sql.func.now(),

src/routes/batch/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import datetime
22

3-
from pydantic import BaseModel
3+
from pydantic import BaseModel, Field
44
import sqlalchemy
55
from sqlalchemy import select
66

@@ -20,6 +20,7 @@ class BatchReturn(BaseModel):
2020
created_at: datetime.datetime
2121
repeat_url: int | None = None
2222
jobs: int
23+
tags: list[str] = Field(default_factory=list)
2324

2425

2526
@app.get("/batch")

src/routes/batch/create/__init__.py

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import datetime
2+
from itertools import batched
23
from typing import Iterable
34
from pydantic import BaseModel, Field, model_validator
45
from sqlalchemy import select
56

67
from src.routes.queue.batch import QueueBatchReturn
7-
from ....models import Job, URL, Batch
8+
from ....models import Job, URL, Batch, BatchTag
89
from ....main import app, async_session
910

1011

@@ -35,44 +36,50 @@ async def add_filled_batch(
3536
items: Iterable[BatchItem],
3637
*,
3738
priority: int = 0,
38-
tags: list[str],
39+
tags: Iterable[str],
3940
) -> QueueBatchReturn:
40-
urls = [item.url for item in items]
41+
items_batched = batched(items, 30000)
42+
job_count = 0
4143
async with async_session() as session, session.begin():
42-
batch = Batch()
43-
stmt = select(URL).where(URL.url.in_(urls))
44-
result = await session.scalars(stmt)
45-
existing_urls_items = result.all()
46-
existing_urls = {url.url for url in existing_urls_items}
47-
new_urls = set(urls) - set(existing_urls)
48-
if new_urls:
49-
new_url_models = [URL(url=url) for url in new_urls]
50-
session.add_all(new_url_models)
51-
del new_urls, new_url_models, existing_urls, existing_urls_items
52-
# Needed because bulk create doesn't return the created models with their IDs
44+
batch = Batch(tags=await BatchTag.resolve_list(set(tags)))
45+
for item_set in items_batched:
46+
urls = [item.url for item in item_set]
5347
stmt = select(URL).where(URL.url.in_(urls))
5448
result = await session.scalars(stmt)
55-
url_models = result.all()
56-
else:
57-
url_models = existing_urls_items
58-
del new_urls, existing_urls, existing_urls_items
59-
url_map: dict[str, URL] = {url.url: url for url in url_models}
60-
del url_models
61-
jobs = []
62-
for item in items:
63-
job = Job(
64-
url=url_map[item.url],
65-
batches=[batch],
66-
priority=priority,
67-
completed=item.completed,
68-
failed=item.failed,
69-
)
70-
job.created_at = item.effective_creation_time
71-
jobs.append(job)
72-
url_map[item.url].last_seen = item.completed or url_map[item.url].last_seen
73-
session.add_all(jobs)
49+
existing_urls_items = result.all()
50+
existing_urls = {url.url for url in existing_urls_items}
51+
new_urls = set(urls) - set(existing_urls)
52+
if new_urls:
53+
new_url_models = [URL(url=url) for url in new_urls]
54+
session.add_all(new_url_models)
55+
del new_urls, new_url_models, existing_urls, existing_urls_items
56+
# Needed because bulk create doesn't return the created models with their IDs
57+
stmt = select(URL).where(URL.url.in_(urls))
58+
result = await session.scalars(stmt)
59+
url_models = result.all()
60+
else:
61+
url_models = existing_urls_items
62+
del new_urls, existing_urls, existing_urls_items
63+
url_map: dict[str, URL] = {url.url: url for url in url_models}
64+
del url_models
65+
jobs = []
66+
for item in item_set:
67+
job = Job(
68+
url=url_map[item.url],
69+
batches=[batch],
70+
priority=priority,
71+
completed=item.completed,
72+
failed=item.failed,
73+
)
74+
job.created_at = item.effective_creation_time
75+
jobs.append(job)
76+
url_map[item.url].last_seen = (
77+
item.completed or url_map[item.url].last_seen
78+
)
79+
session.add_all(jobs)
80+
job_count += len(jobs)
7481

75-
return QueueBatchReturn(batch_id=batch.id, job_count=len(jobs))
82+
return QueueBatchReturn(batch_id=batch.id, job_count=job_count)
7683

7784

7885
@app.post("/batch/create")

src/routes/batch/create/gsheets_archive.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class CSVBatchItem(TypedDict):
3232
@app.post("/batch/create/gsheets_archive")
3333
async def create_batch_gsheets_archive(
3434
file: UploadFile,
35+
exclude_ratelimited_daily: bool = True,
3536
priority: int = 0,
3637
tags: list[str] | None = None,
3738
created_override: datetime.datetime | None = None,
@@ -53,6 +54,11 @@ async def create_batch_gsheets_archive(
5354
)
5455
)
5556
elif item["archive_message"]: # A message other than an URL = failure
57+
if (
58+
"You cannot make more than" in item["archive_message"]
59+
): # We got ratelimited for too many captures in a day
60+
if exclude_ratelimited_daily:
61+
continue
5662
items.append(
5763
BatchItem(
5864
url=url,

0 commit comments

Comments
 (0)