diff --git a/pkg/rest/apiv1_controller.go b/pkg/rest/apiv1_controller.go index 9d7d1c6..6424648 100644 --- a/pkg/rest/apiv1_controller.go +++ b/pkg/rest/apiv1_controller.go @@ -158,18 +158,14 @@ func MailboxDeleteV1(w http.ResponseWriter, req *http.Request, ctx *web.Context) if err != nil { return err } - message, err := ctx.DataStore.GetMessage(name, id) + err = ctx.DataStore.RemoveMessage(name, id) if err == storage.ErrNotExist { http.NotFound(w, req) return nil } if err != nil { // This doesn't indicate missing, likely an IO error - return fmt.Errorf("GetMessage(%q) failed: %v", id, err) - } - err = message.Delete() - if err != nil { - return fmt.Errorf("Delete(%q) failed: %v", id, err) + return fmt.Errorf("RemoveMessage(%q) failed: %v", id, err) } return web.RenderJSON(w, "OK") diff --git a/pkg/server/pop3/handler.go b/pkg/server/pop3/handler.go index 5e3c665..c022619 100644 --- a/pkg/server/pop3/handler.go +++ b/pkg/server/pop3/handler.go @@ -57,17 +57,17 @@ var commands = map[string]bool{ // Session defines an active POP3 session type Session struct { - server *Server // Reference to the server we belong to - id int // Session ID number - conn net.Conn // Our network connection - remoteHost string // IP address of client - sendError error // Used to bail out of read loop on send error - state State // Current session state - reader *bufio.Reader // Buffered reader for our net conn - user string // Mailbox name - messages []storage.Message // Slice of messages in mailbox - retain []bool // Messages to retain upon UPDATE (true=retain) - msgCount int // Number of undeleted messages + server *Server // Reference to the server we belong to + id int // Session ID number + conn net.Conn // Our network connection + remoteHost string // IP address of client + sendError error // Used to bail out of read loop on send error + state State // Current session state + reader *bufio.Reader // Buffered reader for our net conn + user string // Mailbox name + messages []storage.StoreMessage // Slice of messages in mailbox + retain []bool // Messages to retain upon UPDATE (true=retain) + msgCount int // Number of undeleted messages } // NewSession creates a new POP3 session @@ -415,7 +415,7 @@ func (ses *Session) transactionHandler(cmd string, args []string) { } // Send the contents of the message to the client -func (ses *Session) sendMessage(msg storage.Message) { +func (ses *Session) sendMessage(msg storage.StoreMessage) { reader, err := msg.RawReader() if err != nil { ses.logError("Failed to read message for RETR command") @@ -448,7 +448,7 @@ func (ses *Session) sendMessage(msg storage.Message) { } // Send the headers plus the top N lines to the client -func (ses *Session) sendMessageTop(msg storage.Message, lineCount int) { +func (ses *Session) sendMessageTop(msg storage.StoreMessage, lineCount int) { reader, err := msg.RawReader() if err != nil { ses.logError("Failed to read message for RETR command") @@ -522,7 +522,7 @@ func (ses *Session) processDeletes() { for i, msg := range ses.messages { if !ses.retain[i] { ses.logTrace("Deleting %v", msg) - if err := msg.Delete(); err != nil { + if err := ses.server.dataStore.RemoveMessage(ses.user, msg.ID()); err != nil { ses.logWarn("Error deleting %v: %v", msg, err) } } diff --git a/pkg/server/smtp/handler_test.go b/pkg/server/smtp/handler_test.go index 29098ac..d516bf3 100644 --- a/pkg/server/smtp/handler_test.go +++ b/pkg/server/smtp/handler_test.go @@ -16,6 +16,7 @@ import ( "github.com/jhillyerd/inbucket/pkg/config" "github.com/jhillyerd/inbucket/pkg/msghub" "github.com/jhillyerd/inbucket/pkg/storage" + "github.com/jhillyerd/inbucket/pkg/test" ) type scriptStep struct { @@ -25,10 +26,8 @@ type scriptStep struct { // Test commands in GREET state func TestGreetState(t *testing.T) { - // Setup mock objects - mds := &storage.MockDataStore{} - - server, logbuf, teardown := setupSMTPServer(mds) + ds := test.NewStore() + server, logbuf, teardown := setupSMTPServer(ds) defer teardown() // Test out some mangled HELOs @@ -82,10 +81,8 @@ func TestGreetState(t *testing.T) { // Test commands in READY state func TestReadyState(t *testing.T) { - // Setup mock objects - mds := &storage.MockDataStore{} - - server, logbuf, teardown := setupSMTPServer(mds) + ds := test.NewStore() + server, logbuf, teardown := setupSMTPServer(ds) defer teardown() // Test out some mangled READY commands diff --git a/pkg/storage/file/fmessage.go b/pkg/storage/file/fmessage.go index b325678..26c1612 100644 --- a/pkg/storage/file/fmessage.go +++ b/pkg/storage/file/fmessage.go @@ -34,7 +34,7 @@ type Message struct { // newMessage creates a new FileMessage object and sets the Date and ID fields. // It will also delete messages over messageCap if configured. -func (mb *mbox) newMessage() (storage.Message, error) { +func (mb *mbox) newMessage() (storage.StoreMessage, error) { // Load index if !mb.indexLoaded { if err := mb.readIndex(); err != nil { @@ -45,7 +45,7 @@ func (mb *mbox) newMessage() (storage.Message, error) { if mb.store.messageCap > 0 { for len(mb.messages) >= mb.store.messageCap { log.Infof("Mailbox %q over configured message cap", mb.name) - if err := mb.messages[0].Delete(); err != nil { + if err := mb.removeMessage(mb.messages[0].ID()); err != nil { log.Errorf("Error deleting message: %s", err) } } @@ -55,6 +55,11 @@ func (mb *mbox) newMessage() (storage.Message, error) { return &Message{mailbox: mb, Fid: id, Fdate: date, writable: true}, nil } +// Mailbox returns the name of the mailbox this message resides in. +func (m *Message) Mailbox() string { + return m.mailbox.name +} + // ID gets the ID of the Message func (m *Message) ID() string { return m.Fid @@ -240,29 +245,3 @@ func (m *Message) Close() error { m.mailbox.messages = append(m.mailbox.messages, m) return m.mailbox.writeIndex() } - -// Delete this Message from disk by removing it from the index and deleting the -// raw files. -func (m *Message) Delete() error { - messages := m.mailbox.messages - for i, mm := range messages { - if m == mm { - // Slice around message we are deleting - m.mailbox.messages = append(messages[:i], messages[i+1:]...) - break - } - } - if err := m.mailbox.writeIndex(); err != nil { - return err - } - - if len(m.mailbox.messages) == 0 { - // This was the last message, thus writeIndex() has removed the entire - // directory; we don't need to delete the raw file. - return nil - } - - // There are still messages in the index - log.Tracef("Deleting %v", m.rawPath()) - return os.Remove(m.rawPath()) -} diff --git a/pkg/storage/file/fstore.go b/pkg/storage/file/fstore.go index fb09ad0..3e5f9bb 100644 --- a/pkg/storage/file/fstore.go +++ b/pkg/storage/file/fstore.go @@ -75,7 +75,7 @@ func New(cfg config.DataStoreConfig) storage.Store { } // GetMessage returns the messages in the named mailbox, or an error. -func (fs *Store) GetMessage(mailbox, id string) (storage.Message, error) { +func (fs *Store) GetMessage(mailbox, id string) (storage.StoreMessage, error) { mb, err := fs.mbox(mailbox) if err != nil { return nil, err @@ -84,7 +84,7 @@ func (fs *Store) GetMessage(mailbox, id string) (storage.Message, error) { } // GetMessages returns the messages in the named mailbox, or an error. -func (fs *Store) GetMessages(mailbox string) ([]storage.Message, error) { +func (fs *Store) GetMessages(mailbox string) ([]storage.StoreMessage, error) { mb, err := fs.mbox(mailbox) if err != nil { return nil, err @@ -92,6 +92,15 @@ func (fs *Store) GetMessages(mailbox string) ([]storage.Message, error) { return mb.getMessages() } +// RemoveMessage deletes a message by ID from the specified mailbox. +func (fs *Store) RemoveMessage(mailbox, id string) error { + mb, err := fs.mbox(mailbox) + if err != nil { + return err + } + return mb.removeMessage(id) +} + // PurgeMessages deletes all messages in the named mailbox, or returns an error. func (fs *Store) PurgeMessages(mailbox string) error { mb, err := fs.mbox(mailbox) @@ -103,7 +112,7 @@ func (fs *Store) PurgeMessages(mailbox string) error { // VisitMailboxes accepts a function that will be called with the messages in each mailbox while it // continues to return true. -func (fs *Store) VisitMailboxes(f func([]storage.Message) (cont bool)) error { +func (fs *Store) VisitMailboxes(f func([]storage.StoreMessage) (cont bool)) error { infos1, err := ioutil.ReadDir(fs.mailPath) if err != nil { return err @@ -159,7 +168,7 @@ func (fs *Store) LockFor(emailAddress string) (*sync.RWMutex, error) { } // NewMessage is temproary until #69 MessageData refactor -func (fs *Store) NewMessage(mailbox string) (storage.Message, error) { +func (fs *Store) NewMessage(mailbox string) (storage.StoreMessage, error) { mb, err := fs.mbox(mailbox) if err != nil { return nil, err @@ -196,13 +205,13 @@ type mbox struct { // getMessages scans the mailbox directory for .gob files and decodes them into // a slice of Message objects. -func (mb *mbox) getMessages() ([]storage.Message, error) { +func (mb *mbox) getMessages() ([]storage.StoreMessage, error) { if !mb.indexLoaded { if err := mb.readIndex(); err != nil { return nil, err } } - messages := make([]storage.Message, len(mb.messages)) + messages := make([]storage.StoreMessage, len(mb.messages)) for i, m := range mb.messages { messages[i] = m } @@ -210,7 +219,7 @@ func (mb *mbox) getMessages() ([]storage.Message, error) { } // getMessage decodes a single message by ID and returns a Message object. -func (mb *mbox) getMessage(id string) (storage.Message, error) { +func (mb *mbox) getMessage(id string) (storage.StoreMessage, error) { if !mb.indexLoaded { if err := mb.readIndex(); err != nil { return nil, err @@ -227,6 +236,38 @@ func (mb *mbox) getMessage(id string) (storage.Message, error) { return nil, storage.ErrNotExist } +// removeMessage deletes the message off disk and removes it from the index. +func (mb *mbox) removeMessage(id string) error { + if !mb.indexLoaded { + if err := mb.readIndex(); err != nil { + return err + } + } + var msg *Message + for i, m := range mb.messages { + if id == m.ID() { + msg = m + // Slice around message we are deleting + mb.messages = append(mb.messages[:i], mb.messages[i+1:]...) + break + } + } + if msg == nil { + return storage.ErrNotExist + } + if err := mb.writeIndex(); err != nil { + return err + } + if len(mb.messages) == 0 { + // This was the last message, thus writeIndex() has removed the entire + // directory; we don't need to delete the raw file. + return nil + } + // There are still messages in the index + log.Tracef("Deleting %v", msg.rawPath()) + return os.Remove(msg.rawPath()) +} + // purge deletes all messages in this mailbox. func (mb *mbox) purge() error { mb.messages = mb.messages[:0] @@ -256,14 +297,18 @@ func (mb *mbox) readIndex() error { log.Errorf("Failed to close %q: %v", mb.indexPath, err) } }() - // Decode gob data dec := gob.NewDecoder(bufio.NewReader(file)) + name := "" + if err = dec.Decode(&name); err != nil { + return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err) + } + mb.name = name for { - msg := new(Message) + // Load messages until EOF + msg := &Message{} if err = dec.Decode(msg); err != nil { if err == io.EOF { - // It's OK to get an EOF here break } return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err) @@ -271,7 +316,6 @@ func (mb *mbox) readIndex() error { msg.mailbox = mb mb.messages = append(mb.messages, msg) } - mb.indexLoaded = true return nil } @@ -294,9 +338,12 @@ func (mb *mbox) writeIndex() error { writer := bufio.NewWriter(file) // Write each message and then flush enc := gob.NewEncoder(writer) + if err = enc.Encode(mb.name); err != nil { + _ = file.Close() + return err + } for _, m := range mb.messages { - err = enc.Encode(m) - if err != nil { + if err = enc.Encode(m); err != nil { _ = file.Close() return err } @@ -314,7 +361,6 @@ func (mb *mbox) writeIndex() error { log.Tracef("Removing mailbox %v", mb.path) return mb.removeDir() } - return nil } diff --git a/pkg/storage/file/fstore_test.go b/pkg/storage/file/fstore_test.go index a7bb039..8db7253 100644 --- a/pkg/storage/file/fstore_test.go +++ b/pkg/storage/file/fstore_test.go @@ -63,9 +63,7 @@ func TestFSDirStructure(t *testing.T) { assert.True(t, isFile(expect), "Expected %q to be a file", expect) // Delete message - msg, err := ds.GetMessage(mbName, id1) - assert.Nil(t, err) - err = msg.Delete() + err := ds.RemoveMessage(mbName, id1) assert.Nil(t, err) // Message should be removed @@ -75,9 +73,7 @@ func TestFSDirStructure(t *testing.T) { assert.True(t, isFile(expect), "Expected %q to be a file", expect) // Delete message - msg, err = ds.GetMessage(mbName, id2) - assert.Nil(t, err) - err = msg.Delete() + err = ds.RemoveMessage(mbName, id2) assert.Nil(t, err) // Message should be removed @@ -114,7 +110,7 @@ func TestFSVisitMailboxes(t *testing.T) { } seen := 0 - err := ds.VisitMailboxes(func(messages []storage.Message) bool { + err := ds.VisitMailboxes(func(messages []storage.StoreMessage) bool { seen++ count := len(messages) if count != 2 { @@ -196,8 +192,14 @@ func TestFSDelete(t *testing.T) { len(subjects), len(msgs)) // Delete a couple messages - _ = msgs[1].Delete() - _ = msgs[3].Delete() + err = ds.RemoveMessage(mbName, msgs[1].ID()) + if err != nil { + t.Fatal(err) + } + err = ds.RemoveMessage(mbName, msgs[3].ID()) + if err != nil { + t.Fatal(err) + } // Confirm deletion msgs, err = ds.GetMessages(mbName) diff --git a/pkg/storage/retention.go b/pkg/storage/retention.go index 6269443..2da706e 100644 --- a/pkg/storage/retention.go +++ b/pkg/storage/retention.go @@ -119,11 +119,11 @@ func (rs *RetentionScanner) DoScan() error { cutoff := time.Now().Add(-1 * rs.retentionPeriod) retained := 0 // Loop over all mailboxes. - err := rs.ds.VisitMailboxes(func(messages []Message) bool { + err := rs.ds.VisitMailboxes(func(messages []StoreMessage) bool { for _, msg := range messages { if msg.Date().Before(cutoff) { - log.Tracef("Purging expired message %v", msg.ID()) - if err := msg.Delete(); err != nil { + log.Tracef("Purging expired message %v/%v", msg.Mailbox(), msg.ID()) + if err := rs.ds.RemoveMessage(msg.Mailbox(), msg.ID()); err != nil { log.Errorf("Failed to purge message %v: %v", msg.ID(), err) } else { expRetentionDeletesTotal.Add(1) diff --git a/pkg/storage/retention_test.go b/pkg/storage/retention_test.go index bd862cf..f6ed828 100644 --- a/pkg/storage/retention_test.go +++ b/pkg/storage/retention_test.go @@ -20,11 +20,17 @@ func TestDoRetentionScan(t *testing.T) { old2 := mockMessage(12) old3 := mockMessage(24) ds.AddMessage("mb1", new1) + new1.On("Mailbox").Return("mb1") ds.AddMessage("mb1", old1) + old1.On("Mailbox").Return("mb1") ds.AddMessage("mb1", old2) + old2.On("Mailbox").Return("mb1") ds.AddMessage("mb2", old3) + old3.On("Mailbox").Return("mb2") ds.AddMessage("mb2", new2) + new2.On("Mailbox").Return("mb2") ds.AddMessage("mb3", new3) + new3.On("Mailbox").Return("mb3") // Test 4 hour retention cfg := config.DataStoreConfig{ RetentionMinutes: 239, @@ -36,13 +42,17 @@ func TestDoRetentionScan(t *testing.T) { t.Error(err) } // Delete should not have been called on new messages - new1.AssertNotCalled(t, "Delete") - new2.AssertNotCalled(t, "Delete") - new3.AssertNotCalled(t, "Delete") + for _, m := range []storage.StoreMessage{new1, new2, new3} { + if ds.MessageDeleted(m) { + t.Errorf("Expected %v to be present, was deleted", m.ID()) + } + } // Delete should have been called once on old messages - old1.AssertNumberOfCalls(t, "Delete", 1) - old2.AssertNumberOfCalls(t, "Delete", 1) - old3.AssertNumberOfCalls(t, "Delete", 1) + for _, m := range []storage.StoreMessage{old1, old2, old3} { + if !ds.MessageDeleted(m) { + t.Errorf("Expected %v to be deleted, was present", m.ID()) + } + } } // Make a MockMessage of a specific age diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 83cf635..425a792 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -12,27 +12,29 @@ import ( ) var ( - // ErrNotExist indicates the requested message does not exist - ErrNotExist = errors.New("Message does not exist") + // ErrNotExist indicates the requested message does not exist. + ErrNotExist = errors.New("message does not exist") // ErrNotWritable indicates the message is closed; no longer writable ErrNotWritable = errors.New("Message not writable") ) -// Store is an interface to get Mailboxes stored in Inbucket +// Store is the interface Inbucket uses to interact with storage implementations. type Store interface { - GetMessage(mailbox string, id string) (Message, error) - GetMessages(mailbox string) ([]Message, error) + GetMessage(mailbox, id string) (StoreMessage, error) + GetMessages(mailbox string) ([]StoreMessage, error) PurgeMessages(mailbox string) error - VisitMailboxes(f func([]Message) (cont bool)) error + RemoveMessage(mailbox, id string) error + VisitMailboxes(f func([]StoreMessage) (cont bool)) error // LockFor is a temporary hack to fix #77 until Datastore revamp LockFor(emailAddress string) (*sync.RWMutex, error) // NewMessage is temproary until #69 MessageData refactor - NewMessage(mailbox string) (Message, error) + NewMessage(mailbox string) (StoreMessage, error) } -// Message is an interface for a single message in a Mailbox -type Message interface { +// StoreMessage represents a message to be stored, or returned from a storage implementation. +type StoreMessage interface { + Mailbox() string ID() string From() string To() []string @@ -44,7 +46,6 @@ type Message interface { ReadRaw() (raw *string, err error) Append(data []byte) error Close() error - Delete() error String() string Size() int64 } diff --git a/pkg/storage/testing.go b/pkg/storage/testing.go index 6b2604d..16c40b3 100644 --- a/pkg/storage/testing.go +++ b/pkg/storage/testing.go @@ -16,15 +16,21 @@ type MockDataStore struct { } // GetMessage mock function -func (m *MockDataStore) GetMessage(name, id string) (Message, error) { +func (m *MockDataStore) GetMessage(name, id string) (StoreMessage, error) { args := m.Called(name, id) - return args.Get(0).(Message), args.Error(1) + return args.Get(0).(StoreMessage), args.Error(1) } // GetMessages mock function -func (m *MockDataStore) GetMessages(name string) ([]Message, error) { +func (m *MockDataStore) GetMessages(name string) ([]StoreMessage, error) { args := m.Called(name) - return args.Get(0).([]Message), args.Error(1) + return args.Get(0).([]StoreMessage), args.Error(1) +} + +// RemoveMessage mock function +func (m *MockDataStore) RemoveMessage(name, id string) error { + args := m.Called(name, id) + return args.Error(0) } // PurgeMessages mock function @@ -39,14 +45,14 @@ func (m *MockDataStore) LockFor(name string) (*sync.RWMutex, error) { } // NewMessage temporary for #69 -func (m *MockDataStore) NewMessage(mailbox string) (Message, error) { +func (m *MockDataStore) NewMessage(mailbox string) (StoreMessage, error) { args := m.Called(mailbox) - return args.Get(0).(Message), args.Error(1) + return args.Get(0).(StoreMessage), args.Error(1) } // VisitMailboxes accepts a function that will be called with the messages in each mailbox while it // continues to return true. -func (m *MockDataStore) VisitMailboxes(f func([]Message) (cont bool)) error { +func (m *MockDataStore) VisitMailboxes(f func([]StoreMessage) (cont bool)) error { return nil } @@ -55,6 +61,12 @@ type MockMessage struct { mock.Mock } +// Mailbox mock function +func (m *MockMessage) Mailbox() string { + args := m.Called() + return args.String(0) +} + // ID mock function func (m *MockMessage) ID() string { args := m.Called() @@ -127,12 +139,6 @@ func (m *MockMessage) Close() error { return args.Error(0) } -// Delete mock function -func (m *MockMessage) Delete() error { - args := m.Called() - return args.Error(0) -} - // String mock function func (m *MockMessage) String() string { args := m.Called() diff --git a/pkg/test/storage.go b/pkg/test/storage.go index fab78ca..92195ff 100644 --- a/pkg/test/storage.go +++ b/pkg/test/storage.go @@ -2,6 +2,7 @@ package test import ( "errors" + "sync" "github.com/jhillyerd/inbucket/pkg/storage" ) @@ -9,24 +10,26 @@ import ( // StoreStub stubs storage.Store for testing. type StoreStub struct { storage.Store - mailboxes map[string][]storage.Message + mailboxes map[string][]storage.StoreMessage + deleted map[storage.StoreMessage]struct{} } // NewStore creates a new StoreStub. func NewStore() *StoreStub { return &StoreStub{ - mailboxes: make(map[string][]storage.Message), + mailboxes: make(map[string][]storage.StoreMessage), + deleted: make(map[storage.StoreMessage]struct{}), } } // AddMessage adds a message to the specified mailbox. -func (s *StoreStub) AddMessage(mailbox string, m storage.Message) { +func (s *StoreStub) AddMessage(mailbox string, m storage.StoreMessage) { msgs := s.mailboxes[mailbox] s.mailboxes[mailbox] = append(msgs, m) } // GetMessage gets a message by ID from the specified mailbox. -func (s *StoreStub) GetMessage(mailbox, id string) (storage.Message, error) { +func (s *StoreStub) GetMessage(mailbox, id string) (storage.StoreMessage, error) { if mailbox == "messageerr" { return nil, errors.New("internal error") } @@ -39,16 +42,36 @@ func (s *StoreStub) GetMessage(mailbox, id string) (storage.Message, error) { } // GetMessages gets all the messages for the specified mailbox. -func (s *StoreStub) GetMessages(mailbox string) ([]storage.Message, error) { +func (s *StoreStub) GetMessages(mailbox string) ([]storage.StoreMessage, error) { if mailbox == "messageserr" { return nil, errors.New("internal error") } return s.mailboxes[mailbox], nil } +// RemoveMessage deletes a message by ID from the specified mailbox. +func (s *StoreStub) RemoveMessage(mailbox, id string) error { + mb, ok := s.mailboxes[mailbox] + if ok { + var msg storage.StoreMessage + for i, m := range mb { + if m.ID() == id { + msg = m + s.mailboxes[mailbox] = append(mb[:i], mb[i+1:]...) + break + } + } + if msg != nil { + s.deleted[msg] = struct{}{} + return nil + } + } + return storage.ErrNotExist +} + // VisitMailboxes accepts a function that will be called with the messages in each mailbox while it // continues to return true. -func (s *StoreStub) VisitMailboxes(f func([]storage.Message) (cont bool)) error { +func (s *StoreStub) VisitMailboxes(f func([]storage.StoreMessage) (cont bool)) error { for _, v := range s.mailboxes { if !f(v) { return nil @@ -58,6 +81,18 @@ func (s *StoreStub) VisitMailboxes(f func([]storage.Message) (cont bool)) error } // NewMessage is temproary until #69 MessageData refactor -func (s *StoreStub) NewMessage(mailbox string) (storage.Message, error) { +func (s *StoreStub) NewMessage(mailbox string) (storage.StoreMessage, error) { return nil, nil } + +// LockFor mock function returns a new RWMutex, never errors. +// TODO(#69) remove +func (s *StoreStub) LockFor(name string) (*sync.RWMutex, error) { + return &sync.RWMutex{}, nil +} + +// MessageDeleted returns true if the specified message was deleted +func (s *StoreStub) MessageDeleted(m storage.StoreMessage) bool { + _, ok := s.deleted[m] + return ok +}