Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
# Changelog
## v1.6.0 11/20/24
- Added patron_data_helper functions
- Use executemany instead of execute when appropriate in PostgreSQLClient
- Add capability to retry connecting to a database to the MySQL, PostgreSQL, and Redshift clients
- Automatically close database connection upon error in the MySQL, PostgreSQL, and Redshift clients
- Delete old PostgreSQLPoolClient, which was not production ready

## v1.5.0 11/19/24
- Added cloudLibrary client

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ help:
@echo " lint project files using the flake8 linter"

test:
pytest
pytest -W ignore::FutureWarning

lint:
flake8 --exclude *env
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ This package contains common Python utility classes and functions.
* Downloading files from a remote SSH SFTP server
* Connecting to and querying a MySQL database
* Connecting to and querying a PostgreSQL database
* Connecting to and querying a PostgreSQL database using a connection pool
* Connecting to and querying Redshift
* Making requests to the Oauth2 authenticated APIs such as NYPL Platform API and Sierra
* Interacting with vendor APIs such as cloudLibrary
Expand All @@ -21,6 +20,7 @@ This package contains common Python utility classes and functions.
* Creating a logger in the appropriate format
* Obfuscating a value using bcrypt
* Parsing/building Research Catalog identifiers
* Mapping between barcodes and Sierra patron ids plus getting patron data from Sierra and Redshift using those ids

## Usage
```python
Expand All @@ -38,7 +38,7 @@ kinesis_client = KinesisClient(...)
# Do not use any version below 1.0.0
# All available optional dependencies can be found in pyproject.toml.
# See the "Managing dependencies" section below for more details.
nypl-py-utils[kinesis-client,config-helper]==1.5.0
nypl-py-utils[kinesis-client,config-helper]==1.x.y
```

## Developing locally
Expand All @@ -64,7 +64,7 @@ The optional dependency sets also give the developer the option to manually list
### Using PostgreSQLClient in an AWS Lambda
Because `psycopg` requires a statically linked version of the `libpq` library, the `PostgreSQLClient` cannot be installed as-is in an AWS Lambda function. Instead, it must be packaged as follows:
```bash
pip install --target ./package nypl-py-utils[postgresql-client]==1.5.0
pip install --target ./package nypl-py-utils[postgresql-client]==1.x.y

pip install \
--platform manylinux2014_x86_64 \
Expand Down
11 changes: 6 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "nypl_py_utils"
version = "1.5.0"
version = "1.6.0"
authors = [
{ name="Aaron Friedman", email="aaronfriedman@nypl.org" },
]
Expand Down Expand Up @@ -48,9 +48,6 @@ oauth2-api-client = [
postgresql-client = [
"psycopg[binary]>=3.1.6"
]
postgresql-pool-client = [
"psycopg[binary,pool]>=3.1.6"
]
redshift-client = [
"botocore>=1.29.5",
"redshift-connector>=2.0.909"
Expand All @@ -73,11 +70,15 @@ config-helper = [
obfuscation-helper = [
"bcrypt>=4.0.1"
]
patron-data-helper = [
"nypl_py_utils[postgresql-client,redshift-client]>=1.1.5",
"pandas>=2.2.2"
]
research-catalog-identifier-helper = [
"requests>=2.28.1"
]
development = [
"nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,postgresql-pool-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]",
"nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,obfuscation-helper,patron-data-helper,research-catalog-identifier-helper]",
"flake8>=6.0.0",
"freezegun>=1.2.2",
"mock>=4.0.3",
Expand Down
65 changes: 41 additions & 24 deletions src/nypl_py_utils/classes/mysql_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import mysql.connector
import time

from nypl_py_utils.functions.log_helper import create_log

Expand All @@ -15,35 +16,49 @@ def __init__(self, host, port, database, user, password):
self.user = user
self.password = password

def connect(self, **kwargs):
def connect(self, retry_count=0, backoff_factor=5, **kwargs):
"""
Connects to a MySQL database using the given credentials.

Keyword args can be passed into the connection to set certain options.
All possible arguments can be found here:
https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html.

Common arguments include:
autocommit: bool
Whether to automatically commit each query rather than running
them as part of a transaction. By default False.
Parameters
----------
retry_count: int, optional
The number of times to retry connecting before throwing an error.
By default no retry occurs.
backoff_factor: int, optional
The backoff factor when retrying. The amount of time to wait before
retrying is backoff_factor ** number_of_retries_made.
kwargs:
All possible arguments can be found here:
https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
"""
self.logger.info('Connecting to {} database'.format(self.database))
try:
self.conn = mysql.connector.connect(
host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password,
**kwargs)
except mysql.connector.Error as e:
self.logger.error(
'Error connecting to {name} database: {error}'.format(
name=self.database, error=e))
raise MySQLClientError(
'Error connecting to {name} database: {error}'.format(
name=self.database, error=e)) from None
attempt_count = 0
while attempt_count <= retry_count:
try:
try:
self.conn = mysql.connector.connect(
host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password,
**kwargs)
break
except (mysql.connector.Error):
if attempt_count < retry_count:
self.logger.info('Failed to connect -- retrying')
time.sleep(backoff_factor ** attempt_count)
attempt_count += 1
else:
raise
except Exception as e:
self.logger.error(
'Error connecting to {name} database: {error}'.format(
name=self.database, error=e))
raise MySQLClientError(
'Error connecting to {name} database: {error}'.format(
name=self.database, error=e)) from None

def execute_query(self, query, query_params=None, **kwargs):
"""
Expand Down Expand Up @@ -83,6 +98,8 @@ def execute_query(self, query, query_params=None, **kwargs):
return cursor.fetchall()
except Exception as e:
self.conn.rollback()
cursor.close()
self.close_connection()
self.logger.error(
('Error executing {name} database query \'{query}\': {error}')
.format(name=self.database, query=query, error=e))
Expand Down
93 changes: 58 additions & 35 deletions src/nypl_py_utils/classes/postgresql_client.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,59 @@
import psycopg
import time

from nypl_py_utils.functions.log_helper import create_log


class PostgreSQLClient:
"""Client for managing individual connections to a PostgreSQL database"""

def __init__(self, host, port, db_name, user, password):
def __init__(self, host, port, database, user, password):
self.logger = create_log('postgresql_client')
self.conn = None
self.conn_info = ('postgresql://{user}:{password}@{host}:{port}/'
'{db_name}').format(user=user, password=password,
host=host, port=port,
db_name=db_name)
'{database}').format(user=user, password=password,
host=host, port=port,
database=database)
self.database = database

self.db_name = db_name

def connect(self, **kwargs):
def connect(self, retry_count=0, backoff_factor=5, **kwargs):
"""
Connects to a PostgreSQL database using the given credentials.

Keyword args can be passed into the connection to set certain options.
All possible arguments can be found here:
https://www.psycopg.org/psycopg3/docs/api/connections.html#psycopg.Connection.connect.

Common arguments include:
autocommit: bool
Whether to automatically commit each query rather than running
them as part of a transaction. By default False.
row_factory: RowFactory
A psycopg RowFactory that determines how the data will be
returned. Defaults to tuple_row, which returns the rows as a
list of tuples.
Parameters
----------
retry_count: int, optional
The number of times to retry connecting before throwing an error.
By default no retry occurs.
backoff_factor: int, optional
The backoff factor when retrying. The amount of time to wait before
retrying is backoff_factor ** number_of_retries_made.
kwargs:
All possible arguments (such as the row_factory) can be found here:
https://www.psycopg.org/psycopg3/docs/api/connections.html#psycopg.Connection.connect
"""
self.logger.info('Connecting to {} database'.format(self.db_name))
try:
self.conn = psycopg.connect(self.conn_info, **kwargs)
except psycopg.Error as e:
self.logger.error(
'Error connecting to {name} database: {error}'.format(
name=self.db_name, error=e))
raise PostgreSQLClientError(
'Error connecting to {name} database: {error}'.format(
name=self.db_name, error=e)) from None
self.logger.info('Connecting to {} database'.format(self.database))
attempt_count = 0
while attempt_count <= retry_count:
try:
try:
self.conn = psycopg.connect(self.conn_info, **kwargs)
break
except (psycopg.OperationalError,
psycopg.errors.ConnectionTimeout):
if attempt_count < retry_count:
self.logger.info('Failed to connect -- retrying')
time.sleep(backoff_factor ** attempt_count)
attempt_count += 1
else:
raise
except Exception as e:
self.logger.error(
'Error connecting to {name} database: {error}'.format(
name=self.database, error=e))
raise PostgreSQLClientError(
'Error connecting to {name} database: {error}'.format(
name=self.database, error=e)) from None

def execute_query(self, query, query_params=None, **kwargs):
"""
Expand All @@ -53,7 +64,11 @@ def execute_query(self, query, query_params=None, **kwargs):
query: str
The query to execute
query_params: sequence, optional
The values to be used in a parameterized query
The values to be used in a parameterized query. The values can be
for a single insert query -- e.g. execute_query(
"INSERT INTO x VALUES (%s, %s)", (1, "a"))
or for multiple -- e.g execute_transaction(
"INSERT INTO x VALUES (%s, %s)", [(1, "a"), (2, "b")])
kwargs:
All possible arguments can be found here:
https://www.psycopg.org/psycopg3/docs/api/cursors.html#psycopg.Cursor.execute
Expand All @@ -65,30 +80,38 @@ def execute_query(self, query, query_params=None, **kwargs):
based on the connection's row_factory if there's something to
return (even if the result set is empty).
"""
self.logger.info('Querying {} database'.format(self.db_name))
self.logger.info('Querying {} database'.format(self.database))
self.logger.debug('Executing query {}'.format(query))
try:
cursor = self.conn.cursor()
cursor.execute(query, query_params, **kwargs)
if query_params is not None and all(
isinstance(param, tuple) or isinstance(param, list)
for param in query_params
):
cursor.executemany(query, query_params, **kwargs)
else:
cursor.execute(query, query_params, **kwargs)
self.conn.commit()
return None if cursor.description is None else cursor.fetchall()
except Exception as e:
self.conn.rollback()
cursor.close()
self.close_connection()
self.logger.error(
('Error executing {name} database query \'{query}\': '
'{error}').format(
name=self.db_name, query=query, error=e))
name=self.database, query=query, error=e))
raise PostgreSQLClientError(
('Error executing {name} database query \'{query}\': '
'{error}').format(
name=self.db_name, query=query, error=e)) from None
name=self.database, query=query, error=e)) from None
finally:
cursor.close()

def close_connection(self):
"""Closes the database connection"""
self.logger.debug('Closing {} database connection'.format(
self.db_name))
self.database))
self.conn.close()


Expand Down
Loading