1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 17:47:03 +00:00

Reimplement msghub as an actor

This commit is contained in:
James Hillyerd
2017-01-28 19:20:06 -08:00
parent e6f95c9367
commit 9b1d28fc7d
4 changed files with 88 additions and 92 deletions

View File

@@ -3,10 +3,12 @@ package msghub
import ( import (
"container/ring" "container/ring"
"context" "context"
"sync"
"time" "time"
) )
// Length of msghub operation queue
const opChanLen = 100
// Message contains the basic header data for a message // Message contains the basic header data for a message
type Message struct { type Message struct {
Mailbox string Mailbox string
@@ -18,33 +20,27 @@ type Message struct {
Size int64 Size int64
} }
// Listener receives the contents of the log, followed by new messages // Listener receives the contents of the history buffer, followed by new messages
type Listener interface { type Listener interface {
Receive(msg Message) error Receive(msg Message) error
} }
// Hub relays messages on to its listeners // Hub relays messages on to its listeners
type Hub struct { type Hub struct {
// log stores history, points next spot to write. First non-nil entry is oldest Message // history buffer, points next Message to write. Proceeding non-nil entry is oldest Message
log *ring.Ring history *ring.Ring
logMx sync.RWMutex listeners map[Listener]struct{} // listeners interested in new messages
opChan chan func(h *Hub) // operations queued for this actor
// listeners interested in new messages
listeners map[Listener]struct{}
listenersMx sync.RWMutex
// broadcast receives new messages
broadcast chan Message
} }
// New constructs a new Hub which will cache logSize messages in memory for playback to future // New constructs a new Hub which will cache historyLen messages in memory for playback to future
// listeners. A goroutine is created to handle incoming messages; it will run until the provided // listeners. A goroutine is created to handle incoming messages; it will run until the provided
// context is canceled. // context is canceled.
func New(ctx context.Context, logSize int) *Hub { func New(ctx context.Context, historyLen int) *Hub {
h := &Hub{ h := &Hub{
log: ring.New(logSize), history: ring.New(historyLen),
listeners: make(map[Listener]struct{}), listeners: make(map[Listener]struct{}),
broadcast: make(chan Message, 100), opChan: make(chan func(h *Hub), opChanLen),
} }
go func() { go func() {
@@ -52,17 +48,10 @@ func New(ctx context.Context, logSize int) *Hub {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// Shutdown // Shutdown
close(h.broadcast) close(h.opChan)
h.broadcast = nil
return return
case msg := <-h.broadcast: case op := <-h.opChan:
// Log message op(h)
h.logMx.Lock()
h.log.Value = msg
h.log = h.log.Next()
h.logMx.Unlock()
// Deliver message to listeners
h.deliver(msg)
} }
} }
}() }()
@@ -70,47 +59,50 @@ func New(ctx context.Context, logSize int) *Hub {
return h return h
} }
// Broadcast queues a message for processing by the hub. The message will be placed into the // Dispatch queues a message for broadcast by the hub. The message will be placed into the
// in-memory log and relayed to all registered listeners. // history buffer and then relayed to all registered listeners.
func (h *Hub) Broadcast(msg Message) { func (hub *Hub) Dispatch(msg Message) {
if h.broadcast != nil { hub.opChan <- func(h *Hub) {
h.broadcast <- msg // Add to history buffer
h.history.Value = msg
h.history = h.history.Next()
// Deliver message to all listeners, removing listeners if they return an error
for l := range h.listeners {
if err := l.Receive(msg); err != nil {
delete(h.listeners, l)
}
}
} }
} }
// AddListener registers a listener to receive broadcasted messages. // AddListener registers a listener to receive broadcasted messages.
func (h *Hub) AddListener(l Listener) { func (hub *Hub) AddListener(l Listener) {
// Playback log hub.opChan <- func(h *Hub) {
h.logMx.RLock() // Playback log
h.log.Do(func(v interface{}) { h.history.Do(func(v interface{}) {
if v != nil { if v != nil {
l.Receive(v.(Message)) l.Receive(v.(Message))
} }
}) })
h.logMx.RUnlock()
// Add to listeners // Add to listeners
h.listenersMx.Lock() h.listeners[l] = struct{}{}
h.listeners[l] = struct{}{} }
h.listenersMx.Unlock()
} }
// RemoveListener deletes a listener registration, it will cease to receive messages. // RemoveListener deletes a listener registration, it will cease to receive messages.
func (h *Hub) RemoveListener(l Listener) { func (hub *Hub) RemoveListener(l Listener) {
h.listenersMx.Lock() hub.opChan <- func(h *Hub) {
defer h.listenersMx.Unlock()
if _, ok := h.listeners[l]; ok {
delete(h.listeners, l) delete(h.listeners, l)
} }
} }
// deliver message to all listeners, removing listeners if they return an error // Sync blocks until the msghub has processed its queue up to this point, useful
func (h *Hub) deliver(msg Message) { // for unit tests.
h.listenersMx.RLock() func (hub *Hub) Sync() {
defer h.listenersMx.RUnlock() done := make(chan struct{})
for l := range h.listeners { hub.opChan <- func(h *Hub) {
if err := l.Receive(msg); err != nil { close(done)
h.RemoveListener(l)
}
} }
<-done
} }

View File

@@ -66,7 +66,7 @@ func TestHubZeroListeners(t *testing.T) {
hub := New(ctx, 5) hub := New(ctx, 5)
m := Message{} m := Message{}
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
hub.Broadcast(m) hub.Dispatch(m)
} }
// Just making sure Hub doesn't panic // Just making sure Hub doesn't panic
} }
@@ -79,7 +79,7 @@ func TestHubOneListener(t *testing.T) {
l := newTestListener(1) l := newTestListener(1)
hub.AddListener(l) hub.AddListener(l)
hub.Broadcast(m) hub.Dispatch(m)
// Wait for messages // Wait for messages
select { select {
@@ -97,15 +97,16 @@ func TestHubRemoveListener(t *testing.T) {
l := newTestListener(1) l := newTestListener(1)
hub.AddListener(l) hub.AddListener(l)
hub.Broadcast(m) hub.Dispatch(m)
hub.RemoveListener(l) hub.RemoveListener(l)
hub.Broadcast(m) hub.Dispatch(m)
hub.Sync()
// Wait for messages // Wait for messages
select { select {
case <-l.overflow: case <-l.overflow:
t.Error(l) t.Error(l)
case <-time.After(250 * time.Millisecond): case <-time.After(50 * time.Millisecond):
// Expected result, no overflow // Expected result, no overflow
} }
} }
@@ -121,21 +122,22 @@ func TestHubRemoveListenerOnError(t *testing.T) {
l.errorAfter = 1 l.errorAfter = 1
hub.AddListener(l) hub.AddListener(l)
hub.Broadcast(m) hub.Dispatch(m)
hub.Broadcast(m) hub.Dispatch(m)
hub.Broadcast(m) hub.Dispatch(m)
hub.Broadcast(m) hub.Dispatch(m)
hub.Sync()
// Wait for messages // Wait for messages
select { select {
case <-l.overflow: case <-l.overflow:
t.Error(l) t.Error(l)
case <-time.After(250 * time.Millisecond): case <-time.After(50 * time.Millisecond):
// Expected result, no overflow // Expected result, no overflow
} }
} }
func TestHubLogReplay(t *testing.T) { func TestHubHistoryReplay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(ctx, 100) hub := New(ctx, 100)
@@ -148,7 +150,7 @@ func TestHubLogReplay(t *testing.T) {
msgs[i] = Message{ msgs[i] = Message{
Subject: fmt.Sprintf("subj %v", i), Subject: fmt.Sprintf("subj %v", i),
} }
hub.Broadcast(msgs[i]) hub.Dispatch(msgs[i])
} }
// Wait for messages (live) // Wait for messages (live)
@@ -162,7 +164,7 @@ func TestHubLogReplay(t *testing.T) {
l2 := newTestListener(3) l2 := newTestListener(3)
hub.AddListener(l2) hub.AddListener(l2)
// Wait for messages (log) // Wait for messages (history)
select { select {
case <-l2.done: case <-l2.done:
case <-time.After(time.Second): case <-time.After(time.Second):
@@ -178,7 +180,7 @@ func TestHubLogReplay(t *testing.T) {
} }
} }
func TestHubLogReplayWrap(t *testing.T) { func TestHubHistoryReplayWrap(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
hub := New(ctx, 5) hub := New(ctx, 5)
@@ -191,7 +193,7 @@ func TestHubLogReplayWrap(t *testing.T) {
msgs[i] = Message{ msgs[i] = Message{
Subject: fmt.Sprintf("subj %v", i), Subject: fmt.Sprintf("subj %v", i),
} }
hub.Broadcast(msgs[i]) hub.Dispatch(msgs[i])
} }
// Wait for messages (live) // Wait for messages (live)
@@ -205,7 +207,7 @@ func TestHubLogReplayWrap(t *testing.T) {
l2 := newTestListener(5) l2 := newTestListener(5)
hub.AddListener(l2) hub.AddListener(l2)
// Wait for messages (log) // Wait for messages (history)
select { select {
case <-l2.done: case <-l2.done:
case <-time.After(time.Second): case <-time.After(time.Second):
@@ -228,16 +230,15 @@ func TestHubContextCancel(t *testing.T) {
l := newTestListener(1) l := newTestListener(1)
hub.AddListener(l) hub.AddListener(l)
hub.Broadcast(m) hub.Dispatch(m)
hub.Sync()
cancel() cancel()
time.Sleep(50 * time.Millisecond)
hub.Broadcast(m)
// Wait for messages // Wait for messages
select { select {
case <-l.overflow: case <-l.overflow:
t.Error(l) t.Error(l)
case <-time.After(250 * time.Millisecond): case <-time.After(50 * time.Millisecond):
// Expected result, no overflow // Expected result, no overflow
} }
} }

View File

@@ -475,7 +475,7 @@ func (ss *Session) deliverMessage(r recipientDetails, msgBuf [][]byte) (ok bool)
Date: msg.Date(), Date: msg.Date(),
Size: msg.Size(), Size: msg.Size(),
} }
ss.server.msgHub.Broadcast(broadcast) ss.server.msgHub.Dispatch(broadcast)
return true return true
} }

View File

@@ -2,6 +2,7 @@ package smtpd
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
@@ -28,8 +29,8 @@ func TestGreetState(t *testing.T) {
mb1 := &MockMailbox{} mb1 := &MockMailbox{}
mds.On("MailboxFor").Return(mb1, nil) mds.On("MailboxFor").Return(mb1, nil)
server, logbuf := setupSMTPServer(mds) server, logbuf, teardown := setupSMTPServer(mds)
defer teardownSMTPServer(server) defer teardown()
var script []scriptStep var script []scriptStep
@@ -89,8 +90,8 @@ func TestReadyState(t *testing.T) {
mb1 := &MockMailbox{} mb1 := &MockMailbox{}
mds.On("MailboxFor").Return(mb1, nil) mds.On("MailboxFor").Return(mb1, nil)
server, logbuf := setupSMTPServer(mds) server, logbuf, teardown := setupSMTPServer(mds)
defer teardownSMTPServer(server) defer teardown()
var script []scriptStep var script []scriptStep
@@ -164,8 +165,8 @@ func TestMailState(t *testing.T) {
msg1.On("Size").Return(0) msg1.On("Size").Return(0)
msg1.On("Close").Return(nil) msg1.On("Close").Return(nil)
server, logbuf := setupSMTPServer(mds) server, logbuf, teardown := setupSMTPServer(mds)
defer teardownSMTPServer(server) defer teardown()
var script []scriptStep var script []scriptStep
@@ -281,8 +282,8 @@ func TestDataState(t *testing.T) {
msg1.On("Size").Return(0) msg1.On("Size").Return(0)
msg1.On("Close").Return(nil) msg1.On("Close").Return(nil)
server, logbuf := setupSMTPServer(mds) server, logbuf, teardown := setupSMTPServer(mds)
defer teardownSMTPServer(server) defer teardown()
var script []scriptStep var script []scriptStep
pipe := setupSMTPSession(server) pipe := setupSMTPSession(server)
@@ -375,7 +376,7 @@ func (m *mockConn) SetDeadline(t time.Time) error { return nil }
func (m *mockConn) SetReadDeadline(t time.Time) error { return nil } func (m *mockConn) SetReadDeadline(t time.Time) error { return nil }
func (m *mockConn) SetWriteDeadline(t time.Time) error { return nil } func (m *mockConn) SetWriteDeadline(t time.Time) error { return nil }
func setupSMTPServer(ds DataStore) (*Server, *bytes.Buffer) { func setupSMTPServer(ds DataStore) (s *Server, buf *bytes.Buffer, teardown func()) {
// Test Server Config // Test Server Config
cfg := config.SMTPConfig{ cfg := config.SMTPConfig{
IP4address: net.IPv4(127, 0, 0, 1), IP4address: net.IPv4(127, 0, 0, 1),
@@ -389,12 +390,18 @@ func setupSMTPServer(ds DataStore) (*Server, *bytes.Buffer) {
} }
// Capture log output // Capture log output
buf := new(bytes.Buffer) buf = new(bytes.Buffer)
log.SetOutput(buf) log.SetOutput(buf)
// Create a server, don't start it // Create a server, don't start it
shutdownChan := make(chan bool) shutdownChan := make(chan bool)
return NewServer(cfg, shutdownChan, ds, &msghub.Hub{}), buf ctx, cancel := context.WithCancel(context.Background())
teardown = func() {
close(shutdownChan)
cancel()
}
s = NewServer(cfg, shutdownChan, ds, msghub.New(ctx, 100))
return s, buf, teardown
} }
var sessionNum int var sessionNum int
@@ -409,7 +416,3 @@ func setupSMTPSession(server *Server) net.Conn {
return clientConn return clientConn
} }
func teardownSMTPServer(server *Server) {
//log.SetOutput(os.Stderr)
}