diff --git a/substrate/extrinsic-pool/src/lib.rs b/substrate/extrinsic-pool/src/lib.rs
index 7aee7130e3d6d..f45a5697c6e77 100644
--- a/substrate/extrinsic-pool/src/lib.rs
+++ b/substrate/extrinsic-pool/src/lib.rs
@@ -37,6 +37,7 @@ pub mod watcher;
mod error;
mod listener;
mod pool;
+mod rotator;
pub use listener::Listener;
pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics};
diff --git a/substrate/extrinsic-pool/src/pool.rs b/substrate/extrinsic-pool/src/pool.rs
index 5f70123eb9378..fb59756479883 100644
--- a/substrate/extrinsic-pool/src/pool.rs
+++ b/substrate/extrinsic-pool/src/pool.rs
@@ -14,15 +14,21 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
-use std::{ collections::HashMap, fmt, sync::Arc, collections::BTreeMap};
+use std::{
+ collections::{BTreeMap, HashMap},
+ fmt,
+ sync::Arc,
+ time,
+};
use futures::sync::mpsc;
use parking_lot::{Mutex, RwLock};
use serde::{Serialize, de::DeserializeOwned};
use txpool::{self, Scoring, Readiness};
+use error::IntoPoolError;
use listener::Listener;
+use rotator::PoolRotator;
use watcher::Watcher;
-use error::IntoPoolError;
use runtime_primitives::{generic::BlockId, traits::Block as BlockT};
@@ -40,16 +46,18 @@ pub type AllExtrinsics = BTreeMap<<::VEx as txpool::VerifiedTr
/// Verified extrinsic struct. Wraps original extrinsic and verification info.
#[derive(Debug)]
-pub struct Verified {
+pub struct Verified {
/// Original extrinsic.
pub original: Ex,
/// Verification data.
pub verified: VEx,
+ /// Pool deadline, after it's reached we remove the extrinsic from the pool.
+ pub valid_till: time::Instant,
}
-impl txpool::VerifiedTransaction for Verified
-where
- Ex: ::std::fmt::Debug,
+impl txpool::VerifiedTransaction for Verified
+where
+ Ex: fmt::Debug,
VEx: txpool::VerifiedTransaction,
{
type Hash = ::Hash;
@@ -118,10 +126,17 @@ pub struct Ready<'a, 'b, B: 'a + ChainApi> {
api: &'a B,
at: &'b BlockId,
context: B::Ready,
+ rotator: &'a PoolRotator,
+ now: time::Instant,
}
impl<'a, 'b, B: ChainApi> txpool::Ready> for Ready<'a, 'b, B> {
fn is_ready(&mut self, xt: &VerifiedFor) -> Readiness {
+ if self.rotator.ban_if_stale(&self.now, xt) {
+ debug!(target: "extrinsic-pool", "[{:?}] Banning as stale.", txpool::VerifiedTransaction::hash(xt));
+ return Readiness::Stale;
+ }
+
self.api.is_ready(self.at, &mut self.context, xt)
}
}
@@ -155,6 +170,11 @@ impl Scoring> for ScoringAdapter {
}
}
+/// Maximum time the transaction will be kept in the pool.
+///
+/// Transactions that don't get included within the limit are removed from the pool.
+const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5);
+
/// Extrinsics pool.
pub struct Pool {
api: B,
@@ -164,6 +184,7 @@ pub struct Pool {
Listener,
>>,
import_notification_sinks: Mutex>>,
+ rotator: PoolRotator,
}
impl Pool {
@@ -173,6 +194,7 @@ impl Pool {
pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::(Default::default()), options)),
import_notification_sinks: Default::default(),
api,
+ rotator: Default::default(),
}
}
@@ -206,9 +228,20 @@ impl Pool {
{
xts
.into_iter()
- .map(|xt| (self.api.verify_transaction(at, &xt), xt))
+ .map(|xt| {
+ match self.api.verify_transaction(at, &xt) {
+ Ok(ref verified) if self.rotator.is_banned(txpool::VerifiedTransaction::hash(verified)) => {
+ return (Err(txpool::Error::from("Temporarily Banned".to_owned()).into()), xt)
+ },
+ result => (result, xt),
+ }
+ })
.map(|(v, xt)| {
- let xt = Verified { original: xt, verified: v? };
+ let xt = Verified {
+ original: xt,
+ verified: v?,
+ valid_till: time::Instant::now() + POOL_TIME,
+ };
Ok(self.pool.write().import(xt)?)
})
.collect()
@@ -216,9 +249,7 @@ impl Pool {
/// Imports one unverified extrinsic to the pool
pub fn submit_one(&self, at: &BlockId, xt: ExtrinsicFor) -> Result>, B::Error> {
- let v = self.api.verify_transaction(at, &xt)?;
- let xt = Verified { original: xt, verified: v };
- Ok(self.pool.write().import(xt)?)
+ Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed"))
}
/// Import a single extrinsic and starts to watch their progress in the pool.
@@ -244,7 +275,8 @@ impl Pool {
senders: Option<&[::Sender]>,
) -> usize
{
- let ready = Ready { api: &self.api, context: self.api.ready(), at };
+ self.rotator.clear_timeouts(&time::Instant::now());
+ let ready = self.ready(at);
self.pool.write().cull(senders, ready)
}
@@ -284,9 +316,9 @@ impl Pool {
pub fn pending(&self, at: &BlockId, f: F) -> T where
F: FnOnce(txpool::PendingIterator, Ready, ScoringAdapter, Listener>) -> T,
{
- let ready = Ready { api: &self.api, context: self.api.ready(), at };
+ let ready = self.ready(at);
f(self.pool.read().pending(ready))
- }
+ }
/// Retry to import all verified transactions from given sender.
pub fn retry_verification(&self, at: &BlockId, sender: ::Sender) -> Result<(), B::Error> {
@@ -326,6 +358,16 @@ impl Pool {
map
})
}
+
+ fn ready<'a, 'b>(&'a self, at: &'b BlockId) -> Ready<'a, 'b, B> {
+ Ready {
+ api: &self.api,
+ rotator: &self.rotator,
+ context: self.api.ready(),
+ at,
+ now: time::Instant::now(),
+ }
+ }
}
/// A Readiness implementation that returns `Ready` for all transactions.
@@ -337,7 +379,7 @@ impl txpool::Ready for AlwaysReady {
}
#[cfg(test)]
-mod tests {
+pub mod tests {
use txpool;
use super::{VerifiedFor, ExtrinsicFor};
use std::collections::HashMap;
@@ -353,9 +395,9 @@ mod tests {
#[derive(Clone, Debug)]
pub struct VerifiedTransaction {
- hash: Hash,
- sender: AccountId,
- nonce: u64,
+ pub hash: Hash,
+ pub sender: AccountId,
+ pub nonce: u64,
}
impl txpool::VerifiedTransaction for VerifiedTransaction {
@@ -419,7 +461,7 @@ mod tests {
result
}
-
+
fn ready(&self) -> Self::Ready {
HashMap::default()
}
diff --git a/substrate/extrinsic-pool/src/rotator.rs b/substrate/extrinsic-pool/src/rotator.rs
new file mode 100644
index 0000000000000..1f4b1c4e737e1
--- /dev/null
+++ b/substrate/extrinsic-pool/src/rotator.rs
@@ -0,0 +1,177 @@
+// Copyright 2018 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! Rotate extrinsic inside the pool.
+//!
+//! Keeps only recent extrinsic and discard the ones kept for a significant amount of time.
+//! Discarded extrinsics are banned so that they don't get re-imported again.
+
+use std::{
+ collections::HashMap,
+ fmt,
+ hash,
+ time::{Duration, Instant},
+};
+use parking_lot::RwLock;
+use txpool::VerifiedTransaction;
+use Verified;
+
+/// Expected size of the banned extrinsics cache.
+const EXPECTED_SIZE: usize = 2048;
+
+/// Pool rotator is responsible to only keep fresh extrinsics in the pool.
+///
+/// Extrinsics that occupy the pool for too long are culled and temporarily banned from entering
+/// the pool again.
+pub struct PoolRotator {
+ /// How long the extrinsic is banned for.
+ ban_time: Duration,
+ /// Currently banned extrinsics.
+ banned_until: RwLock>,
+}
+
+impl Default for PoolRotator {
+ fn default() -> Self {
+ PoolRotator {
+ ban_time: Duration::from_secs(60 * 30),
+ banned_until: Default::default(),
+ }
+ }
+}
+
+impl PoolRotator {
+ /// Returns `true` if extrinsic hash is currently banned.
+ pub fn is_banned(&self, hash: &Hash) -> bool {
+ self.banned_until.read().contains_key(hash)
+ }
+
+ /// Bans extrinsic if it's stale.
+ ///
+ /// Returns `true` if extrinsic is stale and got banned.
+ pub fn ban_if_stale(&self, now: &Instant, xt: &Verified) -> bool where
+ VEx: VerifiedTransaction,
+ Hash: fmt::Debug + fmt::LowerHex,
+ {
+ if &xt.valid_till > now {
+ return false;
+ }
+
+ let mut banned = self.banned_until.write();
+ banned.insert(xt.verified.hash().clone(), *now + self.ban_time);
+
+ if banned.len() > 2 * EXPECTED_SIZE {
+ while banned.len() > EXPECTED_SIZE {
+ if let Some(key) = banned.keys().next().cloned() {
+ banned.remove(&key);
+ }
+ }
+ }
+
+ true
+ }
+
+ /// Removes timed bans.
+ pub fn clear_timeouts(&self, now: &Instant) {
+ let mut banned = self.banned_until.write();
+
+ let to_remove = banned
+ .iter()
+ .filter_map(|(k, v)| if v < now {
+ Some(k.clone())
+ } else {
+ None
+ }).collect::>();
+
+ for k in to_remove {
+ banned.remove(&k);
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use pool::tests::VerifiedTransaction;
+ use test_client::runtime::Hash;
+
+ fn rotator() -> PoolRotator {
+ PoolRotator {
+ ban_time: Duration::from_millis(10),
+ ..Default::default()
+ }
+ }
+
+ fn tx() -> (Hash, Verified) {
+ let hash = 5.into();
+ let tx = Verified {
+ original: 5,
+ verified: VerifiedTransaction {
+ hash,
+ sender: Default::default(),
+ nonce: Default::default(),
+ },
+ valid_till: Instant::now(),
+ };
+
+ (hash, tx)
+ }
+
+ #[test]
+ fn should_not_ban_if_not_stale() {
+ // given
+ let (hash, tx) = tx();
+ let rotator = rotator();
+ assert!(!rotator.is_banned(&hash));
+ let past = Instant::now() - Duration::from_millis(1000);
+
+ // when
+ assert!(!rotator.ban_if_stale(&past, &tx));
+
+ // then
+ assert!(!rotator.is_banned(&hash));
+ }
+
+ #[test]
+ fn should_ban_stale_extrinsic() {
+ // given
+ let (hash, tx) = tx();
+ let rotator = rotator();
+ assert!(!rotator.is_banned(&hash));
+
+ // when
+ assert!(rotator.ban_if_stale(&Instant::now(), &tx));
+
+ // then
+ assert!(rotator.is_banned(&hash));
+ }
+
+
+ #[test]
+ fn should_clear_banned() {
+ // given
+ let (hash, tx) = tx();
+ let rotator = rotator();
+ assert!(rotator.ban_if_stale(&Instant::now(), &tx));
+ assert!(rotator.is_banned(&hash));
+
+ // when
+ let future = Instant::now() + rotator.ban_time + rotator.ban_time;
+ rotator.clear_timeouts(&future);
+
+ // then
+ assert!(!rotator.is_banned(&hash));
+ }
+}