1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-18 18:17:03 +00:00

Refactor retention scanner prior to starting #12

This commit is contained in:
James Hillyerd
2017-01-22 20:59:59 -08:00
parent 62b77dfe5e
commit 32631daeae
3 changed files with 89 additions and 74 deletions

View File

@@ -26,9 +26,10 @@ type Server struct {
storeMessages bool storeMessages bool
// Dependencies // Dependencies
dataStore DataStore // Mailbox/message store dataStore DataStore // Mailbox/message store
globalShutdown chan bool // Shuts down Inbucket globalShutdown chan bool // Shuts down Inbucket
msgHub *msghub.Hub // Pub/sub for message info msgHub *msghub.Hub // Pub/sub for message info
retentionScanner *RetentionScanner // Deletes expired messages
// State // State
listener net.Listener // Incoming network connections listener net.Listener // Incoming network connections
@@ -63,16 +64,17 @@ func NewServer(
ds DataStore, ds DataStore,
msgHub *msghub.Hub) *Server { msgHub *msghub.Hub) *Server {
return &Server{ return &Server{
domain: cfg.Domain, domain: cfg.Domain,
domainNoStore: strings.ToLower(cfg.DomainNoStore), domainNoStore: strings.ToLower(cfg.DomainNoStore),
maxRecips: cfg.MaxRecipients, maxRecips: cfg.MaxRecipients,
maxIdleSeconds: cfg.MaxIdleSeconds, maxIdleSeconds: cfg.MaxIdleSeconds,
maxMessageBytes: cfg.MaxMessageBytes, maxMessageBytes: cfg.MaxMessageBytes,
storeMessages: cfg.StoreMessages, storeMessages: cfg.StoreMessages,
globalShutdown: globalShutdown, globalShutdown: globalShutdown,
dataStore: ds, dataStore: ds,
msgHub: msgHub, msgHub: msgHub,
waitgroup: new(sync.WaitGroup), retentionScanner: NewRetentionScanner(ds, globalShutdown),
waitgroup: new(sync.WaitGroup),
} }
} }
@@ -102,7 +104,7 @@ func (s *Server) Start(ctx context.Context) {
} }
// Start retention scanner // Start retention scanner
StartRetentionScanner(s.dataStore, s.globalShutdown) s.retentionScanner.Start()
// Listener go routine // Listener go routine
go s.serve(ctx) go s.serve(ctx)
@@ -174,7 +176,7 @@ func (s *Server) Drain() {
// Wait for sessions to close // Wait for sessions to close
s.waitgroup.Wait() s.waitgroup.Wait()
log.Tracef("SMTP connections have drained") log.Tracef("SMTP connections have drained")
RetentionJoin() s.retentionScanner.Join()
} }
// When the provided Ticker ticks, we update our metrics history // When the provided Ticker ticks, we update our metrics history

View File

@@ -14,11 +14,6 @@ var (
retentionScanCompleted = time.Now() retentionScanCompleted = time.Now()
retentionScanCompletedMu sync.RWMutex retentionScanCompletedMu sync.RWMutex
// Indicates Inbucket needs to shut down
globalShutdown chan bool
// Indicates the retention scanner has shut down
retentionShutdown chan bool
// History counters // History counters
expRetentionDeletesTotal = new(expvar.Int) expRetentionDeletesTotal = new(expvar.Int)
expRetentionPeriod = new(expvar.Int) expRetentionPeriod = new(expvar.Int)
@@ -33,73 +28,100 @@ var (
expRetainedHist = new(expvar.String) expRetainedHist = new(expvar.String)
) )
// StartRetentionScanner launches a go-routine that scans for expired func init() {
// messages, following the configured interval rm := expvar.NewMap("retention")
func StartRetentionScanner(ds DataStore, shutdownChannel chan bool) { rm.Set("SecondsSinceScanCompleted", expvar.Func(secondsSinceRetentionScanCompleted))
globalShutdown = shutdownChannel rm.Set("DeletesHist", expRetentionDeletesHist)
retentionShutdown = make(chan bool) rm.Set("DeletesTotal", expRetentionDeletesTotal)
cfg := config.GetDataStoreConfig() rm.Set("Period", expRetentionPeriod)
expRetentionPeriod.Set(int64(cfg.RetentionMinutes * 60)) rm.Set("RetainedHist", expRetainedHist)
if cfg.RetentionMinutes > 0 { rm.Set("RetainedCurrent", expRetainedCurrent)
// Retention scanning enabled
log.Infof("Retention configured for %v minutes", cfg.RetentionMinutes)
go retentionScanner(ds, time.Duration(cfg.RetentionMinutes)*time.Minute,
time.Duration(cfg.RetentionSleep)*time.Millisecond)
} else {
log.Infof("Retention scanner disabled")
close(retentionShutdown)
}
} }
func retentionScanner(ds DataStore, maxAge time.Duration, sleep time.Duration) { // RetentionScanner looks for messages older than the configured retention period and deletes them.
type RetentionScanner struct {
globalShutdown chan bool // Closes when Inbucket needs to shut down
retentionShutdown chan bool // Closed after the scanner has shut down
ds DataStore
retentionPeriod time.Duration
retentionSleep time.Duration
}
// NewRetentionScanner launches a go-routine that scans for expired
// messages, following the configured interval
func NewRetentionScanner(ds DataStore, shutdownChannel chan bool) *RetentionScanner {
cfg := config.GetDataStoreConfig()
rs := &RetentionScanner{
globalShutdown: shutdownChannel,
retentionShutdown: make(chan bool),
ds: ds,
retentionPeriod: time.Duration(cfg.RetentionMinutes) * time.Minute,
retentionSleep: time.Duration(cfg.RetentionSleep) * time.Millisecond,
}
// expRetentionPeriod is displayed on the status page
expRetentionPeriod.Set(int64(cfg.RetentionMinutes * 60))
return rs
}
// Start up the retention scanner if retention period > 0
func (rs *RetentionScanner) Start() {
if rs.retentionPeriod <= 0 {
log.Infof("Retention scanner disabled")
close(rs.retentionShutdown)
return
}
log.Infof("Retention configured for %v", rs.retentionPeriod)
go rs.run()
}
// run loops to kick off the scanner on the correct schedule
func (rs *RetentionScanner) run() {
start := time.Now() start := time.Now()
retentionLoop: retentionLoop:
for { for {
// Prevent scanner from running more than once a minute // Prevent scanner from starting more than once a minute
since := time.Since(start) since := time.Since(start)
if since < time.Minute { if since < time.Minute {
dur := time.Minute - since dur := time.Minute - since
log.Tracef("Retention scanner sleeping for %v", dur) log.Tracef("Retention scanner sleeping for %v", dur)
select { select {
case _ = <-globalShutdown: case _ = <-rs.globalShutdown:
break retentionLoop break retentionLoop
case _ = <-time.After(dur): case _ = <-time.After(dur):
} }
} }
// Kickoff scan // Kickoff scan
start = time.Now() start = time.Now()
if err := doRetentionScan(ds, maxAge, sleep); err != nil { if err := rs.doScan(); err != nil {
log.Errorf("Error during retention scan: %v", err) log.Errorf("Error during retention scan: %v", err)
} }
// Check for global shutdown // Check for global shutdown
select { select {
case _ = <-globalShutdown: case _ = <-rs.globalShutdown:
break retentionLoop break retentionLoop
default: default:
} }
} }
log.Tracef("Retention scanner shut down") log.Tracef("Retention scanner shut down")
close(retentionShutdown) close(rs.retentionShutdown)
} }
// doRetentionScan does a single pass of all mailboxes looking for messages that can be purged // doScan does a single pass of all mailboxes looking for messages that can be purged
func doRetentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) error { func (rs *RetentionScanner) doScan() error {
log.Tracef("Starting retention scan") log.Tracef("Starting retention scan")
cutoff := time.Now().Add(-1 * maxAge) cutoff := time.Now().Add(-1 * rs.retentionPeriod)
mboxes, err := ds.AllMailboxes() mboxes, err := rs.ds.AllMailboxes()
if err != nil { if err != nil {
return err return err
} }
retained := 0 retained := 0
// Loop over all mailboxes
for _, mb := range mboxes { for _, mb := range mboxes {
messages, err := mb.GetMessages() messages, err := mb.GetMessages()
if err != nil { if err != nil {
return err return err
} }
// Loop over all messages in mailbox
for _, msg := range messages { for _, msg := range messages {
if msg.Date().Before(cutoff) { if msg.Date().Before(cutoff) {
log.Tracef("Purging expired message %v", msg.ID()) log.Tracef("Purging expired message %v", msg.ID())
@@ -114,28 +136,26 @@ func doRetentionScan(ds DataStore, maxAge time.Duration, sleep time.Duration) er
retained++ retained++
} }
} }
// Check for shutdown // Sleep after completing a mailbox
select { select {
case _ = <-globalShutdown: case <-rs.globalShutdown:
log.Tracef("Retention scan aborted due to shutdown") log.Tracef("Retention scan aborted due to shutdown")
return nil return nil
default: case <-time.After(rs.retentionSleep):
// Reduce disk thrashing
} }
// Sleep after completing a mailbox
time.Sleep(sleep)
} }
// Update metrics
setRetentionScanCompleted(time.Now()) setRetentionScanCompleted(time.Now())
expRetainedCurrent.Set(int64(retained)) expRetainedCurrent.Set(int64(retained))
return nil return nil
} }
// RetentionJoin does not retun until the retention scanner has shut down // Join does not retun until the retention scanner has shut down
func RetentionJoin() { func (rs *RetentionScanner) Join() {
if retentionShutdown != nil { if rs.retentionShutdown != nil {
select { select {
case _ = <-retentionShutdown: case <-rs.retentionShutdown:
} }
} }
} }
@@ -143,27 +163,15 @@ func RetentionJoin() {
func setRetentionScanCompleted(t time.Time) { func setRetentionScanCompleted(t time.Time) {
retentionScanCompletedMu.Lock() retentionScanCompletedMu.Lock()
defer retentionScanCompletedMu.Unlock() defer retentionScanCompletedMu.Unlock()
retentionScanCompleted = t retentionScanCompleted = t
} }
func getRetentionScanCompleted() time.Time { func getRetentionScanCompleted() time.Time {
retentionScanCompletedMu.RLock() retentionScanCompletedMu.RLock()
defer retentionScanCompletedMu.RUnlock() defer retentionScanCompletedMu.RUnlock()
return retentionScanCompleted return retentionScanCompleted
} }
func secondsSinceRetentionScanCompleted() interface{} { func secondsSinceRetentionScanCompleted() interface{} {
return time.Since(getRetentionScanCompleted()) / time.Second return time.Since(getRetentionScanCompleted()) / time.Second
} }
func init() {
rm := expvar.NewMap("retention")
rm.Set("SecondsSinceScanCompleted", expvar.Func(secondsSinceRetentionScanCompleted))
rm.Set("DeletesHist", expRetentionDeletesHist)
rm.Set("DeletesTotal", expRetentionDeletesTotal)
rm.Set("Period", expRetentionPeriod)
rm.Set("RetainedHist", expRetainedHist)
rm.Set("RetainedCurrent", expRetainedCurrent)
}

View File

@@ -36,7 +36,12 @@ func TestDoRetentionScan(t *testing.T) {
mb3.On("GetMessages").Return([]Message{new3}, nil) mb3.On("GetMessages").Return([]Message{new3}, nil)
// Test 4 hour retention // Test 4 hour retention
if err := doRetentionScan(mds, 4*time.Hour-time.Minute, 0); err != nil { rs := &RetentionScanner{
ds: mds,
retentionPeriod: 4*time.Hour - time.Minute,
retentionSleep: 0,
}
if err := rs.doScan(); err != nil {
t.Error(err) t.Error(err)
} }