66package graceful
77
88import (
9+ "container/list"
910 "crypto/tls"
1011 "net"
1112 "os"
1213 "strings"
1314 "sync"
14- "sync/atomic"
1515 "syscall"
1616 "time"
1717
@@ -30,14 +30,15 @@ type ServeFunction = func(net.Listener) error
3030
3131// Server represents our graceful server
3232type Server struct {
33- network string
34- address string
35- listener net.Listener
36- wg sync.WaitGroup
37- state state
38- lock *sync.RWMutex
39- connections map[*wrappedConn]struct{}
40- connectionsLock sync.RWMutex
33+ network string
34+ address string
35+ listener net.Listener
36+
37+ lock sync.RWMutex
38+ state state
39+ connList *list.List
40+ connEmptyCond *sync.Cond
41+
4142 BeforeBegin func(network, address string)
4243 OnShutdown func()
4344 PerWriteTimeout time.Duration
@@ -52,15 +53,14 @@ func NewServer(network, address, name string) *Server {
5253 log.Info("Starting new %s server: %s:%s on PID: %d", name, network, address, os.Getpid())
5354 }
5455 srv := &Server{
55- wg: sync.WaitGroup{},
5656 state: stateInit,
57- lock: &sync.RWMutex{},
58- connections: make(map[*wrappedConn]struct{}),
57+ connList: list.New(),
5958 network: network,
6059 address: address,
6160 PerWriteTimeout: setting.PerWriteTimeout,
6261 PerWritePerKbTimeout: setting.PerWritePerKbTimeout,
6362 }
63+ srv.connEmptyCond = sync.NewCond(&srv.lock)
6464
6565 srv.BeforeBegin = func(network, addr string) {
6666 log.Debug("Starting server on %s:%s (PID: %d)", network, addr, syscall.Getpid())
@@ -157,7 +157,7 @@ func (srv *Server) Serve(serve ServeFunction) error {
157157 GetManager().RegisterServer()
158158 err := serve(srv.listener)
159159 log.Debug("Waiting for connections to finish... (PID: %d)", syscall.Getpid())
160- srv.wg.Wait ()
160+ srv.waitForActiveConnections ()
161161 srv.setState(stateTerminate)
162162 GetManager().ServerDone()
163163 // use of closed means that the listeners are closed - i.e. we should be shutting down - return nil
@@ -181,19 +181,57 @@ func (srv *Server) setState(st state) {
181181 srv.state = st
182182}
183183
184+ func (srv *Server) waitForActiveConnections() {
185+ srv.lock.Lock()
186+ for srv.connList.Len() > 0 {
187+ srv.connEmptyCond.Wait()
188+ }
189+ srv.lock.Unlock()
190+ }
191+
192+ func (srv *Server) wrapConnection(c net.Conn) (net.Conn, error) {
193+ srv.lock.Lock()
194+ defer srv.lock.Unlock()
195+
196+ if srv.state != stateRunning {
197+ _ = c.Close()
198+ return nil, syscall.EINVAL // same as AcceptTCP
199+ }
200+
201+ wc := &wrappedConn{Conn: c, server: srv}
202+ wc.listElem = srv.connList.PushBack(wc)
203+ return wc, nil
204+ }
205+
206+ func (srv *Server) removeConnection(conn *wrappedConn) {
207+ srv.lock.Lock()
208+ defer srv.lock.Unlock()
209+
210+ if conn.listElem == nil {
211+ return
212+ }
213+ srv.connList.Remove(conn.listElem)
214+ if srv.connList.Len() == 0 {
215+ srv.connEmptyCond.Broadcast()
216+ }
217+ }
218+
184219// closeAllConnections forcefully closes all active connections
185220func (srv *Server) closeAllConnections() {
186- srv.connectionsLock.Lock()
187- connections := make([]*wrappedConn, 0, len(srv.connections))
188- for conn := range srv.connections {
189- connections = append(connections, conn)
221+ srv.lock.Lock()
222+ conns := make([]*wrappedConn, 0, srv.connList.Len())
223+ for e := srv.connList.Front(); e != nil; e = e.Next() {
224+ conn := e.Value.(*wrappedConn)
225+ conn.listElem = nil // mark as removed, will close it later to avoid deadlock of "server.lock"
226+ conns = append(conns, conn)
190227 }
191- srv.connectionsLock.Unlock()
228+ srv.connList = list.New()
229+ srv.lock.Unlock()
192230
193- // Close all connections outside the lock to avoid deadlock
194- for _, conn := range connections {
195- _ = conn.Conn.Close() // Force close the underlying connection
231+ for _, conn := range conns {
232+ _ = conn.Close() // do real close outside of lock
196233 }
234+ srv.connEmptyCond.Broadcast()
197235}
198236
199237type filer interface {
@@ -202,61 +240,39 @@ type filer interface {
202240
203241type wrappedListener struct {
204242 net.Listener
205- stopped bool
206- server *Server
243+ server *Server
207244}
208245
246+ var (
247+ _ net.Listener = (*wrappedListener)(nil)
248+ _ filer = (*wrappedListener)(nil)
249+ )
250+
209251func newWrappedListener(l net.Listener, srv *Server) *wrappedListener {
210252 return &wrappedListener{
211253 Listener: l,
212254 server: srv,
213255 }
214256}
215257
216- func (wl *wrappedListener) Accept() (net.Conn, error) {
217- var c net.Conn
218- // Set keepalive on TCPListeners connections.
258+ func (wl *wrappedListener) Accept() (c net.Conn, err error) {
219259 if tcl, ok := wl.Listener.(*net.TCPListener); ok {
260+ // Set keepalive on TCPListeners connections if possible, http.tcpKeepAliveListener
220261 tc, err := tcl.AcceptTCP()
221262 if err != nil {
222263 return nil, err
223264 }
224- _ = tc.SetKeepAlive(true) // see http.tcpKeepAliveListener
225- _ = tc.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
265+ _ = tc.SetKeepAlive(true)
266+ _ = tc.SetKeepAlivePeriod(3 * time.Minute)
226267 c = tc
227268 } else {
228- var err error
229269 c, err = wl.Listener.Accept()
230270 if err != nil {
231271 return nil, err
232272 }
233273 }
234274
235- closed := int32(0)
236-
237- wc := &wrappedConn{
238- Conn: c,
239- server: wl.server,
240- closed: &closed,
241- }
242-
243- wl.server.wg.Add(1)
244-
245- // Track the connection
246- wl.server.connectionsLock.Lock()
247- wl.server.connections[wc] = struct{}{}
248- wl.server.connectionsLock.Unlock()
249-
250- return wc, nil
251- }
252-
253- func (wl *wrappedListener) Close() error {
254- if wl.stopped {
255- return syscall.EINVAL
256- }
257-
258- wl.stopped = true
259- return wl.Listener.Close()
275+ return wl.server.wrapConnection(c)
260276}
261277
262278func (wl *wrappedListener) File() (*os.File, error) {
@@ -266,8 +282,12 @@ func (wl *wrappedListener) File() (*os.File, error) {
266282
267283type wrappedConn struct {
268284 net.Conn
285+
286+ // listElem is protected by the server's lock (used by the server to remove conn itself from the list)
287+ // nil means it has been removed
288+ listElem *list.Element
289+
269290 server *Server
270- closed *int32
271291 deadline time.Time
272292}
273293
@@ -286,25 +306,6 @@ func (w *wrappedConn) Write(p []byte) (n int, err error) {
286306}
287307
288308func (w *wrappedConn) Close() error {
289- if atomic.CompareAndSwapInt32(w.closed, 0, 1) {
290- defer func() {
291- if err := recover(); err != nil {
292- select {
293- case <-GetManager().IsHammer():
294- // Likely deadlocked request released at hammertime
295- log.Warn("Panic during connection close! %v. Likely there has been a deadlocked request which has been released by forced shutdown.", err)
296- default:
297- log.Error("Panic during connection close! %v", err)
298- }
299- }
300- }()
301-
302- // Remove from tracked connections
303- w.server.connectionsLock.Lock()
304- delete(w.server.connections, w)
305- w.server.connectionsLock.Unlock()
306-
307- w.server.wg.Done()
308- }
309+ w.server.removeConnection(w)
309310 return w.Conn.Close()
310311}
0 commit comments