From d9b5e40c8764b4b04264584cf61d23940632d7e7 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Sat, 10 Mar 2018 22:05:10 -0800 Subject: [PATCH] storage: More refactoring for #69 - retention: Start from pkg main instead of server/smtp - file: Remove DefaultStore() constructor - storage: AllMailboxes replaced with VisitMailboxes for #69 - test: Stub VisitMailboxes for #80 --- cmd/inbucket/main.go | 9 ++++-- pkg/server/smtp/listener.go | 34 +++++++++------------ pkg/storage/file/fstore.go | 30 +++++++++---------- pkg/storage/file/fstore_test.go | 21 +++++++++---- pkg/storage/retention.go | 42 ++++++++++++-------------- pkg/storage/retention_test.go | 52 +++++++++++++-------------------- pkg/storage/storage.go | 2 +- pkg/storage/testing.go | 12 ++++---- pkg/test/storage.go | 11 +++++++ 9 files changed, 106 insertions(+), 107 deletions(-) diff --git a/cmd/inbucket/main.go b/cmd/inbucket/main.go index 6e25e44..9f1d116 100644 --- a/cmd/inbucket/main.go +++ b/cmd/inbucket/main.go @@ -19,6 +19,7 @@ import ( "github.com/jhillyerd/inbucket/pkg/server/pop3" "github.com/jhillyerd/inbucket/pkg/server/smtp" "github.com/jhillyerd/inbucket/pkg/server/web" + "github.com/jhillyerd/inbucket/pkg/storage" "github.com/jhillyerd/inbucket/pkg/storage/file" "github.com/jhillyerd/inbucket/pkg/webui" ) @@ -115,8 +116,11 @@ func main() { // Create message hub msgHub := msghub.New(rootCtx, config.GetWebConfig().MonitorHistory) - // Grab our datastore - ds := file.DefaultStore() + // Setup our datastore + dscfg := config.GetDataStoreConfig() + ds := file.New(dscfg) + retentionScanner := storage.NewRetentionScanner(dscfg, ds, shutdownChan) + retentionScanner.Start() // Start HTTP server web.Initialize(config.GetWebConfig(), shutdownChan, ds, msgHub) @@ -160,6 +164,7 @@ signalLoop: go timedExit() smtpServer.Drain() pop3Server.Drain() + retentionScanner.Join() removePIDFile() } diff --git a/pkg/server/smtp/listener.go b/pkg/server/smtp/listener.go index 4586174..83d5697 100644 --- a/pkg/server/smtp/listener.go +++ b/pkg/server/smtp/listener.go @@ -48,10 +48,9 @@ type Server struct { storeMessages bool // Dependencies - dataStore storage.Store // Mailbox/message store - globalShutdown chan bool // Shuts down Inbucket - msgHub *msghub.Hub // Pub/sub for message info - retentionScanner *storage.RetentionScanner // Deletes expired messages + dataStore storage.Store // Mailbox/message store + globalShutdown chan bool // Shuts down Inbucket + msgHub *msghub.Hub // Pub/sub for message info // State listener net.Listener // Incoming network connections @@ -86,18 +85,17 @@ func NewServer( ds storage.Store, msgHub *msghub.Hub) *Server { return &Server{ - host: fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port), - domain: cfg.Domain, - domainNoStore: strings.ToLower(cfg.DomainNoStore), - maxRecips: cfg.MaxRecipients, - maxIdleSeconds: cfg.MaxIdleSeconds, - maxMessageBytes: cfg.MaxMessageBytes, - storeMessages: cfg.StoreMessages, - globalShutdown: globalShutdown, - dataStore: ds, - msgHub: msgHub, - retentionScanner: storage.NewRetentionScanner(ds, globalShutdown), - waitgroup: new(sync.WaitGroup), + host: fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port), + domain: cfg.Domain, + domainNoStore: strings.ToLower(cfg.DomainNoStore), + maxRecips: cfg.MaxRecipients, + maxIdleSeconds: cfg.MaxIdleSeconds, + maxMessageBytes: cfg.MaxMessageBytes, + storeMessages: cfg.StoreMessages, + globalShutdown: globalShutdown, + dataStore: ds, + msgHub: msgHub, + waitgroup: new(sync.WaitGroup), } } @@ -124,9 +122,6 @@ func (s *Server) Start(ctx context.Context) { log.Infof("Messages sent to domain '%v' will be discarded", s.domainNoStore) } - // Start retention scanner - s.retentionScanner.Start() - // Listener go routine go s.serve(ctx) @@ -195,5 +190,4 @@ func (s *Server) Drain() { // Wait for sessions to close s.waitgroup.Wait() log.Tracef("SMTP connections have drained") - s.retentionScanner.Join() } diff --git a/pkg/storage/file/fstore.go b/pkg/storage/file/fstore.go index 0a03873..ddf6d1c 100644 --- a/pkg/storage/file/fstore.go +++ b/pkg/storage/file/fstore.go @@ -74,13 +74,6 @@ func New(cfg config.DataStoreConfig) storage.Store { return &Store{path: path, mailPath: mailPath, messageCap: cfg.MailboxMsgCap} } -// DefaultStore creates a new DataStore object. It uses the inbucket.Config object to -// construct it's path. -func DefaultStore() storage.Store { - cfg := config.GetDataStoreConfig() - return New(cfg) -} - // GetMessage returns the messages in the named mailbox, or an error. func (fs *Store) GetMessage(mailbox, id string) (storage.Message, error) { mb, err := fs.MailboxFor(mailbox) @@ -125,12 +118,12 @@ func (fs *Store) MailboxFor(emailAddress string) (storage.Mailbox, error) { indexPath: indexPath}, nil } -// AllMailboxes returns a slice with all Mailboxes -func (fs *Store) AllMailboxes() ([]storage.Mailbox, error) { - mailboxes := make([]storage.Mailbox, 0, 100) +// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it +// continues to return true. +func (fs *Store) VisitMailboxes(f func([]storage.Message) (cont bool)) error { infos1, err := ioutil.ReadDir(fs.mailPath) if err != nil { - return nil, err + return err } // Loop over level 1 directories for _, inf1 := range infos1 { @@ -138,7 +131,7 @@ func (fs *Store) AllMailboxes() ([]storage.Mailbox, error) { l1 := inf1.Name() infos2, err := ioutil.ReadDir(filepath.Join(fs.mailPath, l1)) if err != nil { - return nil, err + return err } // Loop over level 2 directories for _, inf2 := range infos2 { @@ -146,7 +139,7 @@ func (fs *Store) AllMailboxes() ([]storage.Mailbox, error) { l2 := inf2.Name() infos3, err := ioutil.ReadDir(filepath.Join(fs.mailPath, l1, l2)) if err != nil { - return nil, err + return err } // Loop over mailboxes for _, inf3 := range infos3 { @@ -156,15 +149,20 @@ func (fs *Store) AllMailboxes() ([]storage.Mailbox, error) { idx := filepath.Join(mbpath, indexFileName) mb := &Mailbox{store: fs, dirName: mbdir, path: mbpath, indexPath: idx} - mailboxes = append(mailboxes, mb) + msgs, err := mb.GetMessages() + if err != nil { + return err + } + if !f(msgs) { + return nil + } } } } } } } - - return mailboxes, nil + return nil } // LockFor returns the RWMutex for this mailbox, or an error. diff --git a/pkg/storage/file/fstore_test.go b/pkg/storage/file/fstore_test.go index 26671c4..8247de9 100644 --- a/pkg/storage/file/fstore_test.go +++ b/pkg/storage/file/fstore_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/jhillyerd/inbucket/pkg/config" + "github.com/jhillyerd/inbucket/pkg/storage" "github.com/stretchr/testify/assert" ) @@ -97,12 +98,12 @@ func TestFSDirStructure(t *testing.T) { } } -// Test FileDataStore.AllMailboxes() -func TestFSAllMailboxes(t *testing.T) { +// TestFSVisitMailboxes tests VisitMailboxes +func TestFSVisitMailboxes(t *testing.T) { ds, logbuf := setupDataStore(config.DataStoreConfig{}) defer teardownDataStore(ds) - - for _, name := range []string{"abby", "bill", "christa", "donald", "evelyn"} { + boxes := []string{"abby", "bill", "christa", "donald", "evelyn"} + for _, name := range boxes { // Create day old message date := time.Now().Add(-24 * time.Hour) deliverMessage(ds, name, "Old Message", date) @@ -112,9 +113,17 @@ func TestFSAllMailboxes(t *testing.T) { deliverMessage(ds, name, "New Message", date) } - mboxes, err := ds.AllMailboxes() + seen := 0 + err := ds.VisitMailboxes(func(messages []storage.Message) bool { + seen++ + count := len(messages) + if count != 2 { + t.Errorf("got: %v messages, want: 2", count) + } + return true + }) assert.Nil(t, err) - assert.Equal(t, len(mboxes), 5) + assert.Equal(t, 5, seen) if t.Failed() { // Wait for handler to finish logging diff --git a/pkg/storage/retention.go b/pkg/storage/retention.go index f067843..6269443 100644 --- a/pkg/storage/retention.go +++ b/pkg/storage/retention.go @@ -52,10 +52,12 @@ type RetentionScanner struct { retentionSleep time.Duration } -// NewRetentionScanner launches a go-routine that scans for expired -// messages, following the configured interval -func NewRetentionScanner(ds Store, shutdownChannel chan bool) *RetentionScanner { - cfg := config.GetDataStoreConfig() +// NewRetentionScanner configures a new RententionScanner. +func NewRetentionScanner( + cfg config.DataStoreConfig, + ds Store, + shutdownChannel chan bool, +) *RetentionScanner { rs := &RetentionScanner{ globalShutdown: shutdownChannel, retentionShutdown: make(chan bool), @@ -97,7 +99,7 @@ retentionLoop: } // Kickoff scan start = time.Now() - if err := rs.doScan(); err != nil { + if err := rs.DoScan(); err != nil { log.Errorf("Error during retention scan: %v", err) } // Check for global shutdown @@ -111,28 +113,17 @@ retentionLoop: close(rs.retentionShutdown) } -// doScan does a single pass of all mailboxes looking for messages that can be purged -func (rs *RetentionScanner) doScan() error { +// DoScan does a single pass of all mailboxes looking for messages that can be purged. +func (rs *RetentionScanner) DoScan() error { log.Tracef("Starting retention scan") cutoff := time.Now().Add(-1 * rs.retentionPeriod) - mboxes, err := rs.ds.AllMailboxes() - if err != nil { - return err - } retained := 0 - // Loop over all mailboxes - for _, mb := range mboxes { - messages, err := mb.GetMessages() - if err != nil { - return err - } - // Loop over all messages in mailbox + // Loop over all mailboxes. + err := rs.ds.VisitMailboxes(func(messages []Message) bool { for _, msg := range messages { if msg.Date().Before(cutoff) { log.Tracef("Purging expired message %v", msg.ID()) - err = msg.Delete() - if err != nil { - // Log but don't abort + if err := msg.Delete(); err != nil { log.Errorf("Failed to purge message %v: %v", msg.ID(), err) } else { expRetentionDeletesTotal.Add(1) @@ -141,14 +132,17 @@ func (rs *RetentionScanner) doScan() error { retained++ } } - // Sleep after completing a mailbox select { case <-rs.globalShutdown: log.Tracef("Retention scan aborted due to shutdown") - return nil + return false case <-time.After(rs.retentionSleep): // Reduce disk thrashing } + return true + }) + if err != nil { + return err } // Update metrics setRetentionScanCompleted(time.Now()) @@ -156,7 +150,7 @@ func (rs *RetentionScanner) doScan() error { return nil } -// Join does not retun until the retention scanner has shut down +// Join does not return until the retention scanner has shut down. func (rs *RetentionScanner) Join() { if rs.retentionShutdown != nil { <-rs.retentionShutdown diff --git a/pkg/storage/retention_test.go b/pkg/storage/retention_test.go index ae221e9..bd862cf 100644 --- a/pkg/storage/retention_test.go +++ b/pkg/storage/retention_test.go @@ -1,19 +1,17 @@ -package storage +package storage_test import ( "fmt" "testing" "time" + + "github.com/jhillyerd/inbucket/pkg/config" + "github.com/jhillyerd/inbucket/pkg/storage" + "github.com/jhillyerd/inbucket/pkg/test" ) func TestDoRetentionScan(t *testing.T) { - // Create mock objects - mds := &MockDataStore{} - - mb1 := &MockMailbox{} - mb2 := &MockMailbox{} - mb3 := &MockMailbox{} - + ds := test.NewStore() // Mockup some different aged messages (num is in hours) new1 := mockMessage(0) new2 := mockMessage(1) @@ -21,36 +19,26 @@ func TestDoRetentionScan(t *testing.T) { old1 := mockMessage(4) old2 := mockMessage(12) old3 := mockMessage(24) - - // First it should ask for all mailboxes - mds.On("AllMailboxes").Return([]Mailbox{mb1, mb2, mb3}, nil) - - // Then for all messages on each box - mb1.On("GetMessages").Return([]Message{new1, old1, old2}, nil) - mb2.On("GetMessages").Return([]Message{old3, new2}, nil) - mb3.On("GetMessages").Return([]Message{new3}, nil) - + ds.AddMessage("mb1", new1) + ds.AddMessage("mb1", old1) + ds.AddMessage("mb1", old2) + ds.AddMessage("mb2", old3) + ds.AddMessage("mb2", new2) + ds.AddMessage("mb3", new3) // Test 4 hour retention - rs := &RetentionScanner{ - ds: mds, - retentionPeriod: 4*time.Hour - time.Minute, - retentionSleep: 0, + cfg := config.DataStoreConfig{ + RetentionMinutes: 239, + RetentionSleep: 0, } - if err := rs.doScan(); err != nil { + shutdownChan := make(chan bool) + rs := storage.NewRetentionScanner(cfg, ds, shutdownChan) + if err := rs.DoScan(); err != nil { t.Error(err) } - - // Check our assertions - mds.AssertExpectations(t) - mb1.AssertExpectations(t) - mb2.AssertExpectations(t) - mb3.AssertExpectations(t) - // Delete should not have been called on new messages new1.AssertNotCalled(t, "Delete") new2.AssertNotCalled(t, "Delete") new3.AssertNotCalled(t, "Delete") - // Delete should have been called once on old messages old1.AssertNumberOfCalls(t, "Delete", 1) old2.AssertNumberOfCalls(t, "Delete", 1) @@ -58,8 +46,8 @@ func TestDoRetentionScan(t *testing.T) { } // Make a MockMessage of a specific age -func mockMessage(ageHours int) *MockMessage { - msg := &MockMessage{} +func mockMessage(ageHours int) *storage.MockMessage { + msg := &storage.MockMessage{} msg.On("ID").Return(fmt.Sprintf("MSG[age=%vh]", ageHours)) msg.On("Date").Return(time.Now().Add(time.Duration(ageHours*-1) * time.Hour)) msg.On("Delete").Return(nil) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 51620ed..b18bf33 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -24,8 +24,8 @@ type Store interface { GetMessage(mailbox string, id string) (Message, error) GetMessages(mailbox string) ([]Message, error) PurgeMessages(mailbox string) error + VisitMailboxes(f func([]Message) (cont bool)) error MailboxFor(emailAddress string) (Mailbox, error) - AllMailboxes() ([]Mailbox, error) // LockFor is a temporary hack to fix #77 until Datastore revamp LockFor(emailAddress string) (*sync.RWMutex, error) } diff --git a/pkg/storage/testing.go b/pkg/storage/testing.go index 8c51b3f..64d7bf0 100644 --- a/pkg/storage/testing.go +++ b/pkg/storage/testing.go @@ -39,17 +39,17 @@ func (m *MockDataStore) MailboxFor(name string) (Mailbox, error) { return args.Get(0).(Mailbox), args.Error(1) } -// AllMailboxes mock function -func (m *MockDataStore) AllMailboxes() ([]Mailbox, error) { - args := m.Called() - return args.Get(0).([]Mailbox), args.Error(1) -} - // LockFor mock function returns a new RWMutex, never errors. func (m *MockDataStore) LockFor(name string) (*sync.RWMutex, error) { return &sync.RWMutex{}, nil } +// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it +// continues to return true. +func (m *MockDataStore) VisitMailboxes(f func([]Message) (cont bool)) error { + return nil +} + // MockMailbox is a shared mock for unit testing type MockMailbox struct { mock.Mock diff --git a/pkg/test/storage.go b/pkg/test/storage.go index 29c24ce..3f0fcbd 100644 --- a/pkg/test/storage.go +++ b/pkg/test/storage.go @@ -45,3 +45,14 @@ func (s *StoreStub) GetMessages(mailbox string) ([]storage.Message, error) { } return s.mailboxes[mailbox], nil } + +// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it +// continues to return true. +func (s *StoreStub) VisitMailboxes(f func([]storage.Message) (cont bool)) error { + for _, v := range s.mailboxes { + if !f(v) { + return nil + } + } + return nil +}