@@ -2,6 +2,7 @@ use std::sync::{LazyLock, OnceLock};
22
33pub use jobserver_crate:: { Acquired , Client , HelperThread } ;
44use jobserver_crate:: { FromEnv , FromEnvErrorKind } ;
5+ use parking_lot:: { Condvar , Mutex } ;
56
67// We can only call `from_env_ext` once per process
78
@@ -52,8 +53,6 @@ fn default_client() -> Client {
5253 client
5354}
5455
55- static GLOBAL_CLIENT_CHECKED : OnceLock < Client > = OnceLock :: new ( ) ;
56-
5756pub fn initialize_checked ( report_warning : impl FnOnce ( & ' static str ) ) {
5857 let client_checked = match & * GLOBAL_CLIENT {
5958 Ok ( client) => client. clone ( ) ,
@@ -62,19 +61,112 @@ pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
6261 default_client ( )
6362 }
6463 } ;
65- GLOBAL_CLIENT_CHECKED . set ( client_checked) . ok ( ) ;
64+ let proxy = Proxy {
65+ client : client_checked,
66+ data : Mutex :: new ( ProxyData { total : 1 , used : 1 , needed : 0 } ) ,
67+ wake_needer : Condvar :: new ( ) ,
68+ wake_helper : Condvar :: new ( ) ,
69+ } ;
70+ GLOBAL_PROXY . set ( proxy) . ok ( ) ;
71+
72+ std:: thread:: spawn ( || {
73+ GLOBAL_PROXY . get ( ) . unwrap ( ) . helper ( ) ;
74+ } ) ;
6675}
6776
6877const ACCESS_ERROR : & str = "jobserver check should have been called earlier" ;
6978
7079pub fn client ( ) -> Client {
71- GLOBAL_CLIENT_CHECKED . get ( ) . expect ( ACCESS_ERROR ) . clone ( )
80+ GLOBAL_PROXY . get ( ) . expect ( ACCESS_ERROR ) . client . clone ( )
7281}
7382
7483pub fn acquire_thread ( ) {
75- GLOBAL_CLIENT_CHECKED . get ( ) . expect ( ACCESS_ERROR ) . acquire_raw ( ) . ok ( ) ;
84+ GLOBAL_PROXY . get ( ) . expect ( ACCESS_ERROR ) . acquire_thread ( ) ;
7685}
7786
7887pub fn release_thread ( ) {
79- GLOBAL_CLIENT_CHECKED . get ( ) . expect ( ACCESS_ERROR ) . release_raw ( ) . ok ( ) ;
88+ GLOBAL_PROXY . get ( ) . expect ( ACCESS_ERROR ) . release_thread ( ) ;
89+ }
90+
91+ static GLOBAL_PROXY : OnceLock < Proxy > = OnceLock :: new ( ) ;
92+
93+ struct ProxyData {
94+ /// The number of tokens assigned to this process.
95+ total : u16 ,
96+
97+ /// The number of tokens assigned to threads.
98+ used : u16 ,
99+
100+ /// The number of threads requesting a token
101+ needed : u16 ,
102+ }
103+
104+ /// This is a jobserver proxy used to ensure that we hold on to at least one token.
105+ struct Proxy {
106+ client : Client ,
107+ data : Mutex < ProxyData > ,
108+
109+ /// Threads which are waiting on a token will wait on this.
110+ wake_needer : Condvar ,
111+
112+ /// This is used to wake the helper thread when tokens are needed.
113+ wake_helper : Condvar ,
114+ }
115+
116+ impl Proxy {
117+ fn helper ( & self ) {
118+ let mut data = self . data . lock ( ) ;
119+ loop {
120+ while data. needed > 0 {
121+ drop ( data) ;
122+ self . client . acquire_raw ( ) . ok ( ) ;
123+ data = self . data . lock ( ) ;
124+ if data. needed > 0 {
125+ data. total += 1 ;
126+ data. used += 1 ;
127+ data. needed -= 1 ;
128+ self . wake_needer . notify_one ( ) ;
129+ } else {
130+ drop ( data) ;
131+ self . client . release_raw ( ) . ok ( ) ;
132+ data = self . data . lock ( ) ;
133+ }
134+ }
135+ self . wake_helper . wait ( & mut data) ;
136+ }
137+ }
138+
139+ fn acquire_thread ( & self ) {
140+ let mut data = self . data . lock ( ) ;
141+
142+ if data. total > data. used {
143+ assert_eq ! ( data. needed, 0 ) ;
144+ data. used += 1 ;
145+ } else {
146+ if data. needed == 0 {
147+ self . wake_helper . notify_one ( ) ;
148+ }
149+ data. needed += 1 ;
150+ self . wake_needer . wait ( & mut data) ;
151+ }
152+ }
153+
154+ fn release_thread ( & self ) {
155+ let mut data = self . data . lock ( ) ;
156+
157+ if data. needed > 0 {
158+ // Give the token to a waiting thread
159+ data. needed -= 1 ;
160+ self . wake_needer . notify_one ( ) ;
161+ } else {
162+ data. used -= 1 ;
163+
164+ // Release the token unless it's the last one in the process
165+ if data. total > 1 {
166+ data. total -= 1 ;
167+ drop ( data) ;
168+ self . client . release_raw ( ) . ok ( ) ;
169+ }
170+ }
171+ }
80172}
0 commit comments