From 32631daeae4b802c16df8d8bb8c2751968b13e0d Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Sun, 22 Jan 2017 20:59:59 -0800 Subject: [PATCH] Refactor retention scanner prior to starting #12 --- smtpd/listener.go | 32 ++++++----- smtpd/retention.go | 124 +++++++++++++++++++++------------------- smtpd/retention_test.go | 7 ++- 3 files changed, 89 insertions(+), 74 deletions(-) diff --git a/smtpd/listener.go b/smtpd/listener.go index c2d3905..5fd4529 100644 --- a/smtpd/listener.go +++ b/smtpd/listener.go @@ -26,9 +26,10 @@ type Server struct { storeMessages bool // Dependencies - dataStore DataStore // Mailbox/message store - globalShutdown chan bool // Shuts down Inbucket - msgHub *msghub.Hub // Pub/sub for message info + dataStore DataStore // Mailbox/message store + globalShutdown chan bool // Shuts down Inbucket + msgHub *msghub.Hub // Pub/sub for message info + retentionScanner *RetentionScanner // Deletes expired messages // State listener net.Listener // Incoming network connections @@ -63,16 +64,17 @@ func NewServer( ds DataStore, msgHub *msghub.Hub) *Server { return &Server{ - domain: cfg.Domain, - domainNoStore: strings.ToLower(cfg.DomainNoStore), - maxRecips: cfg.MaxRecipients, - maxIdleSeconds: cfg.MaxIdleSeconds, - maxMessageBytes: cfg.MaxMessageBytes, - storeMessages: cfg.StoreMessages, - globalShutdown: globalShutdown, - dataStore: ds, - msgHub: msgHub, - waitgroup: new(sync.WaitGroup), + domain: cfg.Domain, + domainNoStore: strings.ToLower(cfg.DomainNoStore), + maxRecips: cfg.MaxRecipients, + maxIdleSeconds: cfg.MaxIdleSeconds, + maxMessageBytes: cfg.MaxMessageBytes, + storeMessages: cfg.StoreMessages, + globalShutdown: globalShutdown, + dataStore: ds, + msgHub: msgHub, + retentionScanner: NewRetentionScanner(ds, globalShutdown), + waitgroup: new(sync.WaitGroup), } } @@ -102,7 +104,7 @@ func (s *Server) Start(ctx context.Context) { } // Start retention scanner - StartRetentionScanner(s.dataStore, s.globalShutdown) + s.retentionScanner.Start() // Listener go routine go s.serve(ctx) @@ -174,7 +176,7 @@ func (s *Server) Drain() { // Wait for sessions to close s.waitgroup.Wait() log.Tracef("SMTP connections have drained") - RetentionJoin() + s.retentionScanner.Join() } // When the provided Ticker ticks, we update our metrics history diff --git a/smtpd/retention.go b/smtpd/retention.go index 9331233..79c28b5 100644 --- a/smtpd/retention.go +++ b/smtpd/retention.go @@ -14,11 +14,6 @@ var ( retentionScanCompleted = time.Now() retentionScanCompletedMu sync.RWMutex - // Indicates Inbucket needs to shut down - globalShutdown chan bool - // Indicates the retention scanner has shut down - retentionShutdown chan bool - // History counters expRetentionDeletesTotal = new(expvar.Int) expRetentionPeriod = new(expvar.Int) @@ -33,73 +28,100 @@ var ( expRetainedHist = new(expvar.String) ) -// StartRetentionScanner launches a go-routine that scans for expired -// messages, following the configured interval -func StartRetentionScanner(ds DataStore, shutdownChannel chan bool) { - globalShutdown = shutdownChannel - retentionShutdown = make(chan bool) - cfg := config.GetDataStoreConfig() - expRetentionPeriod.Set(int64(cfg.RetentionMinutes * 60)) - if cfg.RetentionMinutes > 0 { - // Retention scanning enabled - log.Infof("Retention configured for %v minutes", cfg.RetentionMinutes) - go retentionScanner(ds, time.Duration(cfg.RetentionMinutes)*time.Minute, - time.Duration(cfg.RetentionSleep)*time.Millisecond) - } else { - log.Infof("Retention scanner disabled") - close(retentionShutdown) - } +func init() { + rm := expvar.NewMap("retention") + rm.Set("SecondsSinceScanCompleted", expvar.Func(secondsSinceRetentionScanCompleted)) + rm.Set("DeletesHist", expRetentionDeletesHist) + rm.Set("DeletesTotal", expRetentionDeletesTotal) + rm.Set("Period", expRetentionPeriod) + rm.Set("RetainedHist", expRetainedHist) + rm.Set("RetainedCurrent", expRetainedCurrent) } -func retentionScanner(ds DataStore, maxAge time.Duration, sleep time.Duration) { +// RetentionScanner looks for messages older than the configured retention period and deletes them. +type RetentionScanner struct { + globalShutdown chan bool // Closes when Inbucket needs to shut down + retentionShutdown chan bool // Closed after the scanner has shut down + ds DataStore + retentionPeriod time.Duration + retentionSleep time.Duration +} + +// NewRetentionScanner launches a go-routine that scans for expired +// messages, following the configured interval +func NewRetentionScanner(ds DataStore, shutdownChannel chan bool) *RetentionScanner { + cfg := config.GetDataStoreConfig() + rs := &RetentionScanner{ + globalShutdown: shutdownChannel, + retentionShutdown: make(chan bool), + ds: ds, + retentionPeriod: time.Duration(cfg.RetentionMinutes) * time.Minute, + retentionSleep: time.Duration(cfg.RetentionSleep) * time.Millisecond, + } + // expRetentionPeriod is displayed on the status page + expRetentionPeriod.Set(int64(cfg.RetentionMinutes * 60)) + return rs +} + +// Start up the retention scanner if retention period > 0 +func (rs *RetentionScanner) Start() { + if rs.retentionPeriod <= 0 { + log.Infof("Retention scanner disabled") + close(rs.retentionShutdown) + return + } + log.Infof("Retention configured for %v", rs.retentionPeriod) + go rs.run() +} + +// run loops to kick off the scanner on the correct schedule +func (rs *RetentionScanner) run() { start := time.Now() retentionLoop: for { - // Prevent scanner from running more than once a minute + // Prevent scanner from starting more than once a minute since := time.Since(start) if since < time.Minute { dur := time.Minute - since log.Tracef("Retention scanner sleeping for %v", dur) select { - case _ = <-globalShutdown: + case _ = <-rs.globalShutdown: break retentionLoop case _ = <-time.After(dur): } } - // Kickoff scan start = time.Now() - if err := doRetentionScan(ds, maxAge, sleep); err != nil { + if err := rs.doScan(); err != nil { log.Errorf("Error during retention scan: %v", err) } - // Check for global shutdown select { - case _ = <-globalShutdown: + case _ = <-rs.globalShutdown: break retentionLoop default: } } - log.Tracef("Retention scanner shut down") - close(retentionShutdown) + close(rs.retentionShutdown) } -// doRetentionScan does a single pass of all mailboxes looking for messages that can be purged -func doRetentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) error { +// doScan does a single pass of all mailboxes looking for messages that can be purged +func (rs *RetentionScanner) doScan() error { log.Tracef("Starting retention scan") - cutoff := time.Now().Add(-1 * maxAge) - mboxes, err := ds.AllMailboxes() + cutoff := time.Now().Add(-1 * rs.retentionPeriod) + mboxes, err := rs.ds.AllMailboxes() if err != nil { return err } - retained := 0 + // Loop over all mailboxes for _, mb := range mboxes { messages, err := mb.GetMessages() if err != nil { return err } + // Loop over all messages in mailbox for _, msg := range messages { if msg.Date().Before(cutoff) { log.Tracef("Purging expired message %v", msg.ID()) @@ -114,28 +136,26 @@ func doRetentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) er retained++ } } - // Check for shutdown + // Sleep after completing a mailbox select { - case _ = <-globalShutdown: + case <-rs.globalShutdown: log.Tracef("Retention scan aborted due to shutdown") return nil - default: + case <-time.After(rs.retentionSleep): + // Reduce disk thrashing } - // Sleep after completing a mailbox - time.Sleep(sleep) } - + // Update metrics setRetentionScanCompleted(time.Now()) expRetainedCurrent.Set(int64(retained)) - return nil } -// RetentionJoin does not retun until the retention scanner has shut down -func RetentionJoin() { - if retentionShutdown != nil { +// Join does not retun until the retention scanner has shut down +func (rs *RetentionScanner) Join() { + if rs.retentionShutdown != nil { select { - case _ = <-retentionShutdown: + case <-rs.retentionShutdown: } } } @@ -143,27 +163,15 @@ func RetentionJoin() { func setRetentionScanCompleted(t time.Time) { retentionScanCompletedMu.Lock() defer retentionScanCompletedMu.Unlock() - retentionScanCompleted = t } func getRetentionScanCompleted() time.Time { retentionScanCompletedMu.RLock() defer retentionScanCompletedMu.RUnlock() - return retentionScanCompleted } func secondsSinceRetentionScanCompleted() interface{} { return time.Since(getRetentionScanCompleted()) / time.Second } - -func init() { - rm := expvar.NewMap("retention") - rm.Set("SecondsSinceScanCompleted", expvar.Func(secondsSinceRetentionScanCompleted)) - rm.Set("DeletesHist", expRetentionDeletesHist) - rm.Set("DeletesTotal", expRetentionDeletesTotal) - rm.Set("Period", expRetentionPeriod) - rm.Set("RetainedHist", expRetainedHist) - rm.Set("RetainedCurrent", expRetainedCurrent) -} diff --git a/smtpd/retention_test.go b/smtpd/retention_test.go index a08458b..5e76b42 100644 --- a/smtpd/retention_test.go +++ b/smtpd/retention_test.go @@ -36,7 +36,12 @@ func TestDoRetentionScan(t *testing.T) { mb3.On("GetMessages").Return([]Message{new3}, nil) // Test 4 hour retention - if err := doRetentionScan(mds, 4*time.Hour-time.Minute, 0); err != nil { + rs := &RetentionScanner{ + ds: mds, + retentionPeriod: 4*time.Hour - time.Minute, + retentionSleep: 0, + } + if err := rs.doScan(); err != nil { t.Error(err) }