Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions substrate/extrinsic-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub mod watcher;
mod error;
mod listener;
mod pool;
mod rotator;

pub use listener::Listener;
pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics};
Expand Down
80 changes: 61 additions & 19 deletions substrate/extrinsic-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use std::{ collections::HashMap, fmt, sync::Arc, collections::BTreeMap};
use std::{
collections::{BTreeMap, HashMap},
fmt,
sync::Arc,
time,
};
use futures::sync::mpsc;
use parking_lot::{Mutex, RwLock};
use serde::{Serialize, de::DeserializeOwned};
use txpool::{self, Scoring, Readiness};

use error::IntoPoolError;
use listener::Listener;
use rotator::PoolRotator;
use watcher::Watcher;
use error::IntoPoolError;

use runtime_primitives::{generic::BlockId, traits::Block as BlockT};

Expand All @@ -40,16 +46,18 @@ pub type AllExtrinsics<A> = BTreeMap<<<A as ChainApi>::VEx as txpool::VerifiedTr

/// Verified extrinsic struct. Wraps original extrinsic and verification info.
#[derive(Debug)]
pub struct Verified<Ex: ::std::fmt::Debug, VEx: txpool::VerifiedTransaction> {
pub struct Verified<Ex, VEx> {
/// Original extrinsic.
pub original: Ex,
/// Verification data.
pub verified: VEx,
/// Pool deadline, after it's reached we remove the extrinsic from the pool.
pub valid_till: time::Instant,
}

impl<Ex, VEx> txpool::VerifiedTransaction for Verified<Ex, VEx>
where
Ex: ::std::fmt::Debug,
impl<Ex, VEx> txpool::VerifiedTransaction for Verified<Ex, VEx>
where
Ex: fmt::Debug,
VEx: txpool::VerifiedTransaction,
{
type Hash = <VEx as txpool::VerifiedTransaction>::Hash;
Expand Down Expand Up @@ -118,10 +126,17 @@ pub struct Ready<'a, 'b, B: 'a + ChainApi> {
api: &'a B,
at: &'b BlockId<B::Block>,
context: B::Ready,
rotator: &'a PoolRotator<B::Hash>,
now: time::Instant,
}

impl<'a, 'b, B: ChainApi> txpool::Ready<VerifiedFor<B>> for Ready<'a, 'b, B> {
fn is_ready(&mut self, xt: &VerifiedFor<B>) -> Readiness {
if self.rotator.ban_if_stale(&self.now, xt) {
debug!(target: "extrinsic-pool", "[{:?}] Banning as stale.", txpool::VerifiedTransaction::hash(xt));
return Readiness::Stale;
}

self.api.is_ready(self.at, &mut self.context, xt)
}
}
Expand Down Expand Up @@ -155,6 +170,11 @@ impl<T: ChainApi> Scoring<VerifiedFor<T>> for ScoringAdapter<T> {
}
}

/// Maximum time the transaction will be kept in the pool.
///
/// Transactions that don't get included within the limit are removed from the pool.
const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5);

/// Extrinsics pool.
pub struct Pool<B: ChainApi> {
api: B,
Expand All @@ -164,6 +184,7 @@ pub struct Pool<B: ChainApi> {
Listener<B::Hash>,
>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
rotator: PoolRotator<B::Hash>,
}

impl<B: ChainApi> Pool<B> {
Expand All @@ -173,6 +194,7 @@ impl<B: ChainApi> Pool<B> {
pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::<B>(Default::default()), options)),
import_notification_sinks: Default::default(),
api,
rotator: Default::default(),
}
}

Expand Down Expand Up @@ -206,19 +228,28 @@ impl<B: ChainApi> Pool<B> {
{
xts
.into_iter()
.map(|xt| (self.api.verify_transaction(at, &xt), xt))
.map(|xt| {
match self.api.verify_transaction(at, &xt) {
Ok(ref verified) if self.rotator.is_banned(txpool::VerifiedTransaction::hash(verified)) => {
return (Err(txpool::Error::from("Temporarily Banned".to_owned()).into()), xt)
},
result => (result, xt),
}
})
.map(|(v, xt)| {
let xt = Verified { original: xt, verified: v? };
let xt = Verified {
original: xt,
verified: v?,
valid_till: time::Instant::now() + POOL_TIME,
};
Ok(self.pool.write().import(xt)?)
})
.collect()
}

/// Imports one unverified extrinsic to the pool
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Arc<VerifiedFor<B>>, B::Error> {
let v = self.api.verify_transaction(at, &xt)?;
let xt = Verified { original: xt, verified: v };
Ok(self.pool.write().import(xt)?)
Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed"))
}

/// Import a single extrinsic and starts to watch their progress in the pool.
Expand All @@ -244,7 +275,8 @@ impl<B: ChainApi> Pool<B> {
senders: Option<&[<B::VEx as txpool::VerifiedTransaction>::Sender]>,
) -> usize
{
let ready = Ready { api: &self.api, context: self.api.ready(), at };
self.rotator.clear_timeouts(&time::Instant::now());
let ready = self.ready(at);
self.pool.write().cull(senders, ready)
}

Expand Down Expand Up @@ -284,9 +316,9 @@ impl<B: ChainApi> Pool<B> {
pub fn pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> T where
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T,
{
let ready = Ready { api: &self.api, context: self.api.ready(), at };
let ready = self.ready(at);
f(self.pool.read().pending(ready))
}
}

/// Retry to import all verified transactions from given sender.
pub fn retry_verification(&self, at: &BlockId<B::Block>, sender: <B::VEx as txpool::VerifiedTransaction>::Sender) -> Result<(), B::Error> {
Expand Down Expand Up @@ -326,6 +358,16 @@ impl<B: ChainApi> Pool<B> {
map
})
}

fn ready<'a, 'b>(&'a self, at: &'b BlockId<B::Block>) -> Ready<'a, 'b, B> {
Ready {
api: &self.api,
rotator: &self.rotator,
context: self.api.ready(),
at,
now: time::Instant::now(),
}
}
}

/// A Readiness implementation that returns `Ready` for all transactions.
Expand All @@ -337,7 +379,7 @@ impl<VEx> txpool::Ready<VEx> for AlwaysReady {
}

#[cfg(test)]
mod tests {
pub mod tests {
use txpool;
use super::{VerifiedFor, ExtrinsicFor};
use std::collections::HashMap;
Expand All @@ -353,9 +395,9 @@ mod tests {

#[derive(Clone, Debug)]
pub struct VerifiedTransaction {
hash: Hash,
sender: AccountId,
nonce: u64,
pub hash: Hash,
pub sender: AccountId,
pub nonce: u64,
}

impl txpool::VerifiedTransaction for VerifiedTransaction {
Expand Down Expand Up @@ -419,7 +461,7 @@ mod tests {

result
}

fn ready(&self) -> Self::Ready {
HashMap::default()
}
Expand Down
177 changes: 177 additions & 0 deletions substrate/extrinsic-pool/src/rotator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Rotate extrinsic inside the pool.
//!
//! Keeps only recent extrinsic and discard the ones kept for a significant amount of time.
//! Discarded extrinsics are banned so that they don't get re-imported again.

use std::{
collections::HashMap,
fmt,
hash,
time::{Duration, Instant},
};
use parking_lot::RwLock;
use txpool::VerifiedTransaction;
use Verified;

/// Expected size of the banned extrinsics cache.
const EXPECTED_SIZE: usize = 2048;

/// Pool rotator is responsible to only keep fresh extrinsics in the pool.
///
/// Extrinsics that occupy the pool for too long are culled and temporarily banned from entering
/// the pool again.
pub struct PoolRotator<Hash> {
/// How long the extrinsic is banned for.
ban_time: Duration,
/// Currently banned extrinsics.
banned_until: RwLock<HashMap<Hash, Instant>>,
}

impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> {
fn default() -> Self {
PoolRotator {
ban_time: Duration::from_secs(60 * 30),
banned_until: Default::default(),
}
}
}

impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
/// Returns `true` if extrinsic hash is currently banned.
pub fn is_banned(&self, hash: &Hash) -> bool {
self.banned_until.read().contains_key(hash)
}

/// Bans extrinsic if it's stale.
///
/// Returns `true` if extrinsic is stale and got banned.
pub fn ban_if_stale<Ex, VEx>(&self, now: &Instant, xt: &Verified<Ex, VEx>) -> bool where
VEx: VerifiedTransaction<Hash=Hash>,
Hash: fmt::Debug + fmt::LowerHex,
{
if &xt.valid_till > now {
return false;
}

let mut banned = self.banned_until.write();
banned.insert(xt.verified.hash().clone(), *now + self.ban_time);

if banned.len() > 2 * EXPECTED_SIZE {
while banned.len() > EXPECTED_SIZE {
if let Some(key) = banned.keys().next().cloned() {
banned.remove(&key);
}
}
}

true
}

/// Removes timed bans.
pub fn clear_timeouts(&self, now: &Instant) {
let mut banned = self.banned_until.write();

let to_remove = banned
.iter()
.filter_map(|(k, v)| if v < now {
Some(k.clone())
} else {
None
}).collect::<Vec<_>>();

for k in to_remove {
banned.remove(&k);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use pool::tests::VerifiedTransaction;
use test_client::runtime::Hash;

fn rotator() -> PoolRotator<Hash> {
PoolRotator {
ban_time: Duration::from_millis(10),
..Default::default()
}
}

fn tx() -> (Hash, Verified<u64, VerifiedTransaction>) {
let hash = 5.into();
let tx = Verified {
original: 5,
verified: VerifiedTransaction {
hash,
sender: Default::default(),
nonce: Default::default(),
},
valid_till: Instant::now(),
};

(hash, tx)
}

#[test]
fn should_not_ban_if_not_stale() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(!rotator.is_banned(&hash));
let past = Instant::now() - Duration::from_millis(1000);

// when
assert!(!rotator.ban_if_stale(&past, &tx));

// then
assert!(!rotator.is_banned(&hash));
}

#[test]
fn should_ban_stale_extrinsic() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(!rotator.is_banned(&hash));

// when
assert!(rotator.ban_if_stale(&Instant::now(), &tx));

// then
assert!(rotator.is_banned(&hash));
}


#[test]
fn should_clear_banned() {
// given
let (hash, tx) = tx();
let rotator = rotator();
assert!(rotator.ban_if_stale(&Instant::now(), &tx));
assert!(rotator.is_banned(&hash));

// when
let future = Instant::now() + rotator.ban_time + rotator.ban_time;
rotator.clear_timeouts(&future);

// then
assert!(!rotator.is_banned(&hash));
}
}