1- use std:: sync:: { LazyLock , OnceLock } ;
1+ use std:: sync:: { Arc , LazyLock , OnceLock } ;
22
33pub use jobserver_crate:: { Acquired , Client , HelperThread } ;
44use jobserver_crate:: { FromEnv , FromEnvErrorKind } ;
@@ -53,6 +53,8 @@ fn default_client() -> Client {
5353 client
5454}
5555
56+ static GLOBAL_CLIENT_CHECKED : OnceLock < Client > = OnceLock :: new ( ) ;
57+
5658pub fn initialize_checked ( report_warning : impl FnOnce ( & ' static str ) ) {
5759 let client_checked = match & * GLOBAL_CLIENT {
5860 Ok ( client) => client. clone ( ) ,
@@ -61,35 +63,15 @@ pub fn initialize_checked(report_warning: impl FnOnce(&'static str)) {
6163 default_client ( )
6264 }
6365 } ;
64- let proxy = Proxy {
65- client : client_checked,
66- data : Mutex :: new ( ProxyData { total : 1 , used : 1 , needed : 0 } ) ,
67- wake_needer : Condvar :: new ( ) ,
68- wake_helper : Condvar :: new ( ) ,
69- } ;
70- GLOBAL_PROXY . set ( proxy) . ok ( ) ;
71-
72- std:: thread:: spawn ( || {
73- GLOBAL_PROXY . get ( ) . unwrap ( ) . helper ( ) ;
74- } ) ;
66+ GLOBAL_CLIENT_CHECKED . set ( client_checked) . ok ( ) ;
7567}
7668
7769const ACCESS_ERROR : & str = "jobserver check should have been called earlier" ;
7870
7971pub fn client ( ) -> Client {
80- GLOBAL_PROXY . get ( ) . expect ( ACCESS_ERROR ) . client . clone ( )
81- }
82-
83- pub fn acquire_thread ( ) {
84- GLOBAL_PROXY . get ( ) . expect ( ACCESS_ERROR ) . acquire_thread ( ) ;
85- }
86-
87- pub fn release_thread ( ) {
88- GLOBAL_PROXY . get ( ) . expect ( ACCESS_ERROR ) . release_thread ( ) ;
72+ GLOBAL_CLIENT_CHECKED . get ( ) . expect ( ACCESS_ERROR ) . clone ( )
8973}
9074
91- static GLOBAL_PROXY : OnceLock < Proxy > = OnceLock :: new ( ) ;
92-
9375struct ProxyData {
9476 /// The number of tokens assigned to this process.
9577 total : u16 ,
@@ -98,66 +80,80 @@ struct ProxyData {
9880 used : u16 ,
9981
10082 /// The number of threads requesting a token
101- needed : u16 ,
83+ pending : u16 ,
10284}
10385
10486/// This is a jobserver proxy used to ensure that we hold on to at least one token.
105- struct Proxy {
87+ pub struct Proxy {
10688 client : Client ,
10789 data : Mutex < ProxyData > ,
10890
10991 /// Threads which are waiting on a token will wait on this.
110- wake_needer : Condvar ,
92+ wake_pending : Condvar ,
11193
112- /// This is used to wake the helper thread when tokens are needed.
113- wake_helper : Condvar ,
94+ helper : OnceLock < HelperThread > ,
11495}
11596
11697impl Proxy {
117- fn helper ( & self ) {
118- let mut data = self . data . lock ( ) ;
119- loop {
120- while data. needed > 0 {
121- drop ( data) ;
122- self . client . acquire_raw ( ) . ok ( ) ;
123- data = self . data . lock ( ) ;
124- if data. needed > 0 {
125- data. total += 1 ;
126- data. used += 1 ;
127- data. needed -= 1 ;
128- self . wake_needer . notify_one ( ) ;
129- } else {
130- drop ( data) ;
131- self . client . release_raw ( ) . ok ( ) ;
132- data = self . data . lock ( ) ;
98+ pub fn new ( ) -> Arc < Self > {
99+ let proxy = Arc :: new ( Proxy {
100+ client : client ( ) ,
101+ data : Mutex :: new ( ProxyData { total : 1 , used : 1 , pending : 0 } ) ,
102+ wake_pending : Condvar :: new ( ) ,
103+ helper : OnceLock :: new ( ) ,
104+ } ) ;
105+ let proxy_ = Arc :: clone ( & proxy) ;
106+ let helper = proxy
107+ . client
108+ . clone ( )
109+ . into_helper_thread ( move |token| {
110+ if let Ok ( token) = token {
111+ let mut data = proxy_. data . lock ( ) ;
112+ if data. pending > 0 {
113+ // Give the token to a waiting thread
114+ token. drop_without_releasing ( ) ;
115+ data. total += 1 ;
116+ data. used += 1 ;
117+ data. pending -= 1 ;
118+ proxy_. wake_pending . notify_one ( ) ;
119+ } else {
120+ // The token is no longer needed, drop it.
121+ drop ( data) ;
122+ drop ( token) ;
123+ }
133124 }
134- }
135- self . wake_helper . wait ( & mut data) ;
136- }
125+ } )
126+ . expect ( "failed to create helper thread" ) ;
127+ proxy. helper . set ( helper) . unwrap ( ) ;
128+ proxy
137129 }
138130
139- fn acquire_thread ( & self ) {
131+ pub fn acquire_thread ( & self ) {
140132 let mut data = self . data . lock ( ) ;
141133
142134 if data. total > data. used {
143- assert_eq ! ( data. needed, 0 ) ;
135+ // There was a free token around. This can
136+ // happen when all threads release their token.
137+ assert_eq ! ( data. total, 1 ) ;
138+ assert_eq ! ( data. pending, 0 ) ;
144139 data. used += 1 ;
145140 } else {
146- if data. needed == 0 {
147- self . wake_helper . notify_one ( ) ;
148- }
149- data. needed += 1 ;
150- self . wake_needer . wait ( & mut data) ;
141+ // Request a token from the helper thread. We can't directly use `acquire_raw`
142+ // as we also need to be able to wait for the final token in the process which
143+ // does not get a corresponding `release_raw` call.
144+ self . helper . get ( ) . unwrap ( ) . request_token ( ) ;
145+ data. pending += 1 ;
146+ self . wake_pending . wait ( & mut data) ;
151147 }
152148 }
153149
154- fn release_thread ( & self ) {
150+ pub fn release_thread ( & self ) {
155151 let mut data = self . data . lock ( ) ;
156152
157- if data. needed > 0 {
153+ if data. pending > 0 {
158154 // Give the token to a waiting thread
159- data. needed -= 1 ;
160- self . wake_needer . notify_one ( ) ;
155+ data. pending -= 1 ;
156+ self . wake_pending . notify_one ( ) ;
161157 } else {
162158 data. used -= 1 ;
163159
0 commit comments