11use diesel:: prelude:: * ;
2- use diesel:: r2d2:: { self , ConnectionManager , CustomizeConnection } ;
2+ use diesel:: r2d2:: { self , ConnectionManager , CustomizeConnection , State } ;
33use prometheus:: Histogram ;
44use secrecy:: { ExposeSecret , SecretString } ;
5- use std:: sync:: { Arc , Mutex , MutexGuard } ;
6- use std:: {
7- ops:: { Deref , DerefMut } ,
8- time:: Duration ,
9- } ;
5+ use std:: time:: Duration ;
106use thiserror:: Error ;
117use url:: Url ;
128
@@ -17,15 +13,9 @@ pub mod sql_types;
1713pub type ConnectionPool = r2d2:: Pool < ConnectionManager < PgConnection > > ;
1814
1915#[ derive( Clone ) ]
20- pub enum DieselPool {
21- Pool {
22- pool : ConnectionPool ,
23- time_to_obtain_connection_metric : Histogram ,
24- } ,
25- BackgroundJobPool {
26- pool : ConnectionPool ,
27- } ,
28- Test ( Arc < Mutex < PgConnection > > ) ,
16+ pub struct DieselPool {
17+ pool : ConnectionPool ,
18+ time_to_obtain_connection_metric : Option < Histogram > ,
2919}
3020
3121impl DieselPool {
@@ -48,9 +38,9 @@ impl DieselPool {
4838 // serving errors for the first connections until the pool is initialized) and if we can't
4939 // establish any connection continue booting up the application. The database pool will
5040 // automatically be marked as unhealthy and the rest of the application will adapt.
51- let pool = DieselPool :: Pool {
41+ let pool = DieselPool {
5242 pool : r2d2_config. build_unchecked ( manager) ,
53- time_to_obtain_connection_metric,
43+ time_to_obtain_connection_metric : Some ( time_to_obtain_connection_metric ) ,
5444 } ;
5545 match pool. wait_until_healthy ( Duration :: from_secs ( 5 ) ) {
5646 Ok ( ( ) ) => { }
@@ -62,67 +52,39 @@ impl DieselPool {
6252 }
6353
6454 pub fn new_background_worker ( pool : r2d2:: Pool < ConnectionManager < PgConnection > > ) -> Self {
65- Self :: BackgroundJobPool { pool }
66- }
67-
68- pub ( crate ) fn new_test ( config : & config:: DatabasePools , url : & SecretString ) -> DieselPool {
69- let mut conn = PgConnection :: establish ( & connection_url ( config, url. expose_secret ( ) ) )
70- . expect ( "failed to establish connection" ) ;
71- conn. begin_test_transaction ( )
72- . expect ( "failed to begin test transaction" ) ;
73- DieselPool :: Test ( Arc :: new ( Mutex :: new ( conn) ) )
55+ Self {
56+ pool,
57+ time_to_obtain_connection_metric : None ,
58+ }
7459 }
7560
7661 #[ instrument( name = "db.connect" , skip_all) ]
77- pub fn get ( & self ) -> Result < DieselPooledConn < ' _ > , PoolError > {
78- match self {
79- DieselPool :: Pool {
80- pool,
81- time_to_obtain_connection_metric,
82- } => time_to_obtain_connection_metric. observe_closure_duration ( || {
83- if let Some ( conn) = pool. try_get ( ) {
84- Ok ( DieselPooledConn :: Pool ( conn) )
85- } else if !self . is_healthy ( ) {
86- Err ( PoolError :: UnhealthyPool )
87- } else {
88- Ok ( DieselPooledConn :: Pool ( pool. get ( ) ?) )
89- }
90- } ) ,
91- DieselPool :: BackgroundJobPool { pool } => Ok ( DieselPooledConn :: Pool ( pool. get ( ) ?) ) ,
92- DieselPool :: Test ( conn) => Ok ( DieselPooledConn :: Test (
93- conn. try_lock ( )
94- . map_err ( |_e| PoolError :: TestConnectionUnavailable ) ?,
95- ) ) ,
62+ pub fn get ( & self ) -> Result < DieselPooledConn , PoolError > {
63+ match self . time_to_obtain_connection_metric . as_ref ( ) {
64+ Some ( time_to_obtain_connection_metric) => time_to_obtain_connection_metric
65+ . observe_closure_duration ( || {
66+ if let Some ( conn) = self . pool . try_get ( ) {
67+ Ok ( conn)
68+ } else if !self . is_healthy ( ) {
69+ Err ( PoolError :: UnhealthyPool )
70+ } else {
71+ Ok ( self . pool . get ( ) ?)
72+ }
73+ } ) ,
74+ None => Ok ( self . pool . get ( ) ?) ,
9675 }
9776 }
9877
99- pub fn state ( & self ) -> PoolState {
100- match self {
101- DieselPool :: Pool { pool, .. } | DieselPool :: BackgroundJobPool { pool } => {
102- let state = pool. state ( ) ;
103- PoolState {
104- connections : state. connections ,
105- idle_connections : state. idle_connections ,
106- }
107- }
108- DieselPool :: Test ( _) => PoolState {
109- connections : 0 ,
110- idle_connections : 0 ,
111- } ,
112- }
78+ pub fn state ( & self ) -> State {
79+ self . pool . state ( )
11380 }
11481
11582 #[ instrument( skip_all) ]
11683 pub fn wait_until_healthy ( & self , timeout : Duration ) -> Result < ( ) , PoolError > {
117- match self {
118- DieselPool :: Pool { pool, .. } | DieselPool :: BackgroundJobPool { pool } => {
119- match pool. get_timeout ( timeout) {
120- Ok ( _) => Ok ( ( ) ) ,
121- Err ( _) if !self . is_healthy ( ) => Err ( PoolError :: UnhealthyPool ) ,
122- Err ( err) => Err ( PoolError :: R2D2 ( err) ) ,
123- }
124- }
125- DieselPool :: Test ( _) => Ok ( ( ) ) ,
84+ match self . pool . get_timeout ( timeout) {
85+ Ok ( _) => Ok ( ( ) ) ,
86+ Err ( _) if !self . is_healthy ( ) => Err ( PoolError :: UnhealthyPool ) ,
87+ Err ( err) => Err ( PoolError :: R2D2 ( err) ) ,
12688 }
12789 }
12890
@@ -131,37 +93,7 @@ impl DieselPool {
13193 }
13294}
13395
134- #[ derive( Debug , Copy , Clone ) ]
135- pub struct PoolState {
136- pub connections : u32 ,
137- pub idle_connections : u32 ,
138- }
139-
140- #[ allow( clippy:: large_enum_variant) ]
141- pub enum DieselPooledConn < ' a > {
142- Pool ( r2d2:: PooledConnection < ConnectionManager < PgConnection > > ) ,
143- Test ( MutexGuard < ' a , PgConnection > ) ,
144- }
145-
146- impl Deref for DieselPooledConn < ' _ > {
147- type Target = PgConnection ;
148-
149- fn deref ( & self ) -> & Self :: Target {
150- match self {
151- DieselPooledConn :: Pool ( conn) => conn. deref ( ) ,
152- DieselPooledConn :: Test ( conn) => conn. deref ( ) ,
153- }
154- }
155- }
156-
157- impl DerefMut for DieselPooledConn < ' _ > {
158- fn deref_mut ( & mut self ) -> & mut Self :: Target {
159- match self {
160- DieselPooledConn :: Pool ( conn) => conn. deref_mut ( ) ,
161- DieselPooledConn :: Test ( conn) => conn. deref_mut ( ) ,
162- }
163- }
164- }
96+ pub type DieselPooledConn = r2d2:: PooledConnection < ConnectionManager < PgConnection > > ;
16597
16698pub fn oneoff_connection_with_config (
16799 config : & config:: DatabasePools ,
0 commit comments