1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 17:47:03 +00:00

Use channels to communicate shutdown request

- httpd now uses shutdown channel
- smtpd now uses shutdown channel
- pop3d now uses shutdown channel
- timedExit now removes PID file
- tidy up some struct instantiations, var blocks
This commit is contained in:
James Hillyerd
2016-02-28 23:36:47 -08:00
parent 3a7be7d89c
commit 4b4121bb3a
8 changed files with 203 additions and 138 deletions

View File

@@ -47,12 +47,11 @@ func NewContext(req *http.Request) (*Context, error) {
if sess == nil {
// No session, must fail
return nil, err
} else {
}
// The session cookie was probably signed by an old key, ignore it
// gorilla created an empty session for us
err = nil
}
}
ctx := &Context{
Vars: vars,
Session: sess,

View File

@@ -28,19 +28,28 @@ var (
Router = mux.NewRouter()
webConfig config.WebConfig
server *http.Server
listener net.Listener
sessionStore sessions.Store
shutdown bool
globalShutdown chan bool
)
// Initialize sets up things for unit tests or the Start() method
func Initialize(cfg config.WebConfig, ds smtpd.DataStore) {
func Initialize(cfg config.WebConfig, ds smtpd.DataStore, shutdownChan chan bool) {
webConfig = cfg
setupRoutes(cfg)
globalShutdown = shutdownChan
// NewContext() will use this DataStore for the web handlers
DataStore = ds
// Content Paths
log.Infof("HTTP templates mapped to %q", cfg.TemplateDir)
log.Infof("HTTP static content mapped to %q", cfg.PublicDir)
Router.PathPrefix("/public/").Handler(http.StripPrefix("/public/",
http.FileServer(http.Dir(cfg.PublicDir))))
http.Handle("/", Router)
// Session cookie setup
if cfg.CookieAuthKey == "" {
log.Infof("HTTP generating random cookie.auth.key")
sessionStore = sessions.NewCookieStore(securecookie.GenerateRandomKey(64))
@@ -50,22 +59,10 @@ func Initialize(cfg config.WebConfig, ds smtpd.DataStore) {
}
}
func setupRoutes(cfg config.WebConfig) {
log.Infof("HTTP templates mapped to %q", cfg.TemplateDir)
log.Infof("HTTP static content mapped to %q", cfg.PublicDir)
// Static content
Router.PathPrefix("/public/").Handler(http.StripPrefix("/public/",
http.FileServer(http.Dir(cfg.PublicDir))))
// Register w/ HTTP
http.Handle("/", Router)
}
// Start begins listening for HTTP requests
func Start() {
addr := fmt.Sprintf("%v:%v", webConfig.IP4address, webConfig.IP4port)
server := &http.Server{
server = &http.Server{
Addr: addr,
Handler: nil,
ReadTimeout: 60 * time.Second,
@@ -82,24 +79,32 @@ func Start() {
panic(err)
}
err = server.Serve(listener)
if shutdown {
// Listener go routine
go serve()
// Wait for shutdown
select {
case _ = <-globalShutdown:
log.Tracef("HTTP server shutting down on request")
} else if err != nil {
log.Errorf("HTTP server failed: %v", err)
}
// Closing the listener will cause the serve() go routine to exit
if err := listener.Close(); err != nil {
log.Errorf("Failed to close HTTP listener: %v", err)
}
}
// Stop shuts down the HTTP server
func Stop() {
log.Tracef("HTTP shutdown requested")
shutdown = true
if listener != nil {
if err := listener.Close(); err != nil {
log.Errorf("Error closing HTTP listener: %v", err)
}
} else {
log.Errorf("HTTP listener was nil during shutdown")
// serve begins serving HTTP requests
func serve() {
// server.Serve blocks until we close the listener
err := server.Serve(listener)
select {
case _ = <-globalShutdown:
// Nop
default:
log.Errorf("HTTP server failed: %v", err)
// TODO shutdown?
}
}

View File

@@ -34,6 +34,9 @@ var (
// startTime is used to calculate uptime of Inbucket
startTime = time.Now()
// shutdownChan - close it to tell Inbucket to shut down cleanly
shutdownChan = make(chan bool)
// Server instances
smtpServer *smtpd.Server
pop3Server *pop3d.Server
@@ -63,7 +66,6 @@ func main() {
// Setup signal handler
sigChan := make(chan os.Signal)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
go signalProcessor(sigChan)
// Initialize logging
level, _ := config.Config.String("logging", "level")
@@ -93,24 +95,53 @@ func main() {
ds := smtpd.DefaultFileDataStore()
// Start HTTP server
httpd.Initialize(config.GetWebConfig(), ds)
httpd.Initialize(config.GetWebConfig(), ds, shutdownChan)
webui.SetupRoutes(httpd.Router)
rest.SetupRoutes(httpd.Router)
go httpd.Start()
// Start POP3 server
pop3Server = pop3d.New()
// TODO pass datastore
pop3Server = pop3d.New(shutdownChan)
go pop3Server.Start()
// Startup SMTP server, block until it exits
smtpServer = smtpd.NewServer(config.GetSMTPConfig(), ds)
smtpServer.Start()
// Startup SMTP server
smtpServer = smtpd.NewServer(config.GetSMTPConfig(), ds, shutdownChan)
go smtpServer.Start()
// Loop forever waiting for signals or shutdown channel
signalLoop:
for {
select {
case sig := <-sigChan:
switch sig {
case syscall.SIGHUP:
log.Infof("Recieved SIGHUP, cycling logfile")
log.Rotate()
case syscall.SIGINT:
// Shutdown requested
log.Infof("Received SIGINT, shutting down")
close(shutdownChan)
case syscall.SIGTERM:
// Shutdown requested
log.Infof("Received SIGTERM, shutting down")
close(shutdownChan)
}
case _ = <-shutdownChan:
break signalLoop
}
}
// Wait for active connections to finish
go timedExit()
smtpServer.Drain()
pop3Server.Drain()
// Remove pidfile
removePIDFile()
}
// removePIDFile removes the PID file if created
func removePIDFile() {
if *pidfile != "none" {
if err := os.Remove(*pidfile); err != nil {
log.Errorf("Failed to remove %q: %v", *pidfile, err)
@@ -118,42 +149,12 @@ func main() {
}
}
// signalProcessor is a goroutine that handles OS signals
func signalProcessor(c <-chan os.Signal) {
for {
sig := <-c
switch sig {
case syscall.SIGHUP:
log.Infof("Recieved SIGHUP, cycling logfile")
log.Rotate()
case syscall.SIGINT:
// Initiate shutdown
log.Infof("Received SIGINT, shutting down")
shutdown()
case syscall.SIGTERM:
// Initiate shutdown
log.Infof("Received SIGTERM, shutting down")
shutdown()
}
}
}
// shutdown is called by signalProcessor() when we are asked to shut down
func shutdown() {
go timedExit()
httpd.Stop()
if smtpServer != nil {
smtpServer.Stop()
} else {
log.Errorf("smtpServer was nil during shutdown")
}
}
// timedExit is called as a goroutine during shutdown, it will force an exit
// after 15 seconds
func timedExit() {
time.Sleep(15 * time.Second)
log.Errorf("Inbucket clean shutdown timed out, forcing exit")
log.Errorf("Clean shutdown took too long, forcing exit")
removePIDFile()
os.Exit(0)
}

View File

@@ -17,20 +17,27 @@ type Server struct {
maxIdleSeconds int
dataStore smtpd.DataStore
listener net.Listener
shutdown bool
globalShutdown chan bool
localShutdown chan bool
waitgroup *sync.WaitGroup
}
// New creates a new Server struct
func New() *Server {
func New(shutdownChan chan bool) *Server {
// Get a new instance of the the FileDataStore - the locking and counting
// mechanisms are both global variables in the smtpd package. If that
// changes in the future, this should be modified to use the same DataStore
// instance.
ds := smtpd.DefaultFileDataStore()
cfg := config.GetPOP3Config()
return &Server{domain: cfg.Domain, dataStore: ds, maxIdleSeconds: cfg.MaxIdleSeconds,
waitgroup: new(sync.WaitGroup)}
return &Server{
domain: cfg.Domain,
dataStore: ds,
maxIdleSeconds: cfg.MaxIdleSeconds,
globalShutdown: shutdownChan,
localShutdown: make(chan bool),
waitgroup: new(sync.WaitGroup),
}
}
// Start the server and listen for connections
@@ -52,6 +59,23 @@ func (s *Server) Start() {
panic(err)
}
// Listener go routine
go s.serve()
// Wait for shutdown
select {
case _ = <-s.globalShutdown:
}
log.Tracef("POP3 shutdown requested, connections will be drained")
// Closing the listener will cause the serve() go routine to exit
if err := s.listener.Close(); err != nil {
log.Errorf("Error closing POP3 listener: %v", err)
}
}
// serve is the listen/accept loop
func (s *Server) serve() {
// Handle incoming connections
var tempDelay time.Duration
for sid := 1; ; sid++ {
@@ -70,14 +94,17 @@ func (s *Server) Start() {
time.Sleep(tempDelay)
continue
} else {
if s.shutdown {
log.Tracef("POP3 listener shutting down on request")
// Permanent error
select {
case _ = <-s.globalShutdown:
close(s.localShutdown)
return
}
default:
// TODO Implement a max error counter before shutdown?
// or maybe attempt to restart POP3
// or maybe attempt to restart smtpd
panic(err)
}
}
} else {
tempDelay = 0
s.waitgroup.Add(1)
@@ -86,17 +113,13 @@ func (s *Server) Start() {
}
}
// Stop requests the POP3 server closes it's listener
func (s *Server) Stop() {
log.Tracef("POP3 shutdown requested, connections will be drained")
s.shutdown = true
if err := s.listener.Close(); err != nil {
log.Errorf("Error closing POP3 listener: %v", err)
}
}
// Drain causes the caller to block until all active POP3 sessions have finished
func (s *Server) Drain() {
s.waitgroup.Wait()
log.Tracef("POP3 connections drained")
// Wait for listener to exit
select {
case _ = <-s.localShutdown:
}
// Wait for sessions to close
s.waitgroup.Wait()
log.Tracef("POP3 connections have drained")
}

View File

@@ -191,7 +191,8 @@ func setupWebServer(ds smtpd.DataStore) *bytes.Buffer {
TemplateDir: "../themes/integral/templates",
PublicDir: "../themes/integral/public",
}
httpd.Initialize(cfg, ds)
shutdownChan := make(chan bool)
httpd.Initialize(cfg, ds, shutdownChan)
SetupRoutes(httpd.Router)
return buf

View File

@@ -377,7 +377,8 @@ func setupSMTPServer(ds DataStore) (*Server, *bytes.Buffer) {
log.SetOutput(buf)
// Create a server, don't start it
return NewServer(cfg, ds), buf
shutdownChan := make(chan bool)
return NewServer(cfg, ds, shutdownChan), buf
}
var sessionNum int

View File

@@ -23,35 +23,52 @@ type Server struct {
dataStore DataStore
storeMessages bool
listener net.Listener
shutdown bool
// globalShutdown is the signal Inbucket needs to shut down
globalShutdown chan bool
// localShutdown indicates this component has completed shutting down
localShutdown chan bool
// waitgroup tracks individual sessions
waitgroup *sync.WaitGroup
}
var (
// Raw stat collectors
var expConnectsTotal = new(expvar.Int)
var expConnectsCurrent = new(expvar.Int)
var expReceivedTotal = new(expvar.Int)
var expErrorsTotal = new(expvar.Int)
var expWarnsTotal = new(expvar.Int)
expConnectsTotal = new(expvar.Int)
expConnectsCurrent = new(expvar.Int)
expReceivedTotal = new(expvar.Int)
expErrorsTotal = new(expvar.Int)
expWarnsTotal = new(expvar.Int)
// History of certain stats
var deliveredHist = list.New()
var connectsHist = list.New()
var errorsHist = list.New()
var warnsHist = list.New()
deliveredHist = list.New()
connectsHist = list.New()
errorsHist = list.New()
warnsHist = list.New()
// History rendered as comma delim string
var expReceivedHist = new(expvar.String)
var expConnectsHist = new(expvar.String)
var expErrorsHist = new(expvar.String)
var expWarnsHist = new(expvar.String)
expReceivedHist = new(expvar.String)
expConnectsHist = new(expvar.String)
expErrorsHist = new(expvar.String)
expWarnsHist = new(expvar.String)
)
// NewServer creates a new Server instance with the specificed config
func NewServer(cfg config.SMTPConfig, ds DataStore) *Server {
return &Server{dataStore: ds, domain: cfg.Domain, maxRecips: cfg.MaxRecipients,
maxIdleSeconds: cfg.MaxIdleSeconds, maxMessageBytes: cfg.MaxMessageBytes,
storeMessages: cfg.StoreMessages, domainNoStore: strings.ToLower(cfg.DomainNoStore),
waitgroup: new(sync.WaitGroup)}
func NewServer(cfg config.SMTPConfig, ds DataStore, globalShutdown chan bool) *Server {
return &Server{
dataStore: ds,
domain: cfg.Domain,
maxRecips: cfg.MaxRecipients,
maxIdleSeconds: cfg.MaxIdleSeconds,
maxMessageBytes: cfg.MaxMessageBytes,
storeMessages: cfg.StoreMessages,
domainNoStore: strings.ToLower(cfg.DomainNoStore),
waitgroup: new(sync.WaitGroup),
globalShutdown: globalShutdown,
localShutdown: make(chan bool),
}
}
// Start the listener and handle incoming connections
@@ -82,10 +99,28 @@ func (s *Server) Start() {
// Start retention scanner
StartRetentionScanner(s.dataStore)
// Listener go routine
go s.serve()
// Wait for shutdown
select {
case _ = <-s.globalShutdown:
log.Tracef("SMTP shutdown requested, connections will be drained")
}
// Closing the listener will cause the serve() go routine to exit
if err := s.listener.Close(); err != nil {
log.Errorf("Failed to close SMTP listener: %v", err)
}
}
// serve is the listen/accept loop
func (s *Server) serve() {
// Handle incoming connections
var tempDelay time.Duration
for sid := 1; ; sid++ {
for sessionID := 1; ; sessionID++ {
if conn, err := s.listener.Accept(); err != nil {
// There was an error accepting the connection
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
// Temporary error, sleep for a bit and try again
if tempDelay == 0 {
@@ -100,36 +135,35 @@ func (s *Server) Start() {
time.Sleep(tempDelay)
continue
} else {
if s.shutdown {
log.Tracef("SMTP listener shutting down on request")
// Permanent error
select {
case _ = <-s.globalShutdown:
close(s.localShutdown)
return
}
default:
// TODO Implement a max error counter before shutdown?
// or maybe attempt to restart smtpd
panic(err)
}
}
} else {
tempDelay = 0
expConnectsTotal.Add(1)
s.waitgroup.Add(1)
go s.startSession(sid, conn)
go s.startSession(sessionID, conn)
}
}
}
// Stop requests the SMTP server closes it's listener
func (s *Server) Stop() {
log.Tracef("SMTP shutdown requested, connections will be drained")
s.shutdown = true
if err := s.listener.Close(); err != nil {
log.Errorf("Failed to close SMTP listener: %v", err)
}
}
// Drain causes the caller to block until all active SMTP sessions have finished
func (s *Server) Drain() {
// Wait for listener to exit
select {
case _ = <-s.localShutdown:
}
// Wait for sessions to close
s.waitgroup.Wait()
log.Tracef("SMTP connections drained")
log.Tracef("SMTP connections have drained")
}
// When the provided Ticker ticks, we update our metrics history

View File

@@ -411,7 +411,8 @@ func setupWebServer(ds smtpd.DataStore) *bytes.Buffer {
TemplateDir: "../themes/integral/templates",
PublicDir: "../themes/integral/public",
}
httpd.Initialize(cfg, ds)
shutdownChan := make(chan bool)
httpd.Initialize(cfg, ds, shutdownChan)
SetupRoutes(httpd.Router)
return buf