From b3db619db909235a0bf668b878ab8d38b1dc2b03 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Mon, 16 Jan 2017 13:09:50 -0800 Subject: [PATCH] Broadcast deliveries into msghub for #44 --- inbucket.go | 6 +++++- rest/testmocks_test.go | 5 +++++ smtpd/datastore.go | 1 + smtpd/filestore.go | 4 ++++ smtpd/handler.go | 13 +++++++++++++ smtpd/handler_test.go | 20 ++++++++++++++++++-- smtpd/listener.go | 28 ++++++++++++++++++---------- smtpd/retention_test.go | 5 +++++ 8 files changed, 69 insertions(+), 13 deletions(-) diff --git a/inbucket.go b/inbucket.go index 09eeae0..333408a 100644 --- a/inbucket.go +++ b/inbucket.go @@ -14,6 +14,7 @@ import ( "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/httpd" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/pop3d" "github.com/jhillyerd/inbucket/rest" "github.com/jhillyerd/inbucket/smtpd" @@ -95,6 +96,9 @@ func main() { } } + // Create message hub + msgHub := msghub.New(rootCtx, 100) + // Grab our datastore ds := smtpd.DefaultFileDataStore() @@ -110,7 +114,7 @@ func main() { go pop3Server.Start(rootCtx) // Startup SMTP server - smtpServer = smtpd.NewServer(config.GetSMTPConfig(), ds, shutdownChan) + smtpServer = smtpd.NewServer(config.GetSMTPConfig(), shutdownChan, ds, msgHub) go smtpServer.Start(rootCtx) // Loop forever waiting for signals or shutdown channel diff --git a/rest/testmocks_test.go b/rest/testmocks_test.go index a671165..bc720fc 100644 --- a/rest/testmocks_test.go +++ b/rest/testmocks_test.go @@ -50,6 +50,11 @@ func (m *MockMailbox) NewMessage() (smtpd.Message, error) { return args.Get(0).(smtpd.Message), args.Error(1) } +func (m *MockMailbox) Name() string { + args := m.Called() + return args.String(0) +} + func (m *MockMailbox) String() string { args := m.Called() return args.String(0) diff --git a/smtpd/datastore.go b/smtpd/datastore.go index c06ad6a..180a3ec 100644 --- a/smtpd/datastore.go +++ b/smtpd/datastore.go @@ -29,6 +29,7 @@ type Mailbox interface { GetMessage(id string) (Message, error) Purge() error NewMessage() (Message, error) + Name() string String() string } diff --git a/smtpd/filestore.go b/smtpd/filestore.go index f6208ed..5f4d4f0 100644 --- a/smtpd/filestore.go +++ b/smtpd/filestore.go @@ -148,6 +148,10 @@ type FileMailbox struct { messages []*FileMessage } +func (mb *FileMailbox) Name() string { + return mb.name +} + func (mb *FileMailbox) String() string { return mb.name + "[" + mb.dirName + "]" } diff --git a/smtpd/handler.go b/smtpd/handler.go index 0f0e3d9..e267d11 100644 --- a/smtpd/handler.go +++ b/smtpd/handler.go @@ -13,6 +13,7 @@ import ( "time" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" ) // State tracks the current mode of our SMTP state machine @@ -464,6 +465,18 @@ func (ss *Session) deliverMessage(r recipientDetails, msgBuf [][]byte) (ok bool) return false } + // Broadcast message information + broadcast := msghub.Message{ + Mailbox: r.mailbox.Name(), + ID: msg.ID(), + From: msg.From(), + To: msg.To(), + Subject: msg.Subject(), + Date: msg.Date(), + Size: msg.Size(), + } + ss.server.msgHub.Broadcast(broadcast) + return true } diff --git a/smtpd/handler_test.go b/smtpd/handler_test.go index 34814c7..3f4dc82 100644 --- a/smtpd/handler_test.go +++ b/smtpd/handler_test.go @@ -5,13 +5,15 @@ import ( "fmt" "io" - "github.com/jhillyerd/inbucket/config" "log" "net" "net/textproto" "os" "testing" "time" + + "github.com/jhillyerd/inbucket/config" + "github.com/jhillyerd/inbucket/msghub" ) type scriptStep struct { @@ -153,6 +155,13 @@ func TestMailState(t *testing.T) { msg1 := &MockMessage{} mds.On("MailboxFor").Return(mb1, nil) mb1.On("NewMessage").Return(msg1, nil) + mb1.On("Name").Return("u1") + msg1.On("ID").Return("") + msg1.On("From").Return("") + msg1.On("To").Return(make([]string, 0)) + msg1.On("Date").Return(time.Time{}) + msg1.On("Subject").Return("") + msg1.On("Size").Return(0) msg1.On("Close").Return(nil) server, logbuf := setupSMTPServer(mds) @@ -263,6 +272,13 @@ func TestDataState(t *testing.T) { msg1 := &MockMessage{} mds.On("MailboxFor").Return(mb1, nil) mb1.On("NewMessage").Return(msg1, nil) + mb1.On("Name").Return("u1") + msg1.On("ID").Return("") + msg1.On("From").Return("") + msg1.On("To").Return(make([]string, 0)) + msg1.On("Date").Return(time.Time{}) + msg1.On("Subject").Return("") + msg1.On("Size").Return(0) msg1.On("Close").Return(nil) server, logbuf := setupSMTPServer(mds) @@ -378,7 +394,7 @@ func setupSMTPServer(ds DataStore) (*Server, *bytes.Buffer) { // Create a server, don't start it shutdownChan := make(chan bool) - return NewServer(cfg, ds, shutdownChan), buf + return NewServer(cfg, shutdownChan, ds, &msghub.Hub{}), buf } var sessionNum int diff --git a/smtpd/listener.go b/smtpd/listener.go index ba4fca0..c2d3905 100644 --- a/smtpd/listener.go +++ b/smtpd/listener.go @@ -12,24 +12,27 @@ import ( "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" ) // Server holds the configuration and state of our SMTP server type Server struct { + // Configuration domain string domainNoStore string maxRecips int maxIdleSeconds int maxMessageBytes int - dataStore DataStore storeMessages bool - listener net.Listener - // globalShutdown is the signal Inbucket needs to shut down - globalShutdown chan bool + // Dependencies + dataStore DataStore // Mailbox/message store + globalShutdown chan bool // Shuts down Inbucket + msgHub *msghub.Hub // Pub/sub for message info - // waitgroup tracks individual sessions - waitgroup *sync.WaitGroup + // State + listener net.Listener // Incoming network connections + waitgroup *sync.WaitGroup // Waitgroup tracks individual sessions } var ( @@ -54,17 +57,22 @@ var ( ) // NewServer creates a new Server instance with the specificed config -func NewServer(cfg config.SMTPConfig, ds DataStore, globalShutdown chan bool) *Server { +func NewServer( + cfg config.SMTPConfig, + globalShutdown chan bool, + ds DataStore, + msgHub *msghub.Hub) *Server { return &Server{ - dataStore: ds, domain: cfg.Domain, + domainNoStore: strings.ToLower(cfg.DomainNoStore), maxRecips: cfg.MaxRecipients, maxIdleSeconds: cfg.MaxIdleSeconds, maxMessageBytes: cfg.MaxMessageBytes, storeMessages: cfg.StoreMessages, - domainNoStore: strings.ToLower(cfg.DomainNoStore), - waitgroup: new(sync.WaitGroup), globalShutdown: globalShutdown, + dataStore: ds, + msgHub: msgHub, + waitgroup: new(sync.WaitGroup), } } diff --git a/smtpd/retention_test.go b/smtpd/retention_test.go index 95b17c1..a08458b 100644 --- a/smtpd/retention_test.go +++ b/smtpd/retention_test.go @@ -106,6 +106,11 @@ func (m *MockMailbox) NewMessage() (Message, error) { return args.Get(0).(Message), args.Error(1) } +func (m *MockMailbox) Name() string { + args := m.Called() + return args.String(0) +} + func (m *MockMailbox) String() string { args := m.Called() return args.String(0)