From 4b4121bb3ac772c45d32e7520b258cf27d66fefd Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Sun, 28 Feb 2016 23:36:47 -0800 Subject: [PATCH] Use channels to communicate shutdown request - httpd now uses shutdown channel - smtpd now uses shutdown channel - pop3d now uses shutdown channel - timedExit now removes PID file - tidy up some struct instantiations, var blocks --- httpd/context.go | 7 ++- httpd/server.go | 71 +++++++++++++------------ inbucket.go | 79 ++++++++++++++-------------- pop3d/listener.go | 61 +++++++++++++++------- rest/testutils_test.go | 3 +- smtpd/handler_test.go | 3 +- smtpd/listener.go | 114 ++++++++++++++++++++++++++--------------- webui/rest_test.go | 3 +- 8 files changed, 203 insertions(+), 138 deletions(-) diff --git a/httpd/context.go b/httpd/context.go index 4087e57..9e5d3e8 100644 --- a/httpd/context.go +++ b/httpd/context.go @@ -47,11 +47,10 @@ func NewContext(req *http.Request) (*Context, error) { if sess == nil { // No session, must fail return nil, err - } else { - // The session cookie was probably signed by an old key, ignore it - // gorilla created an empty session for us - err = nil } + // The session cookie was probably signed by an old key, ignore it + // gorilla created an empty session for us + err = nil } ctx := &Context{ Vars: vars, diff --git a/httpd/server.go b/httpd/server.go index a14b336..b262b7f 100644 --- a/httpd/server.go +++ b/httpd/server.go @@ -27,20 +27,29 @@ var ( // incoming requests to the correct handler function Router = mux.NewRouter() - webConfig config.WebConfig - listener net.Listener - sessionStore sessions.Store - shutdown bool + webConfig config.WebConfig + server *http.Server + listener net.Listener + sessionStore sessions.Store + globalShutdown chan bool ) // Initialize sets up things for unit tests or the Start() method -func Initialize(cfg config.WebConfig, ds smtpd.DataStore) { +func Initialize(cfg config.WebConfig, ds smtpd.DataStore, shutdownChan chan bool) { webConfig = cfg - setupRoutes(cfg) + globalShutdown = shutdownChan // NewContext() will use this DataStore for the web handlers DataStore = ds + // Content Paths + log.Infof("HTTP templates mapped to %q", cfg.TemplateDir) + log.Infof("HTTP static content mapped to %q", cfg.PublicDir) + Router.PathPrefix("/public/").Handler(http.StripPrefix("/public/", + http.FileServer(http.Dir(cfg.PublicDir)))) + http.Handle("/", Router) + + // Session cookie setup if cfg.CookieAuthKey == "" { log.Infof("HTTP generating random cookie.auth.key") sessionStore = sessions.NewCookieStore(securecookie.GenerateRandomKey(64)) @@ -50,22 +59,10 @@ func Initialize(cfg config.WebConfig, ds smtpd.DataStore) { } } -func setupRoutes(cfg config.WebConfig) { - log.Infof("HTTP templates mapped to %q", cfg.TemplateDir) - log.Infof("HTTP static content mapped to %q", cfg.PublicDir) - - // Static content - Router.PathPrefix("/public/").Handler(http.StripPrefix("/public/", - http.FileServer(http.Dir(cfg.PublicDir)))) - - // Register w/ HTTP - http.Handle("/", Router) -} - // Start begins listening for HTTP requests func Start() { addr := fmt.Sprintf("%v:%v", webConfig.IP4address, webConfig.IP4port) - server := &http.Server{ + server = &http.Server{ Addr: addr, Handler: nil, ReadTimeout: 60 * time.Second, @@ -82,24 +79,32 @@ func Start() { panic(err) } - err = server.Serve(listener) - if shutdown { + // Listener go routine + go serve() + + // Wait for shutdown + select { + case _ = <-globalShutdown: log.Tracef("HTTP server shutting down on request") - } else if err != nil { - log.Errorf("HTTP server failed: %v", err) + } + + // Closing the listener will cause the serve() go routine to exit + if err := listener.Close(); err != nil { + log.Errorf("Failed to close HTTP listener: %v", err) } } -// Stop shuts down the HTTP server -func Stop() { - log.Tracef("HTTP shutdown requested") - shutdown = true - if listener != nil { - if err := listener.Close(); err != nil { - log.Errorf("Error closing HTTP listener: %v", err) - } - } else { - log.Errorf("HTTP listener was nil during shutdown") +// serve begins serving HTTP requests +func serve() { + // server.Serve blocks until we close the listener + err := server.Serve(listener) + + select { + case _ = <-globalShutdown: + // Nop + default: + log.Errorf("HTTP server failed: %v", err) + // TODO shutdown? } } diff --git a/inbucket.go b/inbucket.go index a9c86a1..6cfa5e2 100644 --- a/inbucket.go +++ b/inbucket.go @@ -34,6 +34,9 @@ var ( // startTime is used to calculate uptime of Inbucket startTime = time.Now() + // shutdownChan - close it to tell Inbucket to shut down cleanly + shutdownChan = make(chan bool) + // Server instances smtpServer *smtpd.Server pop3Server *pop3d.Server @@ -63,7 +66,6 @@ func main() { // Setup signal handler sigChan := make(chan os.Signal) signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) - go signalProcessor(sigChan) // Initialize logging level, _ := config.Config.String("logging", "level") @@ -93,24 +95,53 @@ func main() { ds := smtpd.DefaultFileDataStore() // Start HTTP server - httpd.Initialize(config.GetWebConfig(), ds) + httpd.Initialize(config.GetWebConfig(), ds, shutdownChan) webui.SetupRoutes(httpd.Router) rest.SetupRoutes(httpd.Router) go httpd.Start() // Start POP3 server - pop3Server = pop3d.New() + // TODO pass datastore + pop3Server = pop3d.New(shutdownChan) go pop3Server.Start() - // Startup SMTP server, block until it exits - smtpServer = smtpd.NewServer(config.GetSMTPConfig(), ds) - smtpServer.Start() + // Startup SMTP server + smtpServer = smtpd.NewServer(config.GetSMTPConfig(), ds, shutdownChan) + go smtpServer.Start() + + // Loop forever waiting for signals or shutdown channel +signalLoop: + for { + select { + case sig := <-sigChan: + switch sig { + case syscall.SIGHUP: + log.Infof("Recieved SIGHUP, cycling logfile") + log.Rotate() + case syscall.SIGINT: + // Shutdown requested + log.Infof("Received SIGINT, shutting down") + close(shutdownChan) + case syscall.SIGTERM: + // Shutdown requested + log.Infof("Received SIGTERM, shutting down") + close(shutdownChan) + } + case _ = <-shutdownChan: + break signalLoop + } + } // Wait for active connections to finish + go timedExit() smtpServer.Drain() pop3Server.Drain() - // Remove pidfile + removePIDFile() +} + +// removePIDFile removes the PID file if created +func removePIDFile() { if *pidfile != "none" { if err := os.Remove(*pidfile); err != nil { log.Errorf("Failed to remove %q: %v", *pidfile, err) @@ -118,42 +149,12 @@ func main() { } } -// signalProcessor is a goroutine that handles OS signals -func signalProcessor(c <-chan os.Signal) { - for { - sig := <-c - switch sig { - case syscall.SIGHUP: - log.Infof("Recieved SIGHUP, cycling logfile") - log.Rotate() - case syscall.SIGINT: - // Initiate shutdown - log.Infof("Received SIGINT, shutting down") - shutdown() - case syscall.SIGTERM: - // Initiate shutdown - log.Infof("Received SIGTERM, shutting down") - shutdown() - } - } -} - -// shutdown is called by signalProcessor() when we are asked to shut down -func shutdown() { - go timedExit() - httpd.Stop() - if smtpServer != nil { - smtpServer.Stop() - } else { - log.Errorf("smtpServer was nil during shutdown") - } -} - // timedExit is called as a goroutine during shutdown, it will force an exit // after 15 seconds func timedExit() { time.Sleep(15 * time.Second) - log.Errorf("Inbucket clean shutdown timed out, forcing exit") + log.Errorf("Clean shutdown took too long, forcing exit") + removePIDFile() os.Exit(0) } diff --git a/pop3d/listener.go b/pop3d/listener.go index 3f0d698..3419e87 100644 --- a/pop3d/listener.go +++ b/pop3d/listener.go @@ -17,20 +17,27 @@ type Server struct { maxIdleSeconds int dataStore smtpd.DataStore listener net.Listener - shutdown bool + globalShutdown chan bool + localShutdown chan bool waitgroup *sync.WaitGroup } // New creates a new Server struct -func New() *Server { +func New(shutdownChan chan bool) *Server { // Get a new instance of the the FileDataStore - the locking and counting // mechanisms are both global variables in the smtpd package. If that // changes in the future, this should be modified to use the same DataStore // instance. ds := smtpd.DefaultFileDataStore() cfg := config.GetPOP3Config() - return &Server{domain: cfg.Domain, dataStore: ds, maxIdleSeconds: cfg.MaxIdleSeconds, - waitgroup: new(sync.WaitGroup)} + return &Server{ + domain: cfg.Domain, + dataStore: ds, + maxIdleSeconds: cfg.MaxIdleSeconds, + globalShutdown: shutdownChan, + localShutdown: make(chan bool), + waitgroup: new(sync.WaitGroup), + } } // Start the server and listen for connections @@ -52,6 +59,23 @@ func (s *Server) Start() { panic(err) } + // Listener go routine + go s.serve() + + // Wait for shutdown + select { + case _ = <-s.globalShutdown: + } + + log.Tracef("POP3 shutdown requested, connections will be drained") + // Closing the listener will cause the serve() go routine to exit + if err := s.listener.Close(); err != nil { + log.Errorf("Error closing POP3 listener: %v", err) + } +} + +// serve is the listen/accept loop +func (s *Server) serve() { // Handle incoming connections var tempDelay time.Duration for sid := 1; ; sid++ { @@ -70,13 +94,16 @@ func (s *Server) Start() { time.Sleep(tempDelay) continue } else { - if s.shutdown { - log.Tracef("POP3 listener shutting down on request") + // Permanent error + select { + case _ = <-s.globalShutdown: + close(s.localShutdown) return + default: + // TODO Implement a max error counter before shutdown? + // or maybe attempt to restart smtpd + panic(err) } - // TODO Implement a max error counter before shutdown? - // or maybe attempt to restart POP3 - panic(err) } } else { tempDelay = 0 @@ -86,17 +113,13 @@ func (s *Server) Start() { } } -// Stop requests the POP3 server closes it's listener -func (s *Server) Stop() { - log.Tracef("POP3 shutdown requested, connections will be drained") - s.shutdown = true - if err := s.listener.Close(); err != nil { - log.Errorf("Error closing POP3 listener: %v", err) - } -} - // Drain causes the caller to block until all active POP3 sessions have finished func (s *Server) Drain() { + // Wait for listener to exit + select { + case _ = <-s.localShutdown: + } + // Wait for sessions to close s.waitgroup.Wait() - log.Tracef("POP3 connections drained") + log.Tracef("POP3 connections have drained") } diff --git a/rest/testutils_test.go b/rest/testutils_test.go index 5837e96..d63e1ce 100644 --- a/rest/testutils_test.go +++ b/rest/testutils_test.go @@ -191,7 +191,8 @@ func setupWebServer(ds smtpd.DataStore) *bytes.Buffer { TemplateDir: "../themes/integral/templates", PublicDir: "../themes/integral/public", } - httpd.Initialize(cfg, ds) + shutdownChan := make(chan bool) + httpd.Initialize(cfg, ds, shutdownChan) SetupRoutes(httpd.Router) return buf diff --git a/smtpd/handler_test.go b/smtpd/handler_test.go index f762027..34814c7 100644 --- a/smtpd/handler_test.go +++ b/smtpd/handler_test.go @@ -377,7 +377,8 @@ func setupSMTPServer(ds DataStore) (*Server, *bytes.Buffer) { log.SetOutput(buf) // Create a server, don't start it - return NewServer(cfg, ds), buf + shutdownChan := make(chan bool) + return NewServer(cfg, ds, shutdownChan), buf } var sessionNum int diff --git a/smtpd/listener.go b/smtpd/listener.go index a653502..5d63bd7 100644 --- a/smtpd/listener.go +++ b/smtpd/listener.go @@ -23,35 +23,52 @@ type Server struct { dataStore DataStore storeMessages bool listener net.Listener - shutdown bool - waitgroup *sync.WaitGroup + + // globalShutdown is the signal Inbucket needs to shut down + globalShutdown chan bool + + // localShutdown indicates this component has completed shutting down + localShutdown chan bool + + // waitgroup tracks individual sessions + waitgroup *sync.WaitGroup } -// Raw stat collectors -var expConnectsTotal = new(expvar.Int) -var expConnectsCurrent = new(expvar.Int) -var expReceivedTotal = new(expvar.Int) -var expErrorsTotal = new(expvar.Int) -var expWarnsTotal = new(expvar.Int) +var ( + // Raw stat collectors + expConnectsTotal = new(expvar.Int) + expConnectsCurrent = new(expvar.Int) + expReceivedTotal = new(expvar.Int) + expErrorsTotal = new(expvar.Int) + expWarnsTotal = new(expvar.Int) -// History of certain stats -var deliveredHist = list.New() -var connectsHist = list.New() -var errorsHist = list.New() -var warnsHist = list.New() + // History of certain stats + deliveredHist = list.New() + connectsHist = list.New() + errorsHist = list.New() + warnsHist = list.New() -// History rendered as comma delim string -var expReceivedHist = new(expvar.String) -var expConnectsHist = new(expvar.String) -var expErrorsHist = new(expvar.String) -var expWarnsHist = new(expvar.String) + // History rendered as comma delim string + expReceivedHist = new(expvar.String) + expConnectsHist = new(expvar.String) + expErrorsHist = new(expvar.String) + expWarnsHist = new(expvar.String) +) // NewServer creates a new Server instance with the specificed config -func NewServer(cfg config.SMTPConfig, ds DataStore) *Server { - return &Server{dataStore: ds, domain: cfg.Domain, maxRecips: cfg.MaxRecipients, - maxIdleSeconds: cfg.MaxIdleSeconds, maxMessageBytes: cfg.MaxMessageBytes, - storeMessages: cfg.StoreMessages, domainNoStore: strings.ToLower(cfg.DomainNoStore), - waitgroup: new(sync.WaitGroup)} +func NewServer(cfg config.SMTPConfig, ds DataStore, globalShutdown chan bool) *Server { + return &Server{ + dataStore: ds, + domain: cfg.Domain, + maxRecips: cfg.MaxRecipients, + maxIdleSeconds: cfg.MaxIdleSeconds, + maxMessageBytes: cfg.MaxMessageBytes, + storeMessages: cfg.StoreMessages, + domainNoStore: strings.ToLower(cfg.DomainNoStore), + waitgroup: new(sync.WaitGroup), + globalShutdown: globalShutdown, + localShutdown: make(chan bool), + } } // Start the listener and handle incoming connections @@ -82,10 +99,28 @@ func (s *Server) Start() { // Start retention scanner StartRetentionScanner(s.dataStore) + // Listener go routine + go s.serve() + + // Wait for shutdown + select { + case _ = <-s.globalShutdown: + log.Tracef("SMTP shutdown requested, connections will be drained") + } + + // Closing the listener will cause the serve() go routine to exit + if err := s.listener.Close(); err != nil { + log.Errorf("Failed to close SMTP listener: %v", err) + } +} + +// serve is the listen/accept loop +func (s *Server) serve() { // Handle incoming connections var tempDelay time.Duration - for sid := 1; ; sid++ { + for sessionID := 1; ; sessionID++ { if conn, err := s.listener.Accept(); err != nil { + // There was an error accepting the connection if nerr, ok := err.(net.Error); ok && nerr.Temporary() { // Temporary error, sleep for a bit and try again if tempDelay == 0 { @@ -100,36 +135,35 @@ func (s *Server) Start() { time.Sleep(tempDelay) continue } else { - if s.shutdown { - log.Tracef("SMTP listener shutting down on request") + // Permanent error + select { + case _ = <-s.globalShutdown: + close(s.localShutdown) return + default: + // TODO Implement a max error counter before shutdown? + // or maybe attempt to restart smtpd + panic(err) } - // TODO Implement a max error counter before shutdown? - // or maybe attempt to restart smtpd - panic(err) } } else { tempDelay = 0 expConnectsTotal.Add(1) s.waitgroup.Add(1) - go s.startSession(sid, conn) + go s.startSession(sessionID, conn) } } } -// Stop requests the SMTP server closes it's listener -func (s *Server) Stop() { - log.Tracef("SMTP shutdown requested, connections will be drained") - s.shutdown = true - if err := s.listener.Close(); err != nil { - log.Errorf("Failed to close SMTP listener: %v", err) - } -} - // Drain causes the caller to block until all active SMTP sessions have finished func (s *Server) Drain() { + // Wait for listener to exit + select { + case _ = <-s.localShutdown: + } + // Wait for sessions to close s.waitgroup.Wait() - log.Tracef("SMTP connections drained") + log.Tracef("SMTP connections have drained") } // When the provided Ticker ticks, we update our metrics history diff --git a/webui/rest_test.go b/webui/rest_test.go index ad75cc3..04301b0 100644 --- a/webui/rest_test.go +++ b/webui/rest_test.go @@ -411,7 +411,8 @@ func setupWebServer(ds smtpd.DataStore) *bytes.Buffer { TemplateDir: "../themes/integral/templates", PublicDir: "../themes/integral/public", } - httpd.Initialize(cfg, ds) + shutdownChan := make(chan bool) + httpd.Initialize(cfg, ds, shutdownChan) SetupRoutes(httpd.Router) return buf