Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 14 additions & 99 deletions datafusion-postgres/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,105 +6,7 @@ use pgwire::api::auth::{AuthSource, LoginInfo, Password};
use pgwire::error::{PgWireError, PgWireResult};
use tokio::sync::RwLock;

/// User information stored in the authentication system
#[derive(Debug, Clone)]
pub struct User {
pub username: String,
pub password_hash: String,
pub roles: Vec<String>,
pub is_superuser: bool,
pub can_login: bool,
pub connection_limit: Option<i32>,
}

/// Permission types for granular access control
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Permission {
Select,
Insert,
Update,
Delete,
Create,
Drop,
Alter,
Index,
References,
Trigger,
Execute,
Usage,
Connect,
Temporary,
All,
}

impl Permission {
pub fn from_string(s: &str) -> Option<Permission> {
match s.to_uppercase().as_str() {
"SELECT" => Some(Permission::Select),
"INSERT" => Some(Permission::Insert),
"UPDATE" => Some(Permission::Update),
"DELETE" => Some(Permission::Delete),
"CREATE" => Some(Permission::Create),
"DROP" => Some(Permission::Drop),
"ALTER" => Some(Permission::Alter),
"INDEX" => Some(Permission::Index),
"REFERENCES" => Some(Permission::References),
"TRIGGER" => Some(Permission::Trigger),
"EXECUTE" => Some(Permission::Execute),
"USAGE" => Some(Permission::Usage),
"CONNECT" => Some(Permission::Connect),
"TEMPORARY" => Some(Permission::Temporary),
"ALL" => Some(Permission::All),
_ => None,
}
}
}

/// Resource types for access control
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ResourceType {
Table(String),
Schema(String),
Database(String),
Function(String),
Sequence(String),
All,
}

/// Grant entry for specific permissions on resources
#[derive(Debug, Clone)]
pub struct Grant {
pub permission: Permission,
pub resource: ResourceType,
pub granted_by: String,
pub with_grant_option: bool,
}

/// Role information for access control
#[derive(Debug, Clone)]
pub struct Role {
pub name: String,
pub is_superuser: bool,
pub can_login: bool,
pub can_create_db: bool,
pub can_create_role: bool,
pub can_create_user: bool,
pub can_replication: bool,
pub grants: Vec<Grant>,
pub inherited_roles: Vec<String>,
}

/// Role configuration for creation
#[derive(Debug, Clone)]
pub struct RoleConfig {
pub name: String,
pub is_superuser: bool,
pub can_login: bool,
pub can_create_db: bool,
pub can_create_role: bool,
pub can_create_user: bool,
pub can_replication: bool,
}
use crate::pg_catalog::context::*;

/// Authentication manager that handles users and roles
#[derive(Debug)]
Expand Down Expand Up @@ -645,6 +547,19 @@ impl SimpleAuthSource {
}
}

#[async_trait]
impl PgCatalogContextProvider for Arc<AuthManager> {
// retrieve all database role names
async fn roles(&self) -> Vec<String> {
self.list_roles().await
}

// retrieve database role information
async fn role(&self, name: &str) -> Option<Role> {
self.role(name).await
}
}

#[async_trait]
impl AuthSource for SimpleAuthSource {
async fn get_password(&self, login: &LoginInfo) -> PgWireResult<Password> {
Expand Down
5 changes: 3 additions & 2 deletions datafusion-postgres/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::auth::{AuthManager, Permission, ResourceType};
use crate::sql::PostgresCompatibilityParser;
use async_trait::async_trait;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::ToDFSchema;
Expand All @@ -26,6 +24,9 @@ use pgwire::error::{PgWireError, PgWireResult};
use pgwire::messages::response::TransactionStatus;
use tokio::sync::Mutex;

use crate::auth::AuthManager;
use crate::pg_catalog::context::{Permission, ResourceType};
use crate::sql::PostgresCompatibilityParser;
use arrow_pg::datatypes::df;
use arrow_pg::datatypes::{arrow_schema_to_pg_fields, into_pg_type};

Expand Down
30 changes: 17 additions & 13 deletions datafusion-postgres/src/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use datafusion::prelude::{create_udf, Expr, SessionContext};
use postgres_types::Oid;
use tokio::sync::RwLock;

use crate::auth::AuthManager;
use crate::pg_catalog::catalog_info::CatalogInfo;
use crate::pg_catalog::context::PgCatalogContextProvider;
use crate::pg_catalog::empty_table::EmptyTable;

pub mod catalog_info;
pub mod context;
pub mod empty_table;
pub mod format_type;
pub mod has_privilege_udf;
Expand Down Expand Up @@ -192,16 +193,16 @@ pub(crate) enum OidCacheKey {

// Create custom schema provider for pg_catalog
#[derive(Debug)]
pub struct PgCatalogSchemaProvider<C> {
pub struct PgCatalogSchemaProvider<C, P> {
catalog_list: C,
oid_counter: Arc<AtomicU32>,
oid_cache: Arc<RwLock<HashMap<OidCacheKey, Oid>>>,
static_tables: Arc<PgCatalogStaticTables>,
auth_manager: Arc<AuthManager>,
context_provider: P,
}

#[async_trait]
impl<C: CatalogInfo> SchemaProvider for PgCatalogSchemaProvider<C> {
impl<C: CatalogInfo, P: PgCatalogContextProvider> SchemaProvider for PgCatalogSchemaProvider<C, P> {
fn as_any(&self) -> &dyn std::any::Any {
self
}
Expand All @@ -224,18 +225,18 @@ impl<C: CatalogInfo> SchemaProvider for PgCatalogSchemaProvider<C> {
}
}

impl<C: CatalogInfo> PgCatalogSchemaProvider<C> {
impl<C: CatalogInfo, P: PgCatalogContextProvider> PgCatalogSchemaProvider<C, P> {
pub fn try_new(
catalog_list: C,
static_tables: Arc<PgCatalogStaticTables>,
auth_manager: Arc<AuthManager>,
) -> Result<PgCatalogSchemaProvider<C>> {
context_provider: P,
) -> Result<PgCatalogSchemaProvider<C, P>> {
Ok(Self {
catalog_list,
oid_counter: Arc::new(AtomicU32::new(16384)),
oid_cache: Arc::new(RwLock::new(HashMap::new())),
static_tables,
auth_manager,
context_provider,
})
}

Expand Down Expand Up @@ -413,7 +414,7 @@ impl<C: CatalogInfo> PgCatalogSchemaProvider<C> {
Ok(Some(PgCatalogTable::Dynamic(table)))
}
PG_CATALOG_VIEW_PG_ROLES => {
let table = Arc::new(pg_roles::PgRolesTable::new(Arc::clone(&self.auth_manager)));
let table = Arc::new(pg_roles::PgRolesTable::new(self.context_provider.clone()));
Ok(Some(PgCatalogTable::Dynamic(table)))
}

Expand Down Expand Up @@ -1278,16 +1279,19 @@ pub fn create_pg_backend_pid_udf() -> ScalarUDF {
const BACKEND_PID: i32 = 1;

/// Install pg_catalog and postgres UDFs to current `SessionContext`
pub fn setup_pg_catalog(
pub fn setup_pg_catalog<P>(
session_context: &SessionContext,
catalog_name: &str,
auth_manager: Arc<AuthManager>,
) -> Result<(), Box<DataFusionError>> {
context_provider: P,
) -> Result<(), Box<DataFusionError>>
where
P: PgCatalogContextProvider,
{
let static_tables = Arc::new(PgCatalogStaticTables::try_new()?);
let pg_catalog = PgCatalogSchemaProvider::try_new(
session_context.state().catalog_list().clone(),
static_tables.clone(),
auth_manager,
context_provider,
)?;
session_context
.catalog(catalog_name)
Expand Down
121 changes: 121 additions & 0 deletions datafusion-postgres/src/pg_catalog/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::fmt::Debug;

use async_trait::async_trait;

#[async_trait]
pub trait PgCatalogContextProvider: Clone + Debug + Send + Sync + 'static {
// retrieve all database role names
async fn roles(&self) -> Vec<String> {
vec![]
}

// retrieve database role information
async fn role(&self, _name: &str) -> Option<Role> {
None
}
}

#[derive(Debug, Clone)]
pub struct EmptyContextProvider;

impl PgCatalogContextProvider for EmptyContextProvider {}

/// User information stored in the authentication system
#[derive(Debug, Clone)]
pub struct User {
pub username: String,
pub password_hash: String,
pub roles: Vec<String>,
pub is_superuser: bool,
pub can_login: bool,
pub connection_limit: Option<i32>,
}

/// Permission types for granular access control
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Permission {
Select,
Insert,
Update,
Delete,
Create,
Drop,
Alter,
Index,
References,
Trigger,
Execute,
Usage,
Connect,
Temporary,
All,
}

impl Permission {
pub fn from_string(s: &str) -> Option<Permission> {
match s.to_uppercase().as_str() {
"SELECT" => Some(Permission::Select),
"INSERT" => Some(Permission::Insert),
"UPDATE" => Some(Permission::Update),
"DELETE" => Some(Permission::Delete),
"CREATE" => Some(Permission::Create),
"DROP" => Some(Permission::Drop),
"ALTER" => Some(Permission::Alter),
"INDEX" => Some(Permission::Index),
"REFERENCES" => Some(Permission::References),
"TRIGGER" => Some(Permission::Trigger),
"EXECUTE" => Some(Permission::Execute),
"USAGE" => Some(Permission::Usage),
"CONNECT" => Some(Permission::Connect),
"TEMPORARY" => Some(Permission::Temporary),
"ALL" => Some(Permission::All),
_ => None,
}
}
}

/// Resource types for access control
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ResourceType {
Table(String),
Schema(String),
Database(String),
Function(String),
Sequence(String),
All,
}

/// Grant entry for specific permissions on resources
#[derive(Debug, Clone)]
pub struct Grant {
pub permission: Permission,
pub resource: ResourceType,
pub granted_by: String,
pub with_grant_option: bool,
}

/// Role information for access control
#[derive(Debug, Clone)]
pub struct Role {
pub name: String,
pub is_superuser: bool,
pub can_login: bool,
pub can_create_db: bool,
pub can_create_role: bool,
pub can_create_user: bool,
pub can_replication: bool,
pub grants: Vec<Grant>,
pub inherited_roles: Vec<String>,
}

/// Role configuration for creation
#[derive(Debug, Clone)]
pub struct RoleConfig {
pub name: String,
pub is_superuser: bool,
pub can_login: bool,
pub can_create_db: bool,
pub can_create_role: bool,
pub can_create_user: bool,
pub can_replication: bool,
}
Loading
Loading