mirror of
https://github.com/jhillyerd/inbucket.git
synced 2025-12-17 09:37:02 +00:00
storage: Message refactoring for #69
- Message interface renamed to StoreMessage - Message.Delete becomes Store.RemoveMessage - Added deleted message tracking to Store stub for #80
This commit is contained in:
@@ -158,18 +158,14 @@ func MailboxDeleteV1(w http.ResponseWriter, req *http.Request, ctx *web.Context)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
message, err := ctx.DataStore.GetMessage(name, id)
|
err = ctx.DataStore.RemoveMessage(name, id)
|
||||||
if err == storage.ErrNotExist {
|
if err == storage.ErrNotExist {
|
||||||
http.NotFound(w, req)
|
http.NotFound(w, req)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This doesn't indicate missing, likely an IO error
|
// This doesn't indicate missing, likely an IO error
|
||||||
return fmt.Errorf("GetMessage(%q) failed: %v", id, err)
|
return fmt.Errorf("RemoveMessage(%q) failed: %v", id, err)
|
||||||
}
|
|
||||||
err = message.Delete()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Delete(%q) failed: %v", id, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return web.RenderJSON(w, "OK")
|
return web.RenderJSON(w, "OK")
|
||||||
|
|||||||
@@ -57,17 +57,17 @@ var commands = map[string]bool{
|
|||||||
|
|
||||||
// Session defines an active POP3 session
|
// Session defines an active POP3 session
|
||||||
type Session struct {
|
type Session struct {
|
||||||
server *Server // Reference to the server we belong to
|
server *Server // Reference to the server we belong to
|
||||||
id int // Session ID number
|
id int // Session ID number
|
||||||
conn net.Conn // Our network connection
|
conn net.Conn // Our network connection
|
||||||
remoteHost string // IP address of client
|
remoteHost string // IP address of client
|
||||||
sendError error // Used to bail out of read loop on send error
|
sendError error // Used to bail out of read loop on send error
|
||||||
state State // Current session state
|
state State // Current session state
|
||||||
reader *bufio.Reader // Buffered reader for our net conn
|
reader *bufio.Reader // Buffered reader for our net conn
|
||||||
user string // Mailbox name
|
user string // Mailbox name
|
||||||
messages []storage.Message // Slice of messages in mailbox
|
messages []storage.StoreMessage // Slice of messages in mailbox
|
||||||
retain []bool // Messages to retain upon UPDATE (true=retain)
|
retain []bool // Messages to retain upon UPDATE (true=retain)
|
||||||
msgCount int // Number of undeleted messages
|
msgCount int // Number of undeleted messages
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSession creates a new POP3 session
|
// NewSession creates a new POP3 session
|
||||||
@@ -415,7 +415,7 @@ func (ses *Session) transactionHandler(cmd string, args []string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send the contents of the message to the client
|
// Send the contents of the message to the client
|
||||||
func (ses *Session) sendMessage(msg storage.Message) {
|
func (ses *Session) sendMessage(msg storage.StoreMessage) {
|
||||||
reader, err := msg.RawReader()
|
reader, err := msg.RawReader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ses.logError("Failed to read message for RETR command")
|
ses.logError("Failed to read message for RETR command")
|
||||||
@@ -448,7 +448,7 @@ func (ses *Session) sendMessage(msg storage.Message) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send the headers plus the top N lines to the client
|
// Send the headers plus the top N lines to the client
|
||||||
func (ses *Session) sendMessageTop(msg storage.Message, lineCount int) {
|
func (ses *Session) sendMessageTop(msg storage.StoreMessage, lineCount int) {
|
||||||
reader, err := msg.RawReader()
|
reader, err := msg.RawReader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ses.logError("Failed to read message for RETR command")
|
ses.logError("Failed to read message for RETR command")
|
||||||
@@ -522,7 +522,7 @@ func (ses *Session) processDeletes() {
|
|||||||
for i, msg := range ses.messages {
|
for i, msg := range ses.messages {
|
||||||
if !ses.retain[i] {
|
if !ses.retain[i] {
|
||||||
ses.logTrace("Deleting %v", msg)
|
ses.logTrace("Deleting %v", msg)
|
||||||
if err := msg.Delete(); err != nil {
|
if err := ses.server.dataStore.RemoveMessage(ses.user, msg.ID()); err != nil {
|
||||||
ses.logWarn("Error deleting %v: %v", msg, err)
|
ses.logWarn("Error deleting %v: %v", msg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/jhillyerd/inbucket/pkg/config"
|
"github.com/jhillyerd/inbucket/pkg/config"
|
||||||
"github.com/jhillyerd/inbucket/pkg/msghub"
|
"github.com/jhillyerd/inbucket/pkg/msghub"
|
||||||
"github.com/jhillyerd/inbucket/pkg/storage"
|
"github.com/jhillyerd/inbucket/pkg/storage"
|
||||||
|
"github.com/jhillyerd/inbucket/pkg/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
type scriptStep struct {
|
type scriptStep struct {
|
||||||
@@ -25,10 +26,8 @@ type scriptStep struct {
|
|||||||
|
|
||||||
// Test commands in GREET state
|
// Test commands in GREET state
|
||||||
func TestGreetState(t *testing.T) {
|
func TestGreetState(t *testing.T) {
|
||||||
// Setup mock objects
|
ds := test.NewStore()
|
||||||
mds := &storage.MockDataStore{}
|
server, logbuf, teardown := setupSMTPServer(ds)
|
||||||
|
|
||||||
server, logbuf, teardown := setupSMTPServer(mds)
|
|
||||||
defer teardown()
|
defer teardown()
|
||||||
|
|
||||||
// Test out some mangled HELOs
|
// Test out some mangled HELOs
|
||||||
@@ -82,10 +81,8 @@ func TestGreetState(t *testing.T) {
|
|||||||
|
|
||||||
// Test commands in READY state
|
// Test commands in READY state
|
||||||
func TestReadyState(t *testing.T) {
|
func TestReadyState(t *testing.T) {
|
||||||
// Setup mock objects
|
ds := test.NewStore()
|
||||||
mds := &storage.MockDataStore{}
|
server, logbuf, teardown := setupSMTPServer(ds)
|
||||||
|
|
||||||
server, logbuf, teardown := setupSMTPServer(mds)
|
|
||||||
defer teardown()
|
defer teardown()
|
||||||
|
|
||||||
// Test out some mangled READY commands
|
// Test out some mangled READY commands
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ type Message struct {
|
|||||||
|
|
||||||
// newMessage creates a new FileMessage object and sets the Date and ID fields.
|
// newMessage creates a new FileMessage object and sets the Date and ID fields.
|
||||||
// It will also delete messages over messageCap if configured.
|
// It will also delete messages over messageCap if configured.
|
||||||
func (mb *mbox) newMessage() (storage.Message, error) {
|
func (mb *mbox) newMessage() (storage.StoreMessage, error) {
|
||||||
// Load index
|
// Load index
|
||||||
if !mb.indexLoaded {
|
if !mb.indexLoaded {
|
||||||
if err := mb.readIndex(); err != nil {
|
if err := mb.readIndex(); err != nil {
|
||||||
@@ -45,7 +45,7 @@ func (mb *mbox) newMessage() (storage.Message, error) {
|
|||||||
if mb.store.messageCap > 0 {
|
if mb.store.messageCap > 0 {
|
||||||
for len(mb.messages) >= mb.store.messageCap {
|
for len(mb.messages) >= mb.store.messageCap {
|
||||||
log.Infof("Mailbox %q over configured message cap", mb.name)
|
log.Infof("Mailbox %q over configured message cap", mb.name)
|
||||||
if err := mb.messages[0].Delete(); err != nil {
|
if err := mb.removeMessage(mb.messages[0].ID()); err != nil {
|
||||||
log.Errorf("Error deleting message: %s", err)
|
log.Errorf("Error deleting message: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -55,6 +55,11 @@ func (mb *mbox) newMessage() (storage.Message, error) {
|
|||||||
return &Message{mailbox: mb, Fid: id, Fdate: date, writable: true}, nil
|
return &Message{mailbox: mb, Fid: id, Fdate: date, writable: true}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mailbox returns the name of the mailbox this message resides in.
|
||||||
|
func (m *Message) Mailbox() string {
|
||||||
|
return m.mailbox.name
|
||||||
|
}
|
||||||
|
|
||||||
// ID gets the ID of the Message
|
// ID gets the ID of the Message
|
||||||
func (m *Message) ID() string {
|
func (m *Message) ID() string {
|
||||||
return m.Fid
|
return m.Fid
|
||||||
@@ -240,29 +245,3 @@ func (m *Message) Close() error {
|
|||||||
m.mailbox.messages = append(m.mailbox.messages, m)
|
m.mailbox.messages = append(m.mailbox.messages, m)
|
||||||
return m.mailbox.writeIndex()
|
return m.mailbox.writeIndex()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete this Message from disk by removing it from the index and deleting the
|
|
||||||
// raw files.
|
|
||||||
func (m *Message) Delete() error {
|
|
||||||
messages := m.mailbox.messages
|
|
||||||
for i, mm := range messages {
|
|
||||||
if m == mm {
|
|
||||||
// Slice around message we are deleting
|
|
||||||
m.mailbox.messages = append(messages[:i], messages[i+1:]...)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := m.mailbox.writeIndex(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(m.mailbox.messages) == 0 {
|
|
||||||
// This was the last message, thus writeIndex() has removed the entire
|
|
||||||
// directory; we don't need to delete the raw file.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// There are still messages in the index
|
|
||||||
log.Tracef("Deleting %v", m.rawPath())
|
|
||||||
return os.Remove(m.rawPath())
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ func New(cfg config.DataStoreConfig) storage.Store {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetMessage returns the messages in the named mailbox, or an error.
|
// GetMessage returns the messages in the named mailbox, or an error.
|
||||||
func (fs *Store) GetMessage(mailbox, id string) (storage.Message, error) {
|
func (fs *Store) GetMessage(mailbox, id string) (storage.StoreMessage, error) {
|
||||||
mb, err := fs.mbox(mailbox)
|
mb, err := fs.mbox(mailbox)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -84,7 +84,7 @@ func (fs *Store) GetMessage(mailbox, id string) (storage.Message, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetMessages returns the messages in the named mailbox, or an error.
|
// GetMessages returns the messages in the named mailbox, or an error.
|
||||||
func (fs *Store) GetMessages(mailbox string) ([]storage.Message, error) {
|
func (fs *Store) GetMessages(mailbox string) ([]storage.StoreMessage, error) {
|
||||||
mb, err := fs.mbox(mailbox)
|
mb, err := fs.mbox(mailbox)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -92,6 +92,15 @@ func (fs *Store) GetMessages(mailbox string) ([]storage.Message, error) {
|
|||||||
return mb.getMessages()
|
return mb.getMessages()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RemoveMessage deletes a message by ID from the specified mailbox.
|
||||||
|
func (fs *Store) RemoveMessage(mailbox, id string) error {
|
||||||
|
mb, err := fs.mbox(mailbox)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return mb.removeMessage(id)
|
||||||
|
}
|
||||||
|
|
||||||
// PurgeMessages deletes all messages in the named mailbox, or returns an error.
|
// PurgeMessages deletes all messages in the named mailbox, or returns an error.
|
||||||
func (fs *Store) PurgeMessages(mailbox string) error {
|
func (fs *Store) PurgeMessages(mailbox string) error {
|
||||||
mb, err := fs.mbox(mailbox)
|
mb, err := fs.mbox(mailbox)
|
||||||
@@ -103,7 +112,7 @@ func (fs *Store) PurgeMessages(mailbox string) error {
|
|||||||
|
|
||||||
// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it
|
// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it
|
||||||
// continues to return true.
|
// continues to return true.
|
||||||
func (fs *Store) VisitMailboxes(f func([]storage.Message) (cont bool)) error {
|
func (fs *Store) VisitMailboxes(f func([]storage.StoreMessage) (cont bool)) error {
|
||||||
infos1, err := ioutil.ReadDir(fs.mailPath)
|
infos1, err := ioutil.ReadDir(fs.mailPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -159,7 +168,7 @@ func (fs *Store) LockFor(emailAddress string) (*sync.RWMutex, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewMessage is temproary until #69 MessageData refactor
|
// NewMessage is temproary until #69 MessageData refactor
|
||||||
func (fs *Store) NewMessage(mailbox string) (storage.Message, error) {
|
func (fs *Store) NewMessage(mailbox string) (storage.StoreMessage, error) {
|
||||||
mb, err := fs.mbox(mailbox)
|
mb, err := fs.mbox(mailbox)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -196,13 +205,13 @@ type mbox struct {
|
|||||||
|
|
||||||
// getMessages scans the mailbox directory for .gob files and decodes them into
|
// getMessages scans the mailbox directory for .gob files and decodes them into
|
||||||
// a slice of Message objects.
|
// a slice of Message objects.
|
||||||
func (mb *mbox) getMessages() ([]storage.Message, error) {
|
func (mb *mbox) getMessages() ([]storage.StoreMessage, error) {
|
||||||
if !mb.indexLoaded {
|
if !mb.indexLoaded {
|
||||||
if err := mb.readIndex(); err != nil {
|
if err := mb.readIndex(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
messages := make([]storage.Message, len(mb.messages))
|
messages := make([]storage.StoreMessage, len(mb.messages))
|
||||||
for i, m := range mb.messages {
|
for i, m := range mb.messages {
|
||||||
messages[i] = m
|
messages[i] = m
|
||||||
}
|
}
|
||||||
@@ -210,7 +219,7 @@ func (mb *mbox) getMessages() ([]storage.Message, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getMessage decodes a single message by ID and returns a Message object.
|
// getMessage decodes a single message by ID and returns a Message object.
|
||||||
func (mb *mbox) getMessage(id string) (storage.Message, error) {
|
func (mb *mbox) getMessage(id string) (storage.StoreMessage, error) {
|
||||||
if !mb.indexLoaded {
|
if !mb.indexLoaded {
|
||||||
if err := mb.readIndex(); err != nil {
|
if err := mb.readIndex(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -227,6 +236,38 @@ func (mb *mbox) getMessage(id string) (storage.Message, error) {
|
|||||||
return nil, storage.ErrNotExist
|
return nil, storage.ErrNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// removeMessage deletes the message off disk and removes it from the index.
|
||||||
|
func (mb *mbox) removeMessage(id string) error {
|
||||||
|
if !mb.indexLoaded {
|
||||||
|
if err := mb.readIndex(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var msg *Message
|
||||||
|
for i, m := range mb.messages {
|
||||||
|
if id == m.ID() {
|
||||||
|
msg = m
|
||||||
|
// Slice around message we are deleting
|
||||||
|
mb.messages = append(mb.messages[:i], mb.messages[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if msg == nil {
|
||||||
|
return storage.ErrNotExist
|
||||||
|
}
|
||||||
|
if err := mb.writeIndex(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(mb.messages) == 0 {
|
||||||
|
// This was the last message, thus writeIndex() has removed the entire
|
||||||
|
// directory; we don't need to delete the raw file.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// There are still messages in the index
|
||||||
|
log.Tracef("Deleting %v", msg.rawPath())
|
||||||
|
return os.Remove(msg.rawPath())
|
||||||
|
}
|
||||||
|
|
||||||
// purge deletes all messages in this mailbox.
|
// purge deletes all messages in this mailbox.
|
||||||
func (mb *mbox) purge() error {
|
func (mb *mbox) purge() error {
|
||||||
mb.messages = mb.messages[:0]
|
mb.messages = mb.messages[:0]
|
||||||
@@ -256,14 +297,18 @@ func (mb *mbox) readIndex() error {
|
|||||||
log.Errorf("Failed to close %q: %v", mb.indexPath, err)
|
log.Errorf("Failed to close %q: %v", mb.indexPath, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Decode gob data
|
// Decode gob data
|
||||||
dec := gob.NewDecoder(bufio.NewReader(file))
|
dec := gob.NewDecoder(bufio.NewReader(file))
|
||||||
|
name := ""
|
||||||
|
if err = dec.Decode(&name); err != nil {
|
||||||
|
return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err)
|
||||||
|
}
|
||||||
|
mb.name = name
|
||||||
for {
|
for {
|
||||||
msg := new(Message)
|
// Load messages until EOF
|
||||||
|
msg := &Message{}
|
||||||
if err = dec.Decode(msg); err != nil {
|
if err = dec.Decode(msg); err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
// It's OK to get an EOF here
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err)
|
return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err)
|
||||||
@@ -271,7 +316,6 @@ func (mb *mbox) readIndex() error {
|
|||||||
msg.mailbox = mb
|
msg.mailbox = mb
|
||||||
mb.messages = append(mb.messages, msg)
|
mb.messages = append(mb.messages, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
mb.indexLoaded = true
|
mb.indexLoaded = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -294,9 +338,12 @@ func (mb *mbox) writeIndex() error {
|
|||||||
writer := bufio.NewWriter(file)
|
writer := bufio.NewWriter(file)
|
||||||
// Write each message and then flush
|
// Write each message and then flush
|
||||||
enc := gob.NewEncoder(writer)
|
enc := gob.NewEncoder(writer)
|
||||||
|
if err = enc.Encode(mb.name); err != nil {
|
||||||
|
_ = file.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
for _, m := range mb.messages {
|
for _, m := range mb.messages {
|
||||||
err = enc.Encode(m)
|
if err = enc.Encode(m); err != nil {
|
||||||
if err != nil {
|
|
||||||
_ = file.Close()
|
_ = file.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -314,7 +361,6 @@ func (mb *mbox) writeIndex() error {
|
|||||||
log.Tracef("Removing mailbox %v", mb.path)
|
log.Tracef("Removing mailbox %v", mb.path)
|
||||||
return mb.removeDir()
|
return mb.removeDir()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -63,9 +63,7 @@ func TestFSDirStructure(t *testing.T) {
|
|||||||
assert.True(t, isFile(expect), "Expected %q to be a file", expect)
|
assert.True(t, isFile(expect), "Expected %q to be a file", expect)
|
||||||
|
|
||||||
// Delete message
|
// Delete message
|
||||||
msg, err := ds.GetMessage(mbName, id1)
|
err := ds.RemoveMessage(mbName, id1)
|
||||||
assert.Nil(t, err)
|
|
||||||
err = msg.Delete()
|
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
// Message should be removed
|
// Message should be removed
|
||||||
@@ -75,9 +73,7 @@ func TestFSDirStructure(t *testing.T) {
|
|||||||
assert.True(t, isFile(expect), "Expected %q to be a file", expect)
|
assert.True(t, isFile(expect), "Expected %q to be a file", expect)
|
||||||
|
|
||||||
// Delete message
|
// Delete message
|
||||||
msg, err = ds.GetMessage(mbName, id2)
|
err = ds.RemoveMessage(mbName, id2)
|
||||||
assert.Nil(t, err)
|
|
||||||
err = msg.Delete()
|
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
// Message should be removed
|
// Message should be removed
|
||||||
@@ -114,7 +110,7 @@ func TestFSVisitMailboxes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
seen := 0
|
seen := 0
|
||||||
err := ds.VisitMailboxes(func(messages []storage.Message) bool {
|
err := ds.VisitMailboxes(func(messages []storage.StoreMessage) bool {
|
||||||
seen++
|
seen++
|
||||||
count := len(messages)
|
count := len(messages)
|
||||||
if count != 2 {
|
if count != 2 {
|
||||||
@@ -196,8 +192,14 @@ func TestFSDelete(t *testing.T) {
|
|||||||
len(subjects), len(msgs))
|
len(subjects), len(msgs))
|
||||||
|
|
||||||
// Delete a couple messages
|
// Delete a couple messages
|
||||||
_ = msgs[1].Delete()
|
err = ds.RemoveMessage(mbName, msgs[1].ID())
|
||||||
_ = msgs[3].Delete()
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = ds.RemoveMessage(mbName, msgs[3].ID())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Confirm deletion
|
// Confirm deletion
|
||||||
msgs, err = ds.GetMessages(mbName)
|
msgs, err = ds.GetMessages(mbName)
|
||||||
|
|||||||
@@ -119,11 +119,11 @@ func (rs *RetentionScanner) DoScan() error {
|
|||||||
cutoff := time.Now().Add(-1 * rs.retentionPeriod)
|
cutoff := time.Now().Add(-1 * rs.retentionPeriod)
|
||||||
retained := 0
|
retained := 0
|
||||||
// Loop over all mailboxes.
|
// Loop over all mailboxes.
|
||||||
err := rs.ds.VisitMailboxes(func(messages []Message) bool {
|
err := rs.ds.VisitMailboxes(func(messages []StoreMessage) bool {
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
if msg.Date().Before(cutoff) {
|
if msg.Date().Before(cutoff) {
|
||||||
log.Tracef("Purging expired message %v", msg.ID())
|
log.Tracef("Purging expired message %v/%v", msg.Mailbox(), msg.ID())
|
||||||
if err := msg.Delete(); err != nil {
|
if err := rs.ds.RemoveMessage(msg.Mailbox(), msg.ID()); err != nil {
|
||||||
log.Errorf("Failed to purge message %v: %v", msg.ID(), err)
|
log.Errorf("Failed to purge message %v: %v", msg.ID(), err)
|
||||||
} else {
|
} else {
|
||||||
expRetentionDeletesTotal.Add(1)
|
expRetentionDeletesTotal.Add(1)
|
||||||
|
|||||||
@@ -20,11 +20,17 @@ func TestDoRetentionScan(t *testing.T) {
|
|||||||
old2 := mockMessage(12)
|
old2 := mockMessage(12)
|
||||||
old3 := mockMessage(24)
|
old3 := mockMessage(24)
|
||||||
ds.AddMessage("mb1", new1)
|
ds.AddMessage("mb1", new1)
|
||||||
|
new1.On("Mailbox").Return("mb1")
|
||||||
ds.AddMessage("mb1", old1)
|
ds.AddMessage("mb1", old1)
|
||||||
|
old1.On("Mailbox").Return("mb1")
|
||||||
ds.AddMessage("mb1", old2)
|
ds.AddMessage("mb1", old2)
|
||||||
|
old2.On("Mailbox").Return("mb1")
|
||||||
ds.AddMessage("mb2", old3)
|
ds.AddMessage("mb2", old3)
|
||||||
|
old3.On("Mailbox").Return("mb2")
|
||||||
ds.AddMessage("mb2", new2)
|
ds.AddMessage("mb2", new2)
|
||||||
|
new2.On("Mailbox").Return("mb2")
|
||||||
ds.AddMessage("mb3", new3)
|
ds.AddMessage("mb3", new3)
|
||||||
|
new3.On("Mailbox").Return("mb3")
|
||||||
// Test 4 hour retention
|
// Test 4 hour retention
|
||||||
cfg := config.DataStoreConfig{
|
cfg := config.DataStoreConfig{
|
||||||
RetentionMinutes: 239,
|
RetentionMinutes: 239,
|
||||||
@@ -36,13 +42,17 @@ func TestDoRetentionScan(t *testing.T) {
|
|||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
// Delete should not have been called on new messages
|
// Delete should not have been called on new messages
|
||||||
new1.AssertNotCalled(t, "Delete")
|
for _, m := range []storage.StoreMessage{new1, new2, new3} {
|
||||||
new2.AssertNotCalled(t, "Delete")
|
if ds.MessageDeleted(m) {
|
||||||
new3.AssertNotCalled(t, "Delete")
|
t.Errorf("Expected %v to be present, was deleted", m.ID())
|
||||||
|
}
|
||||||
|
}
|
||||||
// Delete should have been called once on old messages
|
// Delete should have been called once on old messages
|
||||||
old1.AssertNumberOfCalls(t, "Delete", 1)
|
for _, m := range []storage.StoreMessage{old1, old2, old3} {
|
||||||
old2.AssertNumberOfCalls(t, "Delete", 1)
|
if !ds.MessageDeleted(m) {
|
||||||
old3.AssertNumberOfCalls(t, "Delete", 1)
|
t.Errorf("Expected %v to be deleted, was present", m.ID())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a MockMessage of a specific age
|
// Make a MockMessage of a specific age
|
||||||
|
|||||||
@@ -12,27 +12,29 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// ErrNotExist indicates the requested message does not exist
|
// ErrNotExist indicates the requested message does not exist.
|
||||||
ErrNotExist = errors.New("Message does not exist")
|
ErrNotExist = errors.New("message does not exist")
|
||||||
|
|
||||||
// ErrNotWritable indicates the message is closed; no longer writable
|
// ErrNotWritable indicates the message is closed; no longer writable
|
||||||
ErrNotWritable = errors.New("Message not writable")
|
ErrNotWritable = errors.New("Message not writable")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Store is an interface to get Mailboxes stored in Inbucket
|
// Store is the interface Inbucket uses to interact with storage implementations.
|
||||||
type Store interface {
|
type Store interface {
|
||||||
GetMessage(mailbox string, id string) (Message, error)
|
GetMessage(mailbox, id string) (StoreMessage, error)
|
||||||
GetMessages(mailbox string) ([]Message, error)
|
GetMessages(mailbox string) ([]StoreMessage, error)
|
||||||
PurgeMessages(mailbox string) error
|
PurgeMessages(mailbox string) error
|
||||||
VisitMailboxes(f func([]Message) (cont bool)) error
|
RemoveMessage(mailbox, id string) error
|
||||||
|
VisitMailboxes(f func([]StoreMessage) (cont bool)) error
|
||||||
// LockFor is a temporary hack to fix #77 until Datastore revamp
|
// LockFor is a temporary hack to fix #77 until Datastore revamp
|
||||||
LockFor(emailAddress string) (*sync.RWMutex, error)
|
LockFor(emailAddress string) (*sync.RWMutex, error)
|
||||||
// NewMessage is temproary until #69 MessageData refactor
|
// NewMessage is temproary until #69 MessageData refactor
|
||||||
NewMessage(mailbox string) (Message, error)
|
NewMessage(mailbox string) (StoreMessage, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Message is an interface for a single message in a Mailbox
|
// StoreMessage represents a message to be stored, or returned from a storage implementation.
|
||||||
type Message interface {
|
type StoreMessage interface {
|
||||||
|
Mailbox() string
|
||||||
ID() string
|
ID() string
|
||||||
From() string
|
From() string
|
||||||
To() []string
|
To() []string
|
||||||
@@ -44,7 +46,6 @@ type Message interface {
|
|||||||
ReadRaw() (raw *string, err error)
|
ReadRaw() (raw *string, err error)
|
||||||
Append(data []byte) error
|
Append(data []byte) error
|
||||||
Close() error
|
Close() error
|
||||||
Delete() error
|
|
||||||
String() string
|
String() string
|
||||||
Size() int64
|
Size() int64
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,15 +16,21 @@ type MockDataStore struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetMessage mock function
|
// GetMessage mock function
|
||||||
func (m *MockDataStore) GetMessage(name, id string) (Message, error) {
|
func (m *MockDataStore) GetMessage(name, id string) (StoreMessage, error) {
|
||||||
args := m.Called(name, id)
|
args := m.Called(name, id)
|
||||||
return args.Get(0).(Message), args.Error(1)
|
return args.Get(0).(StoreMessage), args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMessages mock function
|
// GetMessages mock function
|
||||||
func (m *MockDataStore) GetMessages(name string) ([]Message, error) {
|
func (m *MockDataStore) GetMessages(name string) ([]StoreMessage, error) {
|
||||||
args := m.Called(name)
|
args := m.Called(name)
|
||||||
return args.Get(0).([]Message), args.Error(1)
|
return args.Get(0).([]StoreMessage), args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveMessage mock function
|
||||||
|
func (m *MockDataStore) RemoveMessage(name, id string) error {
|
||||||
|
args := m.Called(name, id)
|
||||||
|
return args.Error(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PurgeMessages mock function
|
// PurgeMessages mock function
|
||||||
@@ -39,14 +45,14 @@ func (m *MockDataStore) LockFor(name string) (*sync.RWMutex, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewMessage temporary for #69
|
// NewMessage temporary for #69
|
||||||
func (m *MockDataStore) NewMessage(mailbox string) (Message, error) {
|
func (m *MockDataStore) NewMessage(mailbox string) (StoreMessage, error) {
|
||||||
args := m.Called(mailbox)
|
args := m.Called(mailbox)
|
||||||
return args.Get(0).(Message), args.Error(1)
|
return args.Get(0).(StoreMessage), args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it
|
// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it
|
||||||
// continues to return true.
|
// continues to return true.
|
||||||
func (m *MockDataStore) VisitMailboxes(f func([]Message) (cont bool)) error {
|
func (m *MockDataStore) VisitMailboxes(f func([]StoreMessage) (cont bool)) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -55,6 +61,12 @@ type MockMessage struct {
|
|||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mailbox mock function
|
||||||
|
func (m *MockMessage) Mailbox() string {
|
||||||
|
args := m.Called()
|
||||||
|
return args.String(0)
|
||||||
|
}
|
||||||
|
|
||||||
// ID mock function
|
// ID mock function
|
||||||
func (m *MockMessage) ID() string {
|
func (m *MockMessage) ID() string {
|
||||||
args := m.Called()
|
args := m.Called()
|
||||||
@@ -127,12 +139,6 @@ func (m *MockMessage) Close() error {
|
|||||||
return args.Error(0)
|
return args.Error(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete mock function
|
|
||||||
func (m *MockMessage) Delete() error {
|
|
||||||
args := m.Called()
|
|
||||||
return args.Error(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// String mock function
|
// String mock function
|
||||||
func (m *MockMessage) String() string {
|
func (m *MockMessage) String() string {
|
||||||
args := m.Called()
|
args := m.Called()
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/jhillyerd/inbucket/pkg/storage"
|
"github.com/jhillyerd/inbucket/pkg/storage"
|
||||||
)
|
)
|
||||||
@@ -9,24 +10,26 @@ import (
|
|||||||
// StoreStub stubs storage.Store for testing.
|
// StoreStub stubs storage.Store for testing.
|
||||||
type StoreStub struct {
|
type StoreStub struct {
|
||||||
storage.Store
|
storage.Store
|
||||||
mailboxes map[string][]storage.Message
|
mailboxes map[string][]storage.StoreMessage
|
||||||
|
deleted map[storage.StoreMessage]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStore creates a new StoreStub.
|
// NewStore creates a new StoreStub.
|
||||||
func NewStore() *StoreStub {
|
func NewStore() *StoreStub {
|
||||||
return &StoreStub{
|
return &StoreStub{
|
||||||
mailboxes: make(map[string][]storage.Message),
|
mailboxes: make(map[string][]storage.StoreMessage),
|
||||||
|
deleted: make(map[storage.StoreMessage]struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddMessage adds a message to the specified mailbox.
|
// AddMessage adds a message to the specified mailbox.
|
||||||
func (s *StoreStub) AddMessage(mailbox string, m storage.Message) {
|
func (s *StoreStub) AddMessage(mailbox string, m storage.StoreMessage) {
|
||||||
msgs := s.mailboxes[mailbox]
|
msgs := s.mailboxes[mailbox]
|
||||||
s.mailboxes[mailbox] = append(msgs, m)
|
s.mailboxes[mailbox] = append(msgs, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMessage gets a message by ID from the specified mailbox.
|
// GetMessage gets a message by ID from the specified mailbox.
|
||||||
func (s *StoreStub) GetMessage(mailbox, id string) (storage.Message, error) {
|
func (s *StoreStub) GetMessage(mailbox, id string) (storage.StoreMessage, error) {
|
||||||
if mailbox == "messageerr" {
|
if mailbox == "messageerr" {
|
||||||
return nil, errors.New("internal error")
|
return nil, errors.New("internal error")
|
||||||
}
|
}
|
||||||
@@ -39,16 +42,36 @@ func (s *StoreStub) GetMessage(mailbox, id string) (storage.Message, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetMessages gets all the messages for the specified mailbox.
|
// GetMessages gets all the messages for the specified mailbox.
|
||||||
func (s *StoreStub) GetMessages(mailbox string) ([]storage.Message, error) {
|
func (s *StoreStub) GetMessages(mailbox string) ([]storage.StoreMessage, error) {
|
||||||
if mailbox == "messageserr" {
|
if mailbox == "messageserr" {
|
||||||
return nil, errors.New("internal error")
|
return nil, errors.New("internal error")
|
||||||
}
|
}
|
||||||
return s.mailboxes[mailbox], nil
|
return s.mailboxes[mailbox], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RemoveMessage deletes a message by ID from the specified mailbox.
|
||||||
|
func (s *StoreStub) RemoveMessage(mailbox, id string) error {
|
||||||
|
mb, ok := s.mailboxes[mailbox]
|
||||||
|
if ok {
|
||||||
|
var msg storage.StoreMessage
|
||||||
|
for i, m := range mb {
|
||||||
|
if m.ID() == id {
|
||||||
|
msg = m
|
||||||
|
s.mailboxes[mailbox] = append(mb[:i], mb[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if msg != nil {
|
||||||
|
s.deleted[msg] = struct{}{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return storage.ErrNotExist
|
||||||
|
}
|
||||||
|
|
||||||
// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it
|
// VisitMailboxes accepts a function that will be called with the messages in each mailbox while it
|
||||||
// continues to return true.
|
// continues to return true.
|
||||||
func (s *StoreStub) VisitMailboxes(f func([]storage.Message) (cont bool)) error {
|
func (s *StoreStub) VisitMailboxes(f func([]storage.StoreMessage) (cont bool)) error {
|
||||||
for _, v := range s.mailboxes {
|
for _, v := range s.mailboxes {
|
||||||
if !f(v) {
|
if !f(v) {
|
||||||
return nil
|
return nil
|
||||||
@@ -58,6 +81,18 @@ func (s *StoreStub) VisitMailboxes(f func([]storage.Message) (cont bool)) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewMessage is temproary until #69 MessageData refactor
|
// NewMessage is temproary until #69 MessageData refactor
|
||||||
func (s *StoreStub) NewMessage(mailbox string) (storage.Message, error) {
|
func (s *StoreStub) NewMessage(mailbox string) (storage.StoreMessage, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LockFor mock function returns a new RWMutex, never errors.
|
||||||
|
// TODO(#69) remove
|
||||||
|
func (s *StoreStub) LockFor(name string) (*sync.RWMutex, error) {
|
||||||
|
return &sync.RWMutex{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MessageDeleted returns true if the specified message was deleted
|
||||||
|
func (s *StoreStub) MessageDeleted(m storage.StoreMessage) bool {
|
||||||
|
_, ok := s.deleted[m]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user