diff --git a/.dockerignore b/.dockerignore index 7309a955c54..331cad27bb2 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,6 +1,5 @@ .dockerignore Dockerfile -.git/ .idea/ docs/ diff --git a/Cargo.lock b/Cargo.lock index d9223f7547b..800fab12d47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -900,6 +900,11 @@ dependencies = [ "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "doc-comment" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "dtoa" version = "0.4.4" @@ -1787,7 +1792,7 @@ dependencies = [ "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "near 0.2.4", + "near 0.2.6", "near-primitives 0.1.0", "near-protos 0.1.0", "node-runtime 0.0.1", @@ -2002,7 +2007,7 @@ dependencies = [ [[package]] name = "near" -version = "0.2.4" +version = "0.2.6" dependencies = [ "actix 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2023,6 +2028,7 @@ dependencies = [ "near-pool 0.1.0", "near-primitives 0.1.0", "near-store 0.1.0", + "near-telemetry 0.1.0", "near-verifier 0.1.0", "node-runtime 0.0.1", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2065,9 +2071,12 @@ dependencies = [ "near-pool 0.1.0", "near-primitives 0.1.0", "near-store 0.1.0", + "near-telemetry 0.1.0", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.92 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.92 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "sysinfo 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2207,6 +2216,17 @@ dependencies = [ "serde_derive 1.0.92 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "near-telemetry" +version = "0.1.0" +dependencies = [ + "actix 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-web 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.92 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.92 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "near-verifier" version = "0.1.0" @@ -2225,7 +2245,7 @@ dependencies = [ "keystore 0.1.0", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "near 0.2.4", + "near 0.2.6", "near-jsonrpc 0.1.0", "near-network 0.1.0", "near-primitives 0.1.0", @@ -3312,7 +3332,7 @@ version = "0.1.0" dependencies = [ "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "near 0.2.4", + "near 0.2.6", "near-chain 0.1.0", "near-network 0.1.0", "near-primitives 0.1.0", @@ -3356,6 +3376,18 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "sysinfo" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "doc-comment 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "take_mut" version = "0.2.2" @@ -3433,7 +3465,7 @@ dependencies = [ "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "near 0.2.4", + "near 0.2.6", "near-chain 0.1.0", "near-client 0.1.0", "near-jsonrpc 0.1.0", @@ -4285,6 +4317,7 @@ dependencies = [ "checksum digest 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e5b29bf156f3f4b3c4f610a25ff69370616ae6e0657d416de22645483e72af0a" "checksum digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05f47366984d3ad862010e22c7ce81a7dbcaebbdfb37241a620f8b6596ee135c" "checksum dirs 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901" +"checksum doc-comment 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "923dea538cea0aa3025e8685b20d6ee21ef99c4f77e954a30febbaac5ec73a97" "checksum dtoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ea57b42383d091c85abcc2706240b94ab2a8fa1fc81c10ff23c4de06e2a90b5e" "checksum dynasm 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f36d49ab6f8ecc642d2c6ee10fda04ba68003ef0277300866745cdde160e6b40" "checksum dynasmrt 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a4c408a211e7f5762829f5e46bdff0c14bc3b1517a21a4bb781c716bf88b0c68" @@ -4522,6 +4555,7 @@ dependencies = [ "checksum strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" "checksum syn 0.15.36 (registry+https://github.com/rust-lang/crates.io-index)" = "8b4f551a91e2e3848aeef8751d0d4eec9489b6474c720fd4c55958d8d31a430c" "checksum synstructure 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "02353edf96d6e4dc81aea2d8490a7e9db177bf8acb0e951c24940bf866cb313f" +"checksum sysinfo 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c3e2cab189e59f72710e3dd5e1e0d5be0f6c5c999c326f2fdcdf3bf4483ec9fd" "checksum take_mut 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" "checksum tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)" = "b3196bfbffbba3e57481b6ea32249fbaf590396a52505a2615adbb79d9d826d3" "checksum target-lexicon 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1b0ab4982b8945c35cc1c46a83a9094c414f6828a099ce5dcaa8ee2b04642dcb" diff --git a/Dockerfile b/Dockerfile index 800e0f74ff5..0ee805e75c8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,7 @@ FROM phusion/baseimage:0.11 RUN apt-get update -qq && apt-get install -y \ + git \ cmake \ g++ \ protobuf-compiler \ @@ -24,7 +25,10 @@ WORKDIR /near COPY . . ENV CARGO_TARGET_DIR=/tmp/target -RUN cargo build -p near --release && \ +RUN --mount=type=cache,target=/tmp/target \ + --mount=type=cache,target=/usr/local/cargo/git \ + --mount=type=cache,target=/usr/local/cargo/registry \ + cargo build -p near --release && \ cp /tmp/target/release/near /usr/local/bin/ EXPOSE 3030 24567 diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index 2c3a5a17808..f986db0f6e4 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -14,9 +14,13 @@ log = "0.4" rand = "0.6.5" serde_derive = "1.0" serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + +sysinfo = "0.9.0" near-primitives = { path = "../../core/primitives" } near-store = { path = "../../core/store" } near-chain = { path = "../chain" } near-network = { path = "../network" } near-pool = { path = "../pool" } +near-telemetry = { path = "../telemetry" } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 9386bd3ccb3..1b20e82dfd8 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -8,15 +8,15 @@ use std::thread; use std::time::{Duration, Instant}; use actix::{ - Actor, ActorFuture, AsyncContext, Context, ContextFutureSpawner, Handler, Recipient, WrapFuture, + Actor, ActorFuture, Addr, AsyncContext, Context, ContextFutureSpawner, Handler, Recipient, + WrapFuture, }; -use ansi_term::Color::{Cyan, Green, White, Yellow}; use chrono::{DateTime, Utc}; use log::{debug, error, info, warn}; use near_chain::{ Block, BlockApproval, BlockHeader, BlockStatus, Chain, ErrorKind, Provenance, RuntimeAdapter, - Tip, ValidTransaction, + ValidTransaction, }; use near_network::types::{PeerId, ReasonForBan}; use near_network::{ @@ -29,7 +29,9 @@ use near_primitives::transaction::{ReceiptTransaction, SignedTransaction}; use near_primitives::types::{AccountId, BlockIndex, ShardId}; use near_primitives::unwrap_or_return; use near_store::Store; +use near_telemetry::TelemetryActor; +use crate::info::InfoHelper; use crate::sync::{most_weight_peer, BlockSync, HeaderSync, StateSync}; use crate::types::{ BlockProducer, ClientConfig, Error, NetworkInfo, ShardSyncStatus, Status, StatusSyncInfo, @@ -42,10 +44,13 @@ pub struct ClientActor { sync_status: SyncStatus, chain: Chain, runtime_adapter: Arc, + block_producer: Option, tx_pool: TransactionPool, network_actor: Recipient, - block_producer: Option, network_info: NetworkInfo, + /// Identity that represents this Client at the network level. + /// It is used as part of the messages that identify this client. + node_id: PeerId, /// Set of approvals for the next block. approvals: HashMap, /// Timestamp when last block was received / processed. Used to timeout block production. @@ -56,12 +61,8 @@ pub struct ClientActor { block_sync: BlockSync, /// Keeps track of syncing state. state_sync: StateSync, - /// Timestamp when client was started. - started: Instant, - /// Total number of blocks processed. - num_blocks_processed: u64, - /// Total number of transactions processed. - num_tx_processed: u64, + /// Info helper. + info_helper: InfoHelper, } fn wait_until_genesis(genesis_time: &DateTime) { @@ -82,8 +83,10 @@ impl ClientActor { store: Arc, genesis_time: DateTime, runtime_adapter: Arc, + node_id: PeerId, network_actor: Recipient, block_producer: Option, + telemtetry_actor: Addr, ) -> Result { wait_until_genesis(&genesis_time); let chain = Chain::new(store, runtime_adapter.clone(), genesis_time)?; @@ -95,6 +98,7 @@ impl ClientActor { if let Some(bp) = &block_producer { info!(target: "client", "Starting validator node: {}", bp.account_id); } + let info_helper = InfoHelper::new(telemtetry_actor, block_producer.clone()); Ok(ClientActor { config, sync_status, @@ -102,6 +106,7 @@ impl ClientActor { runtime_adapter, tx_pool, network_actor, + node_id, block_producer, network_info: NetworkInfo { num_active_peers: 0, @@ -115,9 +120,7 @@ impl ClientActor { header_sync, block_sync, state_sync, - started: Instant::now(), - num_blocks_processed: 0, - num_tx_processed: 0, + info_helper, }) } } @@ -292,8 +295,7 @@ impl ClientActor { self.last_block_processed = Instant::now(); // Count blocks and transactions processed both in SYNC and regular modes. - self.num_blocks_processed += 1; - self.num_tx_processed += block.transactions.len() as u64; + self.info_helper.block_processed(block.transactions.len() as u64); if provenance != Provenance::SYNC { // If we produced the block, then we want to broadcast it. @@ -774,7 +776,6 @@ impl ClientActor { if !needs_syncing { if currently_syncing { - self.started = Instant::now(); self.last_block_processed = Instant::now(); self.sync_status = SyncStatus::NoSync; @@ -865,7 +866,6 @@ impl ClientActor { /// Periodically log summary. fn log_summary(&self, ctx: &mut Context) { ctx.run_later(self.config.log_summary_period, move |act, ctx| { - // TODO: collect traffic, tx, blocks. let head = unwrap_or_return!(act.chain.head(), ()); let validators = unwrap_or_return!(act.get_epoch_block_proposers(head.epoch_hash), ()); let num_validators = validators.len(); @@ -874,19 +874,15 @@ impl ClientActor { } else { false }; - // Block#, Block Hash, is validator/# validators, active/max peers. - let avg_bls = (act.num_blocks_processed as f64) / (act.started.elapsed().as_millis() as f64) * 1000.0; - let avg_tps = (act.num_tx_processed as f64) / (act.started.elapsed().as_millis() as f64) * 1000.0; - info!(target: "info", "{} {} {} {} {}", - Yellow.bold().paint(display_sync_status(&act.sync_status, &head)), - White.bold().paint(format!("{}/{}", if is_validator { "V" } else { "-" }, num_validators)), - Cyan.bold().paint(format!("{:2}/{:?}/{:2} peers", act.network_info.num_active_peers, act.network_info.most_weight_peers.len(), act.network_info.peer_max_count)), - Cyan.bold().paint(format!("⬇ {} ⬆ {}", pretty_bytes_per_sec(act.network_info.received_bytes_per_sec), pretty_bytes_per_sec(act.network_info.sent_bytes_per_sec))), - Green.bold().paint(format!("{:.2} bls {:.2} tps", avg_bls, avg_tps)) + + act.info_helper.info( + &head, + &act.sync_status, + &act.node_id, + &act.network_info, + is_validator, + num_validators, ); - act.started = Instant::now(); - act.num_blocks_processed = 0; - act.num_tx_processed = 0; act.log_summary(ctx); }); @@ -943,58 +939,3 @@ impl ClientActor { Ok((payload, receipts)) } } - -fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String { - match sync_status { - SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height), - SyncStatus::NoSync => format!("#{:>8} {}", head.height, head.last_block_hash), - SyncStatus::HeaderSync { current_height, highest_height } => { - let percent = - if *highest_height == 0 { 0 } else { current_height * 100 / highest_height }; - format!("#{:>8} Downloading headers {}%", head.height, percent) - } - SyncStatus::BodySync { current_height, highest_height } => { - let percent = - if *highest_height == 0 { 0 } else { current_height * 100 / highest_height }; - format!("#{:>8} Downloading blocks {}%", head.height, percent) - } - SyncStatus::StateSync(_sync_hash, shard_statuses) => { - let mut res = String::from("State "); - for (shard_id, shard_status) in shard_statuses { - res = res - + format!( - "{}: {}", - shard_id, - match shard_status { - ShardSyncStatus::StateDownload { - start_time: _, - prev_update_time: _, - prev_downloaded_size: _, - downloaded_size: _, - total_size: _, - } => format!("download"), - ShardSyncStatus::StateValidation => format!("validation"), - ShardSyncStatus::StateDone => format!("done"), - ShardSyncStatus::Error(error) => format!("error {}", error), - } - ) - .as_str(); - } - res - } - SyncStatus::StateSyncDone => format!("State sync donee"), - } -} - -/// Format bytes per second in a nice way. -fn pretty_bytes_per_sec(num: u64) -> String { - if num < 100 { - // Under 0.1 kiB, display in bytes. - format!("{} B/s", num) - } else if num < 1024 * 1024 { - // Under 1.0 MiB/sec display in kiB/sec. - format!("{:.1}kiB/s", num as f64 / 1024.0) - } else { - format!("{:.1}MiB/s", num as f64 / (1024.0 * 1024.0)) - } -} diff --git a/chain/client/src/info.rs b/chain/client/src/info.rs new file mode 100644 index 00000000000..53218e8bf8c --- /dev/null +++ b/chain/client/src/info.rs @@ -0,0 +1,201 @@ +use std::time::Instant; + +use actix::Addr; +use ansi_term::Color::{Blue, Cyan, Green, White, Yellow}; +use log::info; +use serde_json::json; +use sysinfo::{get_current_pid, Pid, ProcessExt, System, SystemExt}; + +use near_chain::Tip; +use near_network::types::PeerId; +use near_primitives::serialize::to_base; +use near_telemetry::{telemetry, TelemetryActor}; + +use crate::types::{BlockProducer, NetworkInfo, ShardSyncStatus, SyncStatus}; + +/// A helper that prints information about current chain and reports to telemetry. +pub struct InfoHelper { + /// Timestamp when client was started. + started: Instant, + /// Total number of blocks processed. + num_blocks_processed: u64, + /// Total number of transactions processed. + num_tx_processed: u64, + /// Process id to query resources. + pid: Option, + /// System reference. + sys: System, + /// Sign telemetry with block producer key if available. + block_producer: Option, + /// Telemetry actor. + telemetry_actor: Addr, +} + +impl InfoHelper { + pub fn new( + telemetry_actor: Addr, + block_producer: Option, + ) -> Self { + InfoHelper { + started: Instant::now(), + num_blocks_processed: 0, + num_tx_processed: 0, + pid: get_current_pid().ok(), + sys: System::new(), + telemetry_actor, + block_producer, + } + } + + pub fn block_processed(&mut self, num_transactions: u64) { + self.num_blocks_processed += 1; + self.num_tx_processed += num_transactions; + } + + pub fn info( + &mut self, + head: &Tip, + sync_status: &SyncStatus, + node_id: &PeerId, + network_info: &NetworkInfo, + is_validator: bool, + num_validators: usize, + ) { + let (cpu_usage, memory) = if let Some(pid) = self.pid { + if self.sys.refresh_process(pid) { + let proc = self + .sys + .get_process(pid) + .expect("refresh_process succeeds, this should be not None"); + (proc.cpu_usage(), proc.memory()) + } else { + (0.0, 0) + } + } else { + (0.0, 0) + }; + + // Block#, Block Hash, is validator/# validators, active/max peers, traffic, blocks/sec & tx/sec + let avg_bls = (self.num_blocks_processed as f64) + / (self.started.elapsed().as_millis() as f64) + * 1000.0; + let avg_tps = + (self.num_tx_processed as f64) / (self.started.elapsed().as_millis() as f64) * 1000.0; + info!(target: "info", "{} {} {} {} {} {}", + Yellow.bold().paint(display_sync_status(&sync_status, &head)), + White.bold().paint(format!("{}/{}", if is_validator { "V" } else { "-" }, num_validators)), + Cyan.bold().paint(format!("{:2}/{:?}/{:2} peers", network_info.num_active_peers, network_info.most_weight_peers.len(), network_info.peer_max_count)), + Cyan.bold().paint(format!("⬇ {} ⬆ {}", pretty_bytes_per_sec(network_info.received_bytes_per_sec), pretty_bytes_per_sec(network_info.sent_bytes_per_sec))), + Green.bold().paint(format!("{:.2} bls {:.2} tps", avg_bls, avg_tps)), + Blue.bold().paint(format!("CPU: {:.0}%, Mem: {}", cpu_usage, pretty_bytes(memory * 1024))) + ); + self.started = Instant::now(); + self.num_blocks_processed = 0; + self.num_tx_processed = 0; + + telemetry( + &self.telemetry_actor, + try_sign_json( + json!({ + "account_id": self.block_producer.clone().map(|bp| bp.account_id).unwrap_or("".to_string()), + "node_id": node_id, + "status": display_sync_status(&sync_status, &head), + "latest_block_hash": to_base(&head.last_block_hash), + "latest_block_height": head.height, + "num_peers": network_info.num_active_peers, + "bandwidth_download": network_info.received_bytes_per_sec, + "bandwidth_upload": network_info.sent_bytes_per_sec, + "cpu": cpu_usage, + "memory": memory, + }), + &self.block_producer, + ), + ); + } +} + +/// Tries to sign given JSON with block producer if it's present and all succeeds. +fn try_sign_json( + mut value: serde_json::Value, + block_producer: &Option, +) -> serde_json::Value { + let mut signature = "".to_string(); + if let Some(bp) = block_producer { + if let Ok(s) = serde_json::to_string(&value) { + signature = to_base(&bp.signer.sign(s.as_bytes())); + } + } + value["signature"] = signature.into(); + value +} + +fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String { + match sync_status { + SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height), + SyncStatus::NoSync => format!("#{:>8} {}", head.height, head.last_block_hash), + SyncStatus::HeaderSync { current_height, highest_height } => { + let percent = + if *highest_height == 0 { 0 } else { current_height * 100 / highest_height }; + format!("#{:>8} Downloading headers {}%", head.height, percent) + } + SyncStatus::BodySync { current_height, highest_height } => { + let percent = + if *highest_height == 0 { 0 } else { current_height * 100 / highest_height }; + format!("#{:>8} Downloading blocks {}%", head.height, percent) + } + SyncStatus::StateSync(_sync_hash, shard_statuses) => { + let mut res = String::from("State "); + for (shard_id, shard_status) in shard_statuses { + res = res + + format!( + "{}: {}", + shard_id, + match shard_status { + ShardSyncStatus::StateDownload { + start_time: _, + prev_update_time: _, + prev_downloaded_size: _, + downloaded_size: _, + total_size: _, + } => format!("download"), + ShardSyncStatus::StateValidation => format!("validation"), + ShardSyncStatus::StateDone => format!("done"), + ShardSyncStatus::Error(error) => format!("error {}", error), + } + ) + .as_str(); + } + res + } + SyncStatus::StateSyncDone => format!("State sync done"), + } +} + +const KILOBYTE: u64 = 1024; +const MEGABYTE: u64 = KILOBYTE * 1024; +const GIGABYTE: u64 = MEGABYTE * 1024; + +/// Format bytes per second in a nice way. +fn pretty_bytes_per_sec(num: u64) -> String { + if num < 100 { + // Under 0.1 kiB, display in bytes. + format!("{} B/s", num) + } else if num < MEGABYTE { + // Under 1.0 MiB/sec display in kiB/sec. + format!("{:.1}kiB/s", num as f64 / KILOBYTE as f64) + } else { + format!("{:.1}MiB/s", num as f64 / MEGABYTE as f64) + } +} + +fn pretty_bytes(num: u64) -> String { + if num < 1024 { + format!("{} B", num) + } else if num < MEGABYTE { + format!("{:.1} kiB", num as f64 / KILOBYTE as f64) + } else if num < GIGABYTE { + format!("{:.1} MiB", num as f64 / MEGABYTE as f64) + } else { + format!("{:.1} GiB", num as f64 / GIGABYTE as f64) + } +} diff --git a/chain/client/src/lib.rs b/chain/client/src/lib.rs index efdc2c15f7a..9956bc9cf0e 100644 --- a/chain/client/src/lib.rs +++ b/chain/client/src/lib.rs @@ -1,11 +1,12 @@ pub use crate::client::ClientActor; pub use crate::types::{ BlockProducer, ClientConfig, Error, GetBlock, NetworkInfo, Query, Status, StatusResponse, - SyncStatus, TxStatus, TxDetails + SyncStatus, TxDetails, TxStatus, }; pub use crate::view_client::ViewClientActor; mod client; +mod info; mod sync; pub mod test_utils; mod types; diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs index b63022bd66b..c794e98eeb4 100644 --- a/chain/client/src/test_utils.rs +++ b/chain/client/src/test_utils.rs @@ -6,8 +6,10 @@ use chrono::Utc; use near_chain::test_utils::KeyValueRuntime; use near_network::{NetworkRequests, NetworkResponses, PeerManagerActor}; +use near_primitives::crypto::signature::PublicKey; use near_primitives::crypto::signer::InMemorySigner; use near_store::test_utils::create_test_store; +use near_telemetry::TelemetryActor; use crate::{BlockProducer, ClientActor, ClientConfig, ViewClientActor}; @@ -27,6 +29,7 @@ pub fn setup( )); let signer = Arc::new(InMemorySigner::from_seed(account_id, account_id)); let genesis_time = Utc::now(); + let telemetry = TelemetryActor::default().start(); let view_client = ViewClientActor::new(store.clone(), genesis_time.clone(), runtime.clone()).unwrap(); let client = ClientActor::new( @@ -34,8 +37,10 @@ pub fn setup( store, genesis_time, runtime, + PublicKey::empty().into(), recipient, Some(signer.into()), + telemetry, ) .unwrap(); (client, view_client) diff --git a/chain/telemetry/Cargo.toml b/chain/telemetry/Cargo.toml new file mode 100644 index 00000000000..21f0591f1b5 --- /dev/null +++ b/chain/telemetry/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "near-telemetry" +version = "0.1.0" +authors = ["Near Inc "] +edition = "2018" + +[dependencies] +actix-web = "1.0.0-rc" +actix = "0.8.1" +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" diff --git a/chain/telemetry/src/lib.rs b/chain/telemetry/src/lib.rs new file mode 100644 index 00000000000..0718c7fa54b --- /dev/null +++ b/chain/telemetry/src/lib.rs @@ -0,0 +1,64 @@ +use std::time::Duration; + +use actix::prelude::Future; +use actix::{Actor, Addr, Context, Handler, Message}; +use actix_web::client::Client; +use serde_derive::{Deserialize, Serialize}; + +/// Timeout for establishing connection. +const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +pub struct TelemetryConfig { + pub endpoints: Vec, +} + +/// Event to send over telemetry. +#[derive(Message)] +pub struct TelemetryEvent { + content: serde_json::Value, +} + +pub struct TelemetryActor { + config: TelemetryConfig, + client: Client, +} + +impl Default for TelemetryActor { + fn default() -> Self { + Self::new(TelemetryConfig::default()) + } +} + +impl TelemetryActor { + pub fn new(config: TelemetryConfig) -> Self { + let client = Client::build().timeout(CONNECT_TIMEOUT).finish(); + Self { config, client } + } +} + +impl Actor for TelemetryActor { + type Context = Context; +} + +impl Handler for TelemetryActor { + type Result = (); + + fn handle(&mut self, msg: TelemetryEvent, _ctx: &mut Context) { + for endpoint in self.config.endpoints.iter() { + actix::spawn( + self.client + .post(endpoint) + .header("Content-Type", "application/json") + .send_json(&msg.content) + .map_err(|_err| {}) + .map(|_response| {}), + ); + } + } +} + +/// Send telemetry event to all the endpoints. +pub fn telemetry(telemetry: &Addr, content: serde_json::Value) { + telemetry.do_send(TelemetryEvent { content }); +} diff --git a/core/primitives/src/block.rs b/core/primitives/src/block.rs index f7c9daa04d7..d818be194bc 100644 --- a/core/primitives/src/block.rs +++ b/core/primitives/src/block.rs @@ -50,9 +50,11 @@ pub struct BlockHeader { pub epoch_hash: CryptoHash, /// Signature of the block producer. + #[serde(with = "base_format")] pub signature: Signature, /// Cached value of hash for this block. + #[serde(with = "base_format")] hash: CryptoHash, } diff --git a/core/primitives/src/serialize.rs b/core/primitives/src/serialize.rs index 7dded77ceb5..d9dada0ffe4 100644 --- a/core/primitives/src/serialize.rs +++ b/core/primitives/src/serialize.rs @@ -100,6 +100,38 @@ pub mod base_format { } } +pub mod option_base_format { + use serde::de; + use serde::{Deserialize, Deserializer, Serializer}; + + use super::{BaseDecode, BaseEncode}; + + pub fn serialize(data: &Option, serializer: S) -> Result + where + T: BaseEncode, + S: Serializer, + { + if let Some(x) = data { + serializer.serialize_str(&x.to_base()) + } else { + serializer.serialize_str("") + } + } + + pub fn deserialize<'de, T, D>(deserializer: D) -> Result, D::Error> + where + T: BaseDecode + std::fmt::Debug, + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + if s.is_empty() { + Ok(None) + } else { + T::from_base(&s).map(|x| Some(x)).map_err(|err| de::Error::custom(err.to_string())) + } + } +} + pub mod vec_base_format { use std::fmt; diff --git a/core/primitives/src/transaction.rs b/core/primitives/src/transaction.rs index 5d8174ba45c..89ab5e65fc5 100644 --- a/core/primitives/src/transaction.rs +++ b/core/primitives/src/transaction.rs @@ -15,7 +15,7 @@ use crate::account::AccessKey; use crate::crypto::signature::{verify, PublicKey, Signature, DEFAULT_SIGNATURE}; use crate::hash::{hash, CryptoHash}; use crate::logging; -use crate::serialize::base_format; +use crate::serialize::{base_bytes_format, base_format, option_base_format, u128_dec_format}; use crate::types::{AccountId, Balance, CallbackId, Nonce, ShardId, StructSignature}; use crate::utils::{account_to_shard_id, proto_to_result}; @@ -49,7 +49,10 @@ pub struct CreateAccountTransaction { pub nonce: Nonce, pub originator: AccountId, pub new_account_id: AccountId, + #[serde(with = "u128_dec_format")] pub amount: Balance, + // TODO: replace to PublicKey + #[serde(with = "base_bytes_format")] pub public_key: Vec, } @@ -140,6 +143,7 @@ pub struct FunctionCallTransaction { pub contract_id: AccountId, pub method_name: Vec, pub args: Vec, + #[serde(with = "u128_dec_format")] pub amount: Balance, } @@ -190,6 +194,7 @@ pub struct SendMoneyTransaction { pub nonce: Nonce, pub originator: AccountId, pub receiver: AccountId, + #[serde(with = "u128_dec_format")] pub amount: Balance, } @@ -222,6 +227,7 @@ impl From for transaction_proto::SendMoneyTransaction { pub struct StakeTransaction { pub nonce: Nonce, pub originator: AccountId, + #[serde(with = "u128_dec_format")] pub amount: Balance, pub public_key: String, } @@ -256,7 +262,9 @@ pub struct SwapKeyTransaction { pub nonce: Nonce, pub originator: AccountId, // one of the current keys to the account that will be swapped out + #[serde(with = "base_bytes_format")] pub cur_key: Vec, + #[serde(with = "base_bytes_format")] pub new_key: Vec, } @@ -298,6 +306,7 @@ impl fmt::Debug for SwapKeyTransaction { pub struct AddKeyTransaction { pub nonce: Nonce, pub originator: AccountId, + #[serde(with = "base_bytes_format")] pub new_key: Vec, pub access_key: Option, } @@ -458,9 +467,12 @@ impl TransactionBody { #[derive(Eq, Debug, Clone, Serialize, Deserialize)] pub struct SignedTransaction { pub body: TransactionBody, + #[serde(with = "base_format")] pub signature: StructSignature, // In case this TX uses AccessKey, it needs to provide the public_key + #[serde(with = "option_base_format")] pub public_key: Option, + #[serde(with = "base_format")] hash: CryptoHash, } diff --git a/near/Cargo.toml b/near/Cargo.toml index 945936970d8..34ed0a1f483 100644 --- a/near/Cargo.toml +++ b/near/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "near" -version = "0.2.4" +version = "0.2.6" authors = ["Near Inc "] edition = "2018" @@ -31,6 +31,7 @@ near-pool = { path = "../chain/pool" } near-network = { path = "../chain/network" } near-jsonrpc = { path = "../chain/jsonrpc" } near-verifier = { path = "../runtime/verifier" } +near-telemetry = { path = "../chain/telemetry" } [dev-dependencies] tempdir = "0.3" diff --git a/near/src/config.rs b/near/src/config.rs index c34f9f7949e..8e5b1f4ffb1 100644 --- a/near/src/config.rs +++ b/near/src/config.rs @@ -24,6 +24,7 @@ use near_primitives::crypto::signer::{EDSigner, InMemorySigner, KeyFile}; use near_primitives::hash::hash; use near_primitives::serialize::{to_base64, u128_dec_format}; use near_primitives::types::{AccountId, Balance, BlockIndex, ReadablePublicKey, ValidatorId}; +use near_telemetry::TelemetryConfig; use node_runtime::StateRecord; /// Initial balance used in tests. @@ -66,6 +67,8 @@ pub const GENESIS_CONFIG_FILENAME: &str = "genesis.json"; pub const NODE_KEY_FILE: &str = "node_key.json"; pub const VALIDATOR_KEY_FILE: &str = "validator_key.json"; +const DEFAULT_TELEMETRY_URL: &str = "https://explorer.nearprotocol.com/api/nodes"; + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Network { /// Address to listen for incoming connections. @@ -126,11 +129,13 @@ impl Default for Consensus { } #[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(default)] pub struct Config { pub genesis_file: String, pub validator_key_file: String, pub node_key_file: String, pub rpc: RpcConfig, + pub telemetry: TelemetryConfig, pub network: Network, pub consensus: Consensus, } @@ -142,6 +147,7 @@ impl Default for Config { validator_key_file: VALIDATOR_KEY_FILE.to_string(), node_key_file: NODE_KEY_FILE.to_string(), rpc: RpcConfig::default(), + telemetry: TelemetryConfig::default(), network: Network::default(), consensus: Consensus::default(), } @@ -177,6 +183,7 @@ pub struct NearConfig { pub client_config: ClientConfig, pub network_config: NetworkConfig, pub rpc_config: RpcConfig, + pub telemetry_config: TelemetryConfig, pub block_producer: Option, pub genesis_config: GenesisConfig, } @@ -239,6 +246,7 @@ impl NearConfig { peer_expiration_duration: Duration::from_secs(7 * 24 * 60 * 60), peer_stats_period: Duration::from_secs(5), }, + telemetry_config: config.telemetry, rpc_config: config.rpc, genesis_config: genesis_config.clone(), block_producer, @@ -467,7 +475,8 @@ pub fn init_configs( if test_seed.is_some() { panic!("Test seed is not supported for official TestNet"); } - let config = Config::default(); + let mut config = Config::default(); + config.telemetry.endpoints.push(DEFAULT_TELEMETRY_URL.to_string()); config.write_to_file(&dir.join(CONFIG_FILENAME)); // If account id was given, create new key pair for this validator. diff --git a/near/src/lib.rs b/near/src/lib.rs index 502da24aeb5..2b748ba18d7 100644 --- a/near/src/lib.rs +++ b/near/src/lib.rs @@ -9,6 +9,7 @@ use near_client::{ClientActor, ViewClientActor}; use near_jsonrpc::start_http; use near_network::PeerManagerActor; use near_store::create_store; +use near_telemetry::TelemetryActor; pub use crate::config::{ init_configs, load_config, load_test_config, GenesisConfig, NearConfig, NEAR_BASE, @@ -56,6 +57,8 @@ pub fn start_with_config( let runtime = Arc::new(NightshadeRuntime::new(home_dir, store.clone(), config.genesis_config.clone())); + let telemetry = TelemetryActor::new(config.telemetry_config.clone()).start(); + let view_client = ViewClientActor::new( store.clone(), config.genesis_config.genesis_time.clone(), @@ -64,6 +67,7 @@ pub fn start_with_config( .unwrap() .start(); let view_client1 = view_client.clone(); + let node_id = config.network_config.public_key.clone().into(); let client = ClientActor::create(move |ctx| { let network_actor = PeerManagerActor::new(store.clone(), config.network_config, ctx.address().recipient()) @@ -77,8 +81,10 @@ pub fn start_with_config( store.clone(), config.genesis_config.genesis_time, runtime, + node_id, network_actor.recipient(), config.block_producer, + telemetry, ) .unwrap() }); diff --git a/near/src/main.rs b/near/src/main.rs index 0e6c655b6a6..731da8d2778 100644 --- a/near/src/main.rs +++ b/near/src/main.rs @@ -58,6 +58,7 @@ fn main() { .arg(Arg::with_name("min-peers").long("min-peers").help("Minimum number of peers to start syncing / producing blocks").takes_value(true)) .arg(Arg::with_name("network-addr").long("network-addr").help("Customize network listening address (useful for running multiple nodes on the same machine)").takes_value(true)) .arg(Arg::with_name("rpc-addr").long("rpc-addr").help("Customize RPC listening address (useful for running multiple nodes on the same machine)").takes_value(true)) + .arg(Arg::with_name("telemetry-url").long("telemetry-url").help("Customize telemetry url").takes_value(true)) ) .subcommand(SubCommand::with_name("unsafe_reset_data").about("(unsafe) Remove all the data, effectively resetting node to genesis state (keeps genesis and config)")) .subcommand(SubCommand::with_name("unsafe_reset_all").about("(unsafe) Remove all the config, keys, data and effectively removing all information about the network")) @@ -124,6 +125,9 @@ fn main() { if let Some(rpc_addr) = args.value_of("rpc-addr") { near_config.rpc_config.addr = rpc_addr.to_string(); } + if let Some(telemetry_url) = args.value_of("telemetry-url") { + near_config.telemetry_config.endpoints.push(telemetry_url.to_string()); + } let system = System::new("NEAR"); start_with_config(home_dir, near_config); diff --git a/near/src/runtime.rs b/near/src/runtime.rs index 22aa37bb4e6..cabe663d5f1 100644 --- a/near/src/runtime.rs +++ b/near/src/runtime.rs @@ -552,7 +552,7 @@ mod test { amount: TESTING_INIT_BALANCE - TESTING_INIT_STAKE * 2, stake: TESTING_INIT_STAKE * 2, public_keys: vec![block_producers[0].signer.public_key()], - code_hash: account.code_hash + code_hash: account.code_hash, } ); diff --git a/near/tests/stake_nodes.rs b/near/tests/stake_nodes.rs index 220b60a6ce1..82828e4b88a 100644 --- a/near/tests/stake_nodes.rs +++ b/near/tests/stake_nodes.rs @@ -114,6 +114,7 @@ fn test_stake_nodes() { }); } +/// TODO(1094): Enable kickout test after figuring #[test] #[ignore] fn test_validator_kickout() {