1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 17:47:03 +00:00

storage: More refactoring for #69

- retention: Start from pkg main instead of server/smtp
- file: Remove DefaultStore() constructor
- storage: AllMailboxes replaced with VisitMailboxes for #69
- test: Stub VisitMailboxes for #80
This commit is contained in:
James Hillyerd
2018-03-10 22:05:10 -08:00
parent 9c18f1fb30
commit d9b5e40c87
9 changed files with 106 additions and 107 deletions

View File

@@ -19,6 +19,7 @@ import (
"github.com/jhillyerd/inbucket/pkg/server/pop3" "github.com/jhillyerd/inbucket/pkg/server/pop3"
"github.com/jhillyerd/inbucket/pkg/server/smtp" "github.com/jhillyerd/inbucket/pkg/server/smtp"
"github.com/jhillyerd/inbucket/pkg/server/web" "github.com/jhillyerd/inbucket/pkg/server/web"
"github.com/jhillyerd/inbucket/pkg/storage"
"github.com/jhillyerd/inbucket/pkg/storage/file" "github.com/jhillyerd/inbucket/pkg/storage/file"
"github.com/jhillyerd/inbucket/pkg/webui" "github.com/jhillyerd/inbucket/pkg/webui"
) )
@@ -115,8 +116,11 @@ func main() {
// Create message hub // Create message hub
msgHub := msghub.New(rootCtx, config.GetWebConfig().MonitorHistory) msgHub := msghub.New(rootCtx, config.GetWebConfig().MonitorHistory)
// Grab our datastore // Setup our datastore
ds := file.DefaultStore() dscfg := config.GetDataStoreConfig()
ds := file.New(dscfg)
retentionScanner := storage.NewRetentionScanner(dscfg, ds, shutdownChan)
retentionScanner.Start()
// Start HTTP server // Start HTTP server
web.Initialize(config.GetWebConfig(), shutdownChan, ds, msgHub) web.Initialize(config.GetWebConfig(), shutdownChan, ds, msgHub)
@@ -160,6 +164,7 @@ signalLoop:
go timedExit() go timedExit()
smtpServer.Drain() smtpServer.Drain()
pop3Server.Drain() pop3Server.Drain()
retentionScanner.Join()
removePIDFile() removePIDFile()
} }

View File

@@ -48,10 +48,9 @@ type Server struct {
storeMessages bool storeMessages bool
// Dependencies // Dependencies
dataStore storage.Store // Mailbox/message store dataStore storage.Store // Mailbox/message store
globalShutdown chan bool // Shuts down Inbucket globalShutdown chan bool // Shuts down Inbucket
msgHub *msghub.Hub // Pub/sub for message info msgHub *msghub.Hub // Pub/sub for message info
retentionScanner *storage.RetentionScanner // Deletes expired messages
// State // State
listener net.Listener // Incoming network connections listener net.Listener // Incoming network connections
@@ -86,18 +85,17 @@ func NewServer(
ds storage.Store, ds storage.Store,
msgHub *msghub.Hub) *Server { msgHub *msghub.Hub) *Server {
return &Server{ return &Server{
host: fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port), host: fmt.Sprintf("%v:%v", cfg.IP4address, cfg.IP4port),
domain: cfg.Domain, domain: cfg.Domain,
domainNoStore: strings.ToLower(cfg.DomainNoStore), domainNoStore: strings.ToLower(cfg.DomainNoStore),
maxRecips: cfg.MaxRecipients, maxRecips: cfg.MaxRecipients,
maxIdleSeconds: cfg.MaxIdleSeconds, maxIdleSeconds: cfg.MaxIdleSeconds,
maxMessageBytes: cfg.MaxMessageBytes, maxMessageBytes: cfg.MaxMessageBytes,
storeMessages: cfg.StoreMessages, storeMessages: cfg.StoreMessages,
globalShutdown: globalShutdown, globalShutdown: globalShutdown,
dataStore: ds, dataStore: ds,
msgHub: msgHub, msgHub: msgHub,
retentionScanner: storage.NewRetentionScanner(ds, globalShutdown), waitgroup: new(sync.WaitGroup),
waitgroup: new(sync.WaitGroup),
} }
} }
@@ -124,9 +122,6 @@ func (s *Server) Start(ctx context.Context) {
log.Infof("Messages sent to domain '%v' will be discarded", s.domainNoStore) log.Infof("Messages sent to domain '%v' will be discarded", s.domainNoStore)
} }
// Start retention scanner
s.retentionScanner.Start()
// Listener go routine // Listener go routine
go s.serve(ctx) go s.serve(ctx)
@@ -195,5 +190,4 @@ func (s *Server) Drain() {
// Wait for sessions to close // Wait for sessions to close
s.waitgroup.Wait() s.waitgroup.Wait()
log.Tracef("SMTP connections have drained") log.Tracef("SMTP connections have drained")
s.retentionScanner.Join()
} }

View File

@@ -74,13 +74,6 @@ func New(cfg config.DataStoreConfig) storage.Store {
return &Store{path: path, mailPath: mailPath, messageCap: cfg.MailboxMsgCap} return &Store{path: path, mailPath: mailPath, messageCap: cfg.MailboxMsgCap}
} }
// DefaultStore creates a new DataStore object. It uses the inbucket.Config object to
// construct it's path.
func DefaultStore() storage.Store {
cfg := config.GetDataStoreConfig()
return New(cfg)
}
// GetMessage returns the messages in the named mailbox, or an error. // GetMessage returns the messages in the named mailbox, or an error.
func (fs *Store) GetMessage(mailbox, id string) (storage.Message, error) { func (fs *Store) GetMessage(mailbox, id string) (storage.Message, error) {
mb, err := fs.MailboxFor(mailbox) mb, err := fs.MailboxFor(mailbox)
@@ -125,12 +118,12 @@ func (fs *Store) MailboxFor(emailAddress string) (storage.Mailbox, error) {
indexPath: indexPath}, nil indexPath: indexPath}, nil
} }
// AllMailboxes returns a slice with all Mailboxes // VisitMailboxes accepts a function that will be called with the messages in each mailbox while it
func (fs *Store) AllMailboxes() ([]storage.Mailbox, error) { // continues to return true.
mailboxes := make([]storage.Mailbox, 0, 100) func (fs *Store) VisitMailboxes(f func([]storage.Message) (cont bool)) error {
infos1, err := ioutil.ReadDir(fs.mailPath) infos1, err := ioutil.ReadDir(fs.mailPath)
if err != nil { if err != nil {
return nil, err return err
} }
// Loop over level 1 directories // Loop over level 1 directories
for _, inf1 := range infos1 { for _, inf1 := range infos1 {
@@ -138,7 +131,7 @@ func (fs *Store) AllMailboxes() ([]storage.Mailbox, error) {
l1 := inf1.Name() l1 := inf1.Name()
infos2, err := ioutil.ReadDir(filepath.Join(fs.mailPath, l1)) infos2, err := ioutil.ReadDir(filepath.Join(fs.mailPath, l1))
if err != nil { if err != nil {
return nil, err return err
} }
// Loop over level 2 directories // Loop over level 2 directories
for _, inf2 := range infos2 { for _, inf2 := range infos2 {
@@ -146,7 +139,7 @@ func (fs *Store) AllMailboxes() ([]storage.Mailbox, error) {
l2 := inf2.Name() l2 := inf2.Name()
infos3, err := ioutil.ReadDir(filepath.Join(fs.mailPath, l1, l2)) infos3, err := ioutil.ReadDir(filepath.Join(fs.mailPath, l1, l2))
if err != nil { if err != nil {
return nil, err return err
} }
// Loop over mailboxes // Loop over mailboxes
for _, inf3 := range infos3 { for _, inf3 := range infos3 {
@@ -156,15 +149,20 @@ func (fs *Store) AllMailboxes() ([]storage.Mailbox, error) {
idx := filepath.Join(mbpath, indexFileName) idx := filepath.Join(mbpath, indexFileName)
mb := &Mailbox{store: fs, dirName: mbdir, path: mbpath, mb := &Mailbox{store: fs, dirName: mbdir, path: mbpath,
indexPath: idx} indexPath: idx}
mailboxes = append(mailboxes, mb) msgs, err := mb.GetMessages()
if err != nil {
return err
}
if !f(msgs) {
return nil
}
} }
} }
} }
} }
} }
} }
return nil
return mailboxes, nil
} }
// LockFor returns the RWMutex for this mailbox, or an error. // LockFor returns the RWMutex for this mailbox, or an error.

View File

@@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/jhillyerd/inbucket/pkg/config" "github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/storage"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@@ -97,12 +98,12 @@ func TestFSDirStructure(t *testing.T) {
} }
} }
// Test FileDataStore.AllMailboxes() // TestFSVisitMailboxes tests VisitMailboxes
func TestFSAllMailboxes(t *testing.T) { func TestFSVisitMailboxes(t *testing.T) {
ds, logbuf := setupDataStore(config.DataStoreConfig{}) ds, logbuf := setupDataStore(config.DataStoreConfig{})
defer teardownDataStore(ds) defer teardownDataStore(ds)
boxes := []string{"abby", "bill", "christa", "donald", "evelyn"}
for _, name := range []string{"abby", "bill", "christa", "donald", "evelyn"} { for _, name := range boxes {
// Create day old message // Create day old message
date := time.Now().Add(-24 * time.Hour) date := time.Now().Add(-24 * time.Hour)
deliverMessage(ds, name, "Old Message", date) deliverMessage(ds, name, "Old Message", date)
@@ -112,9 +113,17 @@ func TestFSAllMailboxes(t *testing.T) {
deliverMessage(ds, name, "New Message", date) deliverMessage(ds, name, "New Message", date)
} }
mboxes, err := ds.AllMailboxes() seen := 0
err := ds.VisitMailboxes(func(messages []storage.Message) bool {
seen++
count := len(messages)
if count != 2 {
t.Errorf("got: %v messages, want: 2", count)
}
return true
})
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, len(mboxes), 5) assert.Equal(t, 5, seen)
if t.Failed() { if t.Failed() {
// Wait for handler to finish logging // Wait for handler to finish logging

View File

@@ -52,10 +52,12 @@ type RetentionScanner struct {
retentionSleep time.Duration retentionSleep time.Duration
} }
// NewRetentionScanner launches a go-routine that scans for expired // NewRetentionScanner configures a new RententionScanner.
// messages, following the configured interval func NewRetentionScanner(
func NewRetentionScanner(ds Store, shutdownChannel chan bool) *RetentionScanner { cfg config.DataStoreConfig,
cfg := config.GetDataStoreConfig() ds Store,
shutdownChannel chan bool,
) *RetentionScanner {
rs := &RetentionScanner{ rs := &RetentionScanner{
globalShutdown: shutdownChannel, globalShutdown: shutdownChannel,
retentionShutdown: make(chan bool), retentionShutdown: make(chan bool),
@@ -97,7 +99,7 @@ retentionLoop:
} }
// Kickoff scan // Kickoff scan
start = time.Now() start = time.Now()
if err := rs.doScan(); err != nil { if err := rs.DoScan(); err != nil {
log.Errorf("Error during retention scan: %v", err) log.Errorf("Error during retention scan: %v", err)
} }
// Check for global shutdown // Check for global shutdown
@@ -111,28 +113,17 @@ retentionLoop:
close(rs.retentionShutdown) close(rs.retentionShutdown)
} }
// doScan does a single pass of all mailboxes looking for messages that can be purged // DoScan does a single pass of all mailboxes looking for messages that can be purged.
func (rs *RetentionScanner) doScan() error { func (rs *RetentionScanner) DoScan() error {
log.Tracef("Starting retention scan") log.Tracef("Starting retention scan")
cutoff := time.Now().Add(-1 * rs.retentionPeriod) cutoff := time.Now().Add(-1 * rs.retentionPeriod)
mboxes, err := rs.ds.AllMailboxes()
if err != nil {
return err
}
retained := 0 retained := 0
// Loop over all mailboxes // Loop over all mailboxes.
for _, mb := range mboxes { err := rs.ds.VisitMailboxes(func(messages []Message) bool {
messages, err := mb.GetMessages()
if err != nil {
return err
}
// Loop over all messages in mailbox
for _, msg := range messages { for _, msg := range messages {
if msg.Date().Before(cutoff) { if msg.Date().Before(cutoff) {
log.Tracef("Purging expired message %v", msg.ID()) log.Tracef("Purging expired message %v", msg.ID())
err = msg.Delete() if err := msg.Delete(); err != nil {
if err != nil {
// Log but don't abort
log.Errorf("Failed to purge message %v: %v", msg.ID(), err) log.Errorf("Failed to purge message %v: %v", msg.ID(), err)
} else { } else {
expRetentionDeletesTotal.Add(1) expRetentionDeletesTotal.Add(1)
@@ -141,14 +132,17 @@ func (rs *RetentionScanner) doScan() error {
retained++ retained++
} }
} }
// Sleep after completing a mailbox
select { select {
case <-rs.globalShutdown: case <-rs.globalShutdown:
log.Tracef("Retention scan aborted due to shutdown") log.Tracef("Retention scan aborted due to shutdown")
return nil return false
case <-time.After(rs.retentionSleep): case <-time.After(rs.retentionSleep):
// Reduce disk thrashing // Reduce disk thrashing
} }
return true
})
if err != nil {
return err
} }
// Update metrics // Update metrics
setRetentionScanCompleted(time.Now()) setRetentionScanCompleted(time.Now())
@@ -156,7 +150,7 @@ func (rs *RetentionScanner) doScan() error {
return nil return nil
} }
// Join does not retun until the retention scanner has shut down // Join does not return until the retention scanner has shut down.
func (rs *RetentionScanner) Join() { func (rs *RetentionScanner) Join() {
if rs.retentionShutdown != nil { if rs.retentionShutdown != nil {
<-rs.retentionShutdown <-rs.retentionShutdown

View File

@@ -1,19 +1,17 @@
package storage package storage_test
import ( import (
"fmt" "fmt"
"testing" "testing"
"time" "time"
"github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/storage"
"github.com/jhillyerd/inbucket/pkg/test"
) )
func TestDoRetentionScan(t *testing.T) { func TestDoRetentionScan(t *testing.T) {
// Create mock objects ds := test.NewStore()
mds := &MockDataStore{}
mb1 := &MockMailbox{}
mb2 := &MockMailbox{}
mb3 := &MockMailbox{}
// Mockup some different aged messages (num is in hours) // Mockup some different aged messages (num is in hours)
new1 := mockMessage(0) new1 := mockMessage(0)
new2 := mockMessage(1) new2 := mockMessage(1)
@@ -21,36 +19,26 @@ func TestDoRetentionScan(t *testing.T) {
old1 := mockMessage(4) old1 := mockMessage(4)
old2 := mockMessage(12) old2 := mockMessage(12)
old3 := mockMessage(24) old3 := mockMessage(24)
ds.AddMessage("mb1", new1)
// First it should ask for all mailboxes ds.AddMessage("mb1", old1)
mds.On("AllMailboxes").Return([]Mailbox{mb1, mb2, mb3}, nil) ds.AddMessage("mb1", old2)
ds.AddMessage("mb2", old3)
// Then for all messages on each box ds.AddMessage("mb2", new2)
mb1.On("GetMessages").Return([]Message{new1, old1, old2}, nil) ds.AddMessage("mb3", new3)
mb2.On("GetMessages").Return([]Message{old3, new2}, nil)
mb3.On("GetMessages").Return([]Message{new3}, nil)
// Test 4 hour retention // Test 4 hour retention
rs := &RetentionScanner{ cfg := config.DataStoreConfig{
ds: mds, RetentionMinutes: 239,
retentionPeriod: 4*time.Hour - time.Minute, RetentionSleep: 0,
retentionSleep: 0,
} }
if err := rs.doScan(); err != nil { shutdownChan := make(chan bool)
rs := storage.NewRetentionScanner(cfg, ds, shutdownChan)
if err := rs.DoScan(); err != nil {
t.Error(err) t.Error(err)
} }
// Check our assertions
mds.AssertExpectations(t)
mb1.AssertExpectations(t)
mb2.AssertExpectations(t)
mb3.AssertExpectations(t)
// Delete should not have been called on new messages // Delete should not have been called on new messages
new1.AssertNotCalled(t, "Delete") new1.AssertNotCalled(t, "Delete")
new2.AssertNotCalled(t, "Delete") new2.AssertNotCalled(t, "Delete")
new3.AssertNotCalled(t, "Delete") new3.AssertNotCalled(t, "Delete")
// Delete should have been called once on old messages // Delete should have been called once on old messages
old1.AssertNumberOfCalls(t, "Delete", 1) old1.AssertNumberOfCalls(t, "Delete", 1)
old2.AssertNumberOfCalls(t, "Delete", 1) old2.AssertNumberOfCalls(t, "Delete", 1)
@@ -58,8 +46,8 @@ func TestDoRetentionScan(t *testing.T) {
} }
// Make a MockMessage of a specific age // Make a MockMessage of a specific age
func mockMessage(ageHours int) *MockMessage { func mockMessage(ageHours int) *storage.MockMessage {
msg := &MockMessage{} msg := &storage.MockMessage{}
msg.On("ID").Return(fmt.Sprintf("MSG[age=%vh]", ageHours)) msg.On("ID").Return(fmt.Sprintf("MSG[age=%vh]", ageHours))
msg.On("Date").Return(time.Now().Add(time.Duration(ageHours*-1) * time.Hour)) msg.On("Date").Return(time.Now().Add(time.Duration(ageHours*-1) * time.Hour))
msg.On("Delete").Return(nil) msg.On("Delete").Return(nil)

View File

@@ -24,8 +24,8 @@ type Store interface {
GetMessage(mailbox string, id string) (Message, error) GetMessage(mailbox string, id string) (Message, error)
GetMessages(mailbox string) ([]Message, error) GetMessages(mailbox string) ([]Message, error)
PurgeMessages(mailbox string) error PurgeMessages(mailbox string) error
VisitMailboxes(f func([]Message) (cont bool)) error
MailboxFor(emailAddress string) (Mailbox, error) MailboxFor(emailAddress string) (Mailbox, error)
AllMailboxes() ([]Mailbox, error)
// LockFor is a temporary hack to fix #77 until Datastore revamp // LockFor is a temporary hack to fix #77 until Datastore revamp
LockFor(emailAddress string) (*sync.RWMutex, error) LockFor(emailAddress string) (*sync.RWMutex, error)
} }

View File

@@ -39,17 +39,17 @@ func (m *MockDataStore) MailboxFor(name string) (Mailbox, error) {
return args.Get(0).(Mailbox), args.Error(1) return args.Get(0).(Mailbox), args.Error(1)
} }
// AllMailboxes mock function
func (m *MockDataStore) AllMailboxes() ([]Mailbox, error) {
args := m.Called()
return args.Get(0).([]Mailbox), args.Error(1)
}
// LockFor mock function returns a new RWMutex, never errors. // LockFor mock function returns a new RWMutex, never errors.
func (m *MockDataStore) LockFor(name string) (*sync.RWMutex, error) { func (m *MockDataStore) LockFor(name string) (*sync.RWMutex, error) {
return &sync.RWMutex{}, nil return &sync.RWMutex{}, nil
} }
// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it
// continues to return true.
func (m *MockDataStore) VisitMailboxes(f func([]Message) (cont bool)) error {
return nil
}
// MockMailbox is a shared mock for unit testing // MockMailbox is a shared mock for unit testing
type MockMailbox struct { type MockMailbox struct {
mock.Mock mock.Mock

View File

@@ -45,3 +45,14 @@ func (s *StoreStub) GetMessages(mailbox string) ([]storage.Message, error) {
} }
return s.mailboxes[mailbox], nil return s.mailboxes[mailbox], nil
} }
// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it
// continues to return true.
func (s *StoreStub) VisitMailboxes(f func([]storage.Message) (cont bool)) error {
for _, v := range s.mailboxes {
if !f(v) {
return nil
}
}
return nil
}