Skip to content

Added connection recycling settings #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,31 @@ async def main() -> None:
# rust does it instead.
```

### Control connection recycling
There are 3 available options to control how a connection is recycled - `Fast`, `Verified` and `Clean`.
As connection can be closed in different situations on various sides you can select preferable behavior of how a connection is recycled.

- `Fast`: Only run `is_closed()` when recycling existing connections.
- `Verified`: Run `is_closed()` and execute a test query. This is slower, but guarantees that the database connection is ready to
be used. Normally, `is_closed()` should be enough to filter
out bad connections, but under some circumstances (i.e. hard-closed
network connections) it's possible that `is_closed()`
returns `false` while the connection is dead. You will receive an error
on your first query then.
- `Clean`: Like [`Verified`] query method, but instead use the following sequence of statements which guarantees a pristine connection:
```sql
CLOSE ALL;
SET SESSION AUTHORIZATION DEFAULT;
RESET ALL;
UNLISTEN *;
SELECT pg_advisory_unlock_all();
DISCARD TEMP;
DISCARD SEQUENCES;
```
This is similar to calling `DISCARD ALL`. but doesn't call
`DEALLOCATE ALL` and `DISCARD PLAN`, so that the statement cache is not
rendered ineffective.

## Query parameters
You can pass parameters into queries.
Parameters can be passed in any `execute` method as the second parameter, it must be a list.
Expand Down
2 changes: 2 additions & 0 deletions python/psqlpy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ._internal import (
Connection,
ConnRecyclingMethod,
Cursor,
IsolationLevel,
PSQLPool,
Expand All @@ -16,4 +17,5 @@
"ReadVariant",
"Connection",
"Cursor",
"ConnRecyclingMethod",
]
45 changes: 45 additions & 0 deletions python/psqlpy/_internal/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,49 @@ class ReadVariant(Enum):
ReadOnly = 1
ReadWrite = 2

class ConnRecyclingMethod(Enum):
"""Possible methods of how a connection is recycled.

The default is [`Fast`] which does not check the connection health or
perform any clean-up queries.

# Description:
## Fast:
Only run [`is_closed()`] when recycling existing connections.

Unless you have special needs this is a safe choice.

## Verified:
Run [`is_closed()`] and execute a test query.

This is slower, but guarantees that the database connection is ready to
be used. Normally, [`is_closed()`] should be enough to filter
out bad connections, but under some circumstances (i.e. hard-closed
network connections) it's possible that [`is_closed()`]
returns `false` while the connection is dead. You will receive an error
on your first query then.

## Clean:
Like [`Verified`] query method, but instead use the following sequence
of statements which guarantees a pristine connection:
```sql
CLOSE ALL;
SET SESSION AUTHORIZATION DEFAULT;
RESET ALL;
UNLISTEN *;
SELECT pg_advisory_unlock_all();
DISCARD TEMP;
DISCARD SEQUENCES;
```
This is similar to calling `DISCARD ALL`. but doesn't call
`DEALLOCATE ALL` and `DISCARD PLAN`, so that the statement cache is not
rendered ineffective.
"""

Fast = 1
Verified = 2
Clean = 3

class Cursor:
"""Represent opened cursor in a transaction.

Expand Down Expand Up @@ -446,6 +489,7 @@ class PSQLPool:
port: Optional[int] = None,
db_name: Optional[str] = None,
max_db_pool_size: Optional[str] = None,
conn_recycling_method: Optional[ConnRecyclingMethod] = None,
) -> None:
"""Create new PostgreSQL connection pool.

Expand All @@ -468,6 +512,7 @@ class PSQLPool:
- `port`: port of postgres
- `db_name`: name of the database in postgres
- `max_db_pool_size`: maximum size of the connection pool
- `conn_recycling_method`: how a connection is recycled.
"""
async def startup(self: Self) -> None:
"""Startup the connection pool.
Expand Down
24 changes: 23 additions & 1 deletion python/tests/test_connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from psqlpy import Connection, PSQLPool, QueryResult
from psqlpy import Connection, ConnRecyclingMethod, PSQLPool, QueryResult


@pytest.mark.anyio
Expand Down Expand Up @@ -39,3 +39,25 @@ async def test_pool_connection(
"""Test that PSQLPool can return single connection from the pool."""
connection = await psql_pool.connection()
assert isinstance(connection, Connection)


@pytest.mark.anyio
@pytest.mark.parametrize(
"conn_recycling_method",
[
ConnRecyclingMethod.Fast,
ConnRecyclingMethod.Verified,
ConnRecyclingMethod.Clean,
],
)
async def test_pool_conn_recycling_method(
conn_recycling_method: ConnRecyclingMethod,
) -> None:
pg_pool = PSQLPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
conn_recycling_method=conn_recycling_method,
)

await pg_pool.startup()

await pg_pool.execute("SELECT 1")
21 changes: 21 additions & 0 deletions src/driver/common_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use deadpool_postgres::RecyclingMethod;
use pyo3::pyclass;

#[pyclass]
#[derive(Clone, Copy)]
pub enum ConnRecyclingMethod {
Fast,
Verified,
Clean,
}

impl ConnRecyclingMethod {
#[must_use]
pub fn to_internal(&self) -> RecyclingMethod {
match self {
ConnRecyclingMethod::Fast => RecyclingMethod::Fast,
ConnRecyclingMethod::Verified => RecyclingMethod::Verified,
ConnRecyclingMethod::Clean => RecyclingMethod::Clean,
}
}
}
23 changes: 19 additions & 4 deletions src/driver/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
value_converter::{convert_parameters, PythonDTO},
};

use super::connection::Connection;
use super::{common_options::ConnRecyclingMethod, connection::Connection};

/// `PSQLPool` for internal use only.
///
Expand All @@ -23,12 +23,14 @@ pub struct RustPSQLPool {
port: Option<u16>,
db_name: Option<String>,
max_db_pool_size: Option<usize>,
conn_recycling_method: Option<ConnRecyclingMethod>,
db_pool: Arc<tokio::sync::RwLock<Option<Pool>>>,
}

impl RustPSQLPool {
/// Create new `RustPSQLPool`.
#[must_use]
#[allow(clippy::too_many_arguments)]
pub fn new(
dsn: Option<String>,
username: Option<String>,
Expand All @@ -37,6 +39,7 @@ impl RustPSQLPool {
port: Option<u16>,
db_name: Option<String>,
max_db_pool_size: Option<usize>,
conn_recycling_method: Option<ConnRecyclingMethod>,
) -> Self {
RustPSQLPool {
dsn,
Expand All @@ -46,6 +49,7 @@ impl RustPSQLPool {
port,
db_name,
max_db_pool_size,
conn_recycling_method,
db_pool: Arc::new(tokio::sync::RwLock::new(None)),
}
}
Expand Down Expand Up @@ -124,6 +128,7 @@ impl RustPSQLPool {
let db_host = self.host.clone();
let db_port = self.port;
let db_name = self.db_name.clone();
let conn_recycling_method = self.conn_recycling_method;
let max_db_pool_size = self.max_db_pool_size;

let mut db_pool_guard = db_pool_arc.write().await;
Expand Down Expand Up @@ -163,9 +168,16 @@ impl RustPSQLPool {
}
}

let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast,
};
let mgr_config: ManagerConfig;
if let Some(conn_recycling_method) = conn_recycling_method {
mgr_config = ManagerConfig {
recycling_method: conn_recycling_method.to_internal(),
}
} else {
mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast,
};
}
let mgr = Manager::from_config(pg_config, NoTls, mgr_config);

let mut db_pool_builder = Pool::builder(mgr);
Expand All @@ -186,6 +198,7 @@ pub struct PSQLPool {
#[pymethods]
impl PSQLPool {
#[new]
#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn new(
dsn: Option<String>,
Expand All @@ -195,6 +208,7 @@ impl PSQLPool {
port: Option<u16>,
db_name: Option<String>,
max_db_pool_size: Option<usize>,
conn_recycling_method: Option<ConnRecyclingMethod>,
) -> Self {
PSQLPool {
rust_psql_pool: Arc::new(tokio::sync::RwLock::new(RustPSQLPool {
Expand All @@ -205,6 +219,7 @@ impl PSQLPool {
port,
db_name,
max_db_pool_size,
conn_recycling_method,
db_pool: Arc::new(tokio::sync::RwLock::new(None)),
})),
}
Expand Down
1 change: 1 addition & 0 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod common_options;
pub mod connection;
pub mod connection_pool;
pub mod cursor;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fn psqlpy(py: Python<'_>, pymod: &PyModule) -> PyResult<()> {
pymod.add_class::<driver::cursor::Cursor>()?;
pymod.add_class::<driver::transaction_options::IsolationLevel>()?;
pymod.add_class::<driver::transaction_options::ReadVariant>()?;
pymod.add_class::<driver::common_options::ConnRecyclingMethod>()?;
pymod.add_class::<query_result::PSQLDriverPyQueryResult>()?;
add_module(py, pymod, "extra_types", extra_types_module)?;
add_module(py, pymod, "exceptions", python_exceptions_module)?;
Expand Down