diff --git a/pkg/server/smtp/handler.go b/pkg/server/smtp/handler.go index f15656b..1848fe2 100644 --- a/pkg/server/smtp/handler.go +++ b/pkg/server/smtp/handler.go @@ -368,23 +368,10 @@ func (ss *Session) dataHandler() { return } if bytes.Equal(lineBuf, []byte(".\r\n")) || bytes.Equal(lineBuf, []byte(".\n")) { - // Mail data complete + // Mail data complete. for _, recip := range ss.recipients { if recip.ShouldStore() { - // TODO temporary hack to fix #77 until datastore revamp - mu, err := ss.server.dataStore.LockFor(recip.LocalPart) - if err != nil { - ss.logError("Failed to get lock for %q: %s", recip.LocalPart, err) - // Delivery failure - ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart)) - ss.reset() - return - } - mu.Lock() - ok := ss.deliverMessage(recip, msgBuf.Bytes()) - mu.Unlock() - if !ok { - // Delivery failure + if ok := ss.deliverMessage(recip, msgBuf.Bytes()); !ok { ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart)) ss.reset() return @@ -397,28 +384,22 @@ func (ss *Session) dataHandler() { ss.reset() return } - // SMTP RFC says remove leading periods from input + // RFC says remove leading periods from input. if len(lineBuf) > 0 && lineBuf[0] == '.' { lineBuf = lineBuf[1:] } msgBuf.Write(lineBuf) if msgBuf.Len() > ss.server.maxMessageBytes { - // Max message size exceeded ss.send("552 Maximum message size exceeded") ss.logWarn("Max message size exceeded while in DATA") ss.reset() return } - } // end for + } } // deliverMessage creates and populates a new Message for the specified recipient func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok bool) { - name, err := policy.ParseMailboxName(recip.LocalPart) - if err != nil { - // This parse already succeeded when MailboxFor was called, shouldn't fail here. - return false - } // TODO replace with something that only reads header? env, err := enmime.ReadEnvelope(bytes.NewReader(content)) if err != nil { @@ -441,7 +422,7 @@ func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok b ss.remoteDomain, ss.remoteHost, ss.server.domain, recip.Address, stamp)) delivery := &message.Delivery{ Meta: message.Metadata{ - Mailbox: name, + Mailbox: recip.Mailbox, From: from[0], To: to, Date: time.Now(), @@ -455,8 +436,9 @@ func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok b return false } // Broadcast message information. + // TODO this belongs in message pkg. broadcast := msghub.Message{ - Mailbox: name, + Mailbox: recip.Mailbox, ID: id, From: delivery.From().String(), To: stringutil.StringAddressList(delivery.To()), diff --git a/pkg/storage/file/fstore.go b/pkg/storage/file/fstore.go index dc20b5d..f6a60ec 100644 --- a/pkg/storage/file/fstore.go +++ b/pkg/storage/file/fstore.go @@ -2,7 +2,6 @@ package file import ( "bufio" - "encoding/gob" "fmt" "io" "io/ioutil" @@ -77,11 +76,13 @@ func New(cfg config.DataStoreConfig) storage.Store { // AddMessage adds a message to the specified mailbox. func (fs *Store) AddMessage(m storage.StoreMessage) (id string, err error) { - r, err := m.RawReader() + mb, err := fs.mbox(m.Mailbox()) if err != nil { return "", err } - mb, err := fs.mbox(m.Mailbox()) + mb.Lock() + defer mb.Unlock() + r, err := m.RawReader() if err != nil { return "", err } @@ -140,6 +141,8 @@ func (fs *Store) GetMessage(mailbox, id string) (storage.StoreMessage, error) { if err != nil { return nil, err } + mb.RLock() + defer mb.RUnlock() return mb.getMessage(id) } @@ -149,6 +152,8 @@ func (fs *Store) GetMessages(mailbox string) ([]storage.StoreMessage, error) { if err != nil { return nil, err } + mb.RLock() + defer mb.RUnlock() return mb.getMessages() } @@ -158,6 +163,8 @@ func (fs *Store) RemoveMessage(mailbox, id string) error { if err != nil { return err } + mb.Lock() + defer mb.Unlock() return mb.removeMessage(id) } @@ -167,6 +174,8 @@ func (fs *Store) PurgeMessages(mailbox string) error { if err != nil { return err } + mb.Lock() + defer mb.Unlock() return mb.purge() } @@ -196,12 +205,10 @@ func (fs *Store) VisitMailboxes(f func([]storage.StoreMessage) (cont bool)) erro // Loop over mailboxes for _, inf3 := range infos3 { if inf3.IsDir() { - mbdir := inf3.Name() - mbpath := filepath.Join(fs.mailPath, l1, l2, mbdir) - idx := filepath.Join(mbpath, indexFileName) - mb := &mbox{store: fs, dirName: mbdir, path: mbpath, - indexPath: idx} + mb := fs.mboxFromHash(inf3.Name()) + mb.RLock() msgs, err := mb.getMessages() + mb.RUnlock() if err != nil { return err } @@ -217,265 +224,40 @@ func (fs *Store) VisitMailboxes(f func([]storage.StoreMessage) (cont bool)) erro return nil } -// LockFor returns the RWMutex for this mailbox, or an error. -func (fs *Store) LockFor(emailAddress string) (*sync.RWMutex, error) { - name, err := policy.ParseMailboxName(emailAddress) - if err != nil { - return nil, err - } - hash := stringutil.HashMailboxName(name) - return fs.hashLock.Get(hash), nil -} - -// NewMessage is temproary until #69 MessageData refactor -func (fs *Store) NewMessage(mailbox string) (storage.StoreMessage, error) { - mb, err := fs.mbox(mailbox) - if err != nil { - return nil, err - } - return mb.newMessage() -} - // mbox returns the named mailbox. func (fs *Store) mbox(mailbox string) (*mbox, error) { name, err := policy.ParseMailboxName(mailbox) if err != nil { return nil, err } - dir := stringutil.HashMailboxName(name) - s1 := dir[0:3] - s2 := dir[0:6] - path := filepath.Join(fs.mailPath, s1, s2, dir) + hash := stringutil.HashMailboxName(name) + s1 := hash[0:3] + s2 := hash[0:6] + path := filepath.Join(fs.mailPath, s1, s2, hash) indexPath := filepath.Join(path, indexFileName) - - return &mbox{store: fs, name: name, dirName: dir, path: path, - indexPath: indexPath}, nil + return &mbox{ + RWMutex: fs.hashLock.Get(hash), + store: fs, + name: name, + dirName: hash, + path: path, + indexPath: indexPath, + }, nil } -// mbox manages the mail for a specific user and correlates to a particular directory on disk. -type mbox struct { - store *Store - name string - dirName string - path string - indexLoaded bool - indexPath string - messages []*Message -} - -// getMessages scans the mailbox directory for .gob files and decodes them into -// a slice of Message objects. -func (mb *mbox) getMessages() ([]storage.StoreMessage, error) { - if !mb.indexLoaded { - if err := mb.readIndex(); err != nil { - return nil, err - } +// mboxFromPath constructs a mailbox based on name hash. +func (fs *Store) mboxFromHash(hash string) *mbox { + s1 := hash[0:3] + s2 := hash[0:6] + path := filepath.Join(fs.mailPath, s1, s2, hash) + indexPath := filepath.Join(path, indexFileName) + return &mbox{ + RWMutex: fs.hashLock.Get(hash), + store: fs, + dirName: hash, + path: path, + indexPath: indexPath, } - messages := make([]storage.StoreMessage, len(mb.messages)) - for i, m := range mb.messages { - messages[i] = m - } - return messages, nil -} - -// getMessage decodes a single message by ID and returns a Message object. -func (mb *mbox) getMessage(id string) (storage.StoreMessage, error) { - if !mb.indexLoaded { - if err := mb.readIndex(); err != nil { - return nil, err - } - } - if id == "latest" && len(mb.messages) != 0 { - return mb.messages[len(mb.messages)-1], nil - } - for _, m := range mb.messages { - if m.Fid == id { - return m, nil - } - } - return nil, storage.ErrNotExist -} - -// removeMessage deletes the message off disk and removes it from the index. -func (mb *mbox) removeMessage(id string) error { - if !mb.indexLoaded { - if err := mb.readIndex(); err != nil { - return err - } - } - var msg *Message - for i, m := range mb.messages { - if id == m.ID() { - msg = m - // Slice around message we are deleting - mb.messages = append(mb.messages[:i], mb.messages[i+1:]...) - break - } - } - if msg == nil { - return storage.ErrNotExist - } - if err := mb.writeIndex(); err != nil { - return err - } - if len(mb.messages) == 0 { - // This was the last message, thus writeIndex() has removed the entire - // directory; we don't need to delete the raw file. - return nil - } - // There are still messages in the index - log.Tracef("Deleting %v", msg.rawPath()) - return os.Remove(msg.rawPath()) -} - -// purge deletes all messages in this mailbox. -func (mb *mbox) purge() error { - mb.messages = mb.messages[:0] - return mb.writeIndex() -} - -// readIndex loads the mailbox index data from disk -func (mb *mbox) readIndex() error { - // Clear message slice, open index - mb.messages = mb.messages[:0] - // Lock for reading - indexMx.RLock() - defer indexMx.RUnlock() - // Check if index exists - if _, err := os.Stat(mb.indexPath); err != nil { - // Does not exist, but that's not an error in our world - log.Tracef("Index %v does not exist (yet)", mb.indexPath) - mb.indexLoaded = true - return nil - } - file, err := os.Open(mb.indexPath) - if err != nil { - return err - } - defer func() { - if err := file.Close(); err != nil { - log.Errorf("Failed to close %q: %v", mb.indexPath, err) - } - }() - // Decode gob data - dec := gob.NewDecoder(bufio.NewReader(file)) - name := "" - if err = dec.Decode(&name); err != nil { - return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err) - } - mb.name = name - for { - // Load messages until EOF - msg := &Message{} - if err = dec.Decode(msg); err != nil { - if err == io.EOF { - break - } - return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err) - } - msg.mailbox = mb - mb.messages = append(mb.messages, msg) - } - mb.indexLoaded = true - return nil -} - -// writeIndex overwrites the index on disk with the current mailbox data -func (mb *mbox) writeIndex() error { - // Lock for writing - indexMx.Lock() - defer indexMx.Unlock() - if len(mb.messages) > 0 { - // Ensure mailbox directory exists - if err := mb.createDir(); err != nil { - return err - } - // Open index for writing - file, err := os.Create(mb.indexPath) - if err != nil { - return err - } - writer := bufio.NewWriter(file) - // Write each message and then flush - enc := gob.NewEncoder(writer) - if err = enc.Encode(mb.name); err != nil { - _ = file.Close() - return err - } - for _, m := range mb.messages { - if err = enc.Encode(m); err != nil { - _ = file.Close() - return err - } - } - if err := writer.Flush(); err != nil { - _ = file.Close() - return err - } - if err := file.Close(); err != nil { - log.Errorf("Failed to close %q: %v", mb.indexPath, err) - return err - } - } else { - // No messages, delete index+maildir - log.Tracef("Removing mailbox %v", mb.path) - return mb.removeDir() - } - return nil -} - -// createDir checks for the presence of the path for this mailbox, creates it if needed -func (mb *mbox) createDir() error { - dirMx.Lock() - defer dirMx.Unlock() - if _, err := os.Stat(mb.path); err != nil { - if err := os.MkdirAll(mb.path, 0770); err != nil { - log.Errorf("Failed to create directory %v, %v", mb.path, err) - return err - } - } - return nil -} - -// removeDir removes the mailbox, plus empty higher level directories -func (mb *mbox) removeDir() error { - dirMx.Lock() - defer dirMx.Unlock() - // remove mailbox dir, including index file - if err := os.RemoveAll(mb.path); err != nil { - return err - } - // remove parents if empty - dir := filepath.Dir(mb.path) - if removeDirIfEmpty(dir) { - removeDirIfEmpty(filepath.Dir(dir)) - } - return nil -} - -// removeDirIfEmpty will remove the specified directory if it contains no files or directories. -// Caller should hold dirMx. Returns true if dir was removed. -func removeDirIfEmpty(path string) (removed bool) { - f, err := os.Open(path) - if err != nil { - return false - } - files, err := f.Readdirnames(0) - _ = f.Close() - if err != nil { - return false - } - if len(files) > 0 { - // Dir not empty - return false - } - log.Tracef("Removing dir %v", path) - err = os.Remove(path) - if err != nil { - log.Errorf("Failed to remove %q: %v", path, err) - return false - } - return true } // generatePrefix converts a Time object into the ISO style format we use diff --git a/pkg/storage/file/mbox.go b/pkg/storage/file/mbox.go new file mode 100644 index 0000000..ea6d7f4 --- /dev/null +++ b/pkg/storage/file/mbox.go @@ -0,0 +1,242 @@ +package file + +import ( + "bufio" + "encoding/gob" + "fmt" + "io" + "os" + "path/filepath" + "sync" + + "github.com/jhillyerd/inbucket/pkg/log" + "github.com/jhillyerd/inbucket/pkg/storage" +) + +// mbox manages the mail for a specific user and correlates to a particular directory on disk. +// mbox methods are not thread safe, mbox.RWMutex must be held prior to calling. +type mbox struct { + *sync.RWMutex + store *Store + name string + dirName string + path string + indexLoaded bool + indexPath string + messages []*Message +} + +// getMessages scans the mailbox directory for .gob files and decodes them into +// a slice of Message objects. +func (mb *mbox) getMessages() ([]storage.StoreMessage, error) { + if !mb.indexLoaded { + if err := mb.readIndex(); err != nil { + return nil, err + } + } + messages := make([]storage.StoreMessage, len(mb.messages)) + for i, m := range mb.messages { + messages[i] = m + } + return messages, nil +} + +// getMessage decodes a single message by ID and returns a Message object. +func (mb *mbox) getMessage(id string) (storage.StoreMessage, error) { + if !mb.indexLoaded { + if err := mb.readIndex(); err != nil { + return nil, err + } + } + if id == "latest" && len(mb.messages) != 0 { + return mb.messages[len(mb.messages)-1], nil + } + for _, m := range mb.messages { + if m.Fid == id { + return m, nil + } + } + return nil, storage.ErrNotExist +} + +// removeMessage deletes the message off disk and removes it from the index. +func (mb *mbox) removeMessage(id string) error { + if !mb.indexLoaded { + if err := mb.readIndex(); err != nil { + return err + } + } + var msg *Message + for i, m := range mb.messages { + if id == m.ID() { + msg = m + // Slice around message we are deleting + mb.messages = append(mb.messages[:i], mb.messages[i+1:]...) + break + } + } + if msg == nil { + return storage.ErrNotExist + } + if err := mb.writeIndex(); err != nil { + return err + } + if len(mb.messages) == 0 { + // This was the last message, thus writeIndex() has removed the entire + // directory; we don't need to delete the raw file. + return nil + } + // There are still messages in the index + log.Tracef("Deleting %v", msg.rawPath()) + return os.Remove(msg.rawPath()) +} + +// purge deletes all messages in this mailbox. +func (mb *mbox) purge() error { + mb.messages = mb.messages[:0] + return mb.writeIndex() +} + +// readIndex loads the mailbox index data from disk +func (mb *mbox) readIndex() error { + // Clear message slice, open index + mb.messages = mb.messages[:0] + // Lock for reading + indexMx.RLock() + defer indexMx.RUnlock() + // Check if index exists + if _, err := os.Stat(mb.indexPath); err != nil { + // Does not exist, but that's not an error in our world + log.Tracef("Index %v does not exist (yet)", mb.indexPath) + mb.indexLoaded = true + return nil + } + file, err := os.Open(mb.indexPath) + if err != nil { + return err + } + defer func() { + if err := file.Close(); err != nil { + log.Errorf("Failed to close %q: %v", mb.indexPath, err) + } + }() + // Decode gob data + dec := gob.NewDecoder(bufio.NewReader(file)) + name := "" + if err = dec.Decode(&name); err != nil { + return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err) + } + mb.name = name + for { + // Load messages until EOF + msg := &Message{} + if err = dec.Decode(msg); err != nil { + if err == io.EOF { + break + } + return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err) + } + msg.mailbox = mb + mb.messages = append(mb.messages, msg) + } + mb.indexLoaded = true + return nil +} + +// writeIndex overwrites the index on disk with the current mailbox data +func (mb *mbox) writeIndex() error { + // Lock for writing + indexMx.Lock() + defer indexMx.Unlock() + if len(mb.messages) > 0 { + // Ensure mailbox directory exists + if err := mb.createDir(); err != nil { + return err + } + // Open index for writing + file, err := os.Create(mb.indexPath) + if err != nil { + return err + } + writer := bufio.NewWriter(file) + // Write each message and then flush + enc := gob.NewEncoder(writer) + if err = enc.Encode(mb.name); err != nil { + _ = file.Close() + return err + } + for _, m := range mb.messages { + if err = enc.Encode(m); err != nil { + _ = file.Close() + return err + } + } + if err := writer.Flush(); err != nil { + _ = file.Close() + return err + } + if err := file.Close(); err != nil { + log.Errorf("Failed to close %q: %v", mb.indexPath, err) + return err + } + } else { + // No messages, delete index+maildir + log.Tracef("Removing mailbox %v", mb.path) + return mb.removeDir() + } + return nil +} + +// createDir checks for the presence of the path for this mailbox, creates it if needed +func (mb *mbox) createDir() error { + dirMx.Lock() + defer dirMx.Unlock() + if _, err := os.Stat(mb.path); err != nil { + if err := os.MkdirAll(mb.path, 0770); err != nil { + log.Errorf("Failed to create directory %v, %v", mb.path, err) + return err + } + } + return nil +} + +// removeDir removes the mailbox, plus empty higher level directories +func (mb *mbox) removeDir() error { + dirMx.Lock() + defer dirMx.Unlock() + // remove mailbox dir, including index file + if err := os.RemoveAll(mb.path); err != nil { + return err + } + // remove parents if empty + dir := filepath.Dir(mb.path) + if removeDirIfEmpty(dir) { + removeDirIfEmpty(filepath.Dir(dir)) + } + return nil +} + +// removeDirIfEmpty will remove the specified directory if it contains no files or directories. +// Caller should hold dirMx. Returns true if dir was removed. +func removeDirIfEmpty(path string) (removed bool) { + f, err := os.Open(path) + if err != nil { + return false + } + files, err := f.Readdirnames(0) + _ = f.Close() + if err != nil { + return false + } + if len(files) > 0 { + // Dir not empty + return false + } + log.Tracef("Removing dir %v", path) + err = os.Remove(path) + if err != nil { + log.Errorf("Failed to remove %q: %v", path, err) + return false + } + return true +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 62d4972..f8bcaef 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -5,7 +5,6 @@ import ( "errors" "io" "net/mail" - "sync" "time" ) @@ -26,8 +25,6 @@ type Store interface { PurgeMessages(mailbox string) error RemoveMessage(mailbox, id string) error VisitMailboxes(f func([]StoreMessage) (cont bool)) error - // LockFor is a temporary hack to fix #77 until Datastore revamp - LockFor(emailAddress string) (*sync.RWMutex, error) } // StoreMessage represents a message to be stored, or returned from a storage implementation. diff --git a/pkg/test/storage.go b/pkg/test/storage.go index 52c9b00..2c45b10 100644 --- a/pkg/test/storage.go +++ b/pkg/test/storage.go @@ -2,7 +2,6 @@ package test import ( "errors" - "sync" "github.com/jhillyerd/inbucket/pkg/storage" ) @@ -82,12 +81,6 @@ func (s *StoreStub) VisitMailboxes(f func([]storage.StoreMessage) (cont bool)) e return nil } -// LockFor mock function returns a new RWMutex, never errors. -// TODO(#69) remove -func (s *StoreStub) LockFor(name string) (*sync.RWMutex, error) { - return &sync.RWMutex{}, nil -} - // MessageDeleted returns true if the specified message was deleted func (s *StoreStub) MessageDeleted(m storage.StoreMessage) bool { _, ok := s.deleted[m]