Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ CREATE foreign data wrapper etcd_fdw handler etcd_fdw_handler validator etcd_fdw
CREATE SERVER my_etcd_server foreign data wrapper etcd_fdw options (connstr '127.0.0.1:2379');
```

```sql
CREATE USER MAPPING FOR CURRENT_USER SERVER my_etcd_server OPTIONS (user 'root', password 'secret');
```

```sql
CREATE foreign table test (key text, value text) server my_etcd_server options(rowid_column 'key');
```
Expand Down Expand Up @@ -86,7 +90,7 @@ foreign server itself.
`etcd_fdw` now also supports limit offset push-down. Wherever possible,
perform LIMIT operations on the remote server.

#### WHERE push-down
### WHERE push-down

`etcd_fdw` now supports WHERE clause push-down for simple key-based comparisons. Whenever possible, equality and range conditions are translated into etcd key scans, so filtering is done on the remote server.
Currently supported operators: `=`, `>=`, `>`, `<=`, `<`, `BETWEEN`, and `LIKE 'prefix%'`.
Expand All @@ -98,7 +102,7 @@ This behavior is consistent with the prefix, range_end, and key options in `CREA

`etcd_fdw` accepts the following options via the `CREATE SERVER` command:

- **connstr** as *string*, requuired
- **connstr** as *string*, required

Connetion string for etcd server i.e. `127.0.0.1:2379`

Expand All @@ -121,14 +125,6 @@ This behavior is consistent with the prefix, range_end, and key options in `CREA
The domain name to use for verifying the server’s TLS certificate during the handshake.
This value must match the Common Name (CN) or one of the Subject Alternative Names (SANs) in the server’s certificate.

- **username** as *string*, optional, no default

Username to use when connecting to etcd.

- **password** as *string*, optional, no default

Password to authenticate to the etcd server with.

- **connect_timeout** as *string*, optional, default = `10`

Timeout in seconds for establishing the initial connection to the etcd server.
Expand All @@ -142,7 +138,7 @@ This behavior is consistent with the prefix, range_end, and key options in `CREA
`etcd_fdw` accepts the following table-level options via the
`CREATE FOREIGN TABLE` command.

- **rowid_column** as *string*, mandatory, no default
- **rowid_column** as *string*, required, no default

Specifies which column should be treated as the unique row identifier.
Usually set to key.
Expand Down Expand Up @@ -186,6 +182,19 @@ This behavior is consistent with the prefix, range_end, and key options in `CREA
Serializable(`s`), Allows serving results from a local etcd member without cluster-wide consensus.
Serializable reads are faster and lighter on the cluster, but may return stale data in some cases

### CREATE USER MAPPING options

`etcd_fdw` accepts the following user mapping options via the
`CREATE USER MAPPING` command.

- **user** as *string*, required, no default

User to use when connecting to etcd.

- **password** as *string*, required, no default

Password to authenticate to the etcd server with.

## What doesn't work

etcd_fdw supports almost all kinds of CRUD operations. What doesn't work is modifying the key (which is the rowid value) directly using `UPDATE` statements.
Expand Down
154 changes: 139 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct EtcdConfig {
pub ca_cert_path: Option<String>,
pub client_cert_path: Option<String>,
pub client_key_path: Option<String>,
pub username: Option<String>,
pub user: Option<String>,
pub password: Option<String>,
pub servername: Option<String>,
pub connect_timeout: Duration,
Expand All @@ -40,7 +40,7 @@ impl Default for EtcdConfig {
ca_cert_path: None,
client_cert_path: None,
client_key_path: None,
username: None,
user: None,
password: None,
servername: None,
connect_timeout: Duration::from_secs(10),
Expand All @@ -66,7 +66,7 @@ pub enum EtcdFdwError {
#[error("KeyFile and CertFile must both be present.")]
CertKeyMismatch(()),

#[error("Username and Password must both be specified.")]
#[error("User and Password must both be specified through user mappings.")]
UserPassMismatch(()),

#[error("Column {0} is not contained in the input dataset")]
Expand Down Expand Up @@ -101,7 +101,7 @@ impl From<EtcdFdwError> for ErrorReport {
}

/// Check whether dependent options exits
/// i.e username & pass, cert & key
/// i.e user & pass, cert & key
fn require_pair(
a: bool,
b: bool,
Expand Down Expand Up @@ -167,8 +167,8 @@ pub async fn connect_etcd(config: EtcdConfig) -> Result<Client, Error> {
connect_options = connect_options.with_tls(tls_options);
}

// Load Username and Password
if let (Some(user), Some(pass)) = (&config.username, &config.password) {
// Load User and Password
if let (Some(user), Some(pass)) = (&config.user, &config.password) {
connect_options = connect_options.with_user(user, pass);
}

Expand All @@ -192,29 +192,57 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
None => return Err(EtcdFdwError::NoConnStr(())),
};

// TODO: username & pass should be captured separately i.e. from CREATE USER MAPPING
let cacert_path = server.options.get("ssl_ca").cloned();
let cert_path = server.options.get("ssl_cert").cloned();
let key_path = server.options.get("ssl_key").cloned();
let servername = server.options.get("ssl_servername").cloned();
let username = server.options.get("username").cloned();
let password = server.options.get("password").cloned();

// Parse timeouts with defaults
let connect_timeout = parse_timeout(&server.options, "connect_timeout", config.connect_timeout)?;
let request_timeout = parse_timeout(&server.options, "request_timeout", config.request_timeout)?;

// ssl_cert + ssl_key must be both present or both absent
// username + password must be both present or both absent
require_pair(cert_path.is_some(), key_path.is_some(), EtcdFdwError::CertKeyMismatch(()))?;
require_pair(username.is_some(), password.is_some(), EtcdFdwError::UserPassMismatch(()))?;

let mut user = None;
let mut password = None;

unsafe {
let usermapping = pg_sys::GetUserMapping(pg_sys::GetUserId(), server.server_oid);
let options = (*usermapping).options;
pgrx::memcx::current_context(|mcx| {
let list = pgrx::list::List::<*mut std::ffi::c_void>::downcast_ptr_in_memcx(options, mcx).unwrap();
for option in list.iter() {
let option = *option as *mut pg_sys::DefElem;
let name = std::ffi::CStr::from_ptr((*option).defname);
let value = std::ffi::CStr::from_ptr(pg_sys::defGetString(option));
let name = name.to_str().map_err(|_| {
OptionsError::OptionNameIsInvalidUtf8(
String::from_utf8_lossy(name.to_bytes()).to_string(),
)
});
let value = value.to_str().map_err(|_| {
OptionsError::OptionValueIsInvalidUtf8(
String::from_utf8_lossy(value.to_bytes()).to_string(),
)
});
if let (Ok(name), Ok(value)) = (name, value) {
match name {
"user" => user = Some(value.to_string()),
"password" => password = Some(value.to_string()),
_ => {}
}
}
}
});
}

config = EtcdConfig {
endpoints: vec![connstr],
ca_cert_path: cacert_path,
client_cert_path: cert_path,
client_key_path: key_path,
username: username,
user: user,
password: password,
servername: servername,
connect_timeout: connect_timeout,
Expand Down Expand Up @@ -613,11 +641,8 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {

let cacert_path_exists = check_options_contain(&options, "ssl_ca").is_ok();
let cert_path_exists = check_options_contain(&options, "ssl_cert").is_ok();
let username_exists = check_options_contain(&options, "username").is_ok();
let password_exists = check_options_contain(&options, "password").is_ok();

require_pair(cacert_path_exists, cert_path_exists, EtcdFdwError::CertKeyMismatch(()))?;
require_pair(username_exists, password_exists, EtcdFdwError::UserPassMismatch(()))?;
} else if oid == FOREIGN_TABLE_RELATION_ID {
check_options_contain(&options, "rowid_column")?;

Expand All @@ -632,6 +657,11 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
if prefix_exists && key_exists {
return Err(EtcdFdwError::ConflictingPrefixAndKey);
}
} else if oid == pg_sys::BuiltinOid::UserMappingRelationId.value() {
let user_exists = check_options_contain(&options, "user").is_ok();
let password_exists = check_options_contain(&options, "password").is_ok();

require_pair(user_exists, password_exists, EtcdFdwError::UserPassMismatch(()))?;
}
}

Expand All @@ -658,6 +688,7 @@ mod tests {
use std::time::Duration;

use super::*;
use etcd_client::Permission;
use testcontainers::{
core::{IntoContainerPort, WaitFor},
runners::SyncRunner,
Expand All @@ -672,6 +703,28 @@ mod tests {
"http://0.0.0.0:2379",
];

const ETCD_USER: &str = "root";
const ETCD_PASS: &str = "secret";

// Setup etcd root role/user and enable authentication
async fn etcd_auth_setup(endpoint: String) {
let mut client: Client = Client::connect([endpoint], None)
.await
.expect("connect etcd");

// add root user and role
client.role_add("root").await.expect("add role");
client.user_add(ETCD_USER, ETCD_PASS, None)
.await
.expect("add user");

client.user_grant_role(ETCD_USER, "root")
.await
.expect("grant role");

client.auth_enable().await.expect("enable auth");
}

fn create_container() -> (Container<GenericImage>, String) {
let container = GenericImage::new("quay.io/coreos/etcd", "v3.6.4")
.with_exposed_port(2379.tcp())
Expand All @@ -693,6 +746,8 @@ mod tests {
.expect("Exposed host port should be available");

let url = format!("{}:{}", host, port);
let rt = tokio::runtime::Runtime::new().expect("Tokio runtime should be initialized");
rt.block_on(etcd_auth_setup(url.clone()));
(container, url)
}

Expand All @@ -709,6 +764,16 @@ mod tests {
)
.expect("Server should have been created");

// Create a user mapping
Spi::run(
format!(
"CREATE USER MAPPING FOR CURRENT_USER SERVER etcd_test_server options (user '{}', password '{}')",
ETCD_USER, ETCD_PASS
)
.as_str(),
)
.expect("User mapping should have been created");

// Create a foreign table
Spi::run("CREATE FOREIGN TABLE test (key text, value text) server etcd_test_server options (rowid_column 'key')").expect("Test table should have been created");
}
Expand Down Expand Up @@ -829,4 +894,63 @@ mod tests {

assert_eq!(Some(format!("original_value")), query_result);
}

#[pg_test]
fn test_user_mapping_validation() {
let (_container, url) = create_container();

create_fdt(url.clone());

// Insert test data
Spi::run("INSERT INTO test (key, value) VALUES ('/gather', 'data')")
.expect("INSERT should work");

// Test 1: User mapping with invalid credentials (should fail)
Spi::run("ALTER USER MAPPING FOR CURRENT_USER SERVER etcd_test_server OPTIONS (SET password 'wrong_password');")
.expect("Alter user mapping should work");

let result = std::panic::catch_unwind(|| {
Spi::run("SELECT * FROM test;").expect("SELECT should work");
});

assert!(result.is_err(), "Expected SELECT to fail due to invalid user mapping");

// Setup: create a role and user with limited permissions in etcd
let rt = tokio::runtime::Runtime::new().expect("Tokio runtime should be initialized");
rt.block_on(
async {
let mut client: Client = Client::connect([url.clone()], Some(ConnectOptions::new().with_user(ETCD_USER, ETCD_PASS)))
.await
.expect("connect etcd");
client.role_add("rw_role").await.expect("add role");
// role with read and write permissions on keys starting with "/"
client.role_grant_permission("rw_role", Permission::with_from_key(Permission::read_write("/")))
.await
.expect("grant permission");
client.user_add("etcd_user", "secret", None)
.await
.expect("add user");
client.user_grant_role("etcd_user", "rw_role")
.await
.expect("grant role");
}
);

// Alter user mapping to use the new limited permissions user
Spi::run("ALTER USER MAPPING FOR CURRENT_USER SERVER etcd_test_server OPTIONS (SET user 'etcd_user', SET password 'secret');")
.expect("Alter user mapping should work");

// Test 2: Selecting a key outside of the user's permissions (should fail)
let invalid_result = std::panic::catch_unwind(|| {
Spi::run("SELECT * FROM test;").expect("SELECT should work");
});

assert!(invalid_result.is_err(), "Expected SELECT to fail due to insufficient permissions");

// Test 3: Selecting a key within the user's permissions
let result = Spi::get_two::<String, String>("SELECT * FROM test WHERE key = '/gather'")
.expect("SELECT with proper permissions should work");

assert_eq!((Some(format!("/gather")), Some(format!("data"))), result);
}
}
Loading