1717package util
1818
1919import (
20- "bytes"
2120 "crypto/tls"
22- "io"
2321 "net"
2422 "net/http"
2523 "strings"
2624 "sync"
25+ "time"
2726
27+ "github.com/soheilhy/cmux"
2828 "golang.org/x/net/http2"
2929
3030 "github.com/cockroachdb/cockroach/util/log"
3131 "github.com/cockroachdb/cockroach/util/stop"
3232)
3333
34- const eol = "\r \n "
35- const hostHeader = "Host: CRDB" + eol
36-
37- var headerInsertionIndex = int64 (strings .Index (http2 .ClientPreface , eol ) + len (eol ))
38-
39- type replayableConn struct {
40- net.Conn
41- buf bytes.Buffer
42- reader io.Reader
43- }
44-
45- func newReplayableConn (conn net.Conn ) * replayableConn {
46- rc := replayableConn {Conn : conn }
47- rc .reader = io .LimitReader (io .TeeReader (conn , & rc .buf ), headerInsertionIndex )
48- return & rc
49- }
50-
51- func (rc * replayableConn ) replay () * replayableConn {
52- rc .reader = io .MultiReader (& rc .buf , rc .Conn )
53- return rc
54- }
34+ // ListenAndServe creates a listener and serves handler on it, closing
35+ // the listener when signalled by the stopper.
36+ func ListenAndServe (stopper * stop.Stopper , handler http.Handler , addr net.Addr , tlsConfig * tls.Config ) (net.Listener , error ) {
37+ ln , err := net .Listen (addr .Network (), addr .String ())
38+ if err == nil {
39+ stopper .RunWorker (func () {
40+ <- stopper .ShouldDrain ()
41+ // Some unit tests manually close `ln`, so it may already be closed
42+ // when we get here.
43+ if err := ln .Close (); err != nil && ! IsClosedConnection (err ) {
44+ log .Fatal (err )
45+ }
46+ })
5547
56- func (rc * replayableConn ) Read (p []byte ) (int , error ) {
57- if limitReader , ok := rc .reader .(* io.LimitedReader ); ok {
58- // rc.reader is a LimitedReader wrapping a TeeReader.
59- off := headerInsertionIndex - limitReader .N
60- n , err := rc .reader .Read (p )
61- if ! strings .HasPrefix (http2 .ClientPreface [off :], string (p [:n ])) {
62- // The incoming request is not an HTTP2 request, so buffering is no
63- // longer required; send all reads directly to the underlying net.Conn.
64- rc .reader = rc .Conn
65- rc .buf = bytes.Buffer {} // Release the memory.
66- } else if err == io .EOF {
67- // We've exhausted our LimitedReader, which means he incoming request is
68- // an HTTP2 request. Remember, that we are in this code path means that
69- // TLS is not in use, which means the caller (net/http machinery) won't
70- // be able to parse anything after http2.ClientPreface. However, Go 1.6
71- // introduced strict Host header checking for HTTP >= 1.1 requests (see
72- // https://github.com/golang/go/commit/6e11f45), and http2.ClientPreface
73- // contains enough information for the caller to identify this first
74- // request as an HTTP 2 request, but not enough for the caller to
75- // determine the value of the Host header. On the next line, we're going
76- // to help the caller out by providing a bogus HTTP 1.x-style Host
77- // header. This will get us past Host header verification.
78- //
79- // Note that this bogus header won't reappear after replay is called.
80- rc .reader = io .MultiReader (strings .NewReader (hostHeader ), limitReader .R )
48+ if tlsConfig != nil {
49+ ServeHandler (stopper , handler , tls .NewListener (ln , tlsConfig ), tlsConfig )
8150 } else {
82- // The LimitedReader isn't exhausted yet, or we hit an error.
83- return n , err
84- }
51+ m := cmux .New (ln )
52+ h2L := m .Match (cmux .HTTP2 ())
53+ anyL := m .Match (cmux .Any ())
54+
55+ var h2 http2.Server
8556
86- // Cribbed from io.Multireader.
87- if n > 0 || err != io .EOF {
88- if err == io .EOF {
89- // Don't return EOF yet. We've replaced our LimitedReader with rc.Conn or
90- // a new MultiReader and there may be more bytes in there.
91- err = nil
57+ serveConnOpts := & http2.ServeConnOpts {
58+ Handler : handler ,
9259 }
93- return n , err
94- }
95- }
9660
97- // Pseudocode:
98- // if rc.IsHTTP2() {
99- // if rc.replayCalled {
100- // rc.reader == io.MultiReader(&rc.buf, rc.Conn)
101- // } else {
102- // rc.reader == io.TeeReader(conn, &rc.buf)
103- // }
104- // } else {
105- // rc.reader == rc.Conn
106- // }
107- return rc .reader .Read (p )
108- }
61+ serveH2 := func (conn net.Conn ) {
62+ h2 .ServeConn (conn , serveConnOpts )
63+ }
10964
110- type replayableConnListener struct {
111- net.Listener
112- }
65+ serveConn := ServeHandler (stopper , handler , anyL , tlsConfig )
11366
114- func (ml * replayableConnListener ) Accept () (net.Conn , error ) {
115- conn , err := ml .Listener .Accept ()
116- if err == nil {
117- conn = newReplayableConn (conn )
118- }
119- return conn , err
120- }
67+ stopper .RunWorker (func () {
68+ if err := serveConn (h2L , serveH2 ); err != nil && ! IsClosedConnection (err ) {
69+ log .Fatal (err )
70+ }
71+ })
12172
122- // Listen delegates to `net.Listen` and, if tlsConfig is not nil, to `tls.NewListener`.
123- func Listen (addr net.Addr , tlsConfig * tls.Config ) (net.Listener , error ) {
124- ln , err := net .Listen (addr .Network (), addr .String ())
125- if err == nil {
126- if tlsConfig != nil {
127- ln = tls .NewListener (ln , tlsConfig )
128- } else {
129- ln = & replayableConnListener {ln }
73+ stopper .RunWorker (func () {
74+ if err := m .Serve (); err != nil && ! IsClosedConnection (err ) {
75+ log .Fatal (err )
76+ }
77+ })
13078 }
13179 }
132-
13380 return ln , err
13481}
13582
136- // ListenAndServe creates a listener and serves handler on it, closing
137- // the listener when signalled by the stopper.
138- func ListenAndServe (stopper * stop.Stopper , handler http.Handler , addr net.Addr , tlsConfig * tls.Config ) (net.Listener , error ) {
139- ln , err := Listen (addr , tlsConfig )
140- if err != nil {
141- return nil , err
142- }
143-
83+ // ServeHandler serves the handler on the listener.
84+ func ServeHandler (stopper * stop.Stopper , handler http.Handler , ln net.Listener , tlsConfig * tls.Config ) func (net.Listener , func (net.Conn )) error {
14485 var mu sync.Mutex
14586 activeConns := make (map [net.Conn ]struct {})
14687
88+ logger := log .NewStdLogger (log .ErrorLog )
14789 httpServer := http.Server {
148- TLSConfig : tlsConfig ,
14990 Handler : handler ,
91+ TLSConfig : tlsConfig ,
15092 ConnState : func (conn net.Conn , state http.ConnState ) {
15193 mu .Lock ()
15294 switch state {
@@ -157,61 +99,61 @@ func ListenAndServe(stopper *stop.Stopper, handler http.Handler, addr net.Addr,
15799 }
158100 mu .Unlock ()
159101 },
160- ErrorLog : log .NewStdLogger (log .ErrorLog ),
161- }
162-
163- var http2Server http2.Server
164-
165- if tlsConfig == nil {
166- connOpts := http2.ServeConnOpts {
167- BaseConfig : & httpServer ,
168- Handler : handler ,
169- }
170-
171- httpServer .Handler = http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
172- if r .ProtoMajor == 2 {
173- if conn , _ , err := w .(http.Hijacker ).Hijack (); err == nil {
174- http2Server .ServeConn (conn .(* replayableConn ).replay (), & connOpts )
175- } else {
176- log .Fatal (err )
177- }
178- } else {
179- handler .ServeHTTP (w , r )
180- }
181- })
102+ ErrorLog : logger ,
182103 }
183104
184- if err := http2 .ConfigureServer (& httpServer , & http2Server ); err != nil {
185- return nil , err
105+ // net/http.(*Server).Serve/http2.ConfigureServer are not thread safe with
106+ // respect to net/http.(*Server).TLSConfig, so we call it synchronously here.
107+ if err := http2 .ConfigureServer (& httpServer , nil ); err != nil {
108+ log .Fatal (err )
186109 }
187110
188- stopper .RunWorker (func () {
189- <- stopper .ShouldDrain ()
190- // Some unit tests manually close `ln`, so it may already be closed
191- // when we get here.
192- if err := ln .Close (); err != nil && ! IsClosedConnection (err ) {
193- log .Fatal (err )
194- }
195- })
196-
197111 stopper .RunWorker (func () {
198112 if err := httpServer .Serve (ln ); err != nil && ! IsClosedConnection (err ) {
199113 log .Fatal (err )
200114 }
201115
202116 <- stopper .ShouldStop ()
203-
204117 mu .Lock ()
205118 for conn := range activeConns {
206119 conn .Close ()
207120 }
208121 mu .Unlock ()
209122 })
210123
211- return ln , nil
124+ logFn := logger .Printf
125+ return func (l net.Listener , serveConn func (net.Conn )) error {
126+ // Inspired by net/http.(*Server).Serve
127+ var tempDelay time.Duration // how long to sleep on accept failure
128+ for {
129+ rw , e := l .Accept ()
130+ if e != nil {
131+ if ne , ok := e .(net.Error ); ok && ne .Temporary () {
132+ if tempDelay == 0 {
133+ tempDelay = 5 * time .Millisecond
134+ } else {
135+ tempDelay *= 2
136+ }
137+ if max := 1 * time .Second ; tempDelay > max {
138+ tempDelay = max
139+ }
140+ logFn ("http: Accept error: %v; retrying in %v" , e , tempDelay )
141+ time .Sleep (tempDelay )
142+ continue
143+ }
144+ return e
145+ }
146+ tempDelay = 0
147+ go func () {
148+ httpServer .ConnState (rw , http .StateNew ) // before Serve can return
149+ serveConn (rw )
150+ httpServer .ConnState (rw , http .StateClosed )
151+ }()
152+ }
153+ }
212154}
213155
214156// IsClosedConnection returns true if err is the net package's errClosed.
215157func IsClosedConnection (err error ) bool {
216- return strings .Contains (err .Error (), "use of closed network connection" )
158+ return err == cmux . ErrListenerClosed || strings .Contains (err .Error (), "use of closed network connection" )
217159}
0 commit comments