From 856247cdfe046222a0616e735eace85909fa489a Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Sat, 27 Sep 2025 03:41:36 +0800 Subject: [PATCH] refactor: decouple auth_manager and pg_catalog --- datafusion-postgres/src/auth.rs | 113 ++-------------- datafusion-postgres/src/handlers.rs | 5 +- datafusion-postgres/src/pg_catalog.rs | 30 +++-- datafusion-postgres/src/pg_catalog/context.rs | 121 ++++++++++++++++++ .../src/pg_catalog/pg_roles.rs | 18 +-- 5 files changed, 164 insertions(+), 123 deletions(-) create mode 100644 datafusion-postgres/src/pg_catalog/context.rs diff --git a/datafusion-postgres/src/auth.rs b/datafusion-postgres/src/auth.rs index e873e45..f984183 100644 --- a/datafusion-postgres/src/auth.rs +++ b/datafusion-postgres/src/auth.rs @@ -6,105 +6,7 @@ use pgwire::api::auth::{AuthSource, LoginInfo, Password}; use pgwire::error::{PgWireError, PgWireResult}; use tokio::sync::RwLock; -/// User information stored in the authentication system -#[derive(Debug, Clone)] -pub struct User { - pub username: String, - pub password_hash: String, - pub roles: Vec, - pub is_superuser: bool, - pub can_login: bool, - pub connection_limit: Option, -} - -/// Permission types for granular access control -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum Permission { - Select, - Insert, - Update, - Delete, - Create, - Drop, - Alter, - Index, - References, - Trigger, - Execute, - Usage, - Connect, - Temporary, - All, -} - -impl Permission { - pub fn from_string(s: &str) -> Option { - match s.to_uppercase().as_str() { - "SELECT" => Some(Permission::Select), - "INSERT" => Some(Permission::Insert), - "UPDATE" => Some(Permission::Update), - "DELETE" => Some(Permission::Delete), - "CREATE" => Some(Permission::Create), - "DROP" => Some(Permission::Drop), - "ALTER" => Some(Permission::Alter), - "INDEX" => Some(Permission::Index), - "REFERENCES" => Some(Permission::References), - "TRIGGER" => Some(Permission::Trigger), - "EXECUTE" => Some(Permission::Execute), - "USAGE" => Some(Permission::Usage), - "CONNECT" => Some(Permission::Connect), - "TEMPORARY" => Some(Permission::Temporary), - "ALL" => Some(Permission::All), - _ => None, - } - } -} - -/// Resource types for access control -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum ResourceType { - Table(String), - Schema(String), - Database(String), - Function(String), - Sequence(String), - All, -} - -/// Grant entry for specific permissions on resources -#[derive(Debug, Clone)] -pub struct Grant { - pub permission: Permission, - pub resource: ResourceType, - pub granted_by: String, - pub with_grant_option: bool, -} - -/// Role information for access control -#[derive(Debug, Clone)] -pub struct Role { - pub name: String, - pub is_superuser: bool, - pub can_login: bool, - pub can_create_db: bool, - pub can_create_role: bool, - pub can_create_user: bool, - pub can_replication: bool, - pub grants: Vec, - pub inherited_roles: Vec, -} - -/// Role configuration for creation -#[derive(Debug, Clone)] -pub struct RoleConfig { - pub name: String, - pub is_superuser: bool, - pub can_login: bool, - pub can_create_db: bool, - pub can_create_role: bool, - pub can_create_user: bool, - pub can_replication: bool, -} +use crate::pg_catalog::context::*; /// Authentication manager that handles users and roles #[derive(Debug)] @@ -645,6 +547,19 @@ impl SimpleAuthSource { } } +#[async_trait] +impl PgCatalogContextProvider for Arc { + // retrieve all database role names + async fn roles(&self) -> Vec { + self.list_roles().await + } + + // retrieve database role information + async fn role(&self, name: &str) -> Option { + self.role(name).await + } +} + #[async_trait] impl AuthSource for SimpleAuthSource { async fn get_password(&self, login: &LoginInfo) -> PgWireResult { diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index e23a3a5..0ee40cc 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::auth::{AuthManager, Permission, ResourceType}; -use crate::sql::PostgresCompatibilityParser; use async_trait::async_trait; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::common::ToDFSchema; @@ -26,6 +24,9 @@ use pgwire::error::{PgWireError, PgWireResult}; use pgwire::messages::response::TransactionStatus; use tokio::sync::Mutex; +use crate::auth::AuthManager; +use crate::pg_catalog::context::{Permission, ResourceType}; +use crate::sql::PostgresCompatibilityParser; use arrow_pg::datatypes::df; use arrow_pg::datatypes::{arrow_schema_to_pg_fields, into_pg_type}; diff --git a/datafusion-postgres/src/pg_catalog.rs b/datafusion-postgres/src/pg_catalog.rs index cbf0efe..1cc731b 100644 --- a/datafusion-postgres/src/pg_catalog.rs +++ b/datafusion-postgres/src/pg_catalog.rs @@ -20,11 +20,12 @@ use datafusion::prelude::{create_udf, Expr, SessionContext}; use postgres_types::Oid; use tokio::sync::RwLock; -use crate::auth::AuthManager; use crate::pg_catalog::catalog_info::CatalogInfo; +use crate::pg_catalog::context::PgCatalogContextProvider; use crate::pg_catalog::empty_table::EmptyTable; pub mod catalog_info; +pub mod context; pub mod empty_table; pub mod format_type; pub mod has_privilege_udf; @@ -192,16 +193,16 @@ pub(crate) enum OidCacheKey { // Create custom schema provider for pg_catalog #[derive(Debug)] -pub struct PgCatalogSchemaProvider { +pub struct PgCatalogSchemaProvider { catalog_list: C, oid_counter: Arc, oid_cache: Arc>>, static_tables: Arc, - auth_manager: Arc, + context_provider: P, } #[async_trait] -impl SchemaProvider for PgCatalogSchemaProvider { +impl SchemaProvider for PgCatalogSchemaProvider { fn as_any(&self) -> &dyn std::any::Any { self } @@ -224,18 +225,18 @@ impl SchemaProvider for PgCatalogSchemaProvider { } } -impl PgCatalogSchemaProvider { +impl PgCatalogSchemaProvider { pub fn try_new( catalog_list: C, static_tables: Arc, - auth_manager: Arc, - ) -> Result> { + context_provider: P, + ) -> Result> { Ok(Self { catalog_list, oid_counter: Arc::new(AtomicU32::new(16384)), oid_cache: Arc::new(RwLock::new(HashMap::new())), static_tables, - auth_manager, + context_provider, }) } @@ -413,7 +414,7 @@ impl PgCatalogSchemaProvider { Ok(Some(PgCatalogTable::Dynamic(table))) } PG_CATALOG_VIEW_PG_ROLES => { - let table = Arc::new(pg_roles::PgRolesTable::new(Arc::clone(&self.auth_manager))); + let table = Arc::new(pg_roles::PgRolesTable::new(self.context_provider.clone())); Ok(Some(PgCatalogTable::Dynamic(table))) } @@ -1278,16 +1279,19 @@ pub fn create_pg_backend_pid_udf() -> ScalarUDF { const BACKEND_PID: i32 = 1; /// Install pg_catalog and postgres UDFs to current `SessionContext` -pub fn setup_pg_catalog( +pub fn setup_pg_catalog

( session_context: &SessionContext, catalog_name: &str, - auth_manager: Arc, -) -> Result<(), Box> { + context_provider: P, +) -> Result<(), Box> +where + P: PgCatalogContextProvider, +{ let static_tables = Arc::new(PgCatalogStaticTables::try_new()?); let pg_catalog = PgCatalogSchemaProvider::try_new( session_context.state().catalog_list().clone(), static_tables.clone(), - auth_manager, + context_provider, )?; session_context .catalog(catalog_name) diff --git a/datafusion-postgres/src/pg_catalog/context.rs b/datafusion-postgres/src/pg_catalog/context.rs new file mode 100644 index 0000000..78c7de1 --- /dev/null +++ b/datafusion-postgres/src/pg_catalog/context.rs @@ -0,0 +1,121 @@ +use std::fmt::Debug; + +use async_trait::async_trait; + +#[async_trait] +pub trait PgCatalogContextProvider: Clone + Debug + Send + Sync + 'static { + // retrieve all database role names + async fn roles(&self) -> Vec { + vec![] + } + + // retrieve database role information + async fn role(&self, _name: &str) -> Option { + None + } +} + +#[derive(Debug, Clone)] +pub struct EmptyContextProvider; + +impl PgCatalogContextProvider for EmptyContextProvider {} + +/// User information stored in the authentication system +#[derive(Debug, Clone)] +pub struct User { + pub username: String, + pub password_hash: String, + pub roles: Vec, + pub is_superuser: bool, + pub can_login: bool, + pub connection_limit: Option, +} + +/// Permission types for granular access control +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum Permission { + Select, + Insert, + Update, + Delete, + Create, + Drop, + Alter, + Index, + References, + Trigger, + Execute, + Usage, + Connect, + Temporary, + All, +} + +impl Permission { + pub fn from_string(s: &str) -> Option { + match s.to_uppercase().as_str() { + "SELECT" => Some(Permission::Select), + "INSERT" => Some(Permission::Insert), + "UPDATE" => Some(Permission::Update), + "DELETE" => Some(Permission::Delete), + "CREATE" => Some(Permission::Create), + "DROP" => Some(Permission::Drop), + "ALTER" => Some(Permission::Alter), + "INDEX" => Some(Permission::Index), + "REFERENCES" => Some(Permission::References), + "TRIGGER" => Some(Permission::Trigger), + "EXECUTE" => Some(Permission::Execute), + "USAGE" => Some(Permission::Usage), + "CONNECT" => Some(Permission::Connect), + "TEMPORARY" => Some(Permission::Temporary), + "ALL" => Some(Permission::All), + _ => None, + } + } +} + +/// Resource types for access control +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum ResourceType { + Table(String), + Schema(String), + Database(String), + Function(String), + Sequence(String), + All, +} + +/// Grant entry for specific permissions on resources +#[derive(Debug, Clone)] +pub struct Grant { + pub permission: Permission, + pub resource: ResourceType, + pub granted_by: String, + pub with_grant_option: bool, +} + +/// Role information for access control +#[derive(Debug, Clone)] +pub struct Role { + pub name: String, + pub is_superuser: bool, + pub can_login: bool, + pub can_create_db: bool, + pub can_create_role: bool, + pub can_create_user: bool, + pub can_replication: bool, + pub grants: Vec, + pub inherited_roles: Vec, +} + +/// Role configuration for creation +#[derive(Debug, Clone)] +pub struct RoleConfig { + pub name: String, + pub is_superuser: bool, + pub can_login: bool, + pub can_create_db: bool, + pub can_create_role: bool, + pub can_create_user: bool, + pub can_replication: bool, +} diff --git a/datafusion-postgres/src/pg_catalog/pg_roles.rs b/datafusion-postgres/src/pg_catalog/pg_roles.rs index 8c468a2..b8bf8e9 100644 --- a/datafusion-postgres/src/pg_catalog/pg_roles.rs +++ b/datafusion-postgres/src/pg_catalog/pg_roles.rs @@ -10,16 +10,16 @@ use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::streaming::PartitionStream; -use crate::auth::AuthManager; +use crate::pg_catalog::context::PgCatalogContextProvider; #[derive(Debug, Clone)] -pub(crate) struct PgRolesTable { +pub(crate) struct PgRolesTable

{ schema: SchemaRef, - auth_manager: Arc, + context_provider: P, } -impl PgRolesTable { - pub(crate) fn new(auth_manager: Arc) -> Self { +impl PgRolesTable

{ + pub(crate) fn new(context_provider: P) -> Self { let schema = Arc::new(Schema::new(vec![ Field::new("rolname", DataType::Utf8, true), Field::new("rolsuper", DataType::Boolean, true), @@ -49,7 +49,7 @@ impl PgRolesTable { Self { schema, - auth_manager, + context_provider, } } @@ -68,8 +68,8 @@ impl PgRolesTable { let mut rolconfig: Vec>> = Vec::new(); let mut oid: Vec = Vec::new(); - for role_name in &this.auth_manager.list_roles().await { - let role = &this.auth_manager.get_role(role_name).await.unwrap(); + for role_name in &this.context_provider.roles().await { + let role = &this.context_provider.role(role_name).await.unwrap(); rolname.push(role.name.clone()); rolsuper.push(role.is_superuser); rolinherit.push(true); @@ -128,7 +128,7 @@ impl PgRolesTable { } } -impl PartitionStream for PgRolesTable { +impl PartitionStream for PgRolesTable

{ fn schema(&self) -> &SchemaRef { &self.schema }