Skip to content

Commit 7c1db5e

Browse files
authored
refactor: decouple auth_manager and pg_catalog (#192)
1 parent d0a3401 commit 7c1db5e

File tree

5 files changed

+164
-123
lines changed

5 files changed

+164
-123
lines changed

datafusion-postgres/src/auth.rs

Lines changed: 14 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -6,105 +6,7 @@ use pgwire::api::auth::{AuthSource, LoginInfo, Password};
66
use pgwire::error::{PgWireError, PgWireResult};
77
use tokio::sync::RwLock;
88

9-
/// User information stored in the authentication system
10-
#[derive(Debug, Clone)]
11-
pub struct User {
12-
pub username: String,
13-
pub password_hash: String,
14-
pub roles: Vec<String>,
15-
pub is_superuser: bool,
16-
pub can_login: bool,
17-
pub connection_limit: Option<i32>,
18-
}
19-
20-
/// Permission types for granular access control
21-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
22-
pub enum Permission {
23-
Select,
24-
Insert,
25-
Update,
26-
Delete,
27-
Create,
28-
Drop,
29-
Alter,
30-
Index,
31-
References,
32-
Trigger,
33-
Execute,
34-
Usage,
35-
Connect,
36-
Temporary,
37-
All,
38-
}
39-
40-
impl Permission {
41-
pub fn from_string(s: &str) -> Option<Permission> {
42-
match s.to_uppercase().as_str() {
43-
"SELECT" => Some(Permission::Select),
44-
"INSERT" => Some(Permission::Insert),
45-
"UPDATE" => Some(Permission::Update),
46-
"DELETE" => Some(Permission::Delete),
47-
"CREATE" => Some(Permission::Create),
48-
"DROP" => Some(Permission::Drop),
49-
"ALTER" => Some(Permission::Alter),
50-
"INDEX" => Some(Permission::Index),
51-
"REFERENCES" => Some(Permission::References),
52-
"TRIGGER" => Some(Permission::Trigger),
53-
"EXECUTE" => Some(Permission::Execute),
54-
"USAGE" => Some(Permission::Usage),
55-
"CONNECT" => Some(Permission::Connect),
56-
"TEMPORARY" => Some(Permission::Temporary),
57-
"ALL" => Some(Permission::All),
58-
_ => None,
59-
}
60-
}
61-
}
62-
63-
/// Resource types for access control
64-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65-
pub enum ResourceType {
66-
Table(String),
67-
Schema(String),
68-
Database(String),
69-
Function(String),
70-
Sequence(String),
71-
All,
72-
}
73-
74-
/// Grant entry for specific permissions on resources
75-
#[derive(Debug, Clone)]
76-
pub struct Grant {
77-
pub permission: Permission,
78-
pub resource: ResourceType,
79-
pub granted_by: String,
80-
pub with_grant_option: bool,
81-
}
82-
83-
/// Role information for access control
84-
#[derive(Debug, Clone)]
85-
pub struct Role {
86-
pub name: String,
87-
pub is_superuser: bool,
88-
pub can_login: bool,
89-
pub can_create_db: bool,
90-
pub can_create_role: bool,
91-
pub can_create_user: bool,
92-
pub can_replication: bool,
93-
pub grants: Vec<Grant>,
94-
pub inherited_roles: Vec<String>,
95-
}
96-
97-
/// Role configuration for creation
98-
#[derive(Debug, Clone)]
99-
pub struct RoleConfig {
100-
pub name: String,
101-
pub is_superuser: bool,
102-
pub can_login: bool,
103-
pub can_create_db: bool,
104-
pub can_create_role: bool,
105-
pub can_create_user: bool,
106-
pub can_replication: bool,
107-
}
9+
use crate::pg_catalog::context::*;
10810

10911
/// Authentication manager that handles users and roles
11012
#[derive(Debug)]
@@ -645,6 +547,19 @@ impl SimpleAuthSource {
645547
}
646548
}
647549

550+
#[async_trait]
551+
impl PgCatalogContextProvider for Arc<AuthManager> {
552+
// retrieve all database role names
553+
async fn roles(&self) -> Vec<String> {
554+
self.list_roles().await
555+
}
556+
557+
// retrieve database role information
558+
async fn role(&self, name: &str) -> Option<Role> {
559+
self.role(name).await
560+
}
561+
}
562+
648563
#[async_trait]
649564
impl AuthSource for SimpleAuthSource {
650565
async fn get_password(&self, login: &LoginInfo) -> PgWireResult<Password> {

datafusion-postgres/src/handlers.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use std::collections::HashMap;
22
use std::sync::Arc;
33

4-
use crate::auth::{AuthManager, Permission, ResourceType};
5-
use crate::sql::PostgresCompatibilityParser;
64
use async_trait::async_trait;
75
use datafusion::arrow::datatypes::{DataType, Field, Schema};
86
use datafusion::common::ToDFSchema;
@@ -26,6 +24,9 @@ use pgwire::error::{PgWireError, PgWireResult};
2624
use pgwire::messages::response::TransactionStatus;
2725
use tokio::sync::Mutex;
2826

27+
use crate::auth::AuthManager;
28+
use crate::pg_catalog::context::{Permission, ResourceType};
29+
use crate::sql::PostgresCompatibilityParser;
2930
use arrow_pg::datatypes::df;
3031
use arrow_pg::datatypes::{arrow_schema_to_pg_fields, into_pg_type};
3132

datafusion-postgres/src/pg_catalog.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ use datafusion::prelude::{create_udf, Expr, SessionContext};
2020
use postgres_types::Oid;
2121
use tokio::sync::RwLock;
2222

23-
use crate::auth::AuthManager;
2423
use crate::pg_catalog::catalog_info::CatalogInfo;
24+
use crate::pg_catalog::context::PgCatalogContextProvider;
2525
use crate::pg_catalog::empty_table::EmptyTable;
2626

2727
pub mod catalog_info;
28+
pub mod context;
2829
pub mod empty_table;
2930
pub mod format_type;
3031
pub mod has_privilege_udf;
@@ -192,16 +193,16 @@ pub(crate) enum OidCacheKey {
192193

193194
// Create custom schema provider for pg_catalog
194195
#[derive(Debug)]
195-
pub struct PgCatalogSchemaProvider<C> {
196+
pub struct PgCatalogSchemaProvider<C, P> {
196197
catalog_list: C,
197198
oid_counter: Arc<AtomicU32>,
198199
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
199200
static_tables: Arc<PgCatalogStaticTables>,
200-
auth_manager: Arc<AuthManager>,
201+
context_provider: P,
201202
}
202203

203204
#[async_trait]
204-
impl<C: CatalogInfo> SchemaProvider for PgCatalogSchemaProvider<C> {
205+
impl<C: CatalogInfo, P: PgCatalogContextProvider> SchemaProvider for PgCatalogSchemaProvider<C, P> {
205206
fn as_any(&self) -> &dyn std::any::Any {
206207
self
207208
}
@@ -224,18 +225,18 @@ impl<C: CatalogInfo> SchemaProvider for PgCatalogSchemaProvider<C> {
224225
}
225226
}
226227

227-
impl<C: CatalogInfo> PgCatalogSchemaProvider<C> {
228+
impl<C: CatalogInfo, P: PgCatalogContextProvider> PgCatalogSchemaProvider<C, P> {
228229
pub fn try_new(
229230
catalog_list: C,
230231
static_tables: Arc<PgCatalogStaticTables>,
231-
auth_manager: Arc<AuthManager>,
232-
) -> Result<PgCatalogSchemaProvider<C>> {
232+
context_provider: P,
233+
) -> Result<PgCatalogSchemaProvider<C, P>> {
233234
Ok(Self {
234235
catalog_list,
235236
oid_counter: Arc::new(AtomicU32::new(16384)),
236237
oid_cache: Arc::new(RwLock::new(HashMap::new())),
237238
static_tables,
238-
auth_manager,
239+
context_provider,
239240
})
240241
}
241242

@@ -413,7 +414,7 @@ impl<C: CatalogInfo> PgCatalogSchemaProvider<C> {
413414
Ok(Some(PgCatalogTable::Dynamic(table)))
414415
}
415416
PG_CATALOG_VIEW_PG_ROLES => {
416-
let table = Arc::new(pg_roles::PgRolesTable::new(Arc::clone(&self.auth_manager)));
417+
let table = Arc::new(pg_roles::PgRolesTable::new(self.context_provider.clone()));
417418
Ok(Some(PgCatalogTable::Dynamic(table)))
418419
}
419420

@@ -1278,16 +1279,19 @@ pub fn create_pg_backend_pid_udf() -> ScalarUDF {
12781279
const BACKEND_PID: i32 = 1;
12791280

12801281
/// Install pg_catalog and postgres UDFs to current `SessionContext`
1281-
pub fn setup_pg_catalog(
1282+
pub fn setup_pg_catalog<P>(
12821283
session_context: &SessionContext,
12831284
catalog_name: &str,
1284-
auth_manager: Arc<AuthManager>,
1285-
) -> Result<(), Box<DataFusionError>> {
1285+
context_provider: P,
1286+
) -> Result<(), Box<DataFusionError>>
1287+
where
1288+
P: PgCatalogContextProvider,
1289+
{
12861290
let static_tables = Arc::new(PgCatalogStaticTables::try_new()?);
12871291
let pg_catalog = PgCatalogSchemaProvider::try_new(
12881292
session_context.state().catalog_list().clone(),
12891293
static_tables.clone(),
1290-
auth_manager,
1294+
context_provider,
12911295
)?;
12921296
session_context
12931297
.catalog(catalog_name)
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use std::fmt::Debug;
2+
3+
use async_trait::async_trait;
4+
5+
#[async_trait]
6+
pub trait PgCatalogContextProvider: Clone + Debug + Send + Sync + 'static {
7+
// retrieve all database role names
8+
async fn roles(&self) -> Vec<String> {
9+
vec![]
10+
}
11+
12+
// retrieve database role information
13+
async fn role(&self, _name: &str) -> Option<Role> {
14+
None
15+
}
16+
}
17+
18+
#[derive(Debug, Clone)]
19+
pub struct EmptyContextProvider;
20+
21+
impl PgCatalogContextProvider for EmptyContextProvider {}
22+
23+
/// User information stored in the authentication system
24+
#[derive(Debug, Clone)]
25+
pub struct User {
26+
pub username: String,
27+
pub password_hash: String,
28+
pub roles: Vec<String>,
29+
pub is_superuser: bool,
30+
pub can_login: bool,
31+
pub connection_limit: Option<i32>,
32+
}
33+
34+
/// Permission types for granular access control
35+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36+
pub enum Permission {
37+
Select,
38+
Insert,
39+
Update,
40+
Delete,
41+
Create,
42+
Drop,
43+
Alter,
44+
Index,
45+
References,
46+
Trigger,
47+
Execute,
48+
Usage,
49+
Connect,
50+
Temporary,
51+
All,
52+
}
53+
54+
impl Permission {
55+
pub fn from_string(s: &str) -> Option<Permission> {
56+
match s.to_uppercase().as_str() {
57+
"SELECT" => Some(Permission::Select),
58+
"INSERT" => Some(Permission::Insert),
59+
"UPDATE" => Some(Permission::Update),
60+
"DELETE" => Some(Permission::Delete),
61+
"CREATE" => Some(Permission::Create),
62+
"DROP" => Some(Permission::Drop),
63+
"ALTER" => Some(Permission::Alter),
64+
"INDEX" => Some(Permission::Index),
65+
"REFERENCES" => Some(Permission::References),
66+
"TRIGGER" => Some(Permission::Trigger),
67+
"EXECUTE" => Some(Permission::Execute),
68+
"USAGE" => Some(Permission::Usage),
69+
"CONNECT" => Some(Permission::Connect),
70+
"TEMPORARY" => Some(Permission::Temporary),
71+
"ALL" => Some(Permission::All),
72+
_ => None,
73+
}
74+
}
75+
}
76+
77+
/// Resource types for access control
78+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
79+
pub enum ResourceType {
80+
Table(String),
81+
Schema(String),
82+
Database(String),
83+
Function(String),
84+
Sequence(String),
85+
All,
86+
}
87+
88+
/// Grant entry for specific permissions on resources
89+
#[derive(Debug, Clone)]
90+
pub struct Grant {
91+
pub permission: Permission,
92+
pub resource: ResourceType,
93+
pub granted_by: String,
94+
pub with_grant_option: bool,
95+
}
96+
97+
/// Role information for access control
98+
#[derive(Debug, Clone)]
99+
pub struct Role {
100+
pub name: String,
101+
pub is_superuser: bool,
102+
pub can_login: bool,
103+
pub can_create_db: bool,
104+
pub can_create_role: bool,
105+
pub can_create_user: bool,
106+
pub can_replication: bool,
107+
pub grants: Vec<Grant>,
108+
pub inherited_roles: Vec<String>,
109+
}
110+
111+
/// Role configuration for creation
112+
#[derive(Debug, Clone)]
113+
pub struct RoleConfig {
114+
pub name: String,
115+
pub is_superuser: bool,
116+
pub can_login: bool,
117+
pub can_create_db: bool,
118+
pub can_create_role: bool,
119+
pub can_create_user: bool,
120+
pub can_replication: bool,
121+
}

0 commit comments

Comments
 (0)