1+ //! Module implementing an Open Metrics histogram.
2+ //!
3+ //! See [`Summary`] for details.
4+
5+ use super :: { MetricType , TypedMetric } ;
6+ //use owning_ref::OwningRef;
7+ //use std::iter::{self, once};
8+ use std:: sync:: { Arc , Mutex } ;
9+
10+ use quantiles:: ckms:: CKMS ;
11+
12+ /// Open Metrics [`Summary`] to measure distributions of discrete events.
13+ pub struct Summary {
14+ target_quantile : Vec < f64 > ,
15+ target_error : f64 ,
16+ max_age_buckets : u64 ,
17+ max_age_seconds : u64 ,
18+ inner : Arc < Mutex < InnerSummary > > ,
19+ }
20+
21+ impl Clone for Summary {
22+ fn clone ( & self ) -> Self {
23+ Summary {
24+ target_quantile : self . target_quantile . clone ( ) ,
25+ target_error : self . target_error ,
26+ max_age_buckets : self . max_age_buckets ,
27+ max_age_seconds : self . max_age_seconds ,
28+ inner : self . inner . clone ( ) ,
29+ }
30+ }
31+ }
32+
33+ pub ( crate ) struct InnerSummary {
34+ sum : f64 ,
35+ count : u64 ,
36+ quantile_streams : Vec < CKMS < f64 > > ,
37+ // head_stream is like a cursor which carries the index
38+ // of the stream in the quantile_streams that we want to query
39+ head_stream : u64 ,
40+ }
41+
42+ impl Summary {
43+ pub fn new ( max_age_buckets : u64 , max_age_seconds : u64 , target_quantile : Vec < f64 > , target_error : f64 ) -> Self {
44+ let mut streams: Vec < CKMS < f64 > > = Vec :: new ( ) ;
45+ for _ in 0 ..max_age_buckets {
46+ streams. push ( CKMS :: new ( target_error) ) ;
47+ }
48+
49+ Summary {
50+ max_age_buckets,
51+ max_age_seconds,
52+ target_quantile,
53+ target_error,
54+ inner : Arc :: new ( Mutex :: new ( InnerSummary {
55+ sum : Default :: default ( ) ,
56+ count : Default :: default ( ) ,
57+ quantile_streams : streams,
58+ head_stream : 0 ,
59+ } ) )
60+ }
61+ }
62+
63+ pub fn observe ( & mut self , v : f64 ) {
64+ let mut inner = self . inner . lock ( ) . unwrap ( ) ;
65+ inner. sum += v;
66+ inner. count += 1 ;
67+
68+ // insert quantiles into all streams/buckets.
69+ for stream in inner. quantile_streams . iter_mut ( ) {
70+ stream. insert ( v) ;
71+ }
72+ }
73+
74+ pub fn get ( & self ) -> ( f64 , u64 , Vec < ( f64 , f64 ) > ) {
75+ let inner = self . inner . lock ( ) . unwrap ( ) ;
76+ let sum = inner. sum ;
77+ let count = inner. count ;
78+ let head = inner. head_stream ;
79+ let mut quantile_values: Vec < ( f64 , f64 ) > = Vec :: new ( ) ;
80+
81+ // TODO: add stream rotation
82+ for q in self . target_quantile . iter ( ) {
83+ match inner. quantile_streams [ head as usize ] . query ( * q) {
84+ Some ( ( _, v) ) => quantile_values. push ( ( * q, v) ) ,
85+ None => continue , // TODO fix this
86+ } ;
87+ }
88+ ( sum, count, quantile_values)
89+ }
90+ }
91+
92+ // TODO: should this type impl Default like Counter?
93+
94+ impl TypedMetric for Summary {
95+ const TYPE : MetricType = MetricType :: Summary ;
96+ }
97+
98+ #[ cfg( test) ]
99+ mod tests {
100+ use super :: * ;
101+
102+ #[ test]
103+ fn basic ( ) {
104+ let mut summary = Summary :: new ( 5 , 10 , vec ! [ 0.5 , 0.9 , 0.99 ] , 0.01 ) ;
105+ summary. observe ( 5.0 ) ;
106+ summary. observe ( 15.0 ) ;
107+ summary. observe ( 25.0 ) ;
108+
109+ let ( s, c, q) = summary. get ( ) ;
110+ assert_eq ! ( 45.0 , s) ;
111+ assert_eq ! ( 3 , c) ;
112+
113+ for elem in q. iter ( ) {
114+ println ! ( "Vec<{}, {}>" , elem. 0 , elem. 1 ) ;
115+ }
116+ }
117+ }
0 commit comments