Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions crates/bindings-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ mod sym {
/// The macro takes this `input`, which defines what the attribute does,
/// and it is structured roughly like so:
/// ```ignore
/// input = table [ ( private | public ) ] | init | connect | disconnect | migrate
/// input = table [ ( private | public ) ] | init | connect | disconnect
/// | reducer
/// | index(btree | hash [, name = string] [, field_name:ident]*)
/// ```
Expand Down Expand Up @@ -112,9 +112,7 @@ fn route_input(input: MacroInput, item: TokenStream) -> syn::Result<TokenStream>
MacroInput::Reducer(None) => spacetimedb_reducer(item),
MacroInput::Connect => spacetimedb_special_reducer("__identity_connected__", item),
MacroInput::Disconnect => spacetimedb_special_reducer("__identity_disconnected__", item),
MacroInput::Migrate => spacetimedb_special_reducer("__migrate__", item),
MacroInput::Index { ty, name, field_names } => spacetimedb_index(ty, name, field_names, item),
MacroInput::Update => spacetimedb_special_reducer("__update__", item),
}
}

Expand All @@ -137,13 +135,11 @@ enum MacroInput {
Reducer(Option<Span>),
Connect,
Disconnect,
Migrate,
Index {
ty: IndexType,
name: Option<String>,
field_names: Vec<Ident>,
},
Update,
}

/// Parse `f()` delimited by `,` until `input` is empty.
Expand Down Expand Up @@ -234,7 +230,6 @@ impl syn::parse::Parse for MacroInput {
}
kw::connect => Self::Connect,
kw::disconnect => Self::Disconnect,
kw::migrate => Self::Migrate,
kw::index => {
// Extract stuff in parens.
let in_parens;
Expand All @@ -261,7 +256,6 @@ impl syn::parse::Parse for MacroInput {
})?;
Self::Index { ty, name, field_names }
}
kw::update => Self::Update,
}))
}
}
Expand Down Expand Up @@ -293,15 +287,13 @@ mod kw {
syn::custom_keyword!(reducer);
syn::custom_keyword!(connect);
syn::custom_keyword!(disconnect);
syn::custom_keyword!(migrate);
syn::custom_keyword!(index);
syn::custom_keyword!(btree);
syn::custom_keyword!(hash);
syn::custom_keyword!(name);
syn::custom_keyword!(private);
syn::custom_keyword!(public);
syn::custom_keyword!(repeat);
syn::custom_keyword!(update);
syn::custom_keyword!(scheduled);
}

Expand Down
21 changes: 2 additions & 19 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use spacetimedb::host::EntityDef;
use spacetimedb::host::ReducerArgs;
use spacetimedb::host::ReducerCallError;
use spacetimedb::host::ReducerOutcome;
use spacetimedb::host::UpdateDatabaseSuccess;
use spacetimedb::identity::Identity;
use spacetimedb::json::client_api::StmtResultJson;
use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType};
Expand Down Expand Up @@ -713,24 +712,8 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
.await
.map_err(log_and_500)?;

if let Some(updated) = maybe_updated {
match updated {
Ok(success) => {
if let UpdateDatabaseSuccess {
// An update reducer was defined, and it was run
update_result: Some(update_result),
// Not yet implemented
migrate_results: _,
} = success
{
let ror = reducer_outcome_response(&auth.identity, "update", update_result.outcome);
if !matches!(ror, (StatusCode::OK, _)) {
return Err(ror.into());
}
}
}
Err(e) => return Err((StatusCode::BAD_REQUEST, format!("Database update rejected: {e}")).into()),
}
if let Some(Err(e)) = maybe_updated {
return Err((StatusCode::BAD_REQUEST, format!("Database update rejected: {e}")).into());
}

Ok(axum::Json(PublishResult::Success {
Expand Down
93 changes: 44 additions & 49 deletions crates/core/src/db/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::datastore::locking_tx_datastore::MutTxId;
use super::relational_db::RelationalDB;
use crate::database_logger::SystemLogger;
use crate::error::{DBError, TableError};
use crate::execution_context::ExecutionContext;
use anyhow::Context;
use core::{fmt, mem};
use enum_as_inner::EnumAsInner;
Expand All @@ -27,66 +26,62 @@ pub enum UpdateDatabaseError {

pub fn update_database(
stdb: &RelationalDB,
tx: MutTxId,
tx: &mut MutTxId,
proposed_tables: Vec<RawTableDefV8>,
system_logger: &SystemLogger,
) -> anyhow::Result<Result<MutTxId, UpdateDatabaseError>> {
let ctx = ExecutionContext::internal(stdb.address());
let (tx, res) = stdb.with_auto_rollback::<_, _, anyhow::Error>(&ctx, tx, |tx| {
let existing_tables = stdb.get_all_tables_mut(tx)?;
match schema_updates(existing_tables, proposed_tables)? {
SchemaUpdates::Updated(updated) => {
for (name, schema) in updated.new_tables {
system_logger.info(&format!("Creating table `{}`", name));
stdb.create_table(tx, schema)
.with_context(|| format!("failed to create table {}", name))?;
}
) -> anyhow::Result<Result<(), UpdateDatabaseError>> {
let existing_tables = stdb.get_all_tables_mut(tx)?;
match schema_updates(existing_tables, proposed_tables)? {
SchemaUpdates::Updated(updated) => {
for (name, schema) in updated.new_tables {
system_logger.info(&format!("Creating table `{}`", name));
stdb.create_table(tx, schema)
.with_context(|| format!("failed to create table {}", name))?;
}

for (name, access) in updated.changed_access {
stdb.alter_table_access(tx, name, access)?;
}
for (name, access) in updated.changed_access {
stdb.alter_table_access(tx, name, access)?;
}

for (name, added) in updated.added_indexes {
let table_id = stdb
.table_id_from_name_mut(tx, &name)?
.ok_or_else(|| TableError::NotFound(name.into()))?;
for index in added {
stdb.create_index(tx, table_id, index)?;
}
for (name, added) in updated.added_indexes {
let table_id = stdb
.table_id_from_name_mut(tx, &name)?
.ok_or_else(|| TableError::NotFound(name.into()))?;
for index in added {
stdb.create_index(tx, table_id, index)?;
}
}

for index_id in updated.removed_indexes {
stdb.drop_index(tx, index_id)?;
}
for index_id in updated.removed_indexes {
stdb.drop_index(tx, index_id)?;
}
}

SchemaUpdates::Tainted(tainted) => {
system_logger.error("Module update rejected due to schema mismatch");
let mut tables = Vec::with_capacity(tainted.len());
for t in tainted {
system_logger.warn(&format!("{}: {}", t.table_name, t.reason));
if let TaintReason::IncompatibleSchema { existing, proposed } = t.reason {
let existing = format!("{existing:#?}");
let proposed = format!("{proposed:#?}");
let diff = TextDiff::configure()
.timeout(Duration::from_millis(200))
.algorithm(Algorithm::Patience)
.diff_lines(&existing, &proposed);
system_logger.warn(&format!(
"{}: Diff existing vs. proposed:\n{}",
t.table_name,
diff.unified_diff()
));
}
tables.push(t.table_name);
SchemaUpdates::Tainted(tainted) => {
system_logger.error("Module update rejected due to schema mismatch");
let mut tables = Vec::with_capacity(tainted.len());
for t in tainted {
system_logger.warn(&format!("{}: {}", t.table_name, t.reason));
if let TaintReason::IncompatibleSchema { existing, proposed } = t.reason {
let existing = format!("{existing:#?}");
let proposed = format!("{proposed:#?}");
let diff = TextDiff::configure()
.timeout(Duration::from_millis(200))
.algorithm(Algorithm::Patience)
.diff_lines(&existing, &proposed);
system_logger.warn(&format!(
"{}: Diff existing vs. proposed:\n{}",
t.table_name,
diff.unified_diff()
));
}
return Ok(Err(UpdateDatabaseError::IncompatibleSchema { tables }));
tables.push(t.table_name);
}
return Ok(Err(UpdateDatabaseError::IncompatibleSchema { tables }));
}
}

Ok(Ok(()))
})?;
Ok(stdb.rollback_on_err(&ctx, tx, res).map(|(tx, ())| tx))
Ok(Ok(()))
}

/// The reasons a table can become [`Tainted`].
Expand Down
29 changes: 12 additions & 17 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use parking_lot::Mutex;
use serde::Serialize;
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_durability as durability;
use spacetimedb_lib::{hash_bytes, Address};
use spacetimedb_lib::hash_bytes;
use spacetimedb_sats::hash::Hash;
use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -293,7 +293,6 @@ impl HostController {
pub async fn update_module_host(
&self,
database: Database,
caller_address: Option<Address>,
host_type: HostType,
instance_id: u64,
program_bytes: Box<[u8]>,
Expand Down Expand Up @@ -322,7 +321,7 @@ impl HostController {
}
};
let update_result = host
.update_module(caller_address, host_type, program, self.unregister_fn(instance_id))
.update_module(host_type, program, self.unregister_fn(instance_id))
.await?;

*guard = Some(host);
Expand Down Expand Up @@ -352,7 +351,6 @@ impl HostController {
&self,
database: Database,
instance_id: u64,
caller_address: Option<Address>,
expected_hash: Option<Hash>,
) -> anyhow::Result<watch::Receiver<ModuleHost>> {
trace!("custom bootstrap {}/{}", database.address, instance_id);
Expand Down Expand Up @@ -395,7 +393,6 @@ impl HostController {
let program_bytes = load_program(&self.program_storage, program_hash).await?;
let update_result = host
.update_module(
caller_address,
host_type,
Program {
hash: program_hash,
Expand Down Expand Up @@ -593,22 +590,21 @@ async fn launch_module(
async fn update_module(
db: &RelationalDB,
module: &ModuleHost,
caller_address: Option<Address>,
program: Program,
) -> anyhow::Result<UpdateDatabaseResult> {
let addr = db.address();
match stored_program_hash(db)? {
None => Err(anyhow!("database `{}` not yet initialized", addr)),
Some(stored) if stored == program.hash => {
info!("database `{}` up to date with program `{}`", addr, program.hash);
anyhow::Ok(Ok(<_>::default()))
}
Some(stored) => {
info!("updating `{}` from {} to {}", addr, stored, program.hash);
let update_result = module
.update_database(caller_address, program.hash, program.bytes)
.await?;
Ok(update_result)
let res = if stored == program.hash {
info!("database `{}` up to date with program `{}`", addr, program.hash);
Ok(())
} else {
info!("updating `{}` from {} to {}", addr, stored, program.hash);
module.update_database(program.hash, program.bytes).await?
};

Ok(res)
Comment on lines +599 to +607
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let res = if stored == program.hash {
info!("database `{}` up to date with program `{}`", addr, program.hash);
Ok(())
} else {
info!("updating `{}` from {} to {}", addr, stored, program.hash);
module.update_database(program.hash, program.bytes).await?
};
Ok(res)
Ok(if stored == program.hash {
info!("database `{}` up to date with program `{}`", addr, program.hash);
Ok(())
} else {
info!("updating `{}` from {} to {}", addr, stored, program.hash);
module.update_database(program.hash, program.bytes).await?
})

}
}
}
Expand Down Expand Up @@ -763,7 +759,6 @@ impl Host {
/// Either way, the [`UpdateDatabaseResult`] is returned.
async fn update_module(
&mut self,
caller_address: Option<Address>,
host_type: HostType,
program: Program,
on_panic: impl Fn() + Send + Sync + 'static,
Expand All @@ -783,7 +778,7 @@ impl Host {
)
.await?;

let update_result = update_module(&dbic.relational_db, &module, caller_address, program).await?;
let update_result = update_module(&dbic.relational_db, &module, program).await?;
trace!("update result: {update_result:?}");
// Only replace the module + scheduler if the update succeeded.
// Otherwise, we want the database to continue running with the old state.
Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ pub use disk_storage::DiskStorage;
pub use host_controller::{
DescribedEntityType, ExternalStorage, HostController, ProgramStorage, ReducerCallResult, ReducerOutcome,
};
pub use module_host::{
EntityDef, ModuleHost, NoSuchModule, ReducerCallError, UpdateDatabaseResult, UpdateDatabaseSuccess,
};
pub use module_host::{EntityDef, ModuleHost, NoSuchModule, ReducerCallError, UpdateDatabaseResult};
pub use scheduler::Scheduler;
pub use spacetimedb_client_api_messages::timestamp::Timestamp;

Expand Down
28 changes: 5 additions & 23 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,8 @@ pub trait ModuleInstance: Send + 'static {
program_bytes: Box<[u8]>,
) -> anyhow::Result<Option<ReducerCallResult>>;

fn update_database(
&mut self,
caller_address: Option<Address>,
program_hash: Hash,
program_bytes: Box<[u8]>,
) -> anyhow::Result<UpdateDatabaseResult>;
fn update_database(&mut self, program_hash: Hash, program_bytes: Box<[u8]>)
-> anyhow::Result<UpdateDatabaseResult>;

fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult;
}
Expand Down Expand Up @@ -326,11 +322,10 @@ impl<T: Module> ModuleInstance for AutoReplacingModuleInstance<T> {
}
fn update_database(
&mut self,
caller_address: Option<Address>,
program_hash: Hash,
program_bytes: Box<[u8]>,
) -> anyhow::Result<UpdateDatabaseResult> {
let ret = self.inst.update_database(caller_address, program_hash, program_bytes);
let ret = self.inst.update_database(program_hash, program_bytes);
self.check_trap();
ret
}
Expand Down Expand Up @@ -439,19 +434,7 @@ pub struct WeakModuleHost {
on_panic: Weak<dyn Fn() + Send + Sync + 'static>,
}

pub type UpdateDatabaseResult = Result<UpdateDatabaseSuccess, UpdateDatabaseError>;

#[derive(Debug, Default)]
pub struct UpdateDatabaseSuccess {
/// Outcome of calling the module's __update__ reducer, `None` if none is
/// defined.
pub update_result: Option<ReducerCallResult>,
/// Outcome of calling the module's pending __migrate__ reducers, empty if
/// none are defined or pending.
///
/// Currently always empty, as __migrate__ is not yet supported.
pub migrate_results: Vec<ReducerCallResult>,
}
pub type UpdateDatabaseResult = Result<(), UpdateDatabaseError>;

#[derive(thiserror::Error, Debug)]
#[error("no such module")]
Expand Down Expand Up @@ -782,12 +765,11 @@ impl ModuleHost {

pub async fn update_database(
&self,
caller_address: Option<Address>,
program_hash: Hash,
program_bytes: Box<[u8]>,
) -> Result<UpdateDatabaseResult, anyhow::Error> {
self.call("<update_database>", move |inst| {
inst.update_database(caller_address, program_hash, program_bytes)
inst.update_database(program_hash, program_bytes)
})
.await?
.map_err(Into::into)
Expand Down
2 changes: 0 additions & 2 deletions crates/core/src/host/wasm_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ pub const PREINIT_DUNDER: &str = "__preinit__";
pub const SETUP_DUNDER: &str = "__setup__";
/// the reducer with this name initializes the database
pub const INIT_DUNDER: &str = "__init__";
/// the reducer with this name is invoked when updating the database
pub const UPDATE_DUNDER: &str = "__update__";
/// The reducer with this name is invoked when a client connects.
pub const CLIENT_CONNECTED_DUNDER: &str = "__identity_connected__";
/// The reducer with this name is invoked when a client disconnects.
Expand Down
Loading