diff --git a/hasura/metadata/actions.graphql b/hasura/metadata/actions.graphql index 2c5886c1ba5..40b55e955dd 100644 --- a/hasura/metadata/actions.graphql +++ b/hasura/metadata/actions.graphql @@ -725,8 +725,10 @@ input PgAuditOrderBy { input ElectoralLogFilter { id: String user_id: String - created: String - statement_timestamp: String + created_min: String + created_max: String + statement_timestamp_min: String + statement_timestamp_max: String statement_kind: String username: String } diff --git a/packages/Cargo.lock b/packages/Cargo.lock index d12c29c197d..5e6d82b722b 100644 --- a/packages/Cargo.lock +++ b/packages/Cargo.lock @@ -3227,6 +3227,7 @@ version = "0.1.0" dependencies = [ "anyhow", "borsh", + "chrono", "clap", "hex", "immudb-rs", diff --git a/packages/admin-portal/graphql.schema.json b/packages/admin-portal/graphql.schema.json index 55840f3c991..dd2227f5ba3 100644 --- a/packages/admin-portal/graphql.schema.json +++ b/packages/admin-portal/graphql.schema.json @@ -2336,7 +2336,19 @@ "fields": null, "inputFields": [ { - "name": "created", + "name": "created_max", + "description": null, + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + }, + "defaultValue": null, + "isDeprecated": false, + "deprecationReason": null + }, + { + "name": "created_min", "description": null, "type": { "kind": "SCALAR", @@ -2372,7 +2384,19 @@ "deprecationReason": null }, { - "name": "statement_timestamp", + "name": "statement_timestamp_max", + "description": null, + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + }, + "defaultValue": null, + "isDeprecated": false, + "deprecationReason": null + }, + { + "name": "statement_timestamp_min", "description": null, "type": { "kind": "SCALAR", diff --git a/packages/admin-portal/src/components/ElectoralLogList.tsx b/packages/admin-portal/src/components/ElectoralLogList.tsx index 3d7210910b6..a220426a1df 100644 --- a/packages/admin-portal/src/components/ElectoralLogList.tsx +++ b/packages/admin-portal/src/components/ElectoralLogList.tsx @@ -181,16 +181,30 @@ export const ElectoralLogList: React.FC = ({ label={String(t("logsScreen.column.username"))} />, (value ? new Date(value).toISOString() : value)} />, (value ? new Date(value).toISOString() : value)} + />, + (value ? new Date(value).toISOString() : value)} + />, + (value ? new Date(value).toISOString() : value)} />, @@ -264,10 +278,13 @@ export const ElectoralLogList: React.FC = ({ new Date(record.statement_timestamp * 1000).toLocaleString() } /> - + getHeadField(record, "event_type")} /> ; + created_max?: InputMaybe; + created_min?: InputMaybe; id?: InputMaybe; statement_kind?: InputMaybe; - statement_timestamp?: InputMaybe; + statement_timestamp_max?: InputMaybe; + statement_timestamp_min?: InputMaybe; user_id?: InputMaybe; username?: InputMaybe; }; diff --git a/packages/admin-portal/src/queries/ListElectoralLog.ts b/packages/admin-portal/src/queries/ListElectoralLog.ts index 4f601e69e92..a13aeebb5ca 100644 --- a/packages/admin-portal/src/queries/ListElectoralLog.ts +++ b/packages/admin-portal/src/queries/ListElectoralLog.ts @@ -3,7 +3,14 @@ // SPDX-License-Identifier: AGPL-3.0-only import {gql} from "@apollo/client" -const validOrderBy = ["id", "created", "statement_timestamp", "statement_kind", "user_id"] +const validOrderBy = [ + "id", + "created", + "statement_timestamp", + "statement_kind", + "user_id", + "username", +] export const getElectoralLogVariables = (input: any) => { return { diff --git a/packages/admin-portal/src/queries/customBuildQuery.ts b/packages/admin-portal/src/queries/customBuildQuery.ts index 702ae0beda8..e6491d17c9d 100644 --- a/packages/admin-portal/src/queries/customBuildQuery.ts +++ b/packages/admin-portal/src/queries/customBuildQuery.ts @@ -68,8 +68,10 @@ export const customBuildQuery = "election_event_id", "user_id", "username", - "created", - "statement_timestamp", + "created_min", + "created_max", + "statement_timestamp_min", + "statement_timestamp_max", "statement_kind", ] Object.keys(params.filter).forEach((f) => { @@ -83,11 +85,16 @@ export const customBuildQuery = name: resourceName, }, } + const builtVariables = buildVariables(introspectionResults)( + resource, + raFetchType, + params, + null + ) + const finalVariables = getElectoralLogVariables(builtVariables) return { query: getElectoralLog(params), - variables: getElectoralLogVariables( - buildVariables(introspectionResults)(resource, raFetchType, params, null) - ), + variables: finalVariables, parseResponse: (res: any) => { const response = res.data.listElectoralLog let output = { diff --git a/packages/admin-portal/src/translations/cat.ts b/packages/admin-portal/src/translations/cat.ts index 3008f53fa57..23790d07fa6 100644 --- a/packages/admin-portal/src/translations/cat.ts +++ b/packages/admin-portal/src/translations/cat.ts @@ -81,7 +81,11 @@ const catalanTranslation: TranslationType = { id: "ID", statement_kind: "Tipus de declaració", created: "Creat", + created_min: "Creat Mín", + created_max: "Creat Màx", statement_timestamp: "Marca de temps de declaració", + statement_timestamp_min: "Marca de temps de declaració Mín", + statement_timestamp_max: "Marca de temps de declaració Màx", message: "Missatge", user_id: "ID d'usuari", username: "Nom d'Usuari", diff --git a/packages/admin-portal/src/translations/en.ts b/packages/admin-portal/src/translations/en.ts index 58d4b7ce454..dbc83b2e1bf 100644 --- a/packages/admin-portal/src/translations/en.ts +++ b/packages/admin-portal/src/translations/en.ts @@ -27,7 +27,11 @@ const englishTranslation = { id: "Id", statement_kind: "Statement kind", created: "Created", + created_min: "Created Min", + created_max: "Created Max", statement_timestamp: "Statement Timestamp", + statement_timestamp_min: "Statement Timestamp Min", + statement_timestamp_max: "Statement Timestamp Max", message: "Message", user_id: "User Id", username: "Username", diff --git a/packages/admin-portal/src/translations/es.ts b/packages/admin-portal/src/translations/es.ts index bd30b7b73df..dc7753ed577 100644 --- a/packages/admin-portal/src/translations/es.ts +++ b/packages/admin-portal/src/translations/es.ts @@ -20,7 +20,11 @@ const spanishTranslation: TranslationType = { id: "ID", statement_kind: "Tipo de declaración", created: "Creado", + created_min: "Creado Mín", + created_max: "Creado Máx", statement_timestamp: "Marca de tiempo de declaración", + statement_timestamp_min: "Marca de tiempo de declaración Mín", + statement_timestamp_max: "Marca de tiempo de declaración Máx", message: "Mensaje", user_id: "ID de usuario", username: "Nombre de Usuario", diff --git a/packages/admin-portal/src/translations/eu.ts b/packages/admin-portal/src/translations/eu.ts index 47fea536097..317f4642537 100644 --- a/packages/admin-portal/src/translations/eu.ts +++ b/packages/admin-portal/src/translations/eu.ts @@ -28,7 +28,11 @@ const basqueTranslation: TranslationType = { id: "IDa", statement_kind: "Adierazpen mota", created: "Sortua", + created_min: "Sortua Min", + created_max: "Sortua Max", statement_timestamp: "Adierazpen denbora-marka", + statement_timestamp_min: "Adierazpen denbora-marka Min", + statement_timestamp_max: "Adierazpen denbora-marka Max", message: "Mezua", user_id: "Erabiltzaile IDa", username: "Erabiltzaile izena", diff --git a/packages/admin-portal/src/translations/fr.ts b/packages/admin-portal/src/translations/fr.ts index ae630c8d278..044aa4341fe 100644 --- a/packages/admin-portal/src/translations/fr.ts +++ b/packages/admin-portal/src/translations/fr.ts @@ -28,7 +28,11 @@ const frenchTranslation: TranslationType = { id: "ID", statement_kind: "Type de déclaration", created: "Créé", + created_min: "Créé Min", + created_max: "Créé Max", statement_timestamp: "Horodatage de déclaration", + statement_timestamp_min: "Horodatage de déclaration Min", + statement_timestamp_max: "Horodatage de déclaration Max", message: "Message", user_id: "ID utilisateur", username: "Nom d'Utilisateur", diff --git a/packages/admin-portal/src/translations/gl.ts b/packages/admin-portal/src/translations/gl.ts index 1a3d554bca8..fa2511a19e4 100644 --- a/packages/admin-portal/src/translations/gl.ts +++ b/packages/admin-portal/src/translations/gl.ts @@ -28,7 +28,11 @@ const galegoTranslation: TranslationType = { id: "ID", statement_kind: "Tipo de declaración", created: "Creado", + created_min: "Creado Mín", + created_max: "Creado Máx", statement_timestamp: "Marca de tiempo de declaración", + statement_timestamp_min: "Marca de tiempo de declaración Mín", + statement_timestamp_max: "Marca de tiempo de declaración Máx", message: "Mensaje", user_id: "ID de usuario", username: "Nombre de Usuario", diff --git a/packages/admin-portal/src/translations/nl.ts b/packages/admin-portal/src/translations/nl.ts index bfed26bbd50..48215e89ca8 100644 --- a/packages/admin-portal/src/translations/nl.ts +++ b/packages/admin-portal/src/translations/nl.ts @@ -27,7 +27,11 @@ const dutchTranslation: TranslationType = { id: "Id", statement_kind: "Soort verklaring", created: "Aangemaakt", + created_min: "Aangemaakt Min", + created_max: "Aangemaakt Max", statement_timestamp: "Tijdstempel verklaring", + statement_timestamp_min: "Tijdstempel verklaring Min", + statement_timestamp_max: "Tijdstempel verklaring Max", message: "Bericht", user_id: "Gebruikers-ID", username: "Gebruikersnaam", diff --git a/packages/admin-portal/src/translations/tl.ts b/packages/admin-portal/src/translations/tl.ts index 24b7f0cef96..83950caa64b 100644 --- a/packages/admin-portal/src/translations/tl.ts +++ b/packages/admin-portal/src/translations/tl.ts @@ -28,7 +28,11 @@ const tagalogTranslation: TranslationType = { id: "ID", statement_kind: "Uri ng Pahayag", created: "Nilikha", + created_min: "Nilikha Min", + created_max: "Nilikha Max", statement_timestamp: "Tatak ng Panahon ng Pahayag", + statement_timestamp_min: "Tatak ng Panahon ng Pahayag Min", + statement_timestamp_max: "Tatak ng Panahon ng Pahayag Max", message: "Mensahe", user_id: "ID ng User", username: "Username", diff --git a/packages/ballot-verifier/graphql.schema.json b/packages/ballot-verifier/graphql.schema.json index 55840f3c991..dd2227f5ba3 100644 --- a/packages/ballot-verifier/graphql.schema.json +++ b/packages/ballot-verifier/graphql.schema.json @@ -2336,7 +2336,19 @@ "fields": null, "inputFields": [ { - "name": "created", + "name": "created_max", + "description": null, + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + }, + "defaultValue": null, + "isDeprecated": false, + "deprecationReason": null + }, + { + "name": "created_min", "description": null, "type": { "kind": "SCALAR", @@ -2372,7 +2384,19 @@ "deprecationReason": null }, { - "name": "statement_timestamp", + "name": "statement_timestamp_max", + "description": null, + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + }, + "defaultValue": null, + "isDeprecated": false, + "deprecationReason": null + }, + { + "name": "statement_timestamp_min", "description": null, "type": { "kind": "SCALAR", diff --git a/packages/ballot-verifier/src/gql/graphql.ts b/packages/ballot-verifier/src/gql/graphql.ts index 520edd11a74..60462427eff 100644 --- a/packages/ballot-verifier/src/gql/graphql.ts +++ b/packages/ballot-verifier/src/gql/graphql.ts @@ -283,10 +283,12 @@ export type ElectionStatsOutput = { } export type ElectoralLogFilter = { - created?: InputMaybe + created_max?: InputMaybe + created_min?: InputMaybe id?: InputMaybe statement_kind?: InputMaybe - statement_timestamp?: InputMaybe + statement_timestamp_max?: InputMaybe + statement_timestamp_min?: InputMaybe user_id?: InputMaybe username?: InputMaybe } diff --git a/packages/electoral-log/Cargo.toml b/packages/electoral-log/Cargo.toml index 322bb12a1a4..d5442ff4362 100644 --- a/packages/electoral-log/Cargo.toml +++ b/packages/electoral-log/Cargo.toml @@ -29,6 +29,7 @@ tracing-log = "0.2" tracing-attributes = "0.1" tracing-subscriber = "0.3" tracing-tree = "0.4" +chrono = "0.4.41" [dev-dependencies] serial_test = "3.2" diff --git a/packages/electoral-log/src/client/board_client.rs b/packages/electoral-log/src/client/board_client.rs index 94a95313c3d..9e5be4471f6 100644 --- a/packages/electoral-log/src/client/board_client.rs +++ b/packages/electoral-log/src/client/board_client.rs @@ -2,218 +2,43 @@ // // SPDX-License-Identifier: AGPL-3.0-only -use crate::assign_value; +use crate::client::types::*; use anyhow::{anyhow, Context, Result}; -use immudb_rs::{sql_value::Value, Client, CommittedSqlTx, NamedParam, Row, SqlValue, TxMode}; -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; +use immudb_rs::{sql_value::Value, Client, CommittedSqlTx, NamedParam, SqlValue, TxMode}; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Display; -use strum_macros::Display; +use std::time::Instant; use tokio_stream::StreamExt; // Added for streaming use tracing::{error, info, instrument, warn}; const IMMUDB_DEFAULT_LIMIT: usize = 900; -const IMMUDB_DEFAULT_ENTRIES_TX_LIMIT: usize = 50; const IMMUDB_DEFAULT_OFFSET: usize = 0; -const ELECTORAL_LOG_TABLE: &'static str = "electoral_log_messages"; +const ELECTORAL_LOG_TABLE: &str = "electoral_log_messages"; /// 36 chars + EOL + some padding const ID_VARCHAR_LENGTH: usize = 40; +const ID_KEY_VARCHAR_LENGTH: usize = 4; /// Longest possible statement kind must be < 40 const STATEMENT_KIND_VARCHAR_LENGTH: usize = 40; /// 64 chars + EOL + some padding const BALLOT_ID_VARCHAR_LENGTH: usize = 70; +/// This is the order of the cols in the where clauses, as defined in ElectoralLogVarCharColumn: +/// StatementKind, AreaId, ElectionId, UserId, BallotId, statement_timestamp. +/// +/// Other columns that have no length constraint are not indexable. +/// 'create' is not indexed, we use statement_timestamp intead. +pub const MULTI_COLUMN_INDEXES: [&str; 3] = [ + "(statement_kind, election_id, user_id_key, user_id, ballot_id)", // COUNT or SELECT cast_vote_messages and filter by ballot_id + "(statement_kind, user_id_key, user_id)", // Filters in Admin portal LOGS tab. + "(user_id_key, user_id)", // Filters in Admin portal LOGS tab and for the User´s logs. +]; + #[derive(Debug)] pub struct BoardClient { client: Client, } -#[derive(Debug, Clone, Display, PartialEq, Eq, Ord, PartialOrd)] -#[strum(serialize_all = "snake_case")] -pub enum ElectoralLogVarCharColumn { - StatementKind, - UserId, - BallotId, - Username, - SenderPk, - ElectionId, - AreaId, - Version, -} - -/// SQL comparison operators supported by immudb. -/// ILIKE is not supported. -#[derive(Display, Debug, Clone)] -pub enum SqlCompOperators { - #[strum(to_string = "=")] - Equal, - #[strum(to_string = "!=")] - NotEqual, - #[strum(to_string = ">")] - GreaterThan, - #[strum(to_string = "<")] - LessThan, - #[strum(to_string = ">=")] - GreaterThanOrEqual, - #[strum(to_string = "<=")] - LessThanOrEqual, - #[strum(to_string = "LIKE")] - Like, - #[strum(to_string = "IN")] - In, - #[strum(to_string = "NOT IN")] - NotIn, -} - -pub type WhereClauseBTreeMap = BTreeMap; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct ElectoralLogMessage { - pub id: i64, - pub created: i64, - pub sender_pk: String, - pub statement_timestamp: i64, - pub statement_kind: String, - pub message: Vec, - pub version: String, - pub user_id: Option, - pub username: Option, - pub election_id: Option, - pub area_id: Option, - pub ballot_id: Option, -} - -impl TryFrom<&Row> for ElectoralLogMessage { - type Error = anyhow::Error; - - fn try_from(row: &Row) -> Result { - let mut id = 0; - let mut created = 0; - let mut sender_pk = String::from(""); - let mut statement_timestamp = 0; - let mut statement_kind = String::from(""); - let mut message = vec![]; - let mut version = String::from(""); - let mut user_id: Option = None; - let mut username: Option = None; - let mut election_id: Option = None; - let mut area_id: Option = None; - let mut ballot_id: Option = None; - - for (column, value) in row.columns.iter().zip(row.values.iter()) { - // FIXME for some reason columns names appear with parentheses - let dot = column - .find('.') - .ok_or(anyhow!("invalid column found '{}'", column.as_str()))?; - let bare_column = &column[dot + 1..column.len() - 1]; - - match bare_column { - "id" => assign_value!(Value::N, value, id), - "created" => assign_value!(Value::Ts, value, created), - "sender_pk" => assign_value!(Value::S, value, sender_pk), - "statement_timestamp" => { - assign_value!(Value::Ts, value, statement_timestamp) - } - "statement_kind" => assign_value!(Value::S, value, statement_kind), - "message" => assign_value!(Value::Bs, value, message), - "version" => assign_value!(Value::S, value, version), - "user_id" => match value.value.as_ref() { - Some(Value::S(inner)) => user_id = Some(inner.clone()), - Some(Value::Null(_)) => user_id = None, - None => user_id = None, - _ => { - return Err(anyhow!( - "invalid column value for 'user_id': {:?}", - value.value.as_ref() - )) - } - }, - "username" => match value.value.as_ref() { - Some(Value::S(inner)) => username = Some(inner.clone()), - Some(Value::Null(_)) => username = None, - None => username = None, - _ => { - return Err(anyhow!( - "invalid column value for 'username': {:?}", - value.value.as_ref() - )) - } - }, - "election_id" => match value.value.as_ref() { - Some(Value::S(inner)) => election_id = Some(inner.clone()), - Some(Value::Null(_)) => election_id = None, - None => election_id = None, - _ => { - return Err(anyhow!( - "invalid column value for 'election_id': {:?}", - value.value.as_ref() - )) - } - }, - "area_id" => match value.value.as_ref() { - Some(Value::S(inner)) => area_id = Some(inner.clone()), - Some(Value::Null(_)) => area_id = None, - None => area_id = None, - _ => { - return Err(anyhow!( - "invalid column value for 'area_id': {:?}", - value.value.as_ref() - )) - } - }, - "ballot_id" => match value.value.as_ref() { - Some(Value::S(inner)) => ballot_id = Some(inner.clone()), - Some(Value::Null(_)) => ballot_id = None, - None => ballot_id = None, - _ => { - return Err(anyhow!( - "invalid column value for 'ballod_id': {:?}", - value.value.as_ref() - )) - } - }, - _ => return Err(anyhow!("invalid column found '{}'", bare_column)), - } - } - - Ok(ElectoralLogMessage { - id, - created, - sender_pk, - statement_timestamp, - statement_kind, - message, - version, - user_id, - username, - election_id, - area_id, - ballot_id, - }) - } -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct Aggregate { - pub count: i64, -} - -impl TryFrom<&Row> for Aggregate { - type Error = anyhow::Error; - - fn try_from(row: &Row) -> Result { - let mut count = 0; - - for (column, value) in row.columns.iter().zip(row.values.iter()) { - match column.as_str() { - _ => assign_value!(Value::N, value, count), - } - } - Ok(Aggregate { count }) - } -} - impl BoardClient { #[instrument(skip(password), level = "trace")] pub async fn new(server_url: &str, username: &str, password: &str) -> Result { @@ -223,100 +48,24 @@ impl BoardClient { Ok(BoardClient { client: client }) } - /// Get all electoral log messages whose id is bigger than `last_id` - pub async fn get_electoral_log_messages( - &mut self, - board_db: &str, - ) -> Result> { - let mut offset: usize = 0; - let mut last_batch = self - .get_electoral_log_messages_from_db( - board_db, - 0, - Some(IMMUDB_DEFAULT_LIMIT), - Some(offset), - ) - .await?; - let mut messages = last_batch.clone(); - while IMMUDB_DEFAULT_LIMIT == last_batch.len() { - offset += last_batch.len(); - last_batch = self - .get_electoral_log_messages_from_db( - board_db, - 0, - Some(IMMUDB_DEFAULT_LIMIT), - Some(offset), - ) - .await?; - messages.extend(last_batch.clone()); - } - Ok(messages) - } - - async fn get_electoral_log_messages_from_db( - &mut self, - board_db: &str, - last_id: i64, - limit: Option, - offset: Option, - ) -> Result> { - self.client.use_database(board_db).await?; - let sql = format!( - r#" - SELECT - id, - created, - sender_pk, - statement_timestamp, - statement_kind, - message, - version, - user_id, - area_id, - ballot_id, - username - FROM {} - WHERE id > @last_id - ORDER BY id - LIMIT {} - OFFSET {}; - "#, - ELECTORAL_LOG_TABLE, - limit.unwrap_or(IMMUDB_DEFAULT_LIMIT), - offset.unwrap_or(IMMUDB_DEFAULT_OFFSET), - ); - - let params = vec![NamedParam { - name: String::from("last_id"), - value: Some(SqlValue { - value: Some(Value::N(last_id)), - }), - }]; - - let sql_query_response = self.client.sql_query(&sql, params).await?; - let messages = sql_query_response - .get_ref() - .rows - .iter() - .map(ElectoralLogMessage::try_from) - .collect::>>()?; - - Ok(messages) - } - /// columns_matcher represents the columns that will be used to filter the messages, /// The order as defined ElectoralLogVarCharColumn is important for preformance to match the indexes. /// BTreeMap ensures the order is preserved no matter the insertion sequence. - pub async fn get_electoral_log_messages_filtered( + #[instrument(skip_all, err)] + pub async fn get_electoral_log_messages_filtered( &mut self, board_db: &str, - columns_matcher: Option, + columns_matcher: Option, min_ts: Option, max_ts: Option, limit: Option, offset: Option, order_by: Option>, - ) -> Result> { + ) -> Result> + where + K: Debug + Display, + V: Debug + Display, + { self.get_filtered( board_db, columns_matcher, @@ -329,43 +78,62 @@ impl BoardClient { .await } - #[instrument(skip_all, err)] - async fn get_filtered( + /// Returns a batch of electoral log messages at a given row offset, ordered by id. + /// Prefer `get_electoral_log_messages_batch` (cursor-based) when iterating sequentially. + /// This offset-based variant exists for parallel batch processing where offsets are + /// pre-computed and batches run concurrently. + #[instrument(skip(self), err)] + pub async fn get_electoral_log_messages_at_offset( &mut self, board_db: &str, - columns_matcher: Option, + limit: i64, + offset: i64, + ) -> Result> { + self.get_filtered::( + board_db, + None, + None, + None, + Some(limit), + Some(offset), + None, + ) + .await + } + + #[instrument(skip(self, board_db, order_by), err)] + async fn get_filtered( + &mut self, + board_db: &str, + columns_matcher: Option, min_ts: Option, max_ts: Option, limit: Option, offset: Option, order_by: Option>, - ) -> Result> { + ) -> Result> + where + K: Debug + Display, + V: Debug + Display, + { + let start = Instant::now(); + let (where_clause, mut params) = columns_matcher + .clone() + .unwrap_or_default() + .to_where_clause(); + // Min and max clauses will go in the end of where_clause let (min_clause, min_clause_value) = if let Some(min_ts) = min_ts { - ("AND created >= @min_ts", min_ts) + ("statement_timestamp >= @min_ts", Some(min_ts)) } else { - ("", 0) + ("", None) }; let (max_clause, max_clause_value) = if let Some(max_ts) = max_ts { - ("AND created <= @max_ts", max_ts) + ("statement_timestamp <= @max_ts", Some(max_ts)) } else { - ("", 0) + ("", None) }; - let mut params = vec![]; - let mut where_clause = String::from("statement_kind IS NOT NULL "); - if let Some(columns_matcher) = &columns_matcher { - for (key, (op, value)) in columns_matcher { - where_clause.push_str(&format!("AND {key} {op} @{key} ")); - params.push(NamedParam { - name: key.to_string(), - value: Some(SqlValue { - value: Some(Value::S(value.to_owned())), - }), - }) - } - } - let order_by_clauses = if let Some(order_by) = order_by { order_by .iter() @@ -376,33 +144,7 @@ impl BoardClient { format!("ORDER BY id desc") }; - self.client.use_database(board_db).await?; - let sql = format!( - r#" - SELECT - id, - username, - user_id, - area_id, - election_id, - ballot_id, - created, - sender_pk, - statement_timestamp, - statement_kind, - message, - version - FROM {ELECTORAL_LOG_TABLE} - WHERE {where_clause} - {min_clause} - {max_clause} - {order_by_clauses} - LIMIT @limit - OFFSET @offset; - "# - ); - - if min_clause_value != 0 { + if let Some(min_clause_value) = min_clause_value { params.push(NamedParam { name: String::from("min_ts"), value: Some(SqlValue { @@ -410,7 +152,7 @@ impl BoardClient { }), }) } - if max_clause_value != 0 { + if let Some(max_clause_value) = max_clause_value { params.push(NamedParam { name: String::from("max_ts"), value: Some(SqlValue { @@ -433,6 +175,65 @@ impl BoardClient { }), }); + let mut where_clauses = where_clause.clone(); + match (min_clause.is_empty(), where_clauses.is_empty()) { + (true, _) => {} + (false, true) => { + where_clauses.push_str(min_clause); + } + (false, false) => { + // where_clauses is not empty, put the AND + where_clauses.push_str(&format!(" AND {min_clause}")); + } + }; + match (max_clause.is_empty(), where_clauses.is_empty()) { + (true, _) => {} + (false, true) => { + where_clauses.push_str(max_clause); + } + (false, false) => { + // where_clauses is not empty, put the AND + where_clauses.push_str(&format!(" AND {max_clause}")); + } + }; + if !where_clauses.is_empty() { + where_clauses = format!( + r#" + WHERE {where_clauses} + "# + ); + } + + let use_index_clause = columns_matcher + .unwrap_or_default() + .to_use_index_clause() + .unwrap_or_default(); + + self.client.use_database(board_db).await?; + let sql = format!( + r#" + SELECT + id, + username, + user_id, + area_id, + election_id, + ballot_id, + created, + sender_pk, + statement_timestamp, + statement_kind, + message, + version + FROM {ELECTORAL_LOG_TABLE} + {use_index_clause} + {where_clauses} + {order_by_clauses} + LIMIT @limit + OFFSET @offset; + "# + ); + info!("SQL query: {}", sql); let response_stream = self.client.streaming_sql_query(&sql, params) .await @@ -470,104 +271,12 @@ impl BoardClient { } } } - - Ok(messages) - } - - #[instrument(err)] - pub async fn count_electoral_log_messages( - &mut self, - board_db: &str, - columns_matcher: Option, - ) -> Result { - let mut params = vec![]; - let mut where_clause = String::from("statement_kind IS NOT NULL "); - if let Some(columns_matcher) = &columns_matcher { - for (key, (op, value)) in columns_matcher { - where_clause.push_str(&format!("AND {key} {op} @{key} ")); - params.push(NamedParam { - name: key.to_string(), - value: Some(SqlValue { - value: Some(Value::S(value.to_owned())), - }), - }) - } - } - - self.client.use_database(board_db).await?; - let sql = format!( - r#" - SELECT COUNT(*) - FROM {ELECTORAL_LOG_TABLE} - WHERE {where_clause} - "#, - ); - - info!("SQL query: {}", sql); - let sql_query_response = self.client.sql_query(&sql, params).await?; - let mut rows_iter = sql_query_response - .get_ref() - .rows - .iter() - .map(Aggregate::try_from); - let aggregate = rows_iter - .next() - .ok_or_else(|| anyhow!("No aggregate found"))??; - - Ok(aggregate.count as i64) - } - - /// Returns a batch of electoral log messages at a given row offset, ordered by id. - /// Prefer `get_electoral_log_messages_batch` (cursor-based) when iterating sequentially. - /// This offset-based variant exists for parallel batch processing where offsets are - /// pre-computed and batches run concurrently. - #[instrument(skip(self), err)] - pub async fn get_electoral_log_messages_at_offset( - &mut self, - board_db: &str, - limit: i64, - offset: i64, - ) -> Result> { - self.client.use_database(board_db).await?; - let sql = format!( - r#" - SELECT - id, - created, - sender_pk, - statement_timestamp, - statement_kind, - message, - version, - user_id, - username - FROM {ELECTORAL_LOG_TABLE} - ORDER BY id - LIMIT {limit} - OFFSET {offset}; - "# + let duration = start.elapsed(); + info!( + "Processed {} rows from stream in {}ms", + total_rows_fetched, + duration.as_millis() ); - let response_stream = self.client.streaming_sql_query(&sql, vec![]).await?; - let mut stream = response_stream.into_inner(); - let mut messages: Vec = vec![]; - while let Some(batch_result) = stream.next().await { - match batch_result { - Ok(batch) => { - for row in &batch.rows { - match ElectoralLogMessage::try_from(row) { - Ok(msg) => messages.push(msg), - Err(e) => { - warn!("Failed to parse row: {e}"); - } - } - } - } - Err(e) => { - error!("Error receiving batch from stream: {e}"); - break; - } - } - } Ok(messages) } @@ -625,6 +334,96 @@ impl BoardClient { Ok(messages) } + #[instrument(skip(self, board_db), err)] + pub async fn count_electoral_log_messages( + &mut self, + board_db: &str, + columns_matcher: Option, + ) -> Result { + let start = Instant::now(); + let (where_clause, params) = columns_matcher + .clone() + .unwrap_or_default() + .to_where_clause(); + let where_clauses = if !where_clause.is_empty() { + format!( + r#" + WHERE {where_clause} + "# + ) + } else { + String::from("") + }; + let use_index_clause = columns_matcher + .unwrap_or_default() + .to_use_index_clause() + .unwrap_or_default(); + self.client.use_database(board_db).await?; + + let count = if use_index_clause.is_empty() && where_clauses.is_empty() && params.is_empty() + { + // if there are no constraints, just get the last id as the count to avoid a full scan of the table. + let sql = format!( + r#" + SELECT + id, + username, + user_id, + area_id, + election_id, + ballot_id, + created, + sender_pk, + statement_timestamp, + statement_kind, + message, + version + FROM {ELECTORAL_LOG_TABLE} + ORDER BY id desc + LIMIT 1 + OFFSET 0; + "# + ); + info!("SQL query: {}", sql); + let sql_query_response = self.client.sql_query(&sql, vec![]).await?; + let elog_msg = sql_query_response + .get_ref() + .rows + .iter() + .map(ElectoralLogMessage::try_from) + .next(); + match elog_msg { + Some(elog_msg) => elog_msg?.id, + None => 0, + } + } else { + let sql = format!( + r#" + SELECT COUNT(*) + FROM {ELECTORAL_LOG_TABLE} + {use_index_clause} + {where_clauses} + "#, + ); + + info!("SQL query: {}", sql); + let sql_query_response = self.client.sql_query(&sql, params).await?; + let mut rows_iter = sql_query_response + .get_ref() + .rows + .iter() + .map(Aggregate::try_from); + let aggregate = rows_iter + .next() + .ok_or_else(|| anyhow!("No aggregate found"))??; + aggregate.count + }; + + let duration = start.elapsed(); + info!("COUNT query took {}ms", duration.as_millis()); + Ok(count as i64) + } + pub async fn open_session(&mut self, database_name: &str) -> Result<()> { self.client.open_session(database_name).await } @@ -641,7 +440,8 @@ impl BoardClient { self.client.commit(transaction_id).await } - // Insert messages in batch using an existing session/transaction + /// Insert messages in batch using an existing session/transaction + #[instrument(skip(self, messages), err)] pub async fn insert_electoral_log_messages_batch( &mut self, transaction_id: &String, @@ -659,6 +459,7 @@ impl BoardClient { statement_timestamp, message, version, + user_id_key, user_id, username, election_id, @@ -671,6 +472,7 @@ impl BoardClient { @statement_timestamp, @message, @version, + @user_id_key, @user_id, @username, @election_id, @@ -717,6 +519,15 @@ impl BoardClient { value: Some(Value::S(message.version.clone())), }), }, + NamedParam { + name: String::from("user_id_key"), + value: Some(SqlValue { + value: match message.user_id.clone() { + Some(user_id) => Some(Value::S(user_id.chars().take(3).collect())), + None => None, + }, + }), + }, NamedParam { name: String::from("user_id"), value: Some(SqlValue { @@ -798,6 +609,7 @@ impl BoardClient { statement_timestamp, message, version, + user_id_key, user_id, username, election_id, @@ -810,6 +622,7 @@ impl BoardClient { @statement_timestamp, @message, @version, + @user_id_key, @user_id, @username, @election_id, @@ -856,6 +669,15 @@ impl BoardClient { value: Some(Value::S(message.version.clone())), }), }, + NamedParam { + name: String::from("user_id_key"), + value: Some(SqlValue { + value: match message.user_id.clone() { + Some(user_id) => Some(Value::S(user_id.chars().take(3).collect())), + None => None, + }, + }), + }, NamedParam { name: String::from("user_id"), value: Some(SqlValue { @@ -944,6 +766,7 @@ impl BoardClient { statement_kind VARCHAR[{STATEMENT_KIND_VARCHAR_LENGTH}], message BLOB, version VARCHAR, + user_id_key VARCHAR[{ID_KEY_VARCHAR_LENGTH}], user_id VARCHAR[{ID_VARCHAR_LENGTH}], username VARCHAR, election_id VARCHAR[{ID_VARCHAR_LENGTH}], @@ -954,18 +777,12 @@ impl BoardClient { "# ); - // This is the order of the cols in the where clauses, as defined in ElectoralLogVarCharColumn - // Note Username cannot be indexed because it is not constrained to 512B, but is not needded since we have user_id - // StatementKind, UserId, BallotId, Username, SenderPk, ElectionId, AreaId, Version, - let elog_indexes = vec![ - format!("CREATE INDEX IF NOT EXISTS ON {ELECTORAL_LOG_TABLE} (statement_kind, user_id, ballot_id, election_id, id)"), // To list or count cast_vote_messages and Order by id - format!("CREATE INDEX IF NOT EXISTS ON {ELECTORAL_LOG_TABLE} (statement_kind, user_id, ballot_id, election_id, statement_timestamp)"), // Order by statement_timestamp - format!("CREATE INDEX IF NOT EXISTS ON {ELECTORAL_LOG_TABLE} (statement_kind, election_id, id)"), // Order by id - format!("CREATE INDEX IF NOT EXISTS ON {ELECTORAL_LOG_TABLE} (statement_kind, election_id, statement_timestamp)"), // Order by statement_timestamp - format!("CREATE INDEX IF NOT EXISTS ON {ELECTORAL_LOG_TABLE} (user_id, election_id, area_id, id)"), // Other posible filters... - format!("CREATE INDEX IF NOT EXISTS ON {ELECTORAL_LOG_TABLE} (election_id, area_id, id)"), - format!("CREATE INDEX IF NOT EXISTS ON {ELECTORAL_LOG_TABLE} (area_id, id)"), - ]; + let mut elog_indexes = vec![]; + for mult_col_idx in MULTI_COLUMN_INDEXES { + elog_indexes.push(format!( + "CREATE INDEX IF NOT EXISTS ON {ELECTORAL_LOG_TABLE} {mult_col_idx}" + )); + } self.upsert_database(board_dbname, &sql, elog_indexes.as_slice()) .await @@ -1059,17 +876,20 @@ pub(crate) mod tests { .await .unwrap(); - let ret = b.get_electoral_log_messages(BOARD_DB).await.unwrap(); + let ret = b + .get_electoral_log_messages_batch(BOARD_DB, 100, 0) + .await + .unwrap(); assert_eq!(messages, ret); - let cols_match = BTreeMap::from([ + let cols_match = WhereClauseOrdMap::from(&[ ( ElectoralLogVarCharColumn::StatementKind, - (SqlCompOperators::Equal, "".to_string()), + (SqlCompOperators::Equal("".to_string())), ), ( ElectoralLogVarCharColumn::SenderPk, - (SqlCompOperators::Equal, "".to_string()), + (SqlCompOperators::Equal("".to_string())), ), ]); let ret = b diff --git a/packages/electoral-log/src/client/mod.rs b/packages/electoral-log/src/client/mod.rs index c13707ec34d..52900761caa 100644 --- a/packages/electoral-log/src/client/mod.rs +++ b/packages/electoral-log/src/client/mod.rs @@ -2,3 +2,4 @@ // // SPDX-License-Identifier: AGPL-3.0-only pub mod board_client; +pub mod types; diff --git a/packages/electoral-log/src/client/types.rs b/packages/electoral-log/src/client/types.rs new file mode 100644 index 00000000000..654810bc527 --- /dev/null +++ b/packages/electoral-log/src/client/types.rs @@ -0,0 +1,511 @@ +// SPDX-FileCopyrightText: 2024 Sequent Tech +// +// SPDX-License-Identifier: AGPL-3.0-only + +use crate::assign_value; +use crate::client::board_client::MULTI_COLUMN_INDEXES; +use crate::messages::statement::{StatementBody, StatementType}; +use anyhow::{anyhow, Result}; +use chrono::{DateTime, Utc}; +use immudb_rs::{sql_value::Value, NamedParam, Row, SqlValue}; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::fmt::Debug; +use std::str::FromStr; +use strum_macros::{Display, EnumString}; +use tracing::{error, info, instrument, warn}; + +#[derive(Debug, Clone, Display, PartialEq, Eq, Ord, PartialOrd, EnumString)] +#[strum(serialize_all = "snake_case")] +pub enum ElectoralLogVarCharColumn { + StatementKind, + AreaId, + ElectionId, + UserIdKey, + UserId, + BallotId, + Username, + SenderPk, + Version, +} + +/// SQL comparison operators supported by immudb. +/// ILIKE is not supported. +#[derive(Display, Debug, Clone)] +pub enum SqlCompOperators { + #[strum(to_string = "=")] + Equal(String), + #[strum(to_string = "!=")] + NotEqual(String), + #[strum(to_string = ">")] + GreaterThan(String), + #[strum(to_string = "<")] + LessThan(String), + #[strum(to_string = ">=")] + GreaterThanOrEqual(String), + #[strum(to_string = "<=")] + LessThanOrEqual(String), + #[strum(to_string = "LIKE")] + Like(String), + #[strum(to_string = "IN")] + In(Vec), + #[strum(to_string = "NOT IN")] + NotIn(Vec), +} + +/// Each column in the map is unique but it can have several filters associated with it. +/// The type will keep the order of the columns to match the multicolumn indexes. +#[derive(Debug, Clone, Default)] +pub struct WhereClauseOrdMap(BTreeMap>); + +impl WhereClauseOrdMap { + pub fn new() -> Self { + Self(BTreeMap::new()) + } + + pub fn from(tuples: &[(ElectoralLogVarCharColumn, SqlCompOperators)]) -> Self { + let mut map = WhereClauseOrdMap::new(); + for (key, value) in tuples { + map.insert(key.clone(), value.clone()); + } + map + } + + /// If the column already exists, the comparisson will be added to the existing ones. + /// Otherwise it will create the first one. + pub fn insert(&mut self, key: ElectoralLogVarCharColumn, value: SqlCompOperators) { + self.0 + .entry(key) + .and_modify(|vec| vec.push(value.clone())) + .or_insert(vec![value]); + } + + pub fn iter( + &self, + ) -> std::collections::btree_map::Iter> { + self.0.iter() + } + + /// USE INDEX ON clause for multicolumn indexes. + /// Where clause is longer than the index: the last index matched will be used. + /// Where clause is shorter than the index: No index will be used because it causes errors in unmmudb. + /// Will return the longest possible index to use or None. + pub fn to_use_index_clause(&self) -> Option { + let mut try_index_clause = String::from(""); + let mut last_index_clause_match = None; + for (col_name, _) in self.iter() { + if try_index_clause.is_empty() { + try_index_clause.push_str(&format!("({col_name}")); + } else { + try_index_clause.push_str(&format!(", {col_name}")); + } + for index in MULTI_COLUMN_INDEXES { + let try_index_clause_closed = format!("{try_index_clause})"); + if index.eq(&try_index_clause_closed) { + last_index_clause_match = Some(format!("USE INDEX ON {index}")); + } + } + } + last_index_clause_match + } + + pub fn to_where_clause(&self) -> (String, Vec) { + let mut params = vec![]; + let mut where_clause = String::from(""); + for (col_name, comparissons) in self.iter() { + for (i, op) in comparissons.iter().enumerate() { + match op { + SqlCompOperators::In(values_vec) | SqlCompOperators::NotIn(values_vec) => { + let placeholders: Vec = values_vec + .iter() + .enumerate() + .map(|(j, _)| format!("@param_{col_name}{i}{j}")) + .collect(); + for (j, value) in values_vec.into_iter().enumerate() { + params.push(NamedParam { + name: format!("param_{col_name}{i}{j}"), + value: Some(SqlValue { + value: Some(Value::S(value.to_owned())), + }), + }); + } + if where_clause.is_empty() { + where_clause.push_str(&format!( + "{col_name} {op} ({})", + placeholders.join(", ") + )); + } else { + where_clause.push_str(&format!( + "AND {col_name} {op} ({})", + placeholders.join(", ") + )); + } + } + SqlCompOperators::Equal(value) + | SqlCompOperators::NotEqual(value) + | SqlCompOperators::GreaterThan(value) + | SqlCompOperators::LessThan(value) + | SqlCompOperators::GreaterThanOrEqual(value) + | SqlCompOperators::LessThanOrEqual(value) + | SqlCompOperators::Like(value) => { + if where_clause.is_empty() { + where_clause + .push_str(&format!("{col_name} {op} @param_{col_name}{i} ")); + } else { + where_clause + .push_str(&format!("AND {col_name} {op} @param_{col_name}{i} ")); + } + params.push(NamedParam { + name: format!("param_{col_name}{i}"), + value: Some(SqlValue { + value: Some(Value::S(value.to_owned())), + }), + }); + } + } + } + } + (where_clause, params) + } +} + +// Enumeration for the valid fields in the immudb table +#[derive(Debug, Deserialize, Hash, PartialEq, Eq, EnumString, Display, Clone)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum OrderField { + Id, + Created, + StatementTimestamp, + StatementKind, + Message, + UserId, + Username, + BallotId, + SenderPk, + LogType, + EventType, + Description, + Version, +} + +// Enumeration for the valid order directions +#[derive(Debug, Deserialize, EnumString, Display, Clone)] +#[serde(rename_all = "lowercase")] +#[strum(serialize_all = "lowercase")] +pub enum OrderDirection { + Asc, + Desc, +} + +// Enumeration for the valid filter fields with min/max support for timestamps +#[derive(Debug, Deserialize, Hash, PartialEq, Eq, EnumString, Display, Clone)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum FilterField { + Id, + CreatedMin, + CreatedMax, + StatementTimestampMin, + StatementTimestampMax, + StatementKind, + Message, + UserId, + Username, + BallotId, + SenderPk, + LogType, + EventType, + Description, + Version, +} + +#[derive(Deserialize, Debug, Default, Clone)] +pub struct GetElectoralLogBody { + pub tenant_id: String, + pub election_event_id: String, + pub limit: Option, + pub offset: Option, + pub filter: Option>, + pub order_by: Option>, + pub election_id: Option, + pub area_ids: Option>, + pub only_with_user: Option, + pub statement_kind: Option, +} + +impl GetElectoralLogBody { + #[instrument(skip_all)] + pub fn get_min_max_ts(&self) -> Result<(Option, Option)> { + let mut min_ts: Option = None; + let mut max_ts: Option = None; + if let Some(filters_map) = &self.filter { + for (field, value) in filters_map.iter() { + match field { + FilterField::CreatedMin | FilterField::StatementTimestampMin => { + let date_time_utc = DateTime::parse_from_rfc3339(&value) + .map_err(|err| anyhow!("Error parsing timestamp: {err:?}"))?; + let datetime = date_time_utc.with_timezone(&Utc); + let ts: i64 = datetime.timestamp(); + min_ts = Some(ts); + } + FilterField::CreatedMax | FilterField::StatementTimestampMax => { + let date_time_utc = DateTime::parse_from_rfc3339(&value) + .map_err(|err| anyhow!("Error parsing timestamp: {err:?}"))?; + let datetime = date_time_utc.with_timezone(&Utc); + let ts: i64 = datetime.timestamp(); + max_ts = Some(ts); + } + _ => {} + } + } + } + + Ok((min_ts, max_ts)) + } + + #[instrument(skip_all)] + pub fn as_where_clause_map(&self) -> Result { + let mut cols_match_select = WhereClauseOrdMap::new(); + if let Some(filters_map) = &self.filter { + for (field, value) in filters_map.iter() { + match field { + FilterField::Id => {} // Why would someone filter the electoral log by id? + FilterField::SenderPk | FilterField::Username | FilterField::BallotId | FilterField::StatementKind | FilterField::Version => { // sql VARCHAR type + let variant = ElectoralLogVarCharColumn::from_str(field.to_string().as_str()).map_err(|_| anyhow!("Field not found"))?; + cols_match_select.insert( + variant, + SqlCompOperators::Equal(value.clone()), // Using 'Like' here would not scale for millions of entries, causing no response from immudb is some cases. + ); + } + FilterField::UserId => { + // insert user_id_mod + cols_match_select.insert( + ElectoralLogVarCharColumn::UserIdKey, + SqlCompOperators::Equal(value.clone().chars().take(3).collect()), + ); + let variant = ElectoralLogVarCharColumn::from_str(field.to_string().as_str()).map_err(|_| anyhow!("Field not found"))?; + cols_match_select.insert( + variant, + SqlCompOperators::Equal(value.clone()), + ); + } + FilterField::StatementTimestampMin | FilterField::StatementTimestampMax + | FilterField::CreatedMin | FilterField::CreatedMax => {} // handled by `get_min_max_ts` + FilterField::EventType | FilterField::LogType | FilterField::Description // these have no column but are inside of Message + | FilterField::Message => {} // Message column is sql BLOB type and it´s encrypted so we can't filter it without expensive operations + } + } + } + if let Some(election_id) = &self.election_id { + if !election_id.is_empty() { + cols_match_select.insert( + ElectoralLogVarCharColumn::ElectionId, + SqlCompOperators::Equal(election_id.clone()), + ); + } + } + + if let Some(area_ids) = &self.area_ids { + if !area_ids.is_empty() { + cols_match_select.insert( + ElectoralLogVarCharColumn::AreaId, + SqlCompOperators::In(area_ids.clone()), + ); + } + } + + if let Some(statement_kind) = &self.statement_kind { + cols_match_select.insert( + ElectoralLogVarCharColumn::StatementKind, + SqlCompOperators::Equal(statement_kind.to_string()), + ); + } + + Ok(cols_match_select) + } + + #[instrument] + pub fn as_cast_vote_count_and_select_clauses( + &self, + election_id: &str, + user_id: &str, + ballot_id_filter: &str, + ) -> (WhereClauseOrdMap, WhereClauseOrdMap) { + let cols_match_count = WhereClauseOrdMap::from(&[ + ( + ElectoralLogVarCharColumn::StatementKind, + SqlCompOperators::Equal(StatementType::CastVote.to_string()), + ), + ( + ElectoralLogVarCharColumn::ElectionId, + SqlCompOperators::Equal(election_id.to_string()), + ), + ]); + let mut cols_match_select = cols_match_count.clone(); + // Restrict the SQL query to user_id and ballot_id in case of filtering + if !ballot_id_filter.is_empty() { + cols_match_select.insert( + ElectoralLogVarCharColumn::UserIdKey, + SqlCompOperators::Equal(user_id.chars().take(3).collect()), + ); + cols_match_select.insert( + ElectoralLogVarCharColumn::UserId, + SqlCompOperators::Equal(user_id.to_string()), + ); + cols_match_select.insert( + ElectoralLogVarCharColumn::BallotId, + SqlCompOperators::Like(ballot_id_filter.to_string()), + ); + } + + (cols_match_count, cols_match_select) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ElectoralLogMessage { + pub id: i64, + pub created: i64, + pub sender_pk: String, + pub statement_timestamp: i64, + pub statement_kind: String, + pub message: Vec, + pub version: String, + pub user_id: Option, + pub username: Option, + pub election_id: Option, + pub area_id: Option, + pub ballot_id: Option, +} + +impl TryFrom<&Row> for ElectoralLogMessage { + type Error = anyhow::Error; + + fn try_from(row: &Row) -> Result { + let mut id = 0; + let mut created = 0; + let mut sender_pk = String::from(""); + let mut statement_timestamp = 0; + let mut statement_kind = String::from(""); + let mut message = vec![]; + let mut version = String::from(""); + let mut user_id: Option = None; + let mut username: Option = None; + let mut election_id: Option = None; + let mut area_id: Option = None; + let mut ballot_id: Option = None; + + for (column, value) in row.columns.iter().zip(row.values.iter()) { + // FIXME for some reason columns names appear with parentheses + let dot = column + .find('.') + .ok_or(anyhow!("invalid column found '{}'", column.as_str()))?; + let bare_column = &column[dot + 1..column.len() - 1]; + + match bare_column { + "id" => assign_value!(Value::N, value, id), + "created" => assign_value!(Value::Ts, value, created), + "sender_pk" => assign_value!(Value::S, value, sender_pk), + "statement_timestamp" => { + assign_value!(Value::Ts, value, statement_timestamp) + } + "statement_kind" => assign_value!(Value::S, value, statement_kind), + "message" => assign_value!(Value::Bs, value, message), + "version" => assign_value!(Value::S, value, version), + "user_id" => match value.value.as_ref() { + Some(Value::S(inner)) => user_id = Some(inner.clone()), + Some(Value::Null(_)) => user_id = None, + None => user_id = None, + _ => { + return Err(anyhow!( + "invalid column value for 'user_id': {:?}", + value.value.as_ref() + )) + } + }, + "username" => match value.value.as_ref() { + Some(Value::S(inner)) => username = Some(inner.clone()), + Some(Value::Null(_)) => username = None, + None => username = None, + _ => { + return Err(anyhow!( + "invalid column value for 'username': {:?}", + value.value.as_ref() + )) + } + }, + "election_id" => match value.value.as_ref() { + Some(Value::S(inner)) => election_id = Some(inner.clone()), + Some(Value::Null(_)) => election_id = None, + None => election_id = None, + _ => { + return Err(anyhow!( + "invalid column value for 'election_id': {:?}", + value.value.as_ref() + )) + } + }, + "area_id" => match value.value.as_ref() { + Some(Value::S(inner)) => area_id = Some(inner.clone()), + Some(Value::Null(_)) => area_id = None, + None => area_id = None, + _ => { + return Err(anyhow!( + "invalid column value for 'area_id': {:?}", + value.value.as_ref() + )) + } + }, + "ballot_id" => match value.value.as_ref() { + Some(Value::S(inner)) => ballot_id = Some(inner.clone()), + Some(Value::Null(_)) => ballot_id = None, + None => ballot_id = None, + _ => { + return Err(anyhow!( + "invalid column value for 'ballod_id': {:?}", + value.value.as_ref() + )) + } + }, + _ => return Err(anyhow!("invalid column found '{}'", bare_column)), + } + } + + Ok(ElectoralLogMessage { + id, + created, + sender_pk, + statement_timestamp, + statement_kind, + message, + version, + user_id, + username, + election_id, + area_id, + ballot_id, + }) + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Aggregate { + pub count: i64, +} + +impl TryFrom<&Row> for Aggregate { + type Error = anyhow::Error; + + fn try_from(row: &Row) -> Result { + let mut count = 0; + + for (column, value) in row.columns.iter().zip(row.values.iter()) { + match column.as_str() { + _ => assign_value!(Value::N, value, count), + } + } + Ok(Aggregate { count }) + } +} diff --git a/packages/electoral-log/src/messages/message.rs b/packages/electoral-log/src/messages/message.rs index dd3ffe2ee8b..e2ed80832fc 100644 --- a/packages/electoral-log/src/messages/message.rs +++ b/packages/electoral-log/src/messages/message.rs @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0-only -use crate::ElectoralLogMessage; +use crate::client::types::ElectoralLogMessage; use anyhow::Result; use borsh::{BorshDeserialize, BorshSerialize}; use serde::{Deserialize, Serialize}; @@ -26,7 +26,7 @@ use std::fmt; /// a cross-event statement pub const GENERIC_EVENT: &'static str = "Generic Event"; -#[derive(BorshSerialize, BorshDeserialize, Serialize, Deserialize, std::fmt::Debug)] +#[derive(Clone, BorshSerialize, BorshDeserialize, Serialize, Deserialize, std::fmt::Debug)] pub struct Message { pub sender: Sender, pub sender_signature: StrandSignature, diff --git a/packages/electoral-log/src/messages/statement.rs b/packages/electoral-log/src/messages/statement.rs index c7c3c9d0a9b..a6c37a56fb7 100644 --- a/packages/electoral-log/src/messages/statement.rs +++ b/packages/electoral-log/src/messages/statement.rs @@ -10,7 +10,7 @@ use strum_macros::Display; use crate::messages::newtypes::*; use tracing::info; -#[derive(BorshSerialize, BorshDeserialize, Deserialize, Serialize, Debug)] +#[derive(BorshSerialize, BorshDeserialize, Deserialize, Serialize, Debug, Clone)] pub struct Statement { pub head: StatementHead, pub body: StatementBody, @@ -195,7 +195,7 @@ impl StatementHead { } } -#[derive(BorshSerialize, BorshDeserialize, Deserialize, Serialize, Debug)] +#[derive(BorshSerialize, BorshDeserialize, Deserialize, Serialize, Debug, Clone)] pub enum StatementBody { // NOT IMPLEMENTED YET, but please feel free // "Emisión de voto (sólo como registro que el sistema almacenó correctamente el voto) diff --git a/packages/graphql.schema.json b/packages/graphql.schema.json index fe70d02c861..e8b6d4a4aba 100644 --- a/packages/graphql.schema.json +++ b/packages/graphql.schema.json @@ -2117,7 +2117,17 @@ "fields": null, "inputFields": [ { - "name": "created", + "name": "created_max", + "description": null, + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + }, + "defaultValue": null + }, + { + "name": "created_min", "description": null, "type": { "kind": "SCALAR", @@ -2147,7 +2157,17 @@ "defaultValue": null }, { - "name": "statement_timestamp", + "name": "statement_timestamp_max", + "description": null, + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + }, + "defaultValue": null + }, + { + "name": "statement_timestamp_min", "description": null, "type": { "kind": "SCALAR", diff --git a/packages/harvest/src/routes/electoral_log.rs b/packages/harvest/src/routes/electoral_log.rs index 32a52d88b1f..0ca539f0ca2 100644 --- a/packages/harvest/src/routes/electoral_log.rs +++ b/packages/harvest/src/routes/electoral_log.rs @@ -3,35 +3,138 @@ // SPDX-License-Identifier: AGPL-3.0-only use crate::services::authorization::authorize; +use crate::types::resources::TotalAggregate; use anyhow::{anyhow, Context, Result}; use deadpool_postgres::Client as DbClient; +use electoral_log::client::types::*; use rocket::http::Status; use rocket::serde::json::Json; use sequent_core::services::jwt::JwtClaims; +use sequent_core::services::keycloak::{get_event_realm, get_tenant_realm}; use sequent_core::types::permissions::Permissions; -use serde::{Deserialize, Serialize}; -use tracing::instrument; +use tracing::{info, instrument}; +use windmill::services::database::get_keycloak_pool; use windmill::services::electoral_log::{ - list_electoral_log as get_logs, ElectoralLogRow, GetElectoralLogBody, + count_electoral_log, list_electoral_log as windmill_list_electoral_log, + ElectoralLogRow, }; +use windmill::services::users::get_users_by_username; use windmill::types::resources::DataList; -#[instrument] +#[instrument(skip(claims))] #[post("/immudb/electoral-log", format = "json", data = "")] pub async fn list_electoral_log( body: Json, claims: JwtClaims, ) -> Result>, (Status, String)> { - let input = body.into_inner(); + let mut input = body.into_inner(); authorize( &claims, true, Some(input.tenant_id.clone()), vec![Permissions::LOGS_READ], )?; - let ret_val = get_logs(input) + + // If there is username but no user_id in the filter, fill the user_id to + // inprove performance. + if let Some(filter) = &mut input.filter { + if let (Some(username), None) = ( + filter.get(&FilterField::Username), + filter.get(&FilterField::UserId), + ) { + match get_user_id( + &input.tenant_id, + &input.election_event_id, + username, + ) + .await + { + Ok(Some(user_id)) => { + filter.insert(FilterField::UserId, user_id); + } + Ok(None) => { + return Ok(Json(DataList::default())); + } + Err(e) => { + return Err((Status::InternalServerError, e.to_string())); + } + } + } + } + + let (data_res, count_res) = tokio::join!( + windmill_list_electoral_log(input.clone()), + count_electoral_log(input) + ); + + let mut data = data_res.map_err(|e| { + ( + Status::InternalServerError, + format!("Eror listing electoral log: {e:?}"), + ) + })?; + data.total.aggregate.count = count_res.map_err(|e| { + ( + Status::InternalServerError, + format!("Error counting electoral log: {e:?}"), + ) + })?; + + Ok(Json(data)) +} + +/// Get user id by username +#[instrument(err)] +pub async fn get_user_id( + tenant_id: &str, + election_event_id: &str, + username: &str, +) -> Result> { + let tenant_realm = get_tenant_realm(tenant_id); + let event_realm = get_event_realm(tenant_id, election_event_id); + let mut keycloak_db_client: DbClient = get_keycloak_pool() + .await + .get() + .await + .map_err(|e| anyhow!("Error getting keycloak client: {e:?}"))?; + + let keycloak_transaction = keycloak_db_client + .transaction() + .await + .map_err(|e| anyhow!("Error getting keycloak transaction: {e:?}"))?; + + // Get user id by username, first look in the tenant realm which has less + // users. Then if not found, look in the event realm. + let mut user_ids = + get_users_by_username(&keycloak_transaction, &tenant_realm, username) + .await + .map_err(|e| { + anyhow!( + "Error getting users by username in tenant realm: {e:?}" + ) + })?; + if user_ids.is_empty() { + user_ids = get_users_by_username( + &keycloak_transaction, + &event_realm, + username, + ) .await - .map_err(|e| (Status::InternalServerError, format!("{:?}", e)))?; + .map_err(|e| { + anyhow!("Error getting users by username in event realm: {e:?}") + })?; + } - Ok(Json(ret_val)) + match user_ids.len() { + 0 => { + info!("Could not get users by username: Not Found"); + return Ok(None); + } + 1 => Ok(Some(user_ids[0].clone())), + _ => { + return Err(anyhow!( + "Error getting users by username: Multiple users Found" + )); + } + } } diff --git a/packages/harvest/src/routes/immudb_log_audit.rs b/packages/harvest/src/routes/immudb_log_audit.rs index 0bcbaaafd52..7cfefb5bd73 100644 --- a/packages/harvest/src/routes/immudb_log_audit.rs +++ b/packages/harvest/src/routes/immudb_log_audit.rs @@ -3,11 +3,10 @@ // SPDX-License-Identifier: AGPL-3.0-only use crate::services::authorization::authorize; -use crate::types::resources::{ - Aggregate, DataList, OrderDirection, TotalAggregate, -}; +use crate::types::resources::{Aggregate, DataList, TotalAggregate}; use anyhow::{anyhow, Context, Result}; use electoral_log::assign_value; +use electoral_log::client::types::OrderDirection; use immudb_rs::{sql_value::Value, Client, NamedParam, Row, SqlValue}; use rocket::http::Status; use rocket::response::Debug; diff --git a/packages/harvest/src/routes/voter_electoral_log.rs b/packages/harvest/src/routes/voter_electoral_log.rs index 7ea3e1f2679..727a5d2f8e2 100644 --- a/packages/harvest/src/routes/voter_electoral_log.rs +++ b/packages/harvest/src/routes/voter_electoral_log.rs @@ -5,6 +5,7 @@ use crate::services::authorization::authorize_voter_election; use crate::types::error_response::{ErrorCode, ErrorResponse, JsonError}; use anyhow::Result; +use electoral_log::client::types::*; use rocket::http::Status; use rocket::serde::json::Json; use sequent_core::ballot::ShowCastVoteLogs; @@ -15,12 +16,9 @@ use serde::Deserialize; use std::collections::HashMap; use tracing::instrument; use windmill::postgres::election_event::get_election_event_by_id; -use windmill::services::electoral_log; -use windmill::services::electoral_log::{ - CastVoteMessagesOutput, GetElectoralLogBody, OrderField, -}; +use windmill::services::electoral_log::list_cast_vote_messages_and_count; +use windmill::services::electoral_log::CastVoteMessagesOutput; use windmill::services::providers::transactions_provider::provide_hasura_transaction; -use windmill::types::resources::OrderDirection; #[derive(Deserialize, Debug)] pub struct CastVoteMessagesInput { @@ -104,7 +102,7 @@ pub async fn list_cast_vote_messages( ..Default::default() }; - let ret_val = electoral_log::list_cast_vote_messages( + let ret_val = list_cast_vote_messages_and_count( elog_input, ballot_id, &user_id, &username, ) .await diff --git a/packages/harvest/src/types/resources.rs b/packages/harvest/src/types/resources.rs index 0cc57943ea6..e205d1ea63f 100644 --- a/packages/harvest/src/types/resources.rs +++ b/packages/harvest/src/types/resources.rs @@ -17,15 +17,6 @@ pub struct TotalAggregate { pub aggregate: Aggregate, } -// Enumeration for the valid order directions -#[derive(Debug, Deserialize, EnumString, Display)] -#[serde(rename_all = "lowercase")] -#[strum(serialize_all = "lowercase")] -pub enum OrderDirection { - Asc, - Desc, -} - #[derive(Deserialize, Debug)] pub struct SortPayload { pub field: String, diff --git a/packages/sequent-core/src/ballot_codec/bases.rs b/packages/sequent-core/src/ballot_codec/bases.rs index b6e7e0720f0..3f16ad3e443 100644 --- a/packages/sequent-core/src/ballot_codec/bases.rs +++ b/packages/sequent-core/src/ballot_codec/bases.rs @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0-only use crate::{ballot::*, types::ceremonies::CountingAlgType}; -use anyhow::Result; +use anyhow::{anyhow, Result}; use std::convert::TryInto; pub trait BasesCodec { @@ -24,7 +24,9 @@ impl BasesCodec for Contest { CountingAlgType::Cumulative => { self.cumulative_number_of_checkboxes() + 1u64 } - _ => (self.max_votes + 1i64).try_into().unwrap(), + _ => (self.max_votes + 1i64).try_into().map_err(|_e| { + anyhow!("Failed to convert {} to u64", self.max_votes + 1i64) + })?, }; let num_valid_candidates: usize = self diff --git a/packages/step-cli/src/commands/create_electoral_logs.rs b/packages/step-cli/src/commands/create_electoral_logs.rs index 29f95d186d4..80e675ccbec 100644 --- a/packages/step-cli/src/commands/create_electoral_logs.rs +++ b/packages/step-cli/src/commands/create_electoral_logs.rs @@ -7,12 +7,12 @@ use anyhow::{anyhow, Context, Result}; use chrono::Utc; use clap::Args; use colored::Colorize; +use electoral_log::client::types::ElectoralLogMessage; use electoral_log::messages::message::{Message, Sender}; use electoral_log::messages::newtypes::EventIdString; use electoral_log::messages::statement::{ Statement, StatementBody, StatementEventType, StatementHead, StatementLogType, StatementType, }; -use electoral_log::ElectoralLogMessage; use fake::faker::internet::raw::Username; use fake::locales::EN; use fake::Fake; diff --git a/packages/step-cli/src/commands/export_cast_votes.rs b/packages/step-cli/src/commands/export_cast_votes.rs index 046e4731dd1..43345a7fb9b 100644 --- a/packages/step-cli/src/commands/export_cast_votes.rs +++ b/packages/step-cli/src/commands/export_cast_votes.rs @@ -10,10 +10,13 @@ use base64::Engine; use clap::Args; use colored::Colorize; use csv::WriterBuilder; +use electoral_log::client::types::{ + ElectoralLogVarCharColumn, SqlCompOperators, WhereClauseOrdMap, +}; use electoral_log::messages::message::Message; use electoral_log::messages::newtypes::ElectionIdString; use electoral_log::messages::statement::{StatementBody, StatementType}; -use electoral_log::{BoardClient, ElectoralLogVarCharColumn, SqlCompOperators}; +use electoral_log::BoardClient; use sequent_core::encrypt::shorten_hash; use serde::Serialize; use serde_json::Value; @@ -79,9 +82,9 @@ impl ExportCastVotes { .await .map_err(|err| anyhow!("Failed to create the client: {:?}", err))?; - let cols_match = BTreeMap::from([( + let cols_match = WhereClauseOrdMap::from(&[( ElectoralLogVarCharColumn::StatementKind, - (SqlCompOperators::Equal, StatementType::CastVote.to_string()), + SqlCompOperators::Equal(StatementType::CastVote.to_string()), )]); let order_by: Option> = None; println!("Getting messages"); diff --git a/packages/step-cli/src/graphql/schema.json b/packages/step-cli/src/graphql/schema.json index fe70d02c861..e8b6d4a4aba 100644 --- a/packages/step-cli/src/graphql/schema.json +++ b/packages/step-cli/src/graphql/schema.json @@ -2117,7 +2117,17 @@ "fields": null, "inputFields": [ { - "name": "created", + "name": "created_max", + "description": null, + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + }, + "defaultValue": null + }, + { + "name": "created_min", "description": null, "type": { "kind": "SCALAR", @@ -2147,7 +2157,17 @@ "defaultValue": null }, { - "name": "statement_timestamp", + "name": "statement_timestamp_max", + "description": null, + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + }, + "defaultValue": null + }, + { + "name": "statement_timestamp_min", "description": null, "type": { "kind": "SCALAR", diff --git a/packages/voting-portal/graphql.schema.json b/packages/voting-portal/graphql.schema.json index 55840f3c991..dd2227f5ba3 100644 --- a/packages/voting-portal/graphql.schema.json +++ b/packages/voting-portal/graphql.schema.json @@ -2336,7 +2336,19 @@ "fields": null, "inputFields": [ { - "name": "created", + "name": "created_max", + "description": null, + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + }, + "defaultValue": null, + "isDeprecated": false, + "deprecationReason": null + }, + { + "name": "created_min", "description": null, "type": { "kind": "SCALAR", @@ -2372,7 +2384,19 @@ "deprecationReason": null }, { - "name": "statement_timestamp", + "name": "statement_timestamp_max", + "description": null, + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + }, + "defaultValue": null, + "isDeprecated": false, + "deprecationReason": null + }, + { + "name": "statement_timestamp_min", "description": null, "type": { "kind": "SCALAR", diff --git a/packages/voting-portal/src/gql/graphql.ts b/packages/voting-portal/src/gql/graphql.ts index 02a82b09e38..2120eabbb5a 100644 --- a/packages/voting-portal/src/gql/graphql.ts +++ b/packages/voting-portal/src/gql/graphql.ts @@ -282,10 +282,12 @@ export type ElectionStatsOutput = { }; export type ElectoralLogFilter = { - created?: InputMaybe; + created_max?: InputMaybe; + created_min?: InputMaybe; id?: InputMaybe; statement_kind?: InputMaybe; - statement_timestamp?: InputMaybe; + statement_timestamp_max?: InputMaybe; + statement_timestamp_min?: InputMaybe; user_id?: InputMaybe; username?: InputMaybe; }; diff --git a/packages/voting-portal/src/routes/BallotLocator.tsx b/packages/voting-portal/src/routes/BallotLocator.tsx index 2c9116efb32..043992d8ca4 100644 --- a/packages/voting-portal/src/routes/BallotLocator.tsx +++ b/packages/voting-portal/src/routes/BallotLocator.tsx @@ -667,7 +667,6 @@ const BallotLocatorLogic = () => { const {t} = useTranslation() const [inputBallotId, setInputBallotId] = useState("") const {globalSettings} = useContext(SettingsContext) - const hasBallotId = !!ballotId const {data: dataBallotStyles} = useQuery(GET_BALLOT_STYLES) diff --git a/packages/windmill/external-bin/generate_logs.rs b/packages/windmill/external-bin/generate_logs.rs old mode 100644 new mode 100755 index 2a4393170d4..d10b5c705e9 --- a/packages/windmill/external-bin/generate_logs.rs +++ b/packages/windmill/external-bin/generate_logs.rs @@ -8,6 +8,7 @@ use base64::Engine; use chrono::{TimeZone, Utc}; use clap::Parser; use csv::Writer; +use electoral_log::client::types::ElectoralLogMessage; use electoral_log::messages::message::Message; use immudb_rs::{sql_value::Value as ImmudbSqlValue, Client}; use serde::Deserialize; @@ -187,10 +188,18 @@ async fn main() -> Result<()> { info!(total_rows_fetched, "Processed rows from stream..."); } - let elog_row = match ElectoralLogRow::try_from(individual_row) { + let elog_msg = match ElectoralLogMessage::try_from(individual_row) { + Ok(elog_msg) => elog_msg, + Err(e) => { + warn!(error = %e, "Failed to parse ImmudbRow into ElectoralLogMessage from stream batch."); + continue; + } + }; + + let elog_row = match ElectoralLogRow::try_from(elog_msg.clone()) { Ok(elog_row) => elog_row, Err(e) => { - warn!(error = %e, "Failed to parse ImmudbRow into ElectoralLogRow from stream batch."); + warn!(error = %e, "Failed to parse ImmudbRow into ElectoralLogRow from ElectoralLogMessage."); continue; } }; @@ -217,10 +226,10 @@ async fn main() -> Result<()> { }; let extracted_election_id_opt = message.election_id.clone(); - let activity_log_row = match ActivityLogRow::try_from(elog_row.clone()) { + let activity_log_row = match ActivityLogRow::try_from(elog_msg.clone()) { Ok(activity_log_row) => activity_log_row, Err(e) => { - warn!(log_id = elog_row.id, error = %e, "Failed to transform ElectoralLogRow."); + warn!(log_id = elog_msg.id, error = %e, "Failed to transform ElectoralLogMessage."); continue; } }; diff --git a/packages/windmill/src/services/ceremonies/tally_ceremony.rs b/packages/windmill/src/services/ceremonies/tally_ceremony.rs index ec427a66165..b1ed69ee946 100644 --- a/packages/windmill/src/services/ceremonies/tally_ceremony.rs +++ b/packages/windmill/src/services/ceremonies/tally_ceremony.rs @@ -563,9 +563,7 @@ pub async fn update_tally_ceremony( if new_execution_status == TallyExecutionStatus::IN_PROGRESS { let tally_elections_ids = tally_session.election_ids.clone(); - let slug = std::env::var("ENV_SLUG").with_context(|| "missing env var ENV_SLUG")?; - // Save this in the electoral log let board_name: String = get_event_board(&tenant_id, &election_event_id, &slug); let electoral_log = ElectoralLog::for_admin_user( diff --git a/packages/windmill/src/services/electoral_log.rs b/packages/windmill/src/services/electoral_log.rs index 9a55bbb0f54..7f8df84aa07 100644 --- a/packages/windmill/src/services/electoral_log.rs +++ b/packages/windmill/src/services/electoral_log.rs @@ -12,19 +12,17 @@ use crate::services::vault; use crate::tasks::electoral_log::{ enqueue_electoral_log_event, LogEventBody, LogEventInput, LogMessageType, }; -use crate::types::resources::{Aggregate, DataList, OrderDirection, TotalAggregate}; +use crate::types::resources::{Aggregate, DataList, TotalAggregate}; use anyhow::{anyhow, ensure, Context, Result}; use b3::messages::message::Signer; use base64::engine::general_purpose; use base64::Engine; use deadpool_postgres::Transaction; use electoral_log::assign_value; +use electoral_log::client::types::*; use electoral_log::messages::message::{Message, SigningData}; use electoral_log::messages::newtypes::*; use electoral_log::messages::statement::{StatementBody, StatementType}; -use electoral_log::{ - ElectoralLogMessage, ElectoralLogVarCharColumn, SqlCompOperators, WhereClauseBTreeMap, -}; use immudb_rs::{sql_value::Value, Client, NamedParam, Row, TxMode}; use rust_decimal::prelude::ToPrimitive; use sequent_core::serialization::deserialize_with_path::{deserialize_str, deserialize_value}; @@ -42,7 +40,7 @@ use strand::signature::{StrandSignaturePk, StrandSignatureSk}; use strum_macros::{Display, EnumString}; use tempfile::NamedTempFile; use tokio_stream::StreamExt; -use tracing::{event, info, instrument, warn, Level}; +use tracing::{info, instrument, warn}; pub const IMMUDB_ROWS_LIMIT: usize = 2500; pub const MAX_ROWS_PER_PAGE: usize = 50; @@ -863,199 +861,14 @@ impl ElectoralLog { } } -// Enumeration for the valid fields in the immudb table -#[derive(Debug, Deserialize, Hash, PartialEq, Eq, EnumString, Display, Clone)] -#[serde(rename_all = "snake_case")] -#[strum(serialize_all = "snake_case")] -pub enum OrderField { - Id, - Created, - StatementTimestamp, - StatementKind, - Message, - UserId, - Username, - BallotId, - SenderPk, - LogType, - EventType, - Description, - Version, -} - -#[derive(Deserialize, Debug, Default, Clone)] -pub struct GetElectoralLogBody { - pub tenant_id: String, - pub election_event_id: String, - pub limit: Option, - pub offset: Option, - pub filter: Option>, - pub order_by: Option>, - pub election_id: Option, - pub area_ids: Option>, - pub only_with_user: Option, - pub statement_kind: Option, -} - -impl GetElectoralLogBody { - // Returns the SQL clauses related to the request along with the parameters - #[instrument(ret)] - fn as_sql(&self, to_count: bool) -> Result<(String, Vec)> { - let mut clauses = Vec::new(); - let mut params = Vec::new(); - - // Handle filters - if let Some(filters_map) = &self.filter { - let mut where_clauses = Vec::new(); - - for (field, value) in filters_map { - info!("field = ?: {field}, value = ?: {value}"); - let param_name = format!("param_{field}"); - match field { - OrderField::Id => { // sql INTEGER type - let int_value: i64 = value.parse()?; - where_clauses.push(format!("id = @{}", param_name)); - params.push(create_named_param(param_name, Value::N(int_value))); - } - OrderField::SenderPk | OrderField::UserId | OrderField::Username | OrderField::BallotId | OrderField::StatementKind | OrderField::Version => { // sql VARCHAR type - where_clauses.push(format!("{field} LIKE @{}", param_name)); - params.push(create_named_param(param_name, Value::S(value.to_string()))); - } - OrderField::StatementTimestamp | OrderField::Created => { // sql TIMESTAMP type - // these have their own column and are inside of Message´s column as well - let datetime = ISO8601::to_date_utc(&value) - .map_err(|err| anyhow!("Failed to parse timestamp: {:?}", err))?; - let ts: i64 = datetime.timestamp(); - let ts_end: i64 = ts + 60; // Search along that minute, the second is not specified by the front. - let param_name_end = format!("{param_name}_end"); - where_clauses.push(format!("{field} >= @{} AND {field} < @{}", param_name, param_name_end)); - params.push(create_named_param(param_name, Value::Ts(ts))); - params.push(create_named_param(param_name_end, Value::Ts(ts_end))); - } - OrderField::EventType | OrderField::LogType | OrderField::Description // these have no column but are inside of Message - | OrderField::Message => {} // Message column is sql BLOB type and it´s encrypted so we can't search it without expensive operations - } - } - - if !where_clauses.is_empty() { - clauses.push(format!("WHERE {}", where_clauses.join(" AND "))); - } - }; - - // Build a single extra clause. - // This clause returns rows if: - // - @election_filter is non-empty and matches election_id, OR - // - @area_filter is non-empty and matches area_id, OR - // - Both election_id and area_id are either '' or NULL. (General to all elections log) - let mut extra_where_clauses = Vec::new(); - if self.election_id.is_some() || self.area_ids.is_some() { - let mut conds = Vec::new(); - - if let Some(election) = &self.election_id { - if !election.is_empty() { - params.push(create_named_param( - "param_election".to_string(), - Value::S(election.clone()), - )); - conds.push("election_id LIKE @param_election".to_string()); - } - } - - if let Some(area_ids) = &self.area_ids { - if !area_ids.is_empty() { - let placeholders: Vec = area_ids - .iter() - .enumerate() - .map(|(i, _)| format!("@param_area{}", i)) - .collect(); - for (i, area) in area_ids.into_iter().enumerate() { - let param_name = format!("param_area{}", i); - params.push(create_named_param( - param_name.clone(), - Value::S(area.clone()), - )); - } - conds.push(format!( - "(@param_area0 <> '' AND area_id IN ({}))", - placeholders.join(", ") - )); - } - } - - // if neither filter matches, return logs where both fields are empty or NULL. - conds.push( - "((election_id = '' OR election_id IS NULL) AND (area_id = '' OR area_id IS NULL))" - .to_string(), - ); - - extra_where_clauses.push(format!("({})", conds.join(" OR "))); - } - - // Handle only_with_user - if self.only_with_user.unwrap_or(false) { - extra_where_clauses.push("(user_id IS NOT NULL AND user_id <> '')".to_string()); - } - - // Handle - if let Some(statement_kind) = &self.statement_kind { - params.push(create_named_param( - "param_statement_kind".to_string(), - Value::S(statement_kind.to_string()), - )); - extra_where_clauses.push("(statement_kind = @param_statement_kind)".to_string()); - } - - if !extra_where_clauses.is_empty() { - match clauses.len() { - 0 => { - clauses.push(format!("WHERE {}", extra_where_clauses.join(" AND "))); - } - _ => { - let where_clause = clauses.pop().ok_or(anyhow!("Empty clause"))?; - clauses.push(format!( - "{} AND {}", - where_clause, - extra_where_clauses.join(" AND ") - )); - } - } - } - - // Handle order_by - if !to_count && self.order_by.is_some() { - let order_by_clauses: Vec = self - .order_by - .as_ref() - .ok_or(anyhow!("Empty order clause"))? - .iter() - .map(|(field, direction)| format!("{field} {direction}")) - .collect(); - if order_by_clauses.len() > 0 { - clauses.push(format!("ORDER BY {}", order_by_clauses.join(", "))); - } - } - - // Handle limit - if !to_count { - let limit_param_name = String::from("limit"); - let limit_value = self - .limit - .unwrap_or(PgConfig::from_env()?.default_sql_limit.into()); - let limit = std::cmp::min(limit_value, PgConfig::from_env()?.low_sql_limit.into()); - clauses.push(format!("LIMIT @{limit_param_name}")); - params.push(create_named_param(limit_param_name, Value::N(limit))); - } - - // Handle offset - if !to_count && self.offset.is_some() { - let offset_param_name = String::from("offset"); - let offset = std::cmp::max(self.offset.unwrap_or(0), 0); - clauses.push(format!("OFFSET @{}", offset_param_name)); - params.push(create_named_param(offset_param_name, Value::N(offset))); - } - - Ok((clauses.join(" "), params)) - } +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct StatementHeadDataString { + pub event: String, + pub kind: String, + pub timestamp: i64, + pub event_type: String, + pub log_type: String, + pub description: String, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -1069,14 +882,28 @@ pub struct ElectoralLogRow { pub user_id: Option, pub username: Option, } -#[derive(Deserialize, Serialize, Debug, Clone)] -pub struct StatementHeadDataString { - pub event: String, - pub kind: String, - pub timestamp: i64, - pub event_type: String, - pub log_type: String, - pub description: String, + +// Removing this step would inprove the performance, i.e. return the final type directly from ElectoralLogMessage. +impl TryFrom for ElectoralLogRow { + type Error = anyhow::Error; + + fn try_from(elog_msg: ElectoralLogMessage) -> Result { + let serialized = general_purpose::STANDARD_NO_PAD.encode(elog_msg.message.clone()); + let deserialized_message = Message::strand_deserialize(&elog_msg.message) + .map_err(|e| anyhow!("Error deserializing message: {e:?}"))?; + + Ok(ElectoralLogRow { + id: elog_msg.id, + created: elog_msg.created, + statement_timestamp: elog_msg.statement_timestamp, + statement_kind: elog_msg.statement_kind.clone(), + message: serde_json::to_string_pretty(&deserialized_message) + .with_context(|| "Error serializing message to json")?, + data: serialized, + user_id: elog_msg.user_id.clone(), + username: elog_msg.username.clone(), + }) + } } impl ElectoralLogRow { @@ -1137,93 +964,93 @@ impl ElectoralLogRow { } } -impl TryFrom for ElectoralLogRow { - type Error = anyhow::Error; - - fn try_from(elog_msg: ElectoralLogMessage) -> Result { - let serialized = general_purpose::STANDARD_NO_PAD.encode(elog_msg.message.clone()); - let deserialized_message = Message::strand_deserialize(&elog_msg.message) - .map_err(|e| anyhow!("Error deserializing message: {e:?}"))?; - - Ok(ElectoralLogRow { - id: elog_msg.id, - created: elog_msg.created, - statement_timestamp: elog_msg.statement_timestamp, - statement_kind: elog_msg.statement_kind.clone(), - message: serde_json::to_string_pretty(&deserialized_message) - .with_context(|| "Error serializing message to json")?, - data: serialized, - user_id: elog_msg.user_id.clone(), - username: elog_msg.username.clone(), - }) - } -} - -impl TryFrom<&Row> for ElectoralLogRow { - type Error = anyhow::Error; - - fn try_from(row: &Row) -> Result { - let mut id = 0; - let mut created: i64 = 0; - let mut sender_pk = String::from(""); - let mut statement_timestamp: i64 = 0; - let mut statement_kind = String::from(""); - let mut message = vec![]; - let mut user_id = None; - let mut username = None; - - for (column, value) in row.columns.iter().zip(row.values.iter()) { - match column.as_str() { - c if c.ends_with(".id)") => { - assign_value!(Value::N, value, id) - } - c if c.ends_with(".created)") => { - assign_value!(Value::Ts, value, created) - } - c if c.ends_with(".sender_pk)") => { - assign_value!(Value::S, value, sender_pk) - } - c if c.ends_with(".statement_timestamp)") => { - assign_value!(Value::Ts, value, statement_timestamp) - } - c if c.ends_with(".statement_kind)") => { - assign_value!(Value::S, value, statement_kind) - } - c if c.ends_with(".message)") => { - assign_value!(Value::Bs, value, message) - } - c if c.ends_with(".user_id)") => match value.value.as_ref() { - Some(Value::S(inner)) => user_id = Some(inner.clone()), - Some(Value::Null(_)) => user_id = None, - None => user_id = None, - _ => return Err(anyhow!("invalid column value for 'user_id'")), - }, - c if c.ends_with(".username)") => match value.value.as_ref() { - Some(Value::S(inner)) => username = Some(inner.clone()), - Some(Value::Null(_)) => username = None, - None => username = None, - _ => return Err(anyhow!("invalid column value for 'username'")), - }, - _ => return Err(anyhow!("invalid column found '{}'", column.as_str())), - } - } - - let deserialized_message = - Message::strand_deserialize(&message).with_context(|| "Error deserializing message")?; - let serialized = general_purpose::STANDARD_NO_PAD.encode(message); - Ok(ElectoralLogRow { - id, - created, - statement_timestamp, - statement_kind, - message: serde_json::to_string_pretty(&deserialized_message) - .with_context(|| "Error serializing message to json")?, - data: serialized, - user_id, - username, - }) - } -} +// impl TryFrom for ElectoralLogRow { +// type Error = anyhow::Error; + +// fn try_from(elog_msg: ElectoralLogMessage) -> Result { +// let serialized = general_purpose::STANDARD_NO_PAD.encode(elog_msg.message.clone()); +// let deserialized_message = Message::strand_deserialize(&elog_msg.message) +// .map_err(|e| anyhow!("Error deserializing message: {e:?}"))?; + +// Ok(ElectoralLogRow { +// id: elog_msg.id, +// created: elog_msg.created, +// statement_timestamp: elog_msg.statement_timestamp, +// statement_kind: elog_msg.statement_kind.clone(), +// message: serde_json::to_string_pretty(&deserialized_message) +// .with_context(|| "Error serializing message to json")?, +// data: serialized, +// user_id: elog_msg.user_id.clone(), +// username: elog_msg.username.clone(), +// }) +// } +// } + +// impl TryFrom<&Row> for ElectoralLogRow { +// type Error = anyhow::Error; + +// fn try_from(row: &Row) -> Result { +// let mut id = 0; +// let mut created: i64 = 0; +// let mut sender_pk = String::from(""); +// let mut statement_timestamp: i64 = 0; +// let mut statement_kind = String::from(""); +// let mut message = vec![]; +// let mut user_id = None; +// let mut username = None; + +// for (column, value) in row.columns.iter().zip(row.values.iter()) { +// match column.as_str() { +// c if c.ends_with(".id)") => { +// assign_value!(Value::N, value, id) +// } +// c if c.ends_with(".created)") => { +// assign_value!(Value::Ts, value, created) +// } +// c if c.ends_with(".sender_pk)") => { +// assign_value!(Value::S, value, sender_pk) +// } +// c if c.ends_with(".statement_timestamp)") => { +// assign_value!(Value::Ts, value, statement_timestamp) +// } +// c if c.ends_with(".statement_kind)") => { +// assign_value!(Value::S, value, statement_kind) +// } +// c if c.ends_with(".message)") => { +// assign_value!(Value::Bs, value, message) +// } +// c if c.ends_with(".user_id)") => match value.value.as_ref() { +// Some(Value::S(inner)) => user_id = Some(inner.clone()), +// Some(Value::Null(_)) => user_id = None, +// None => user_id = None, +// _ => return Err(anyhow!("invalid column value for 'user_id'")), +// }, +// c if c.ends_with(".username)") => match value.value.as_ref() { +// Some(Value::S(inner)) => username = Some(inner.clone()), +// Some(Value::Null(_)) => username = None, +// None => username = None, +// _ => return Err(anyhow!("invalid column value for 'username'")), +// }, +// _ => return Err(anyhow!("invalid column found '{}'", column.as_str())), +// } +// } + +// let deserialized_message = +// Message::strand_deserialize(&message).with_context(|| "Error deserializing message")?; +// let serialized = general_purpose::STANDARD_NO_PAD.encode(message); +// Ok(ElectoralLogRow { +// id, +// created, +// statement_timestamp, +// statement_kind, +// message: serde_json::to_string_pretty(&deserialized_message) +// .with_context(|| "Error serializing message to json")?, +// data: serialized, +// user_id, +// username, +// }) +// } +// } #[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct CastVoteEntry { @@ -1241,129 +1068,116 @@ pub struct CastVoteMessagesOutput { } impl CastVoteEntry { - pub fn from_elog_message(entry: &ElectoralLogMessage) -> Result, anyhow::Error> { + pub fn from_elog_message(entry: &ElectoralLogMessage) -> Result { let ballot_id = entry.ballot_id.clone().unwrap_or_default(); let username = entry.username.clone(); - let message: &Message = &Message::strand_deserialize(&entry.message) + let message: Message = Message::strand_deserialize(&entry.message) .map_err(|err| anyhow!("Failed to deserialize message: {:?}", err))?; let message = Some(message.to_string()); - Ok(Some(CastVoteEntry { + Ok(CastVoteEntry { statement_timestamp: entry.statement_timestamp, statement_kind: StatementType::CastVote.to_string(), ballot_id, username, message, - })) + }) } } #[instrument(err)] pub async fn list_electoral_log(input: GetElectoralLogBody) -> Result> { - let mut client: Client = get_immudb_client().await?; + let mut client = get_board_client().await?; let slug = std::env::var("ENV_SLUG").with_context(|| "missing env var ENV_SLUG")?; let board_name = get_event_board( input.tenant_id.as_str(), input.election_event_id.as_str(), &slug, ); + info!("database name = {board_name}"); + let cols_match_select = input.as_where_clause_map()?; + let order_by = input.order_by.clone(); + let (min_ts, max_ts) = input.get_min_max_ts()?; + let limit: i64 = input.limit.unwrap_or(IMMUDB_ROWS_LIMIT as i64); + let offset: i64 = input.offset.unwrap_or(0); + let mut rows: Vec = vec![]; + let electoral_log_messages = client + .get_electoral_log_messages_filtered( + &board_name, + Some(cols_match_select.clone()), + min_ts, + max_ts, + Some(limit), + Some(offset), + order_by.clone(), + ) + .await + .map_err(|err| anyhow!("Failed to get filtered messages: {:?}", err))?; - event!(Level::INFO, "database name = {board_name}"); - client.open_session(&board_name).await?; - let (clauses, params) = input.as_sql(false)?; - let (clauses_to_count, count_params) = input.as_sql(true)?; - info!("clauses ?:= {clauses}"); - let sql = format!( - r#" - SELECT - id, - created, - sender_pk, - statement_timestamp, - statement_kind, - message, - user_id, - username - FROM electoral_log_messages - {clauses} - "#, - ); - info!("query: {sql}"); - let sql_query_response = client.streaming_sql_query(&sql, params).await?; - - let limit: usize = input.limit.unwrap_or(IMMUDB_ROWS_LIMIT as i64).try_into()?; - info!("list_electoral_log: limit = {}", limit); - let mut rows: Vec = Vec::with_capacity(limit); - let mut resp_stream = sql_query_response.into_inner(); - while let Some(streaming_batch) = resp_stream.next().await { - let items = streaming_batch? - .rows - .iter() - .map(ElectoralLogRow::try_from) - .collect::>>()?; - rows.extend(items); + let t_entries = electoral_log_messages.len(); + info!("Got {t_entries} entries. Offset: {offset}, limit: {limit}"); + for message in electoral_log_messages { + rows.push(message.try_into()?); } - let sql = format!( - r#" - SELECT - COUNT(*) - FROM electoral_log_messages - {clauses_to_count} - "#, - ); - let sql_query_response = client.sql_query(&sql, count_params).await?; - let mut rows_iter = sql_query_response - .get_ref() - .rows - .iter() - .map(Aggregate::try_from); - - let aggregate = rows_iter - // get the first item - .next() - // unwrap the Result and Option - .ok_or(anyhow!("No aggregate found"))??; - - client.close_session().await?; Ok(DataList { items: rows, total: TotalAggregate { - aggregate: aggregate, + aggregate: Aggregate { + count: t_entries as i64, + }, }, }) } -#[instrument] -pub fn get_cols_match_count_and_select( - election_id: &str, - user_id: &str, +/// Returns the entries for statement_kind = "CastVote" which ballot_id matches the input. +/// ballot_id_filter is restricted to be an even number of characters, so thatnit can be converted +/// to a byte array. +#[instrument(err, skip_all)] +pub async fn list_cast_vote_messages_and_count( + input: GetElectoralLogBody, ballot_id_filter: &str, -) -> (WhereClauseBTreeMap, WhereClauseBTreeMap) { - let cols_match_count = BTreeMap::from([ - ( - ElectoralLogVarCharColumn::StatementKind, - (SqlCompOperators::Equal, StatementType::CastVote.to_string()), - ), - ( - ElectoralLogVarCharColumn::ElectionId, - (SqlCompOperators::Equal, election_id.to_string()), + user_id: &str, + username: &str, +) -> Result { + ensure!( + ballot_id_filter.chars().count() % 2 == 0 && ballot_id_filter.is_ascii(), + "Incorrect ballot_id, the length must be an even number of characters" + ); + let election_id = input.election_id.clone().unwrap_or_default(); + let (cols_match_count, cols_match_select) = + input.as_cast_vote_count_and_select_clauses(&election_id, user_id, ballot_id_filter); + + let (entries_res, count_res) = tokio::join!( + list_cast_vote_messages( + input.clone(), + ballot_id_filter, + user_id, + username, + cols_match_select ), - ]); - let mut cols_match_select = cols_match_count.clone(); - // Restrict the SQL query to user_id and ballot_id in case of filtering - if !ballot_id_filter.is_empty() { - cols_match_select.insert( - ElectoralLogVarCharColumn::UserId, - (SqlCompOperators::Equal, user_id.to_string()), - ); - cols_match_select.insert( - ElectoralLogVarCharColumn::BallotId, - (SqlCompOperators::Like, ballot_id_filter.to_string()), - ); - } + async { + let mut client = get_board_client().await?; + let slug = std::env::var("ENV_SLUG").with_context(|| "missing env var ENV_SLUG")?; + let board_name = get_event_board( + input.tenant_id.as_str(), + input.election_event_id.as_str(), + &slug, + ); + info!("database name = {board_name}"); + let total: usize = client + .count_electoral_log_messages(&board_name, Some(cols_match_count)) + .await? + .to_usize() + .unwrap_or(0); + Ok(total) + } + ); - (cols_match_count, cols_match_select) + let list = entries_res.map_err(|e| anyhow!("Error listing electoral log: {e:?}"))?; + let total = + count_res.map_err(|e: anyhow::Error| anyhow!("Error counting electoral log: {e:?}"))?; + Ok(CastVoteMessagesOutput { list, total }) } /// Returns the entries for statement_kind = "CastVote" which ballot_id matches the input @@ -1375,14 +1189,8 @@ pub async fn list_cast_vote_messages( ballot_id_filter: &str, user_id: &str, username: &str, -) -> Result { - ensure!( - ballot_id_filter.chars().count() % 2 == 0 && ballot_id_filter.is_ascii(), - "Incorrect ballot_id, the length must be an even number of characters" - ); - // The limits are used to cut the output after filtering the ballot id. - // Because ballot_id cannot be filtered at SQL level the sql limit is constant - let output_limit: i64 = input.limit.unwrap_or(MAX_ROWS_PER_PAGE as i64); + cols_match_select: WhereClauseOrdMap, +) -> Result> { let slug = std::env::var("ENV_SLUG").with_context(|| "missing env var ENV_SLUG")?; let board_name = get_event_board( input.tenant_id.as_str(), @@ -1391,97 +1199,52 @@ pub async fn list_cast_vote_messages( ); info!("database name = {board_name}"); let order_by = input.order_by.clone(); - let election_id = input.election_id.clone().unwrap_or_default(); - - let limit: i64 = match ballot_id_filter.is_empty() { - false => IMMUDB_ROWS_LIMIT as i64, // When there is a filter, need to fetch all entries by batches. - true => input.limit.unwrap_or(MAX_ROWS_PER_PAGE as i64), + let (limit, offset) = match ballot_id_filter.is_empty() { + false => (1, 0), // When there is a filter, limit to 1 the result because ballot_id is unique and offset to 0 to scan the whole table + true => ( + input.limit.unwrap_or(MAX_ROWS_PER_PAGE as i64), + input.offset.unwrap_or(0), + ), }; - let mut offset: i64 = input.offset.unwrap_or(0); - let mut list: Vec = Vec::with_capacity(MAX_ROWS_PER_PAGE); // Filtered messages. - let (cols_match_count, cols_match_select) = - get_cols_match_count_and_select(&election_id, user_id, ballot_id_filter); let mut client = get_board_client().await?; - let total = client - .count_electoral_log_messages(&board_name, Some(cols_match_count)) - .await? - .to_u64() - .unwrap_or(0) as usize; - let mut filter_matched = false; // Exit at the first match if the filter is not empty - while (list.len() as i64) < output_limit && (offset < total as i64) && !filter_matched { - let electoral_log_messages = client - .get_electoral_log_messages_filtered( - &board_name, - Some(cols_match_select.clone()), - None, - None, - Some(limit), - Some(offset), - order_by.clone(), - ) - .await - .map_err(|err| anyhow!("Failed to get filtered messages: {:?}", err))?; - - let t_entries = electoral_log_messages.len(); - info!("Got {t_entries} entries. Offset: {offset}, limit: {limit}, total: {total}"); - for message in electoral_log_messages.iter() { - match CastVoteEntry::from_elog_message(&message)? { - Some(entry) if !ballot_id_filter.is_empty() => { - // If there is filter exit at the first match - filter_matched = true; - list.push(entry); - } - Some(entry) => { - // Add all the entries till the limit, when there is no filter - list.push(entry); - } - None => {} - } - if (list.len() as i64) >= output_limit || filter_matched { - break; - } - } - offset += limit; - } + let electoral_log_messages = client + .get_electoral_log_messages_filtered( + &board_name, + Some(cols_match_select.clone()), + None, + None, + Some(limit), + Some(offset), + order_by.clone(), + ) + .await + .map_err(|err| anyhow!("Failed to get filtered messages: {:?}", err))?; + let t_entries = electoral_log_messages.len(); + info!("Got {t_entries} entries. Offset: {offset}, limit: {limit}"); + let mut list: Vec = Vec::with_capacity(MAX_ROWS_PER_PAGE); // Filtered messages. + list = electoral_log_messages + .iter() + .map(|message| CastVoteEntry::from_elog_message(message)) + .collect::>>()?; - Ok(CastVoteMessagesOutput { list, total }) + Ok(list) } #[instrument(err)] pub async fn count_electoral_log(input: GetElectoralLogBody) -> Result { - let mut client = get_immudb_client().await?; + let mut client = get_board_client().await?; let slug = std::env::var("ENV_SLUG").with_context(|| "missing env var ENV_SLUG")?; let board_name = get_event_board( input.tenant_id.as_str(), input.election_event_id.as_str(), &slug, ); - - info!("board name: {board_name}"); - client.open_session(&board_name).await?; - - let (clauses_to_count, count_params) = input.as_sql(true)?; - let sql = format!( - r#" - SELECT COUNT(*) - FROM electoral_log_messages - {clauses_to_count} - "#, - ); - - info!("query: {sql}"); - - let sql_query_response = client.sql_query(&sql, count_params).await?; - - let mut rows_iter = sql_query_response - .get_ref() - .rows - .iter() - .map(Aggregate::try_from); - let aggregate = rows_iter - .next() - .ok_or_else(|| anyhow!("No aggregate found"))??; - - client.close_session().await?; - Ok(aggregate.count as i64) + info!("database name = {board_name}"); + let cols_match_count = input.as_where_clause_map()?; + let total = client + .count_electoral_log_messages(&board_name, Some(cols_match_count)) + .await? + .to_u64() + .unwrap_or(0) as i64; + Ok(total) } diff --git a/packages/windmill/src/services/event_list.rs b/packages/windmill/src/services/event_list.rs index 14b83aa5b5c..0ba4fcbc9e0 100644 --- a/packages/windmill/src/services/event_list.rs +++ b/packages/windmill/src/services/event_list.rs @@ -2,12 +2,10 @@ // // SPDX-License-Identifier: AGPL-3.0-only use crate::postgres::election_event::get_election_event_by_id; -use crate::{ - postgres::scheduled_event::{insert_new_scheduled_event, insert_scheduled_event}, - types::resources::OrderDirection, -}; +use crate::postgres::scheduled_event::{insert_new_scheduled_event, insert_scheduled_event}; use anyhow::Result; use deadpool_postgres::Transaction; +use electoral_log::client::types::OrderDirection; use rocket::http::Status; use sequent_core::services::keycloak; use sequent_core::types::hasura::core::ElectionEvent; diff --git a/packages/windmill/src/services/export/export_election_event.rs b/packages/windmill/src/services/export/export_election_event.rs index 3cda1b3944a..36afb0ad26f 100644 --- a/packages/windmill/src/services/export/export_election_event.rs +++ b/packages/windmill/src/services/export/export_election_event.rs @@ -515,17 +515,10 @@ pub async fn process_export_zip( ReportFormat::CSV, // Assuming CSV format for this export ); - // Prepare user data - let user_data = activity_logs_template - .prepare_user_data(&hasura_transaction, &hasura_transaction) + let temp_activity_logs_file = activity_logs_template + .generate_export_csv_data(&activity_logs_filename) .await - .map_err(|e| anyhow!("Error preparing activity logs data: {e:?}"))?; - - // Generate the CSV file using generate_export_data - let temp_activity_logs_file = - activity_log::generate_export_data(&user_data.electoral_log, &activity_logs_filename) - .await - .map_err(|e| anyhow!("Error generating export data: {e:?}"))?; + .map_err(|e| anyhow!("Error generating export data: {e:?}"))?; zip_writer .start_file(&activity_logs_filename, options) diff --git a/packages/windmill/src/services/insert_cast_vote.rs b/packages/windmill/src/services/insert_cast_vote.rs index d0a98eeae69..36978335008 100644 --- a/packages/windmill/src/services/insert_cast_vote.rs +++ b/packages/windmill/src/services/insert_cast_vote.rs @@ -997,7 +997,7 @@ async fn check_previous_votes( .filter_map(|cv| cv.area_id.and_then(|id| parse_uuid_v4(&id).ok())) .partition(|cv_area_id| cv_area_id.to_string() == area_id.to_string()); - info!("get cast votes returns same: {:?}", same); + info!("get cast votes returns same: {same:?}"); // Skip max votes check if max_revotes is 0, allowing unlimited votes if max_revotes > 0 && same.len() >= max_revotes { diff --git a/packages/windmill/src/services/reports/activity_log.rs b/packages/windmill/src/services/reports/activity_log.rs index 16b532eb4a2..2c61f7cc39e 100644 --- a/packages/windmill/src/services/reports/activity_log.rs +++ b/packages/windmill/src/services/reports/activity_log.rs @@ -12,8 +12,8 @@ use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use csv::WriterBuilder; use deadpool_postgres::Transaction; -use electoral_log::messages::message::Message; -use electoral_log::ElectoralLogMessage; +use electoral_log::client::types::*; +use electoral_log::messages::message::{Message, SigningData}; use sequent_core::services::date::ISO8601; use sequent_core::services::s3::get_minio_url; use sequent_core::types::hasura::core::TasksExecution; @@ -45,6 +45,53 @@ pub struct ActivityLogRow { user_id: String, } +impl TryFrom for ActivityLogRow { + type Error = anyhow::Error; + + fn try_from(electoral_log: ElectoralLogMessage) -> Result { + let user_id = match electoral_log.user_id { + Some(user_id) => user_id.to_string(), + None => "-".to_string(), + }; + + let statement_timestamp: String = if let Ok(datetime_parsed) = + ISO8601::timestamp_secs_utc_to_date_opt(electoral_log.statement_timestamp) + { + datetime_parsed.to_rfc3339() + } else { + return Err(anyhow::anyhow!("Error parsing statement_timestamp")); + }; + + let created: String = if let Ok(datetime_parsed) = + ISO8601::timestamp_secs_utc_to_date_opt(electoral_log.created) + { + datetime_parsed.to_rfc3339() + } else { + return Err(anyhow::anyhow!("Error parsing created")); + }; + + let deserialized_message = Message::strand_deserialize(&electoral_log.message) + .map_err(|e| anyhow!("Error deserializing message: {e:?}"))?; + + let head_data = deserialized_message.statement.head.clone(); + let event_type = head_data.event_type.to_string(); + let log_type = head_data.log_type.to_string(); + let description = head_data.description; + + Ok(ActivityLogRow { + id: electoral_log.id, + user_id: user_id, + created, + statement_timestamp, + statement_kind: electoral_log.statement_kind, + event_type, + log_type, + description, + message: deserialized_message.to_string(), + }) + } +} + /// Struct for User Data /// act_log is for PDF /// electoral_log is for CSV @@ -130,99 +177,6 @@ impl ActivityLogsTemplate { } } -impl TryFrom for ActivityLogRow { - type Error = anyhow::Error; - - fn try_from(electoral_log: ElectoralLogRow) -> Result { - let user_id = match electoral_log.user_id() { - Some(user_id) => user_id.to_string(), - None => "-".to_string(), - }; - - let statement_timestamp: String = if let Ok(datetime_parsed) = - ISO8601::timestamp_secs_utc_to_date_opt(electoral_log.statement_timestamp()) - { - datetime_parsed.to_rfc3339() - } else { - return Err(anyhow::anyhow!("Error parsing statement_timestamp")); - }; - - let created: String = if let Ok(datetime_parsed) = - ISO8601::timestamp_secs_utc_to_date_opt(electoral_log.created()) - { - datetime_parsed.to_rfc3339() - } else { - return Err(anyhow::anyhow!("Error parsing created")); - }; - - let head_data = electoral_log - .statement_head_data() - .with_context(|| "Error to get head data.")?; - let event_type = head_data.event_type; - let log_type = head_data.log_type; - let description = head_data.description; - - Ok(ActivityLogRow { - id: electoral_log.id(), - user_id, - created, - statement_timestamp, - statement_kind: electoral_log.statement_kind().to_string(), - event_type, - log_type, - description, - message: electoral_log.message().to_string(), - }) - } -} - -impl TryFrom for ActivityLogRow { - type Error = anyhow::Error; - - fn try_from(electoral_log: ElectoralLogMessage) -> Result { - let user_id = match electoral_log.user_id { - Some(user_id) => user_id.to_string(), - None => "-".to_string(), - }; - - let statement_timestamp: String = if let Ok(datetime_parsed) = - ISO8601::timestamp_secs_utc_to_date_opt(electoral_log.statement_timestamp) - { - datetime_parsed.to_rfc3339() - } else { - return Err(anyhow::anyhow!("Error parsing statement_timestamp")); - }; - - let created: String = if let Ok(datetime_parsed) = - ISO8601::timestamp_secs_utc_to_date_opt(electoral_log.created) - { - datetime_parsed.to_rfc3339() - } else { - return Err(anyhow::anyhow!("Error parsing created")); - }; - - let deserialized_message = Message::strand_deserialize(&electoral_log.message) - .map_err(|e| anyhow!("Error deserializing message: {e:?}"))?; - - let head_data = deserialized_message.statement.head.clone(); - let event_type = head_data.event_type.to_string(); - let log_type = head_data.log_type.to_string(); - let description = head_data.description; - - Ok(ActivityLogRow { - id: electoral_log.id, - user_id, - created, - statement_timestamp, - statement_kind: electoral_log.statement_kind, - event_type, - log_type, - description, - message: deserialized_message.to_string(), - }) - } -} - #[async_trait] impl TemplateRenderer for ActivityLogsTemplate { type UserData = UserData; @@ -255,7 +209,12 @@ impl TemplateRenderer for ActivityLogsTemplate { fn prefix(&self) -> String { format!("activity_logs_{}", rand::random::()) } - async fn count_items(&self, _hasura_transaction: &Transaction<'_>) -> Result> { + + #[instrument(err, skip_all)] + async fn count_items( + &self, + _hasura_transaction: Option<&Transaction<'_>>, + ) -> Result> { let mut client = get_board_client().await?; let slug = std::env::var("ENV_SLUG").with_context(|| "missing env var ENV_SLUG")?; let board_name = get_event_board( @@ -273,8 +232,8 @@ impl TemplateRenderer for ActivityLogsTemplate { #[instrument(err, skip_all)] async fn prepare_user_data_batch( &self, - _hasura_transaction: &Transaction<'_>, - _keycloak_transaction: &Transaction<'_>, + _hasura_transaction: Option<&Transaction<'_>>, + _keycloak_transaction: Option<&Transaction<'_>>, offset: &mut i64, limit: i64, ) -> Result { @@ -368,8 +327,8 @@ impl TemplateRenderer for ActivityLogsTemplate { .await } else { // Generate CSV file using generate_export_csv_data - let name = format!("export-election-event-logs-{}", election_event_id); - let full_name = format!("{}.csv", name); + let name = format!("export-election-event-logs-{election_event_id}"); + let full_name = format!("{name}.csv"); let temp_file = self .generate_export_csv_data(&name) .await diff --git a/packages/windmill/src/services/reports/template_renderer.rs b/packages/windmill/src/services/reports/template_renderer.rs index d57bb376975..a8b931007b3 100644 --- a/packages/windmill/src/services/reports/template_renderer.rs +++ b/packages/windmill/src/services/reports/template_renderer.rs @@ -111,13 +111,16 @@ pub trait TemplateRenderer: Debug { /// or from other place than the reports TAB. fn get_initial_template_alias(&self) -> Option; - async fn count_items(&self, hasura_transaction: &Transaction<'_>) -> Result> { + async fn count_items( + &self, + hasura_transaction: Option<&Transaction<'_>>, + ) -> Result> { Ok(None) } async fn prepare_user_data_batch( &self, - hasura_transaction: &Transaction<'_>, - keycloak_transaction: &Transaction<'_>, + hasura_transaction: Option<&Transaction<'_>>, + keycloak_transaction: Option<&Transaction<'_>>, offset: &mut i64, limit: i64, ) -> Result { @@ -384,9 +387,14 @@ pub trait TemplateRenderer: Debug { } else { if let (Some(o), Some(l)) = (offset, limit) { info!("Batched processing: offset = {o}, limit = {l}"); - self.prepare_user_data_batch(hasura_transaction, keycloak_transaction, o, l) - .await - .map_err(|e| anyhow!("Error preparing batched user data: {e:?}"))? + self.prepare_user_data_batch( + Some(hasura_transaction), + Some(keycloak_transaction), + o, + l, + ) + .await + .map_err(|e| anyhow!("Error preparing batched user data: {e:?}"))? } else { self.prepare_user_data(hasura_transaction, keycloak_transaction) .await @@ -499,7 +507,10 @@ pub trait TemplateRenderer: Debug { anyhow!("Error providing the user template and extra config: {e:?}") })?; - let items_count = self.count_items(&hasura_transaction).await?.unwrap_or(0); + let items_count = self + .count_items(Some(&hasura_transaction)) + .await? + .unwrap_or(0); let report_options = ext_cfg.report_options.clone(); let per_report_limit = report_options .max_items_per_report diff --git a/packages/windmill/src/tasks/activity_logs_report.rs b/packages/windmill/src/tasks/activity_logs_report.rs index 3f2d9157cd0..81186af3781 100644 --- a/packages/windmill/src/tasks/activity_logs_report.rs +++ b/packages/windmill/src/tasks/activity_logs_report.rs @@ -21,7 +21,7 @@ use anyhow::{anyhow, Context}; use celery::error::TaskError; use deadpool_postgres::Client as DbClient; use sequent_core::types::hasura::core::TasksExecution; -use tracing::instrument; +use tracing::{info, instrument}; async fn generate_activity_logs_report_impl( tenant_id: String, @@ -67,6 +67,7 @@ async fn generate_activity_logs_report_impl( format, ); + info!("Generating activity logs report"); report .execute_report( &document_id, diff --git a/packages/windmill/src/tasks/electoral_log.rs b/packages/windmill/src/tasks/electoral_log.rs index 9da6534ba26..5d62cac85da 100644 --- a/packages/windmill/src/tasks/electoral_log.rs +++ b/packages/windmill/src/tasks/electoral_log.rs @@ -15,13 +15,13 @@ use crate::types::error::{Error, Result}; use anyhow::{anyhow, Context}; use celery::error::TaskError; use deadpool_postgres::Client as DbClient; -use electoral_log::client::board_client::ElectoralLogMessage; +use electoral_log::client::types::ElectoralLogMessage; use immudb_rs::TxMode; use sequent_core::serialization::deserialize_with_path::deserialize_str; use sequent_core::services::keycloak::get_event_realm; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use tracing::{event, info, instrument}; +use tracing::{info, instrument}; use lapin::{ options::{BasicAckOptions, BasicGetOptions, QueueDeclareOptions}, diff --git a/packages/windmill/src/tasks/generate_report.rs b/packages/windmill/src/tasks/generate_report.rs index 2eee3e5f0ba..25f6150da45 100644 --- a/packages/windmill/src/tasks/generate_report.rs +++ b/packages/windmill/src/tasks/generate_report.rs @@ -116,6 +116,7 @@ pub async fn generate_report( .await?; }; } + info!("To generate report type: {report_type_str}"); match ReportType::from_str(&report_type_str) { Ok(ReportType::INITIALIZATION_REPORT) => { let report = InitializationTemplate::new(ids); diff --git a/packages/windmill/src/types/resources.rs b/packages/windmill/src/types/resources.rs index 6974d1c752d..dd909aa5945 100644 --- a/packages/windmill/src/types/resources.rs +++ b/packages/windmill/src/types/resources.rs @@ -7,31 +7,31 @@ use immudb_rs::{sql_value::Value, Client, NamedParam, Row, SqlValue}; use serde::{Deserialize, Serialize}; use strum_macros::{Display, EnumString}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Default)] pub struct Aggregate { pub count: i64, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Default)] pub struct TotalAggregate { pub aggregate: Aggregate, } -// Enumeration for the valid order directions -#[derive(Debug, Deserialize, EnumString, Display, Clone)] -#[serde(rename_all = "lowercase")] -#[strum(serialize_all = "lowercase")] -pub enum OrderDirection { - Asc, - Desc, -} - #[derive(Serialize, Deserialize, Debug)] pub struct DataList { pub items: Vec, pub total: TotalAggregate, } +impl Default for DataList { + fn default() -> Self { + DataList { + items: vec![], + total: TotalAggregate::default(), + } + } +} + impl TryFrom<&Row> for Aggregate { type Error = anyhow::Error;