1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 09:37:02 +00:00

storage: Make locking an implementation detail for #69

- file: Store handles its own locking #77
- file: Move mbox into its own file
- file & test: remove LockFor()
This commit is contained in:
James Hillyerd
2018-03-17 14:02:50 -07:00
parent b9003a9328
commit e84b1f8952
5 changed files with 288 additions and 292 deletions

View File

@@ -368,23 +368,10 @@ func (ss *Session) dataHandler() {
return return
} }
if bytes.Equal(lineBuf, []byte(".\r\n")) || bytes.Equal(lineBuf, []byte(".\n")) { if bytes.Equal(lineBuf, []byte(".\r\n")) || bytes.Equal(lineBuf, []byte(".\n")) {
// Mail data complete // Mail data complete.
for _, recip := range ss.recipients { for _, recip := range ss.recipients {
if recip.ShouldStore() { if recip.ShouldStore() {
// TODO temporary hack to fix #77 until datastore revamp if ok := ss.deliverMessage(recip, msgBuf.Bytes()); !ok {
mu, err := ss.server.dataStore.LockFor(recip.LocalPart)
if err != nil {
ss.logError("Failed to get lock for %q: %s", recip.LocalPart, err)
// Delivery failure
ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart))
ss.reset()
return
}
mu.Lock()
ok := ss.deliverMessage(recip, msgBuf.Bytes())
mu.Unlock()
if !ok {
// Delivery failure
ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart)) ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart))
ss.reset() ss.reset()
return return
@@ -397,28 +384,22 @@ func (ss *Session) dataHandler() {
ss.reset() ss.reset()
return return
} }
// SMTP RFC says remove leading periods from input // RFC says remove leading periods from input.
if len(lineBuf) > 0 && lineBuf[0] == '.' { if len(lineBuf) > 0 && lineBuf[0] == '.' {
lineBuf = lineBuf[1:] lineBuf = lineBuf[1:]
} }
msgBuf.Write(lineBuf) msgBuf.Write(lineBuf)
if msgBuf.Len() > ss.server.maxMessageBytes { if msgBuf.Len() > ss.server.maxMessageBytes {
// Max message size exceeded
ss.send("552 Maximum message size exceeded") ss.send("552 Maximum message size exceeded")
ss.logWarn("Max message size exceeded while in DATA") ss.logWarn("Max message size exceeded while in DATA")
ss.reset() ss.reset()
return return
} }
} // end for }
} }
// deliverMessage creates and populates a new Message for the specified recipient // deliverMessage creates and populates a new Message for the specified recipient
func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok bool) { func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok bool) {
name, err := policy.ParseMailboxName(recip.LocalPart)
if err != nil {
// This parse already succeeded when MailboxFor was called, shouldn't fail here.
return false
}
// TODO replace with something that only reads header? // TODO replace with something that only reads header?
env, err := enmime.ReadEnvelope(bytes.NewReader(content)) env, err := enmime.ReadEnvelope(bytes.NewReader(content))
if err != nil { if err != nil {
@@ -441,7 +422,7 @@ func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok b
ss.remoteDomain, ss.remoteHost, ss.server.domain, recip.Address, stamp)) ss.remoteDomain, ss.remoteHost, ss.server.domain, recip.Address, stamp))
delivery := &message.Delivery{ delivery := &message.Delivery{
Meta: message.Metadata{ Meta: message.Metadata{
Mailbox: name, Mailbox: recip.Mailbox,
From: from[0], From: from[0],
To: to, To: to,
Date: time.Now(), Date: time.Now(),
@@ -455,8 +436,9 @@ func (ss *Session) deliverMessage(recip *policy.Recipient, content []byte) (ok b
return false return false
} }
// Broadcast message information. // Broadcast message information.
// TODO this belongs in message pkg.
broadcast := msghub.Message{ broadcast := msghub.Message{
Mailbox: name, Mailbox: recip.Mailbox,
ID: id, ID: id,
From: delivery.From().String(), From: delivery.From().String(),
To: stringutil.StringAddressList(delivery.To()), To: stringutil.StringAddressList(delivery.To()),

View File

@@ -2,7 +2,6 @@ package file
import ( import (
"bufio" "bufio"
"encoding/gob"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@@ -77,11 +76,13 @@ func New(cfg config.DataStoreConfig) storage.Store {
// AddMessage adds a message to the specified mailbox. // AddMessage adds a message to the specified mailbox.
func (fs *Store) AddMessage(m storage.StoreMessage) (id string, err error) { func (fs *Store) AddMessage(m storage.StoreMessage) (id string, err error) {
r, err := m.RawReader() mb, err := fs.mbox(m.Mailbox())
if err != nil { if err != nil {
return "", err return "", err
} }
mb, err := fs.mbox(m.Mailbox()) mb.Lock()
defer mb.Unlock()
r, err := m.RawReader()
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -140,6 +141,8 @@ func (fs *Store) GetMessage(mailbox, id string) (storage.StoreMessage, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
mb.RLock()
defer mb.RUnlock()
return mb.getMessage(id) return mb.getMessage(id)
} }
@@ -149,6 +152,8 @@ func (fs *Store) GetMessages(mailbox string) ([]storage.StoreMessage, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
mb.RLock()
defer mb.RUnlock()
return mb.getMessages() return mb.getMessages()
} }
@@ -158,6 +163,8 @@ func (fs *Store) RemoveMessage(mailbox, id string) error {
if err != nil { if err != nil {
return err return err
} }
mb.Lock()
defer mb.Unlock()
return mb.removeMessage(id) return mb.removeMessage(id)
} }
@@ -167,6 +174,8 @@ func (fs *Store) PurgeMessages(mailbox string) error {
if err != nil { if err != nil {
return err return err
} }
mb.Lock()
defer mb.Unlock()
return mb.purge() return mb.purge()
} }
@@ -196,12 +205,10 @@ func (fs *Store) VisitMailboxes(f func([]storage.StoreMessage) (cont bool)) erro
// Loop over mailboxes // Loop over mailboxes
for _, inf3 := range infos3 { for _, inf3 := range infos3 {
if inf3.IsDir() { if inf3.IsDir() {
mbdir := inf3.Name() mb := fs.mboxFromHash(inf3.Name())
mbpath := filepath.Join(fs.mailPath, l1, l2, mbdir) mb.RLock()
idx := filepath.Join(mbpath, indexFileName)
mb := &mbox{store: fs, dirName: mbdir, path: mbpath,
indexPath: idx}
msgs, err := mb.getMessages() msgs, err := mb.getMessages()
mb.RUnlock()
if err != nil { if err != nil {
return err return err
} }
@@ -217,265 +224,40 @@ func (fs *Store) VisitMailboxes(f func([]storage.StoreMessage) (cont bool)) erro
return nil return nil
} }
// LockFor returns the RWMutex for this mailbox, or an error.
func (fs *Store) LockFor(emailAddress string) (*sync.RWMutex, error) {
name, err := policy.ParseMailboxName(emailAddress)
if err != nil {
return nil, err
}
hash := stringutil.HashMailboxName(name)
return fs.hashLock.Get(hash), nil
}
// NewMessage is temproary until #69 MessageData refactor
func (fs *Store) NewMessage(mailbox string) (storage.StoreMessage, error) {
mb, err := fs.mbox(mailbox)
if err != nil {
return nil, err
}
return mb.newMessage()
}
// mbox returns the named mailbox. // mbox returns the named mailbox.
func (fs *Store) mbox(mailbox string) (*mbox, error) { func (fs *Store) mbox(mailbox string) (*mbox, error) {
name, err := policy.ParseMailboxName(mailbox) name, err := policy.ParseMailboxName(mailbox)
if err != nil { if err != nil {
return nil, err return nil, err
} }
dir := stringutil.HashMailboxName(name) hash := stringutil.HashMailboxName(name)
s1 := dir[0:3] s1 := hash[0:3]
s2 := dir[0:6] s2 := hash[0:6]
path := filepath.Join(fs.mailPath, s1, s2, dir) path := filepath.Join(fs.mailPath, s1, s2, hash)
indexPath := filepath.Join(path, indexFileName) indexPath := filepath.Join(path, indexFileName)
return &mbox{
return &mbox{store: fs, name: name, dirName: dir, path: path, RWMutex: fs.hashLock.Get(hash),
indexPath: indexPath}, nil store: fs,
name: name,
dirName: hash,
path: path,
indexPath: indexPath,
}, nil
} }
// mbox manages the mail for a specific user and correlates to a particular directory on disk. // mboxFromPath constructs a mailbox based on name hash.
type mbox struct { func (fs *Store) mboxFromHash(hash string) *mbox {
store *Store s1 := hash[0:3]
name string s2 := hash[0:6]
dirName string path := filepath.Join(fs.mailPath, s1, s2, hash)
path string indexPath := filepath.Join(path, indexFileName)
indexLoaded bool return &mbox{
indexPath string RWMutex: fs.hashLock.Get(hash),
messages []*Message store: fs,
dirName: hash,
path: path,
indexPath: indexPath,
} }
// getMessages scans the mailbox directory for .gob files and decodes them into
// a slice of Message objects.
func (mb *mbox) getMessages() ([]storage.StoreMessage, error) {
if !mb.indexLoaded {
if err := mb.readIndex(); err != nil {
return nil, err
}
}
messages := make([]storage.StoreMessage, len(mb.messages))
for i, m := range mb.messages {
messages[i] = m
}
return messages, nil
}
// getMessage decodes a single message by ID and returns a Message object.
func (mb *mbox) getMessage(id string) (storage.StoreMessage, error) {
if !mb.indexLoaded {
if err := mb.readIndex(); err != nil {
return nil, err
}
}
if id == "latest" && len(mb.messages) != 0 {
return mb.messages[len(mb.messages)-1], nil
}
for _, m := range mb.messages {
if m.Fid == id {
return m, nil
}
}
return nil, storage.ErrNotExist
}
// removeMessage deletes the message off disk and removes it from the index.
func (mb *mbox) removeMessage(id string) error {
if !mb.indexLoaded {
if err := mb.readIndex(); err != nil {
return err
}
}
var msg *Message
for i, m := range mb.messages {
if id == m.ID() {
msg = m
// Slice around message we are deleting
mb.messages = append(mb.messages[:i], mb.messages[i+1:]...)
break
}
}
if msg == nil {
return storage.ErrNotExist
}
if err := mb.writeIndex(); err != nil {
return err
}
if len(mb.messages) == 0 {
// This was the last message, thus writeIndex() has removed the entire
// directory; we don't need to delete the raw file.
return nil
}
// There are still messages in the index
log.Tracef("Deleting %v", msg.rawPath())
return os.Remove(msg.rawPath())
}
// purge deletes all messages in this mailbox.
func (mb *mbox) purge() error {
mb.messages = mb.messages[:0]
return mb.writeIndex()
}
// readIndex loads the mailbox index data from disk
func (mb *mbox) readIndex() error {
// Clear message slice, open index
mb.messages = mb.messages[:0]
// Lock for reading
indexMx.RLock()
defer indexMx.RUnlock()
// Check if index exists
if _, err := os.Stat(mb.indexPath); err != nil {
// Does not exist, but that's not an error in our world
log.Tracef("Index %v does not exist (yet)", mb.indexPath)
mb.indexLoaded = true
return nil
}
file, err := os.Open(mb.indexPath)
if err != nil {
return err
}
defer func() {
if err := file.Close(); err != nil {
log.Errorf("Failed to close %q: %v", mb.indexPath, err)
}
}()
// Decode gob data
dec := gob.NewDecoder(bufio.NewReader(file))
name := ""
if err = dec.Decode(&name); err != nil {
return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err)
}
mb.name = name
for {
// Load messages until EOF
msg := &Message{}
if err = dec.Decode(msg); err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err)
}
msg.mailbox = mb
mb.messages = append(mb.messages, msg)
}
mb.indexLoaded = true
return nil
}
// writeIndex overwrites the index on disk with the current mailbox data
func (mb *mbox) writeIndex() error {
// Lock for writing
indexMx.Lock()
defer indexMx.Unlock()
if len(mb.messages) > 0 {
// Ensure mailbox directory exists
if err := mb.createDir(); err != nil {
return err
}
// Open index for writing
file, err := os.Create(mb.indexPath)
if err != nil {
return err
}
writer := bufio.NewWriter(file)
// Write each message and then flush
enc := gob.NewEncoder(writer)
if err = enc.Encode(mb.name); err != nil {
_ = file.Close()
return err
}
for _, m := range mb.messages {
if err = enc.Encode(m); err != nil {
_ = file.Close()
return err
}
}
if err := writer.Flush(); err != nil {
_ = file.Close()
return err
}
if err := file.Close(); err != nil {
log.Errorf("Failed to close %q: %v", mb.indexPath, err)
return err
}
} else {
// No messages, delete index+maildir
log.Tracef("Removing mailbox %v", mb.path)
return mb.removeDir()
}
return nil
}
// createDir checks for the presence of the path for this mailbox, creates it if needed
func (mb *mbox) createDir() error {
dirMx.Lock()
defer dirMx.Unlock()
if _, err := os.Stat(mb.path); err != nil {
if err := os.MkdirAll(mb.path, 0770); err != nil {
log.Errorf("Failed to create directory %v, %v", mb.path, err)
return err
}
}
return nil
}
// removeDir removes the mailbox, plus empty higher level directories
func (mb *mbox) removeDir() error {
dirMx.Lock()
defer dirMx.Unlock()
// remove mailbox dir, including index file
if err := os.RemoveAll(mb.path); err != nil {
return err
}
// remove parents if empty
dir := filepath.Dir(mb.path)
if removeDirIfEmpty(dir) {
removeDirIfEmpty(filepath.Dir(dir))
}
return nil
}
// removeDirIfEmpty will remove the specified directory if it contains no files or directories.
// Caller should hold dirMx. Returns true if dir was removed.
func removeDirIfEmpty(path string) (removed bool) {
f, err := os.Open(path)
if err != nil {
return false
}
files, err := f.Readdirnames(0)
_ = f.Close()
if err != nil {
return false
}
if len(files) > 0 {
// Dir not empty
return false
}
log.Tracef("Removing dir %v", path)
err = os.Remove(path)
if err != nil {
log.Errorf("Failed to remove %q: %v", path, err)
return false
}
return true
} }
// generatePrefix converts a Time object into the ISO style format we use // generatePrefix converts a Time object into the ISO style format we use

242
pkg/storage/file/mbox.go Normal file
View File

@@ -0,0 +1,242 @@
package file
import (
"bufio"
"encoding/gob"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/storage"
)
// mbox manages the mail for a specific user and correlates to a particular directory on disk.
// mbox methods are not thread safe, mbox.RWMutex must be held prior to calling.
type mbox struct {
*sync.RWMutex
store *Store
name string
dirName string
path string
indexLoaded bool
indexPath string
messages []*Message
}
// getMessages scans the mailbox directory for .gob files and decodes them into
// a slice of Message objects.
func (mb *mbox) getMessages() ([]storage.StoreMessage, error) {
if !mb.indexLoaded {
if err := mb.readIndex(); err != nil {
return nil, err
}
}
messages := make([]storage.StoreMessage, len(mb.messages))
for i, m := range mb.messages {
messages[i] = m
}
return messages, nil
}
// getMessage decodes a single message by ID and returns a Message object.
func (mb *mbox) getMessage(id string) (storage.StoreMessage, error) {
if !mb.indexLoaded {
if err := mb.readIndex(); err != nil {
return nil, err
}
}
if id == "latest" && len(mb.messages) != 0 {
return mb.messages[len(mb.messages)-1], nil
}
for _, m := range mb.messages {
if m.Fid == id {
return m, nil
}
}
return nil, storage.ErrNotExist
}
// removeMessage deletes the message off disk and removes it from the index.
func (mb *mbox) removeMessage(id string) error {
if !mb.indexLoaded {
if err := mb.readIndex(); err != nil {
return err
}
}
var msg *Message
for i, m := range mb.messages {
if id == m.ID() {
msg = m
// Slice around message we are deleting
mb.messages = append(mb.messages[:i], mb.messages[i+1:]...)
break
}
}
if msg == nil {
return storage.ErrNotExist
}
if err := mb.writeIndex(); err != nil {
return err
}
if len(mb.messages) == 0 {
// This was the last message, thus writeIndex() has removed the entire
// directory; we don't need to delete the raw file.
return nil
}
// There are still messages in the index
log.Tracef("Deleting %v", msg.rawPath())
return os.Remove(msg.rawPath())
}
// purge deletes all messages in this mailbox.
func (mb *mbox) purge() error {
mb.messages = mb.messages[:0]
return mb.writeIndex()
}
// readIndex loads the mailbox index data from disk
func (mb *mbox) readIndex() error {
// Clear message slice, open index
mb.messages = mb.messages[:0]
// Lock for reading
indexMx.RLock()
defer indexMx.RUnlock()
// Check if index exists
if _, err := os.Stat(mb.indexPath); err != nil {
// Does not exist, but that's not an error in our world
log.Tracef("Index %v does not exist (yet)", mb.indexPath)
mb.indexLoaded = true
return nil
}
file, err := os.Open(mb.indexPath)
if err != nil {
return err
}
defer func() {
if err := file.Close(); err != nil {
log.Errorf("Failed to close %q: %v", mb.indexPath, err)
}
}()
// Decode gob data
dec := gob.NewDecoder(bufio.NewReader(file))
name := ""
if err = dec.Decode(&name); err != nil {
return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err)
}
mb.name = name
for {
// Load messages until EOF
msg := &Message{}
if err = dec.Decode(msg); err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("Corrupt mailbox %q: %v", mb.indexPath, err)
}
msg.mailbox = mb
mb.messages = append(mb.messages, msg)
}
mb.indexLoaded = true
return nil
}
// writeIndex overwrites the index on disk with the current mailbox data
func (mb *mbox) writeIndex() error {
// Lock for writing
indexMx.Lock()
defer indexMx.Unlock()
if len(mb.messages) > 0 {
// Ensure mailbox directory exists
if err := mb.createDir(); err != nil {
return err
}
// Open index for writing
file, err := os.Create(mb.indexPath)
if err != nil {
return err
}
writer := bufio.NewWriter(file)
// Write each message and then flush
enc := gob.NewEncoder(writer)
if err = enc.Encode(mb.name); err != nil {
_ = file.Close()
return err
}
for _, m := range mb.messages {
if err = enc.Encode(m); err != nil {
_ = file.Close()
return err
}
}
if err := writer.Flush(); err != nil {
_ = file.Close()
return err
}
if err := file.Close(); err != nil {
log.Errorf("Failed to close %q: %v", mb.indexPath, err)
return err
}
} else {
// No messages, delete index+maildir
log.Tracef("Removing mailbox %v", mb.path)
return mb.removeDir()
}
return nil
}
// createDir checks for the presence of the path for this mailbox, creates it if needed
func (mb *mbox) createDir() error {
dirMx.Lock()
defer dirMx.Unlock()
if _, err := os.Stat(mb.path); err != nil {
if err := os.MkdirAll(mb.path, 0770); err != nil {
log.Errorf("Failed to create directory %v, %v", mb.path, err)
return err
}
}
return nil
}
// removeDir removes the mailbox, plus empty higher level directories
func (mb *mbox) removeDir() error {
dirMx.Lock()
defer dirMx.Unlock()
// remove mailbox dir, including index file
if err := os.RemoveAll(mb.path); err != nil {
return err
}
// remove parents if empty
dir := filepath.Dir(mb.path)
if removeDirIfEmpty(dir) {
removeDirIfEmpty(filepath.Dir(dir))
}
return nil
}
// removeDirIfEmpty will remove the specified directory if it contains no files or directories.
// Caller should hold dirMx. Returns true if dir was removed.
func removeDirIfEmpty(path string) (removed bool) {
f, err := os.Open(path)
if err != nil {
return false
}
files, err := f.Readdirnames(0)
_ = f.Close()
if err != nil {
return false
}
if len(files) > 0 {
// Dir not empty
return false
}
log.Tracef("Removing dir %v", path)
err = os.Remove(path)
if err != nil {
log.Errorf("Failed to remove %q: %v", path, err)
return false
}
return true
}

View File

@@ -5,7 +5,6 @@ import (
"errors" "errors"
"io" "io"
"net/mail" "net/mail"
"sync"
"time" "time"
) )
@@ -26,8 +25,6 @@ type Store interface {
PurgeMessages(mailbox string) error PurgeMessages(mailbox string) error
RemoveMessage(mailbox, id string) error RemoveMessage(mailbox, id string) error
VisitMailboxes(f func([]StoreMessage) (cont bool)) error VisitMailboxes(f func([]StoreMessage) (cont bool)) error
// LockFor is a temporary hack to fix #77 until Datastore revamp
LockFor(emailAddress string) (*sync.RWMutex, error)
} }
// StoreMessage represents a message to be stored, or returned from a storage implementation. // StoreMessage represents a message to be stored, or returned from a storage implementation.

View File

@@ -2,7 +2,6 @@ package test
import ( import (
"errors" "errors"
"sync"
"github.com/jhillyerd/inbucket/pkg/storage" "github.com/jhillyerd/inbucket/pkg/storage"
) )
@@ -82,12 +81,6 @@ func (s *StoreStub) VisitMailboxes(f func([]storage.StoreMessage) (cont bool)) e
return nil return nil
} }
// LockFor mock function returns a new RWMutex, never errors.
// TODO(#69) remove
func (s *StoreStub) LockFor(name string) (*sync.RWMutex, error) {
return &sync.RWMutex{}, nil
}
// MessageDeleted returns true if the specified message was deleted // MessageDeleted returns true if the specified message was deleted
func (s *StoreStub) MessageDeleted(m storage.StoreMessage) bool { func (s *StoreStub) MessageDeleted(m storage.StoreMessage) bool {
_, ok := s.deleted[m] _, ok := s.deleted[m]