Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: -p psqlpy --all-features -- -W clippy::all -W clippy::pedantic -D warnings
args: -p psqlpy --all-features -- -W clippy::all -W clippy::pedantic
pytest:
name: ${{matrix.job.os}}-${{matrix.py_version}}
strategy:
Expand Down
2 changes: 0 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ repos:
- clippy::all
- -W
- clippy::pedantic
- -D
- warnings

- id: check
types:
Expand Down
26 changes: 14 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 4 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,8 @@ crate-type = ["cdylib"]

[dependencies]
deadpool-postgres = { git = "https://github.com/chandr-andr/deadpool.git", branch = "psqlpy" }
pyo3 = { version = "*", features = [
"chrono",
"experimental-async",
"rust_decimal",
"py-clone",
"gil-refs",
"macros",
] }
pyo3-async-runtimes = { git = "https://github.com/chandr-andr/pyo3-async-runtimes.git", branch = "main", features = [
pyo3 = { version = "0.23.4", features = ["chrono", "experimental-async", "rust_decimal", "py-clone", "macros"] }
pyo3-async-runtimes = { git = "https://github.com/chandr-andr/pyo3-async-runtimes.git", branch = "psqlpy", features = [
"tokio-runtime",
] }

Expand Down Expand Up @@ -59,3 +52,5 @@ pg_interval = { git = "https://github.com/chandr-andr/rust-postgres-interval.git
pgvector = { git = "https://github.com/chandr-andr/pgvector-rust.git", branch = "psqlpy", features = [
"postgres",
] }
futures-channel = "0.3.31"
futures = "0.3.31"
1 change: 1 addition & 0 deletions docs/.vuepress/sidebar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export default sidebar({
"connection",
"transaction",
"cursor",
"listener",
"results",
"exceptions",
],
Expand Down
1 change: 1 addition & 0 deletions docs/components/components_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ title: Components
- `Connection`: represents single database connection, can be retrieved from `ConnectionPool`.
- `Transaction`: represents database transaction, can be made from `Connection`.
- `Cursor`: represents database cursor, can be made from `Transaction`.
- `Listener`: object to work with [LISTEN](https://www.postgresql.org/docs/current/sql-listen.html)/[NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html) functionality, can be mode from `ConnectionPool`.
- `QueryResult`: represents list of results from database.
- `SingleQueryResult`: represents single result from the database.
- `Exceptions`: we have some custom exceptions.
Expand Down
12 changes: 12 additions & 0 deletions docs/components/connection_pool.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,18 @@ This is the preferable way to work with the PostgreSQL.
:::


### Listener

Create a new instance of a listener.

```python
async def main() -> None:
...
listener = db_pool.listener()
```
```


### Close

To close the connection pool at the stop of your application.
Expand Down
18 changes: 18 additions & 0 deletions docs/components/exceptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ stateDiagram-v2
RustPSQLDriverPyBaseError --> BaseConnectionError
RustPSQLDriverPyBaseError --> BaseTransactionError
RustPSQLDriverPyBaseError --> BaseCursorError
RustPSQLDriverPyBaseError --> BaseListenerError
RustPSQLDriverPyBaseError --> RustException
RustPSQLDriverPyBaseError --> RustToPyValueMappingError
RustPSQLDriverPyBaseError --> PyToRustValueMappingError
Expand Down Expand Up @@ -44,6 +45,11 @@ stateDiagram-v2
[*] --> CursorFetchError
[*] --> CursorClosedError
}
state BaseListenerError {
[*] --> ListenerStartError
[*] --> ListenerClosedError
[*] --> ListenerCallbackError
}
state RustException {
[*] --> DriverError
[*] --> MacAddrParseError
Expand Down Expand Up @@ -127,3 +133,15 @@ Error in cursor fetch (any fetch).

#### CursorClosedError
Error if underlying connection is closed.

### BaseListenerError
Base error for all Listener errors.

#### ListenerStartError
Error if listener start failed.

#### ListenerClosedError
Error if listener manipulated but it's closed

#### ListenerCallbackError
Error if callback passed to listener isn't a coroutine
204 changes: 204 additions & 0 deletions docs/components/listener.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
---
title: Listener
---

`Listener` object allows users to work with [LISTEN](https://www.postgresql.org/docs/current/sql-listen.html)/[NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html) functionality.

## Usage

There are two ways of using `Listener` object:
- Async iterator
- Background task

::: tabs

@tab Background task
```python
from psqlpy import ConnectionPool, Connection, Listener


db_pool = ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
)

async def test_channel_callback(
connection: Connection,
payload: str,
channel: str,
process_id: int,
) -> None:
# do some important staff
...

async def main() -> None:
# Create listener object
listener: Listener = db_pool.listener()

# Add channel to listen and callback for it.
await listener.add_callback(
channel="test_channel",
callback=test_channel_callback,
)

# Startup the listener
await listener.startup()

# Start listening.
# `listen` method isn't blocking, it returns None and starts background
# task in the Rust event loop.
listener.listen()

# You can stop listening.
listener.abort_listen()
```

@tab Async Iterator
```python
from psqlpy import (
ConnectionPool,
Connection,
Listener,
ListenerNotificationMsg,
)


db_pool = ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
)

async def main() -> None:
# Create listener object
listener: Listener = db_pool.listener()

# Startup the listener
await listener.startup()

listener_msg: ListenerNotificationMsg
async for listener_msg in listener:
print(listener_msg)
```

:::

## Listener attributes

- `connection`: Instance of `Connection`.
If `startup` wasn't called, raises `ListenerStartError`.

- `is_started`: Flag that shows whether the `Listener` is running or not.

## Listener methods

### Startup

Startup `Listener` instance and can be called once or again only after `shutdown`.

::: important
`Listener` must be started up.
:::

```python
async def main() -> None:
listener: Listener = db_pool.listener()

await listener.startup()
```

### Shutdown
Abort listen (if called) and release underlying connection.

```python
async def main() -> None:
listener: Listener = db_pool.listener()

await listener.startup()
await listener.shutdown()
```

### Add Callback

#### Parameters:
- `channel`: name of the channel to listen.
- `callback`: coroutine callback.

Add new callback to the channel, can be called more than 1 times.

Callback signature is like this:
```python
from psqlpy import Connection

async def callback(
connection: Connection,
payload: str,
channel: str,
process_id: int,
) -> None:
...
```

Parameters for callback are based like `args`, so this signature is correct to:
```python
async def callback(
connection: Connection,
*args,
) -> None:
...
```

**Example:**
```python
async def test_channel_callback(
connection: Connection,
payload: str,
channel: str,
process_id: int,
) -> None:
...

async def main() -> None:
listener = db_pool.listener()

await listener.add_callback(
channel="test_channel",
callback=test_channel_callback,
)
```

### Clear Channel Callbacks

#### Parameters:
- `channel`: name of the channel

Remove all callbacks for the channel

```python
async def main() -> None:
listener = db_pool.listener()
await listener.clear_channel_callbacks()
```

### Clear All Channels
Clear all channels and callbacks.

```python
async def main() -> None:
listener = db_pool.listener()
await listener.clear_all_channels()
```

### Listen
Start listening.

It's a non-blocking operation.
In the background it creates task in Rust event loop.

```python
async def main() -> None:
listener = db_pool.listener()
await listener.startup()
await listener.listen()
```

### Abort Listen
Abort listen.
If `listen()` method was called, stop listening, else don't do anything.
Loading
Loading