Skip to content

Commit 9e1118e

Browse files
committed
LISTEN/NOTIFY funcionality
Signed-off-by: chandr-andr (Kiselev Aleksandr) <chandr@chandr.net>
1 parent bcd96f3 commit 9e1118e

File tree

11 files changed

+225
-196
lines changed

11 files changed

+225
-196
lines changed

python/psqlpy/_internal/__init__.pyi

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1799,15 +1799,15 @@ class Listener:
17991799
- `channel`: name of the channel.
18001800
"""
18011801

1802-
async def listen(self: Self) -> None:
1802+
def listen(self: Self) -> None:
18031803
"""Start listening.
18041804
18051805
Start actual listening.
18061806
In the background it creates task in Rust event loop.
18071807
You must save returned Future to the array.
18081808
"""
18091809

1810-
async def abort_listen(self: Self) -> None:
1810+
def abort_listen(self: Self) -> None:
18111811
"""Abort listen.
18121812
18131813
If `listen()` method was called, stop listening,

src/common.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use pyo3::{
44
};
55

66
use crate::{
7-
driver::connection::InnerConnection, exceptions::rust_errors::RustPSQLDriverPyResult, query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult}, value_converter::{convert_parameters, PythonDTO, QueryParameter}
7+
driver::connection::PsqlpyConnection,
8+
exceptions::rust_errors::RustPSQLDriverPyResult,
9+
query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult},
10+
value_converter::{convert_parameters, PythonDTO, QueryParameter},
811
};
912

1013
/// Add new module to the parent one.
@@ -52,7 +55,7 @@ pub trait ObjectQueryTrait {
5255
) -> impl std::future::Future<Output = RustPSQLDriverPyResult<()>> + Send;
5356
}
5457

55-
impl ObjectQueryTrait for InnerConnection {
58+
impl ObjectQueryTrait for PsqlpyConnection {
5659
async fn psqlpy_query_one(
5760
&self,
5861
querystring: String,
@@ -128,6 +131,6 @@ impl ObjectQueryTrait for InnerConnection {
128131
}
129132

130133
async fn psqlpy_query_simple(&self, querystring: String) -> RustPSQLDriverPyResult<()> {
131-
Ok(self.batch_execute(querystring.as_str()).await?)
134+
self.batch_execute(querystring.as_str()).await
132135
}
133136
}

src/driver/connection.rs

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use futures_util::pin_mut;
44
use postgres_types::ToSql;
55
use pyo3::{buffer::PyBuffer, pyclass, pymethods, Py, PyAny, PyErr, Python};
66
use std::{collections::HashSet, sync::Arc, vec};
7-
use tokio_postgres::{binary_copy::BinaryCopyInWriter, Client, CopyInSink, Row, Statement, ToStatement};
7+
use tokio_postgres::{
8+
binary_copy::BinaryCopyInWriter, Client, CopyInSink, Row, Statement, ToStatement,
9+
};
810

911
use crate::{
1012
exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult},
@@ -20,108 +22,114 @@ use super::{
2022
transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit},
2123
};
2224

23-
pub enum InnerConnection {
25+
#[allow(clippy::module_name_repetitions)]
26+
pub enum PsqlpyConnection {
2427
PoolConn(Object),
2528
SingleConn(Client),
2629
}
2730

28-
impl InnerConnection {
29-
pub async fn prepare_cached(
30-
&self,
31-
query: &str
32-
) -> RustPSQLDriverPyResult<Statement> {
31+
impl PsqlpyConnection {
32+
/// Prepare cached statement.
33+
///
34+
/// # Errors
35+
/// May return Err if cannot prepare statement.
36+
pub async fn prepare_cached(&self, query: &str) -> RustPSQLDriverPyResult<Statement> {
3337
match self {
34-
InnerConnection::PoolConn(pconn) => {
35-
return Ok(pconn.prepare_cached(query).await?)
36-
}
37-
InnerConnection::SingleConn(sconn) => {
38-
return Ok(sconn.prepare(query).await?)
39-
}
38+
PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.prepare_cached(query).await?),
39+
PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.prepare(query).await?),
4040
}
4141
}
4242

43+
/// Prepare cached statement.
44+
///
45+
/// # Errors
46+
/// May return Err if cannot execute statement.
4347
pub async fn query<T>(
4448
&self,
4549
statement: &T,
4650
params: &[&(dyn ToSql + Sync)],
4751
) -> RustPSQLDriverPyResult<Vec<Row>>
48-
where T: ?Sized + ToStatement {
52+
where
53+
T: ?Sized + ToStatement,
54+
{
4955
match self {
50-
InnerConnection::PoolConn(pconn) => {
51-
return Ok(pconn.query(statement, params).await?)
52-
}
53-
InnerConnection::SingleConn(sconn) => {
56+
PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.query(statement, params).await?),
57+
PsqlpyConnection::SingleConn(sconn) => {
5458
return Ok(sconn.query(statement, params).await?)
5559
}
5660
}
5761
}
5862

63+
/// Prepare cached statement.
64+
///
65+
/// # Errors
66+
/// May return Err if cannot execute statement.
5967
pub async fn batch_execute(&self, query: &str) -> RustPSQLDriverPyResult<()> {
6068
match self {
61-
InnerConnection::PoolConn(pconn) => {
62-
return Ok(pconn.batch_execute(query).await?)
63-
}
64-
InnerConnection::SingleConn(sconn) => {
65-
return Ok(sconn.batch_execute(query).await?)
66-
}
69+
PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.batch_execute(query).await?),
70+
PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.batch_execute(query).await?),
6771
}
6872
}
6973

74+
/// Prepare cached statement.
75+
///
76+
/// # Errors
77+
/// May return Err if cannot execute statement.
7078
pub async fn query_one<T>(
7179
&self,
7280
statement: &T,
7381
params: &[&(dyn ToSql + Sync)],
7482
) -> RustPSQLDriverPyResult<Row>
75-
where T: ?Sized + ToStatement
83+
where
84+
T: ?Sized + ToStatement,
7685
{
7786
match self {
78-
InnerConnection::PoolConn(pconn) => {
87+
PsqlpyConnection::PoolConn(pconn) => {
7988
return Ok(pconn.query_one(statement, params).await?)
8089
}
81-
InnerConnection::SingleConn(sconn) => {
90+
PsqlpyConnection::SingleConn(sconn) => {
8291
return Ok(sconn.query_one(statement, params).await?)
8392
}
8493
}
8594
}
8695

87-
pub async fn copy_in<T, U>(
88-
&self,
89-
statement: &T
90-
) -> RustPSQLDriverPyResult<CopyInSink<U>>
96+
/// Prepare cached statement.
97+
///
98+
/// # Errors
99+
/// May return Err if cannot execute copy data.
100+
pub async fn copy_in<T, U>(&self, statement: &T) -> RustPSQLDriverPyResult<CopyInSink<U>>
91101
where
92102
T: ?Sized + ToStatement,
93-
U: Buf + 'static + Send
103+
U: Buf + 'static + Send,
94104
{
95105
match self {
96-
InnerConnection::PoolConn(pconn) => {
97-
return Ok(pconn.copy_in(statement).await?)
98-
}
99-
InnerConnection::SingleConn(sconn) => {
100-
return Ok(sconn.copy_in(statement).await?)
101-
}
106+
PsqlpyConnection::PoolConn(pconn) => return Ok(pconn.copy_in(statement).await?),
107+
PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.copy_in(statement).await?),
102108
}
103109
}
104110
}
105111

106112
#[pyclass(subclass)]
107113
#[derive(Clone)]
108114
pub struct Connection {
109-
db_client: Option<Arc<InnerConnection>>,
115+
db_client: Option<Arc<PsqlpyConnection>>,
110116
db_pool: Option<Pool>,
111117
}
112118

113119
impl Connection {
114120
#[must_use]
115-
pub fn new(db_client: Option<Arc<InnerConnection>>, db_pool: Option<Pool>) -> Self {
121+
pub fn new(db_client: Option<Arc<PsqlpyConnection>>, db_pool: Option<Pool>) -> Self {
116122
Connection { db_client, db_pool }
117123
}
118124

119-
pub fn db_client(&self) -> Option<Arc<InnerConnection>> {
120-
return self.db_client.clone()
125+
#[must_use]
126+
pub fn db_client(&self) -> Option<Arc<PsqlpyConnection>> {
127+
self.db_client.clone()
121128
}
122129

130+
#[must_use]
123131
pub fn db_pool(&self) -> Option<Pool> {
124-
return self.db_pool.clone()
132+
self.db_pool.clone()
125133
}
126134
}
127135

@@ -151,7 +159,7 @@ impl Connection {
151159
.await??;
152160
pyo3::Python::with_gil(|gil| {
153161
let mut self_ = self_.borrow_mut(gil);
154-
self_.db_client = Some(Arc::new(InnerConnection::PoolConn(db_connection)));
162+
self_.db_client = Some(Arc::new(PsqlpyConnection::PoolConn(db_connection)));
155163
});
156164
return Ok(self_);
157165
}
@@ -277,7 +285,7 @@ impl Connection {
277285
let db_client = pyo3::Python::with_gil(|gil| self_.borrow(gil).db_client.clone());
278286

279287
if let Some(db_client) = db_client {
280-
return Ok(db_client.batch_execute(&querystring).await?);
288+
return db_client.batch_execute(&querystring).await;
281289
}
282290

283291
Err(RustPSQLDriverError::ConnectionClosedError)

src/driver/connection_pool.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use crate::{
1111
};
1212

1313
use super::{
14-
common_options::{ConnRecyclingMethod, LoadBalanceHosts, SslMode, TargetSessionAttrs}, connection::{Connection, InnerConnection}, listener::core::Listener, utils::{build_connection_config, build_manager, build_tls}
14+
common_options::{ConnRecyclingMethod, LoadBalanceHosts, SslMode, TargetSessionAttrs},
15+
connection::{Connection, PsqlpyConnection},
16+
listener::core::Listener,
17+
utils::{build_connection_config, build_manager, build_tls},
1518
};
1619

1720
/// Make new connection pool.
@@ -210,7 +213,8 @@ pub struct ConnectionPool {
210213
}
211214

212215
impl ConnectionPool {
213-
#[must_use] pub fn build(
216+
#[must_use]
217+
pub fn build(
214218
pool: Pool,
215219
pg_config: Config,
216220
ca_file: Option<String>,
@@ -497,9 +501,9 @@ impl ConnectionPool {
497501
Connection::new(None, Some(self.pool.clone()))
498502
}
499503

500-
pub fn listener(
501-
self_: pyo3::Py<Self>,
502-
) -> RustPSQLDriverPyResult<Listener> {
504+
#[must_use]
505+
#[allow(clippy::needless_pass_by_value)]
506+
pub fn listener(self_: pyo3::Py<Self>) -> Listener {
503507
let (pg_config, ca_file, ssl_mode) = pyo3::Python::with_gil(|gil| {
504508
let b_gil = self_.borrow(gil);
505509
(
@@ -509,7 +513,7 @@ impl ConnectionPool {
509513
)
510514
});
511515

512-
Ok(Listener::new(pg_config, ca_file, ssl_mode))
516+
Listener::new(pg_config, ca_file, ssl_mode)
513517
}
514518

515519
/// Return new single connection.
@@ -524,7 +528,10 @@ impl ConnectionPool {
524528
})
525529
.await??;
526530

527-
Ok(Connection::new(Some(Arc::new(InnerConnection::PoolConn(db_connection))), None))
531+
Ok(Connection::new(
532+
Some(Arc::new(PsqlpyConnection::PoolConn(db_connection))),
533+
None,
534+
))
528535
}
529536

530537
/// Close connection pool.

src/driver/cursor.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
runtime::rustdriver_future,
1212
};
1313

14-
use super::connection::InnerConnection;
14+
use super::connection::PsqlpyConnection;
1515

1616
/// Additional implementation for the `Object` type.
1717
#[allow(clippy::ref_option)]
@@ -28,7 +28,7 @@ trait CursorObjectTrait {
2828
async fn cursor_close(&self, closed: &bool, cursor_name: &str) -> RustPSQLDriverPyResult<()>;
2929
}
3030

31-
impl CursorObjectTrait for InnerConnection {
31+
impl CursorObjectTrait for PsqlpyConnection {
3232
/// Start the cursor.
3333
///
3434
/// Execute `DECLARE` command with parameters.
@@ -90,7 +90,7 @@ impl CursorObjectTrait for InnerConnection {
9090

9191
#[pyclass(subclass)]
9292
pub struct Cursor {
93-
db_transaction: Option<Arc<InnerConnection>>,
93+
db_transaction: Option<Arc<PsqlpyConnection>>,
9494
querystring: String,
9595
parameters: Option<Py<PyAny>>,
9696
cursor_name: String,
@@ -104,7 +104,7 @@ pub struct Cursor {
104104
impl Cursor {
105105
#[must_use]
106106
pub fn new(
107-
db_transaction: Arc<InnerConnection>,
107+
db_transaction: Arc<PsqlpyConnection>,
108108
querystring: String,
109109
parameters: Option<Py<PyAny>>,
110110
cursor_name: String,

0 commit comments

Comments
 (0)