mirror of
https://github.com/jhillyerd/inbucket.git
synced 2025-12-18 18:17:03 +00:00
datastore: Concurrency fix, closes #77
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/mail"
|
"net/mail"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jhillyerd/enmime"
|
"github.com/jhillyerd/enmime"
|
||||||
@@ -22,6 +23,8 @@ var (
|
|||||||
type DataStore interface {
|
type DataStore interface {
|
||||||
MailboxFor(emailAddress string) (Mailbox, error)
|
MailboxFor(emailAddress string) (Mailbox, error)
|
||||||
AllMailboxes() ([]Mailbox, error)
|
AllMailboxes() ([]Mailbox, error)
|
||||||
|
// LockFor is a temporary hack to fix #77 until Datastore revamp
|
||||||
|
LockFor(emailAddress string) (*sync.RWMutex, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mailbox is an interface to get and manipulate messages in a DataStore
|
// Mailbox is an interface to get and manipulate messages in a DataStore
|
||||||
|
|||||||
19
datastore/lock.go
Normal file
19
datastore/lock.go
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package datastore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type HashLock [4096]sync.RWMutex
|
||||||
|
|
||||||
|
func (h *HashLock) Get(hash string) *sync.RWMutex {
|
||||||
|
if len(hash) < 3 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
i, err := strconv.ParseInt(hash[0:3], 16, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &h[i]
|
||||||
|
}
|
||||||
61
datastore/lock_test.go
Normal file
61
datastore/lock_test.go
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
package datastore_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/jhillyerd/inbucket/datastore"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHashLock(t *testing.T) {
|
||||||
|
hl := &datastore.HashLock{}
|
||||||
|
|
||||||
|
// Invalid hashes
|
||||||
|
testCases := []struct {
|
||||||
|
name, input string
|
||||||
|
}{
|
||||||
|
{"empty", ""},
|
||||||
|
{"short", "a0"},
|
||||||
|
{"badhex", "zzzzzzzzzzzzzzzzzzzzzzz"},
|
||||||
|
}
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.input, func(t *testing.T) {
|
||||||
|
l := hl.Get(tc.input)
|
||||||
|
if l != nil {
|
||||||
|
t.Errorf("Expected nil lock for %s %q, got %v", tc.name, tc.input, l)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Valid hashes
|
||||||
|
testStrings := []string{
|
||||||
|
"deadbeef",
|
||||||
|
"00000000",
|
||||||
|
"ffffffff",
|
||||||
|
}
|
||||||
|
for _, ts := range testStrings {
|
||||||
|
t.Run(ts, func(t *testing.T) {
|
||||||
|
l := hl.Get(ts)
|
||||||
|
if l == nil {
|
||||||
|
t.Errorf("Expeced non-nil lock for hex string %q", ts)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
a := hl.Get("deadbeef")
|
||||||
|
b := hl.Get("deadbeef")
|
||||||
|
if a != b {
|
||||||
|
t.Errorf("Expected identical locks for identical hashes, got: %p != %p", a, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
a = hl.Get("deadbeef")
|
||||||
|
b = hl.Get("d3adb33f")
|
||||||
|
if a == b {
|
||||||
|
t.Errorf("Expected different locks for different hashes, got: %p == %p", a, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
a = hl.Get("deadbeef")
|
||||||
|
b = hl.Get("deadb33f")
|
||||||
|
if a != b {
|
||||||
|
t.Errorf("Expected identical locks for identical leading hashes, got: %p != %p", a, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,6 +3,7 @@ package datastore
|
|||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
"net/mail"
|
"net/mail"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jhillyerd/enmime"
|
"github.com/jhillyerd/enmime"
|
||||||
@@ -26,6 +27,10 @@ func (m *MockDataStore) AllMailboxes() ([]Mailbox, error) {
|
|||||||
return args.Get(0).([]Mailbox), args.Error(1)
|
return args.Get(0).([]Mailbox), args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MockDataStore) LockFor(name string) (*sync.RWMutex, error) {
|
||||||
|
return &sync.RWMutex{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// MockMailbox is a shared mock for unit testing
|
// MockMailbox is a shared mock for unit testing
|
||||||
type MockMailbox struct {
|
type MockMailbox struct {
|
||||||
mock.Mock
|
mock.Mock
|
||||||
|
|||||||
@@ -51,6 +51,7 @@ func countGenerator(c chan int) {
|
|||||||
// FileDataStore implements DataStore aand is the root of the mail storage
|
// FileDataStore implements DataStore aand is the root of the mail storage
|
||||||
// hiearchy. It provides access to Mailbox objects
|
// hiearchy. It provides access to Mailbox objects
|
||||||
type FileDataStore struct {
|
type FileDataStore struct {
|
||||||
|
hashLock datastore.HashLock
|
||||||
path string
|
path string
|
||||||
mailPath string
|
mailPath string
|
||||||
messageCap int
|
messageCap int
|
||||||
@@ -139,6 +140,15 @@ func (ds *FileDataStore) AllMailboxes() ([]datastore.Mailbox, error) {
|
|||||||
return mailboxes, nil
|
return mailboxes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ds *FileDataStore) LockFor(emailAddress string) (*sync.RWMutex, error) {
|
||||||
|
name, err := stringutil.ParseMailboxName(emailAddress)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
hash := stringutil.HashMailboxName(name)
|
||||||
|
return ds.hashLock.Get(hash), nil
|
||||||
|
}
|
||||||
|
|
||||||
// FileMailbox implements Mailbox, manages the mail for a specific user and
|
// FileMailbox implements Mailbox, manages the mail for a specific user and
|
||||||
// correlates to a particular directory on disk.
|
// correlates to a particular directory on disk.
|
||||||
type FileMailbox struct {
|
type FileMailbox struct {
|
||||||
|
|||||||
@@ -402,7 +402,19 @@ func (ss *Session) dataHandler() {
|
|||||||
if ss.server.storeMessages {
|
if ss.server.storeMessages {
|
||||||
// Create a message for each valid recipient
|
// Create a message for each valid recipient
|
||||||
for _, r := range recipients {
|
for _, r := range recipients {
|
||||||
if ok := ss.deliverMessage(r, msgBuf); ok {
|
// TODO temporary hack to fix #77 until datastore revamp
|
||||||
|
mu, err := ss.server.dataStore.LockFor(r.localPart)
|
||||||
|
if err != nil {
|
||||||
|
ss.logError("Failed to get lock for %q: %s", r.localPart, err)
|
||||||
|
// Delivery failure
|
||||||
|
ss.send(fmt.Sprintf("451 Failed to store message for %v", r.localPart))
|
||||||
|
ss.reset()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mu.Lock()
|
||||||
|
ok := ss.deliverMessage(r, msgBuf)
|
||||||
|
mu.Unlock()
|
||||||
|
if ok {
|
||||||
expReceivedTotal.Add(1)
|
expReceivedTotal.Add(1)
|
||||||
} else {
|
} else {
|
||||||
// Delivery failure
|
// Delivery failure
|
||||||
|
|||||||
Reference in New Issue
Block a user