1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-18 01:57:02 +00:00

Retention scanner respects global shutdown

- Clean up vars in retention.go
- Check globalShutdown in several parts of the retention scanner
- smtpd.Drain() now waits for scanner to shut down
- Closes #22
This commit is contained in:
James Hillyerd
2016-02-29 21:54:10 -08:00
parent 28adcf0437
commit 8e66be63f5
2 changed files with 56 additions and 15 deletions

View File

@@ -101,7 +101,7 @@ func (s *Server) Start() {
} }
// Start retention scanner // Start retention scanner
StartRetentionScanner(s.dataStore) StartRetentionScanner(s.dataStore, s.globalShutdown)
// Listener go routine // Listener go routine
go s.serve() go s.serve()
@@ -177,6 +177,7 @@ func (s *Server) Drain() {
// Wait for sessions to close // Wait for sessions to close
s.waitgroup.Wait() s.waitgroup.Wait()
log.Tracef("SMTP connections have drained") log.Tracef("SMTP connections have drained")
RetentionJoin()
} }
// When the provided Ticker ticks, we update our metrics history // When the provided Ticker ticks, we update our metrics history

View File

@@ -10,24 +10,34 @@ import (
"github.com/jhillyerd/inbucket/log" "github.com/jhillyerd/inbucket/log"
) )
var retentionScanCompleted time.Time var (
var retentionScanCompletedMu sync.RWMutex retentionScanCompleted = time.Now()
retentionScanCompletedMu sync.RWMutex
var expRetentionDeletesTotal = new(expvar.Int) // Indicates Inbucket needs to shut down
var expRetentionPeriod = new(expvar.Int) globalShutdown chan bool
var expRetainedCurrent = new(expvar.Int) // Indicates the retention scanner has shut down
retentionShutdown chan bool
// History of certain stats // History counters
var retentionDeletesHist = list.New() expRetentionDeletesTotal = new(expvar.Int)
var retainedHist = list.New() expRetentionPeriod = new(expvar.Int)
expRetainedCurrent = new(expvar.Int)
// History rendered as comma delimited string // History of certain stats
var expRetentionDeletesHist = new(expvar.String) retentionDeletesHist = list.New()
var expRetainedHist = new(expvar.String) retainedHist = list.New()
// History rendered as comma delimited string
expRetentionDeletesHist = new(expvar.String)
expRetainedHist = new(expvar.String)
)
// StartRetentionScanner launches a go-routine that scans for expired // StartRetentionScanner launches a go-routine that scans for expired
// messages, following the configured interval // messages, following the configured interval
func StartRetentionScanner(ds DataStore) { func StartRetentionScanner(ds DataStore, shutdownChannel chan bool) {
globalShutdown = shutdownChannel
retentionShutdown = make(chan bool)
cfg := config.GetDataStoreConfig() cfg := config.GetDataStoreConfig()
expRetentionPeriod.Set(int64(cfg.RetentionMinutes * 60)) expRetentionPeriod.Set(int64(cfg.RetentionMinutes * 60))
if cfg.RetentionMinutes > 0 { if cfg.RetentionMinutes > 0 {
@@ -37,26 +47,42 @@ func StartRetentionScanner(ds DataStore) {
time.Duration(cfg.RetentionSleep)*time.Millisecond) time.Duration(cfg.RetentionSleep)*time.Millisecond)
} else { } else {
log.Infof("Retention scanner disabled") log.Infof("Retention scanner disabled")
close(retentionShutdown)
} }
} }
func retentionScanner(ds DataStore, maxAge time.Duration, sleep time.Duration) { func retentionScanner(ds DataStore, maxAge time.Duration, sleep time.Duration) {
start := time.Now() start := time.Now()
retentionLoop:
for { for {
// Prevent scanner from running more than once a minute // Prevent scanner from running more than once a minute
since := time.Since(start) since := time.Since(start)
if since < time.Minute { if since < time.Minute {
dur := time.Minute - since dur := time.Minute - since
log.Tracef("Retention scanner sleeping for %v", dur) log.Tracef("Retention scanner sleeping for %v", dur)
time.Sleep(dur) select {
case _ = <-globalShutdown:
break retentionLoop
case _ = <-time.After(dur):
}
} }
start = time.Now()
// Kickoff scan // Kickoff scan
start = time.Now()
if err := doRetentionScan(ds, maxAge, sleep); err != nil { if err := doRetentionScan(ds, maxAge, sleep); err != nil {
log.Errorf("Error during retention scan: %v", err) log.Errorf("Error during retention scan: %v", err)
} }
// Check for global shutdown
select {
case _ = <-globalShutdown:
break retentionLoop
default:
}
} }
log.Tracef("Retention scanner shut down")
close(retentionShutdown)
} }
// doRetentionScan does a single pass of all mailboxes looking for messages that can be purged // doRetentionScan does a single pass of all mailboxes looking for messages that can be purged
@@ -88,6 +114,13 @@ func doRetentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) er
retained++ retained++
} }
} }
// Check for shutdown
select {
case _ = <-globalShutdown:
log.Tracef("Retention scan aborted due to shutdown")
return nil
default:
}
// Sleep after completing a mailbox // Sleep after completing a mailbox
time.Sleep(sleep) time.Sleep(sleep)
} }
@@ -98,6 +131,13 @@ func doRetentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) er
return nil return nil
} }
// RetentionJoin does not retun until the retention scanner has shut down
func RetentionJoin() {
select {
case _ = <-retentionShutdown:
}
}
func setRetentionScanCompleted(t time.Time) { func setRetentionScanCompleted(t time.Time) {
retentionScanCompletedMu.Lock() retentionScanCompletedMu.Lock()
defer retentionScanCompletedMu.Unlock() defer retentionScanCompletedMu.Unlock()