@@ -29,12 +29,15 @@ import (
2929 "github.com/coreos/etcd/etcdserver"
3030 "github.com/coreos/etcd/etcdserver/api/etcdhttp"
3131 "github.com/coreos/etcd/etcdserver/api/v2http"
32+ "github.com/coreos/etcd/etcdserver/api/v3rpc"
3233 "github.com/coreos/etcd/pkg/cors"
3334 "github.com/coreos/etcd/pkg/debugutil"
3435 runtimeutil "github.com/coreos/etcd/pkg/runtime"
3536 "github.com/coreos/etcd/pkg/transport"
3637 "github.com/coreos/etcd/pkg/types"
3738 "github.com/coreos/etcd/rafthttp"
39+
40+ "github.com/cockroachdb/cmux"
3841 "github.com/coreos/pkg/capnslog"
3942 "google.golang.org/grpc"
4043 "google.golang.org/grpc/keepalive"
@@ -60,12 +63,14 @@ const (
6063type Etcd struct {
6164 Peers []* peerListener
6265 Clients []net.Listener
63- Server * etcdserver.EtcdServer
66+ // a map of contexts for the servers that serves client requests.
67+ sctxs map [string ]* serveCtx
68+
69+ Server * etcdserver.EtcdServer
6470
6571 cfg Config
6672 stopc chan struct {}
6773 errc chan error
68- sctxs map [string ]* serveCtx
6974
7075 closeOnce sync.Once
7176}
@@ -91,20 +96,20 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
9196 return
9297 }
9398 if ! serving {
94- // errored before starting gRPC server for serveCtx.grpcServerC
99+ // errored before starting gRPC server for serveCtx.serversC
95100 for _ , sctx := range e .sctxs {
96- close (sctx .grpcServerC )
101+ close (sctx .serversC )
97102 }
98103 }
99104 e .Close ()
100105 e = nil
101106 }()
102107
103108 if e .Peers , err = startPeerListeners (cfg ); err != nil {
104- return
109+ return e , err
105110 }
106111 if e .sctxs , err = startClientListeners (cfg ); err != nil {
107- return
112+ return e , err
108113 }
109114 for _ , sctx := range e .sctxs {
110115 e .Clients = append (e .Clients , sctx .l )
@@ -150,76 +155,53 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
150155 }
151156
152157 if e .Server , err = etcdserver .NewServer (srvcfg ); err != nil {
153- return
154- }
155-
156- // configure peer handlers after rafthttp.Transport started
157- ph := etcdhttp .NewPeerHandler (e .Server )
158- for _ , p := range e .Peers {
159- srv := & http.Server {
160- Handler : ph ,
161- ReadTimeout : 5 * time .Minute ,
162- ErrorLog : defaultLog .New (ioutil .Discard , "" , 0 ), // do not log user error
163- }
164-
165- l := p .Listener
166- p .serve = func () error { return srv .Serve (l ) }
167- p .close = func (ctx context.Context ) error {
168- // gracefully shutdown http.Server
169- // close open listeners, idle connections
170- // until context cancel or time-out
171- return srv .Shutdown (ctx )
172- }
158+ return e , err
173159 }
174160
175161 // buffer channel so goroutines on closed connections won't wait forever
176162 e .errc = make (chan error , len (e .Peers )+ len (e .Clients )+ 2 * len (e .sctxs ))
177163
178164 e .Server .Start ()
179- if err = e .serve (); err != nil {
180- return
165+
166+ if err = e .servePeers (); err != nil {
167+ return e , err
181168 }
169+ if err = e .serveClients (); err != nil {
170+ return e , err
171+ }
172+
182173 serving = true
183- return
174+ return e , nil
184175}
185176
186177// Config returns the current configuration.
187178func (e * Etcd ) Config () Config {
188179 return e .cfg
189180}
190181
182+ // Close gracefully shuts down all servers/listeners.
183+ // Client requests will be terminated with request timeout.
184+ // After timeout, enforce remaning requests be closed immediately.
191185func (e * Etcd ) Close () {
192186 e .closeOnce .Do (func () { close (e .stopc ) })
193187
188+ // close client requests with request timeout
194189 timeout := 2 * time .Second
195190 if e .Server != nil {
196191 timeout = e .Server .Cfg .ReqTimeout ()
197192 }
198193 for _ , sctx := range e .sctxs {
199- for gs := range sctx .grpcServerC {
200- ch := make (chan struct {})
201- go func () {
202- defer close (ch )
203- // close listeners to stop accepting new connections,
204- // will block on any existing transports
205- gs .GracefulStop ()
206- }()
207- // wait until all pending RPCs are finished
208- select {
209- case <- ch :
210- case <- time .After (timeout ):
211- // took too long, manually close open transports
212- // e.g. watch streams
213- gs .Stop ()
214- // concurrent GracefulStop should be interrupted
215- <- ch
216- }
194+ for ss := range sctx .serversC {
195+ ctx , cancel := context .WithTimeout (context .Background (), timeout )
196+ stopServers (ctx , ss )
197+ cancel ()
217198 }
218199 }
219200
220201 for _ , sctx := range e .sctxs {
221202 sctx .cancel ()
222203 }
204+
223205 for i := range e .Clients {
224206 if e .Clients [i ] != nil {
225207 e .Clients [i ].Close ()
@@ -241,6 +223,43 @@ func (e *Etcd) Close() {
241223 }
242224}
243225
226+ func stopServers (ctx context.Context , ss * servers ) {
227+ shutdownNow := func () {
228+ // first, close the http.Server
229+ ss .http .Shutdown (ctx )
230+ // then close grpc.Server; cancels all active RPCs
231+ ss .grpc .Stop ()
232+ }
233+
234+ // do not grpc.Server.GracefulStop with TLS enabled etcd server
235+ // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
236+ // and https://github.com/coreos/etcd/issues/8916
237+ if ss .secure {
238+ shutdownNow ()
239+ return
240+ }
241+
242+ ch := make (chan struct {})
243+ go func () {
244+ defer close (ch )
245+ // close listeners to stop accepting new connections,
246+ // will block on any existing transports
247+ ss .grpc .GracefulStop ()
248+ }()
249+
250+ // wait until all pending RPCs are finished
251+ select {
252+ case <- ch :
253+ case <- ctx .Done ():
254+ // took too long, manually close open transports
255+ // e.g. watch streams
256+ shutdownNow ()
257+
258+ // concurrent GracefulStop should be interrupted
259+ <- ch
260+ }
261+ }
262+
244263func (e * Etcd ) Err () <- chan error { return e .errc }
245264
246265func startPeerListeners (cfg * Config ) (peers []* peerListener , err error ) {
@@ -269,7 +288,9 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
269288 for i := range peers {
270289 if peers [i ] != nil && peers [i ].close != nil {
271290 plog .Info ("stopping listening for peers on " , cfg .LPUrls [i ].String ())
272- peers [i ].close (context .Background ())
291+ ctx , cancel := context .WithTimeout (context .Background (), time .Second )
292+ peers [i ].close (ctx )
293+ cancel ()
273294 }
274295 }
275296 }()
@@ -297,6 +318,45 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
297318 return peers , nil
298319}
299320
321+ // configure peer handlers after rafthttp.Transport started
322+ func (e * Etcd ) servePeers () (err error ) {
323+ ph := etcdhttp .NewPeerHandler (e .Server )
324+ var peerTLScfg * tls.Config
325+ if ! e .cfg .PeerTLSInfo .Empty () {
326+ if peerTLScfg , err = e .cfg .PeerTLSInfo .ServerConfig (); err != nil {
327+ return err
328+ }
329+ }
330+
331+ for _ , p := range e .Peers {
332+ gs := v3rpc .Server (e .Server , peerTLScfg )
333+ m := cmux .New (p .Listener )
334+ go gs .Serve (m .Match (cmux .HTTP2 ()))
335+ srv := & http.Server {
336+ Handler : grpcHandlerFunc (gs , ph ),
337+ ReadTimeout : 5 * time .Minute ,
338+ ErrorLog : defaultLog .New (ioutil .Discard , "" , 0 ), // do not log user error
339+ }
340+ go srv .Serve (m .Match (cmux .Any ()))
341+ p .serve = func () error { return m .Serve () }
342+ p .close = func (ctx context.Context ) error {
343+ // gracefully shutdown http.Server
344+ // close open listeners, idle connections
345+ // until context cancel or time-out
346+ stopServers (ctx , & servers {secure : peerTLScfg != nil , grpc : gs , http : srv })
347+ return nil
348+ }
349+ }
350+
351+ // start peer servers in a goroutine
352+ for _ , pl := range e .Peers {
353+ go func (l * peerListener ) {
354+ e .errHandler (l .serve ())
355+ }(pl )
356+ }
357+ return nil
358+ }
359+
300360func startClientListeners (cfg * Config ) (sctxs map [string ]* serveCtx , err error ) {
301361 if cfg .ClientAutoTLS && cfg .ClientTLSInfo .Empty () {
302362 chosts := make ([]string , len (cfg .LCUrls ))
@@ -388,7 +448,7 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
388448 return sctxs , nil
389449}
390450
391- func (e * Etcd ) serve () (err error ) {
451+ func (e * Etcd ) serveClients () (err error ) {
392452 var ctlscfg * tls.Config
393453 if ! e .cfg .ClientTLSInfo .Empty () {
394454 plog .Infof ("ClientTLS: %s" , e .cfg .ClientTLSInfo )
@@ -401,13 +461,6 @@ func (e *Etcd) serve() (err error) {
401461 plog .Infof ("cors = %s" , e .cfg .CorsInfo )
402462 }
403463
404- // Start the peer server in a goroutine
405- for _ , pl := range e .Peers {
406- go func (l * peerListener ) {
407- e .errHandler (l .serve ())
408- }(pl )
409- }
410-
411464 // Start a client server goroutine for each listen address
412465 var h http.Handler
413466 if e .Config ().EnableV2 {
@@ -433,6 +486,8 @@ func (e *Etcd) serve() (err error) {
433486 Timeout : e .cfg .GRPCKeepAliveTimeout ,
434487 }))
435488 }
489+
490+ // start client servers in a goroutine
436491 for _ , sctx := range e .sctxs {
437492 go func (s * serveCtx ) {
438493 e .errHandler (s .serve (e .Server , ctlscfg , h , e .errHandler , gopts ... ))
0 commit comments