Skip to content

Commit bf02430

Browse files
committed
iter for commited state
1 parent a00fd1f commit bf02430

File tree

1 file changed

+80
-70
lines changed
  • crates/core/src/db/datastore/locking_tx_datastore

1 file changed

+80
-70
lines changed

crates/core/src/db/datastore/locking_tx_datastore/mod.rs

Lines changed: 80 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -439,81 +439,78 @@ impl CommittedState {
439439
Ok(())
440440
}
441441

442+
fn iter_by_col<'a>(
443+
&'a self,
444+
table_id: &'a TableId,
445+
table_id_col: &'a NonEmpty<ColId>,
446+
value: &'a AlgebraicValue,
447+
) -> Result<CommittedStateIter<'a>, TableError> {
448+
let table = self
449+
.tables
450+
.get(table_id)
451+
.ok_or(TableError::IdNotFoundState(*table_id))?;
452+
453+
Ok(CommittedStateIter {
454+
iter: table.rows.iter(),
455+
table_id_col,
456+
value,
457+
})
458+
}
459+
442460
#[tracing::instrument(skip_all)]
443461
fn schema_for_table(&self, table_id: TableId, database_address: Address) -> super::Result<Cow<'_, TableSchema>> {
444462
if let Some(schema) = self.get_schema(&table_id) {
445463
return Ok(Cow::Borrowed(schema));
446464
}
447465

448-
let ctx = ExecutionContext::internal(database_address);
466+
let _ctx = ExecutionContext::internal(database_address);
449467

450468
// Look up the table_name for the table in question.
451469
let table_id_col = NonEmpty::new(StTableFields::TableId.col_id());
452-
453-
// TODO(george): As part of the bootstrapping process, we add a bunch of rows
454-
// and only at very end do we patch things up and create table metadata, indexes,
455-
// and so on. Early parts of that process insert rows, and need the schema to do
456-
// so. We can't just call `iter_by_col_range` here as that would attempt to use the
457-
// index which we haven't created yet. So instead we just manually Scan here.
458-
459470
let value: AlgebraicValue = table_id.into();
460-
let rows = self.tables.get(&ST_TABLES_ID).unwrap().rows.iter().filter(|(_, row)| {
461-
let table_id = row.project_not_empty(&table_id_col).unwrap();
462-
table_id == value
463-
}).collect::<Vec<_>>();
464-
assert!(rows.len() <= 1, "Expected at most one row in st_tables for table_id");
465-
471+
let rows = self
472+
.iter_by_col(&ST_TABLES_ID, &table_id_col, &value)?
473+
.collect::<Vec<_>>();
466474
let row = rows
467475
.first()
468476
.ok_or_else(|| TableError::IdNotFound(ST_TABLES_ID, table_id.0))?;
469477

470-
let el = StTableRow::try_from(row.1)?;
478+
let el = StTableRow::try_from(row.view())?;
471479
let table_name = el.table_name.to_owned();
472-
let table_id: AlgebraicValue = el.table_id.into();
480+
let table_id_value: AlgebraicValue = el.table_id.into();
473481

474482
// Look up the columns for the table in question.
475-
let mut columns = Vec::new();
476-
//for data_ref in self.iter_by_col_eq(&ctx, &ST_COLUMNS_ID, StTableFields::TableId, table_id.into())? {
477-
let columns_iter = self.tables.get(&ST_COLUMNS_ID).unwrap().rows.iter().filter(|(_, row)| {
478-
let table_id = row.project_not_empty(&table_id_col).unwrap();
479-
table_id == value
480-
});
481-
for (row_id, _) in columns_iter {
482-
let data_ref = get_committed_row(self, &ST_COLUMNS_ID, row_id);
483-
let row = data_ref.view();
484-
let el = StColumnRow::try_from(row)?;
485-
let col_schema = ColumnSchema {
486-
table_id: el.table_id,
487-
col_id: el.col_id,
488-
col_name: el.col_name.into(),
489-
col_type: el.col_type,
490-
is_autoinc: el.is_autoinc,
491-
};
492-
columns.push(col_schema);
493-
}
483+
let mut columns = self
484+
.iter_by_col(&ST_COLUMNS_ID, &NonEmpty::new(StColumnFields::TableId.col_id()), &value)?
485+
.map(|row| {
486+
let el = StColumnRow::try_from(row.view())?;
487+
Ok(ColumnSchema {
488+
table_id: el.table_id,
489+
col_id: el.col_id,
490+
col_name: el.col_name.into(),
491+
col_type: el.col_type,
492+
is_autoinc: el.is_autoinc,
493+
})
494+
})
495+
.collect::<super::Result<Vec<_>>>()?;
494496

495497
columns.sort_by_key(|col| col.col_id);
496-
let mut indexes = Vec::new();
497498

498499
// Look up the indexes for the table in question.
499-
if let Some(indexes_iter) = self
500-
.index_seek(&ST_INDEXES_ID, &StIndexFields::TableId.into(), &table_id) {
501-
for row_id in indexes_iter {
502-
let data_ref = get_committed_row(self, &ST_COLUMNS_ID, row_id);
503-
let row = data_ref.view();
504-
505-
let el = StIndexRow::try_from(row)?;
506-
let index_schema = IndexSchema {
507-
table_id: el.table_id,
508-
cols: el.cols,
509-
index_name: el.index_name.into(),
510-
is_unique: el.is_unique,
511-
index_id: el.index_id,
512-
index_type: el.index_type,
513-
};
514-
indexes.push(index_schema);
515-
}
516-
}
500+
let indexes = self
501+
.iter_by_col(&ST_INDEXES_ID, &StIndexFields::TableId.into(), &table_id_value)?
502+
.map(|row| {
503+
let el = StIndexRow::try_from(row.view())?;
504+
Ok(IndexSchema {
505+
table_id: el.table_id,
506+
cols: el.cols,
507+
index_name: el.index_name.into(),
508+
is_unique: el.is_unique,
509+
index_id: el.index_id,
510+
index_type: el.index_type,
511+
})
512+
})
513+
.collect::<super::Result<Vec<_>>>()?;
517514

518515
Ok(Cow::Owned(TableSchema {
519516
columns,
@@ -531,10 +528,7 @@ impl CommittedState {
531528
}
532529

533530
fn get_row_type(&self, table_id: &TableId) -> Option<&ProductType> {
534-
self
535-
.tables
536-
.get(table_id)
537-
.map(|table| table.get_row_type())
531+
self.tables.get(table_id).map(|table| table.get_row_type())
538532
}
539533

540534
fn row_type_for_table(&self, table_id: TableId, database_address: Address) -> super::Result<Cow<'_, ProductType>> {
@@ -565,7 +559,6 @@ impl CommittedState {
565559
Ok(Cow::Owned(ProductType { elements }))
566560
}
567561

568-
569562
/// After replaying all old transactions, tables which have rows will
570563
/// have been created in memory, but tables with no rows will not have
571564
/// been created. This function ensures that they are created.
@@ -585,6 +578,25 @@ impl CommittedState {
585578
}
586579
}
587580

581+
struct CommittedStateIter<'a> {
582+
iter: indexmap::map::Iter<'a, RowId, ProductValue>,
583+
table_id_col: &'a NonEmpty<ColId>,
584+
value: &'a AlgebraicValue,
585+
}
586+
587+
impl<'a> Iterator for CommittedStateIter<'a> {
588+
type Item = DataRef<'a>;
589+
590+
fn next(&mut self) -> Option<Self::Item> {
591+
while let Some((row_id, row)) = self.iter.next() {
592+
let table_id = row.project_not_empty(self.table_id_col).unwrap();
593+
if table_id == *self.value {
594+
return Some(DataRef::new(row_id, row));
595+
}
596+
}
597+
None
598+
}
599+
}
588600
/// `TxState` tracks all of the modifications made during a particular transaction.
589601
/// Rows inserted during a transaction will be added to insert_tables, and similarly,
590602
/// rows deleted in the transaction will be added to delete_tables.
@@ -1437,11 +1449,12 @@ impl MutTxId {
14371449
row.encode(&mut bytes);
14381450
let data_key = DataKey::from_data(&bytes);
14391451
let row_id = RowId(data_key);
1452+
let tx_state = self.tx_state_lock.as_mut().unwrap();
14401453

14411454
// If the table does exist in the tx state, we need to create it based on the table in the
14421455
// committed state. If the table does not exist in the committed state, it doesn't exist
14431456
// in the database.
1444-
let insert_table = if let Some(table) = self.tx_state_lock.as_ref().unwrap().get_insert_table(&table_id) {
1457+
let insert_table = if let Some(table) = tx_state.get_insert_table(&table_id) {
14451458
table
14461459
} else {
14471460
let Some(committed_table) = self.committed_state_write_lock.as_ref().unwrap().tables.get(&table_id) else {
@@ -1468,14 +1481,10 @@ impl MutTxId {
14681481
.collect(),
14691482
rows: Default::default(),
14701483
};
1471-
self.tx_state_lock
1472-
.as_mut()
1473-
.unwrap()
1484+
tx_state
14741485
.insert_tables
14751486
.insert(table_id, table);
1476-
self.tx_state_lock
1477-
.as_ref()
1478-
.unwrap()
1487+
tx_state
14791488
.get_insert_table(&table_id)
14801489
.unwrap()
14811490
};
@@ -1500,7 +1509,7 @@ impl MutTxId {
15001509
continue;
15011510
};
15021511
for row_id in violators {
1503-
if let Some(delete_table) = self.tx_state_lock.as_ref().unwrap().delete_tables.get(&table_id) {
1512+
if let Some(delete_table) = tx_state.delete_tables.get(&table_id) {
15041513
if !delete_table.contains(row_id) {
15051514
let value = row.project_not_empty(&index.cols).unwrap();
15061515
return Err(index.build_error_unique(table, value).into());
@@ -1515,7 +1524,6 @@ impl MutTxId {
15151524

15161525
// Now that we have checked all the constraints, we can perform the actual insertion.
15171526
{
1518-
let tx_state = self.tx_state_lock.as_mut().unwrap();
15191527

15201528
// We have a few cases to consider, based on the history of this transaction, and
15211529
// whether the row was already present or not at the start of this transaction.
@@ -1606,7 +1614,9 @@ impl MutTxId {
16061614
{
16071615
return Some(schema);
16081616
}
1609-
self.committed_state_write_lock.as_ref().unwrap()
1617+
self.committed_state_write_lock
1618+
.as_ref()
1619+
.unwrap()
16101620
.tables
16111621
.get(table_id)
16121622
.map(|table| table.get_schema())
@@ -1836,7 +1846,7 @@ impl Locking {
18361846
Ok(())
18371847
}
18381848

1839-
pub fn replay_transaction(
1849+
pub fn replay_transaction(
18401850
&self,
18411851
transaction: &Transaction,
18421852
odb: Arc<std::sync::Mutex<Box<dyn ObjectDB + Send>>>,

0 commit comments

Comments
 (0)