-
Notifications
You must be signed in to change notification settings - Fork 429
feat: Implement SQLStorageClient
based on sqlalchemy
v2+
#1339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a new SQL-based storage client (SQLStorageClient
) that provides persistent data storage using SQLAlchemy v2+ for datasets, key-value stores, and request queues.
Key changes:
- Adds
SQLStorageClient
with support for connection strings, pre-configured engines, or default SQLite database - Implements SQL-based clients for all three storage types with database schema management and transaction handling
- Updates storage model configurations to support SQLAlchemy ORM mapping with
from_attributes=True
Reviewed Changes
Copilot reviewed 16 out of 18 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
src/crawlee/storage_clients/_sql/ |
New SQL storage implementation with database models, clients, and schema management |
tests/unit/storage_clients/_sql/ |
Comprehensive test suite for SQL storage functionality |
tests/unit/storages/ |
Updates to test fixtures to include SQL storage client testing |
src/crawlee/storage_clients/models.py |
Adds from_attributes=True to model configs for SQLAlchemy ORM compatibility |
pyproject.toml |
Adds new sql optional dependency group |
src/crawlee/storage_clients/__init__.py |
Adds conditional import for SQLStorageClient |
Comments suppressed due to low confidence (1)
tests/unit/storages/test_request_queue.py:23
- The test fixture only tests 'sql' storage client, but the removed 'memory' and 'file_system' parameters suggest this may have unintentionally reduced test coverage. Consider including all storage client types to ensure comprehensive testing.
@pytest.fixture(params=['sql'])
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
When implementing, I opted out of
|
The storage client has been repeatedly tested with SQLLite and a local PostgreSQL (a simple container installation without fine-tuning). import asyncio
from crawlee.crawlers import BasicCrawler, BasicCrawlingContext
from crawlee.storage_clients import SQLStorageClient
from crawlee.storages import RequestQueue, KeyValueStore
from crawlee import service_locator
from crawlee import ConcurrencySettings
LOCAL_POSTGRE = None # 'postgresql+asyncpg://myuser:mypassword@localhost:5432/postgres'
USE_STATE = True
KVS = True
DATASET = True
CRAWLERS = 1
REQUESTS = 10000
DROP_STORAGES = True
async def main() -> None:
service_locator.set_storage_client(
SQLStorageClient(
connection_string=LOCAL_POSTGRE if LOCAL_POSTGRE else None,
)
)
kvs = await KeyValueStore.open()
queue_1 = await RequestQueue.open(name='test_queue_1')
queue_2 = await RequestQueue.open(name='test_queue_2')
queue_3 = await RequestQueue.open(name='test_queue_3')
urls = [f'https://crawlee.dev/page/{i}' for i in range(REQUESTS)]
await queue_1.add_requests(urls)
await queue_2.add_requests(urls)
await queue_3.add_requests(urls)
crawler_1 = BasicCrawler(concurrency_settings=ConcurrencySettings(desired_concurrency=50), request_manager=queue_1)
crawler_2 = BasicCrawler(concurrency_settings=ConcurrencySettings(desired_concurrency=50), request_manager=queue_2)
crawler_3 = BasicCrawler(concurrency_settings=ConcurrencySettings(desired_concurrency=50), request_manager=queue_3)
# Define the default request handler
@crawler_1.router.default_handler
@crawler_2.router.default_handler
@crawler_3.router.default_handler
async def request_handler(context: BasicCrawlingContext) -> None:
if USE_STATE:
# Use state to store data
state_data = await context.use_state()
state_data['a'] = context.request.url
if KVS:
# Use KeyValueStore to store data
await kvs.set_value(context.request.url, {'url': context.request.url, 'title': 'Example Title'})
if DATASET:
await context.push_data({'url': context.request.url, 'title': 'Example Title'})
crawlers = [crawler_1]
if CRAWLERS > 1:
crawlers.append(crawler_2)
if CRAWLERS > 2:
crawlers.append(crawler_3)
# Run the crawler
data = await asyncio.gather(*[crawler.run() for crawler in crawlers])
print(data)
if DROP_STORAGES:
# Drop all storages
await queue_1.drop()
await queue_2.drop()
await queue_3.drop()
await kvs.drop()
if __name__ == '__main__':
asyncio.run(main()) This allows you to load work with storage without real requests. |
The use of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First part review. I will do RQ and tests in second part.
I have only minor comments. My main suggestion is to extract more code that is shared in all 3 clients. It is easier to understand all the clients once the reader easily knows which part of the code is exactly the same in all clients and which part of the code is unique and specific to the client. It also makes it easier to maintain the code.
Drawback would be that understanding just one class in the isolation would be little bit harder. But who wants to understand just one client?
flatten: list[str] | None = None, | ||
view: str | None = None, | ||
) -> DatasetItemsListPage: | ||
# Check for unsupported arguments and log a warning if found. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this unsupported just in this initial commit or there is no plan for supporting them in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will complicate database queries quite a bit. I don't plan to support this. But we could reconsider this in the future.
Since SQLite now supports JSON operations, this is possible - https://sqlite.org/json1.html
) | ||
|
||
@override | ||
async def iterate_items( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shares same code with get_data
, maybe extract to internal function reused in both places?
Just a note:
I guess there could be also room for some optimization in the future to make iterate_items more lazy when it comes to getting data from db or lazy + buffer instead of getting all data from db now and iterating over them.
impl = DateTime(timezone=True) | ||
cache_ok = True | ||
|
||
def process_result_value(self, value: datetime | None, _dialect: Dialect) -> datetime | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it allow None
at input/output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TypeDecorator
expects that value
in process_result_value
can be None
, since this is a generic type definition and the column can have nullable=True
.
"""Convert Python value to database value.""" | ||
return 'default' if value is None else value | ||
|
||
def process_result_value(self, value: str | None, _dialect: Dialect) -> str | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
input value should be str
and not None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause a conflict in mypy
, since value
in the signature of TypeDecorator.process_result_value
is Any | None
.
self._accessed_modified_update_interval = storage_client.get_accessed_modified_update_interval() | ||
|
||
@override | ||
async def get_metadata(self) -> DatasetMetadata: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These three methods are near identical in all clients
get_session
get_autocommit_session
and maybe
get_metadata
Maybe we could reuse it. Maybe define them in a standalone class and use them as mixin in all three clients.
Class could be generic based on the metadata type
Or maybe push them down to SQLStorageClient
as they are indeed specific to it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this also be further expanded by adding method safely_open
to the mixin or down to the SQLStorageClient
which would be wrapper for client specific open?
async with storage_client.create_session() as session:
# client specific open
try:
# Commit the insert or update metadata to the database
await session.commit()
except SQLAlchemyError:
...
client = cls(
id=orm_metadata.id,
storage_client=storage_client,
)
Could be done in many different ways. Point is to extract and centralize the SQL specific code shared in all clients and keep clients clean with only their specific, unique code.
Maybe a decorator in SQLStorageClient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
purge
and drop
also show significant degree of similarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the portion of metadata update related to the times could be extracted,
self._last_accessed_at
and self._last_modified_at
has the same optimization mechanics shared in all SQL clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, great ideas for code optimization.
I moved the duplicate code to a mixin.
It would also be good to mention it in docs and maybe show an example use. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will continue with the review later. There are many ways how to approach the RQclient implementation. I guess I have some different expectations in my mind (I am not saying those are correct :D ). Maybe we should define the expectations first, so that I do the review correctly based on that.
My initial expectations for the RQclient:
- Can be used on the APify platform and outside as well
- Supports any persistance
- Supports parallel consumers/producers (Use case being - speeding up crawlers on Apify platform with multiprocessing to fully utilize resources available -> for example Parsel based actor could have multiple ParselCrawlers under the hood and all of them working on the same RQ, but reducing the costs by avoiding ApifyRQClient)
Most typical use case:
- Crawlee outside of Apify platform
- Crawlee on Apify platform, but avoiding expensive ApifyRQClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job Max!
I haven't read the whole source code yet, but for the beginning...
Could you please share a bit more detail about the solution? For example, how it was tested, which tables it creates, how do we storage dataset records, kvs records, request records, and any other relevant context...
Since the dependencies include only sqlalchemy
and aiosqlite
, am I right in assuming this storage client currently supports only SQLite? If so, it might make sense to name it SqliteStorageClient
and also update the name of the extra. On the other hand, if it's intended to be a general SQL client, could you clarify what steps are needed to connect to a different database (e.g. Postgres)?
Also the documentation is missing. As a first step, we should extend the existing guide (https://crawlee.dev/python/docs/guides/storage-clients) with a section describing this storage client. If the functionality turns out to be more complex (e.g. supporting multiple databases with configuration), we instead create a dedicated guide as well.
Since this feature involve more complexity and potential edge cases, I'd suggest marking it under experimental flag for the v1.0 release.
@vdusek thanks. Sure.
For testing, I used the code provided in this comment: #1339 (comment) I used At the same time, I think it loads the storage quite diversely using different storage mechanisms. Direct writing to KVS, updating the key in KVS, writing to Datase, using RecoverableState with ‘use_state’, running up to 3 parallel queues. About Tables. Metadata tables duplicate Pydantic models. However, they are described in accordance with SQLAlchem. Dataset:
KVS:
RequestQueue: (It will probably be updated yet, since I am making some optimizations after the review @Pijukatel).
As @Pijukatel correctly pointed out in his review, several independent clients cannot currently work with the queue. I am working on optimizations for this. Any ideas for improvements and optimizations for tables are welcome. I also think this is a good time to discuss how we will support this in the future. For example, will we write migrations in case of table changes? |
SQLite is only used as a standard database. However, the client supports SQLite, PostgreSQL, and MySQL. By using dialects, you can obtain more optimized queries for these three databases. Without using dialects, I am not yet sure about the queue. This is not so critical for Dataset and KVS. To use a different database, you need to install the appropriate asynchronous library supported by SQLAlchemy. For example, For standard SQLite, some optimization settings are performed in SQLStorageClient. |
Yes, I put off working on the documentation. I assumed that there might be critical updates during the review process. |
Description
SQLStorageClient
which can accept a database connection string or a pre-configuredAsyncEngine
, or creates a defaultcrawlee.db
database inConfiguration.storage_dir
.Issues