diff --git a/smtpd/listener.go b/smtpd/listener.go index fe4286d..e7faffd 100644 --- a/smtpd/listener.go +++ b/smtpd/listener.go @@ -101,7 +101,7 @@ func (s *Server) Start() { } // Start retention scanner - StartRetentionScanner(s.dataStore) + StartRetentionScanner(s.dataStore, s.globalShutdown) // Listener go routine go s.serve() @@ -177,6 +177,7 @@ func (s *Server) Drain() { // Wait for sessions to close s.waitgroup.Wait() log.Tracef("SMTP connections have drained") + RetentionJoin() } // When the provided Ticker ticks, we update our metrics history diff --git a/smtpd/retention.go b/smtpd/retention.go index ac1b3f8..ea3642c 100644 --- a/smtpd/retention.go +++ b/smtpd/retention.go @@ -10,24 +10,34 @@ import ( "github.com/jhillyerd/inbucket/log" ) -var retentionScanCompleted time.Time -var retentionScanCompletedMu sync.RWMutex +var ( + retentionScanCompleted = time.Now() + retentionScanCompletedMu sync.RWMutex -var expRetentionDeletesTotal = new(expvar.Int) -var expRetentionPeriod = new(expvar.Int) -var expRetainedCurrent = new(expvar.Int) + // Indicates Inbucket needs to shut down + globalShutdown chan bool + // Indicates the retention scanner has shut down + retentionShutdown chan bool -// History of certain stats -var retentionDeletesHist = list.New() -var retainedHist = list.New() + // History counters + expRetentionDeletesTotal = new(expvar.Int) + expRetentionPeriod = new(expvar.Int) + expRetainedCurrent = new(expvar.Int) -// History rendered as comma delimited string -var expRetentionDeletesHist = new(expvar.String) -var expRetainedHist = new(expvar.String) + // History of certain stats + retentionDeletesHist = list.New() + retainedHist = list.New() + + // History rendered as comma delimited string + expRetentionDeletesHist = new(expvar.String) + expRetainedHist = new(expvar.String) +) // StartRetentionScanner launches a go-routine that scans for expired // messages, following the configured interval -func StartRetentionScanner(ds DataStore) { +func StartRetentionScanner(ds DataStore, shutdownChannel chan bool) { + globalShutdown = shutdownChannel + retentionShutdown = make(chan bool) cfg := config.GetDataStoreConfig() expRetentionPeriod.Set(int64(cfg.RetentionMinutes * 60)) if cfg.RetentionMinutes > 0 { @@ -37,26 +47,42 @@ func StartRetentionScanner(ds DataStore) { time.Duration(cfg.RetentionSleep)*time.Millisecond) } else { log.Infof("Retention scanner disabled") + close(retentionShutdown) } } func retentionScanner(ds DataStore, maxAge time.Duration, sleep time.Duration) { start := time.Now() +retentionLoop: for { // Prevent scanner from running more than once a minute since := time.Since(start) if since < time.Minute { dur := time.Minute - since log.Tracef("Retention scanner sleeping for %v", dur) - time.Sleep(dur) + select { + case _ = <-globalShutdown: + break retentionLoop + case _ = <-time.After(dur): + } } - start = time.Now() // Kickoff scan + start = time.Now() if err := doRetentionScan(ds, maxAge, sleep); err != nil { log.Errorf("Error during retention scan: %v", err) } + + // Check for global shutdown + select { + case _ = <-globalShutdown: + break retentionLoop + default: + } } + + log.Tracef("Retention scanner shut down") + close(retentionShutdown) } // doRetentionScan does a single pass of all mailboxes looking for messages that can be purged @@ -88,6 +114,13 @@ func doRetentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) er retained++ } } + // Check for shutdown + select { + case _ = <-globalShutdown: + log.Tracef("Retention scan aborted due to shutdown") + return nil + default: + } // Sleep after completing a mailbox time.Sleep(sleep) } @@ -98,6 +131,13 @@ func doRetentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) er return nil } +// RetentionJoin does not retun until the retention scanner has shut down +func RetentionJoin() { + select { + case _ = <-retentionShutdown: + } +} + func setRetentionScanCompleted(t time.Time) { retentionScanCompletedMu.Lock() defer retentionScanCompletedMu.Unlock()