From a222b7c428423085370122fe0ae6fb1ce11d631e Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Sun, 15 Jan 2017 21:49:04 -0800 Subject: [PATCH] Make use of pkg context - Use context inside of servers for shutdown - Remove unnecessary localShutdown related code --- httpd/server.go | 11 ++++++----- inbucket.go | 11 ++++++++--- pop3d/listener.go | 25 ++++++++----------------- smtpd/listener.go | 27 ++++++++------------------- 4 files changed, 30 insertions(+), 44 deletions(-) diff --git a/httpd/server.go b/httpd/server.go index afaa454..769d7d6 100644 --- a/httpd/server.go +++ b/httpd/server.go @@ -2,6 +2,7 @@ package httpd import ( + "context" "fmt" "net" "net/http" @@ -60,7 +61,7 @@ func Initialize(cfg config.WebConfig, ds smtpd.DataStore, shutdownChan chan bool } // Start begins listening for HTTP requests -func Start() { +func Start(ctx context.Context) { addr := fmt.Sprintf("%v:%v", webConfig.IP4address, webConfig.IP4port) server = &http.Server{ Addr: addr, @@ -80,11 +81,11 @@ func Start() { } // Listener go routine - go serve() + go serve(ctx) // Wait for shutdown select { - case _ = <-globalShutdown: + case _ = <-ctx.Done(): log.Tracef("HTTP server shutting down on request") } @@ -95,12 +96,12 @@ func Start() { } // serve begins serving HTTP requests -func serve() { +func serve(ctx context.Context) { // server.Serve blocks until we close the listener err := server.Serve(listener) select { - case _ = <-globalShutdown: + case _ = <-ctx.Done(): // Nop default: log.Errorf("HTTP server failed: %v", err) diff --git a/inbucket.go b/inbucket.go index 5c0cd86..09eeae0 100644 --- a/inbucket.go +++ b/inbucket.go @@ -2,6 +2,7 @@ package main import ( + "context" "expvar" "flag" "fmt" @@ -52,6 +53,9 @@ func main() { return } + // Root context + rootCtx, rootCancel := context.WithCancel(context.Background()) + // Load & Parse config if flag.NArg() != 1 { flag.Usage() @@ -98,16 +102,16 @@ func main() { httpd.Initialize(config.GetWebConfig(), ds, shutdownChan) webui.SetupRoutes(httpd.Router) rest.SetupRoutes(httpd.Router) - go httpd.Start() + go httpd.Start(rootCtx) // Start POP3 server // TODO pass datastore pop3Server = pop3d.New(shutdownChan) - go pop3Server.Start() + go pop3Server.Start(rootCtx) // Startup SMTP server smtpServer = smtpd.NewServer(config.GetSMTPConfig(), ds, shutdownChan) - go smtpServer.Start() + go smtpServer.Start(rootCtx) // Loop forever waiting for signals or shutdown channel signalLoop: @@ -128,6 +132,7 @@ signalLoop: close(shutdownChan) } case _ = <-shutdownChan: + rootCancel() break signalLoop } } diff --git a/pop3d/listener.go b/pop3d/listener.go index a245b65..8536881 100644 --- a/pop3d/listener.go +++ b/pop3d/listener.go @@ -1,6 +1,7 @@ package pop3d import ( + "context" "fmt" "net" "sync" @@ -18,7 +19,6 @@ type Server struct { dataStore smtpd.DataStore listener net.Listener globalShutdown chan bool - localShutdown chan bool waitgroup *sync.WaitGroup } @@ -35,20 +35,17 @@ func New(shutdownChan chan bool) *Server { dataStore: ds, maxIdleSeconds: cfg.MaxIdleSeconds, globalShutdown: shutdownChan, - localShutdown: make(chan bool), waitgroup: new(sync.WaitGroup), } } // Start the server and listen for connections -func (s *Server) Start() { +func (s *Server) Start(ctx context.Context) { cfg := config.GetPOP3Config() addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port)) if err != nil { log.Errorf("POP3 Failed to build tcp4 address: %v", err) - // serve() never called, so we do local shutdown here - close(s.localShutdown) s.emergencyShutdown() return } @@ -57,18 +54,16 @@ func (s *Server) Start() { s.listener, err = net.ListenTCP("tcp4", addr) if err != nil { log.Errorf("POP3 failed to start tcp4 listener: %v", err) - // serve() never called, so we do local shutdown here - close(s.localShutdown) s.emergencyShutdown() return } // Listener go routine - go s.serve() + go s.serve(ctx) // Wait for shutdown select { - case _ = <-s.globalShutdown: + case _ = <-ctx.Done(): } log.Tracef("POP3 shutdown requested, connections will be drained") @@ -79,7 +74,7 @@ func (s *Server) Start() { } // serve is the listen/accept loop -func (s *Server) serve() { +func (s *Server) serve(ctx context.Context) { // Handle incoming connections var tempDelay time.Duration for sid := 1; ; sid++ { @@ -100,11 +95,11 @@ func (s *Server) serve() { } else { // Permanent error select { - case _ = <-s.globalShutdown: - close(s.localShutdown) + case <-ctx.Done(): + // POP3 is shutting down return default: - close(s.localShutdown) + // Something went wrong s.emergencyShutdown() return } @@ -128,10 +123,6 @@ func (s *Server) emergencyShutdown() { // Drain causes the caller to block until all active POP3 sessions have finished func (s *Server) Drain() { - // Wait for listener to exit - select { - case _ = <-s.localShutdown: - } // Wait for sessions to close s.waitgroup.Wait() log.Tracef("POP3 connections have drained") diff --git a/smtpd/listener.go b/smtpd/listener.go index e7faffd..ba4fca0 100644 --- a/smtpd/listener.go +++ b/smtpd/listener.go @@ -2,6 +2,7 @@ package smtpd import ( "container/list" + "context" "expvar" "fmt" "net" @@ -27,9 +28,6 @@ type Server struct { // globalShutdown is the signal Inbucket needs to shut down globalShutdown chan bool - // localShutdown indicates this component has completed shutting down - localShutdown chan bool - // waitgroup tracks individual sessions waitgroup *sync.WaitGroup } @@ -67,19 +65,16 @@ func NewServer(cfg config.SMTPConfig, ds DataStore, globalShutdown chan bool) *S domainNoStore: strings.ToLower(cfg.DomainNoStore), waitgroup: new(sync.WaitGroup), globalShutdown: globalShutdown, - localShutdown: make(chan bool), } } // Start the listener and handle incoming connections -func (s *Server) Start() { +func (s *Server) Start(ctx context.Context) { cfg := config.GetSMTPConfig() addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port)) if err != nil { log.Errorf("Failed to build tcp4 address: %v", err) - // serve() never called, so we do local shutdown here - close(s.localShutdown) s.emergencyShutdown() return } @@ -88,8 +83,6 @@ func (s *Server) Start() { s.listener, err = net.ListenTCP("tcp4", addr) if err != nil { log.Errorf("SMTP failed to start tcp4 listener: %v", err) - // serve() never called, so we do local shutdown here - close(s.localShutdown) s.emergencyShutdown() return } @@ -104,11 +97,11 @@ func (s *Server) Start() { StartRetentionScanner(s.dataStore, s.globalShutdown) // Listener go routine - go s.serve() + go s.serve(ctx) // Wait for shutdown select { - case _ = <-s.globalShutdown: + case <-ctx.Done(): log.Tracef("SMTP shutdown requested, connections will be drained") } @@ -119,7 +112,7 @@ func (s *Server) Start() { } // serve is the listen/accept loop -func (s *Server) serve() { +func (s *Server) serve(ctx context.Context) { // Handle incoming connections var tempDelay time.Duration for sessionID := 1; ; sessionID++ { @@ -141,11 +134,11 @@ func (s *Server) serve() { } else { // Permanent error select { - case _ = <-s.globalShutdown: - close(s.localShutdown) + case <-ctx.Done(): + // SMTP is shutting down return default: - close(s.localShutdown) + // Something went wrong s.emergencyShutdown() return } @@ -170,10 +163,6 @@ func (s *Server) emergencyShutdown() { // Drain causes the caller to block until all active SMTP sessions have finished func (s *Server) Drain() { - // Wait for listener to exit - select { - case _ = <-s.localShutdown: - } // Wait for sessions to close s.waitgroup.Wait() log.Tracef("SMTP connections have drained")