Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion datafusion-postgres/src/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod pg_class;
pub mod pg_database;
pub mod pg_get_expr_udf;
pub mod pg_namespace;
pub mod pg_replication_slot;
pub mod pg_settings;
pub mod pg_tables;
pub mod pg_views;
Expand Down Expand Up @@ -100,6 +101,7 @@ const PG_CATALOG_VIEW_PG_VIEWS: &str = "pg_views";
const PG_CATALOG_VIEW_PG_MATVIEWS: &str = "pg_matviews";
const PG_CATALOG_VIEW_PG_TABLES: &str = "pg_tables";
const PG_CATALOG_VIEW_PG_STAT_USER_TABELS: &str = "pg_stat_user_tables";
const PG_CATALOG_VIEW_PG_REPLICATION_SLOTS: &str = "pg_replication_slots";

pub const PG_CATALOG_TABLES: &[&str] = &[
PG_CATALOG_TABLE_PG_AGGREGATE,
Expand Down Expand Up @@ -167,6 +169,7 @@ pub const PG_CATALOG_TABLES: &[&str] = &[
PG_CATALOG_VIEW_PG_VIEWS,
PG_CATALOG_VIEW_PG_MATVIEWS,
PG_CATALOG_VIEW_PG_STAT_USER_TABELS,
PG_CATALOG_VIEW_PG_REPLICATION_SLOTS,
];

#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
Expand Down Expand Up @@ -352,7 +355,10 @@ impl<C: CatalogInfo> SchemaProvider for PgCatalogSchemaProvider<C> {
PG_CATALOG_VIEW_PG_STAT_USER_TABELS => {
Ok(Some(Arc::new(pg_views::pg_stat_user_tables()?)))
}

PG_CATALOG_VIEW_PG_REPLICATION_SLOTS => {
let table = pg_replication_slot::pg_replication_slots()?;
Ok(Some(table))
}
_ => Ok(None),
}
}
Expand Down
30 changes: 30 additions & 0 deletions datafusion-postgres/src/pg_catalog/pg_replication_slot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::pg_catalog::empty_table::EmptyTable;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::catalog::TableProvider;
use datafusion::error::Result;
use std::sync::Arc;

pub(crate) fn pg_replication_slots() -> Result<Arc<dyn TableProvider>> {
let schema = Arc::new(Schema::new(vec![
Field::new("slot_name", DataType::Utf8, true),
Field::new("plugin", DataType::Utf8, true),
Field::new("slot_type", DataType::Utf8, true),
Field::new("datoid", DataType::Int32, true),
Field::new("database", DataType::Utf8, true),
Field::new("temporary", DataType::Boolean, false),
Field::new("active", DataType::Boolean, false),
Field::new("active_pid", DataType::Int32, true),
Field::new("xmin", DataType::Int32, true),
Field::new("catalog_xmin", DataType::Int32, true),
Field::new("restart_lsn", DataType::Utf8, true), // TODO: is this the correct type to use?
Field::new("confirmed_flush_lsn", DataType::Utf8, true), // TODO: is this the correct type to use?
Field::new("wal_status", DataType::Utf8, true),
Field::new("safe_wal_size", DataType::Int64, true),
Field::new("two_phase", DataType::Boolean, false),
Field::new("conflicting", DataType::Boolean, false),
]));

let table = EmptyTable::new(schema).try_into_memtable()?;

Ok(Arc::new(table))
}
Loading