From 88ccb193600fdcb6e4cbdf19661c0aae7b4491ff Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Sun, 15 Jan 2017 20:01:20 -0800 Subject: [PATCH 01/10] Implement pub/sub message hub as a base for #44 --- msghub/hub.go | 116 ++++++++++++++++++++++ msghub/hub_test.go | 243 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 359 insertions(+) create mode 100644 msghub/hub.go create mode 100644 msghub/hub_test.go diff --git a/msghub/hub.go b/msghub/hub.go new file mode 100644 index 0000000..0f43077 --- /dev/null +++ b/msghub/hub.go @@ -0,0 +1,116 @@ +package msghub + +import ( + "container/ring" + "context" + "sync" + "time" +) + +// Message contains the basic header data for a message +type Message struct { + Mailbox string + ID string + From string + To []string + Subject string + Date time.Time + Size int64 +} + +// Listener receives the contents of the log, followed by new messages +type Listener interface { + Receive(msg Message) error +} + +// Hub relays messages on to its listeners +type Hub struct { + // log stores history, points next spot to write. First non-nil entry is oldest Message + log *ring.Ring + logMx sync.RWMutex + + // listeners interested in new messages + listeners map[Listener]struct{} + listenersMx sync.RWMutex + + // broadcast receives new messages + broadcast chan Message +} + +// New constructs a new Hub which will cache logSize messages in memory for playback to future +// listeners. A goroutine is created to handle incoming messages; it will run until the provided +// context is canceled. +func New(ctx context.Context, logSize int) *Hub { + h := &Hub{ + log: ring.New(logSize), + listeners: make(map[Listener]struct{}), + broadcast: make(chan Message, 100), + } + + go func() { + for { + select { + case <-ctx.Done(): + // Shutdown + close(h.broadcast) + h.broadcast = nil + return + case msg := <-h.broadcast: + // Log message + h.logMx.Lock() + h.log.Value = msg + h.log = h.log.Next() + h.logMx.Unlock() + // Deliver message to listeners + h.deliver(msg) + } + } + }() + + return h +} + +// Broadcast queues a message for processing by the hub. The message will be placed into the +// in-memory log and relayed to all registered listeners. +func (h *Hub) Broadcast(msg Message) { + if h.broadcast != nil { + h.broadcast <- msg + } +} + +// AddListener registers a listener to receive broadcasted messages. +func (h *Hub) AddListener(l Listener) { + // Playback log + h.logMx.RLock() + h.log.Do(func(v interface{}) { + if v != nil { + l.Receive(v.(Message)) + } + }) + h.logMx.RUnlock() + + // Add to listeners + h.listenersMx.Lock() + h.listeners[l] = struct{}{} + h.listenersMx.Unlock() +} + +// RemoveListener deletes a listener registration, it will cease to receive messages. +func (h *Hub) RemoveListener(l Listener) { + h.listenersMx.Lock() + defer h.listenersMx.Unlock() + if _, ok := h.listeners[l]; ok { + delete(h.listeners, l) + } +} + +// deliver message to all listeners, removing listeners if they return an error +func (h *Hub) deliver(msg Message) { + h.listenersMx.RLock() + defer h.listenersMx.RUnlock() + for l := range h.listeners { + if err := l.Receive(msg); err != nil { + h.RemoveListener(l) + } + } +} diff --git a/msghub/hub_test.go b/msghub/hub_test.go new file mode 100644 index 0000000..f6c131e --- /dev/null +++ b/msghub/hub_test.go @@ -0,0 +1,243 @@ +package msghub + +import ( + "context" + "fmt" + "testing" + "time" +) + +// testListener implements the Listener interface, mock for unit tests +type testListener struct { + messages []*Message // received messages + wantMessages int // how many messages this listener wants to receive + errorAfter int // when != 0, messages until Receive() begins returning error + + done chan struct{} // closed once we have received wantMessages + overflow chan struct{} // closed if we receive wantMessages+1 +} + +func newTestListener(want int) *testListener { + l := &testListener{ + messages: make([]*Message, 0, want*2), + wantMessages: want, + done: make(chan struct{}), + overflow: make(chan struct{}), + } + if want == 0 { + close(l.done) + } + return l +} + +// Receive a Message, store it in the messages slice, close applicable channels, and return an error +// if instructed +func (l *testListener) Receive(msg Message) error { + l.messages = append(l.messages, &msg) + if len(l.messages) == l.wantMessages { + close(l.done) + } + if len(l.messages) == l.wantMessages+1 { + close(l.overflow) + } + if l.errorAfter > 0 && len(l.messages) > l.errorAfter { + return fmt.Errorf("Too many messages") + } + return nil +} + +// String formats the got vs wanted message counts +func (l *testListener) String() string { + return fmt.Sprintf("got %v messages, wanted %v", len(l.messages), l.wantMessages) +} + +func TestHubNew(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + if hub == nil { + t.Fatal("New() == nil, expected a new Hub") + } +} + +func TestHubZeroListeners(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + m := Message{} + for i := 0; i < 100; i++ { + hub.Broadcast(m) + } + // Just making sure Hub doesn't panic +} + +func TestHubOneListener(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + m := Message{} + l := newTestListener(1) + + hub.AddListener(l) + hub.Broadcast(m) + + // Wait for messages + select { + case <-l.done: + case <-time.After(time.Second): + t.Error("Timeout:", l) + } +} + +func TestHubRemoveListener(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + m := Message{} + l := newTestListener(1) + + hub.AddListener(l) + hub.Broadcast(m) + hub.RemoveListener(l) + hub.Broadcast(m) + + // Wait for messages + select { + case <-l.overflow: + t.Error(l) + case <-time.After(250 * time.Millisecond): + // Expected result, no overflow + } +} + +func TestHubRemoveListenerOnError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + m := Message{} + + // error after 1 means listener should receive 2 messages before being removed + l := newTestListener(2) + l.errorAfter = 1 + + hub.AddListener(l) + hub.Broadcast(m) + hub.Broadcast(m) + hub.Broadcast(m) + hub.Broadcast(m) + + // Wait for messages + select { + case <-l.overflow: + t.Error(l) + case <-time.After(250 * time.Millisecond): + // Expected result, no overflow + } +} + +func TestHubLogReplay(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 100) + l1 := newTestListener(3) + hub.AddListener(l1) + + // Broadcast 3 messages with no listeners + msgs := make([]Message, 3) + for i := 0; i < len(msgs); i++ { + msgs[i] = Message{ + Subject: fmt.Sprintf("subj %v", i), + } + hub.Broadcast(msgs[i]) + } + + // Wait for messages (live) + select { + case <-l1.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l1) + } + + // Add a new listener + l2 := newTestListener(3) + hub.AddListener(l2) + + // Wait for messages (log) + select { + case <-l2.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l2) + } + + for i := 0; i < len(msgs); i++ { + got := l2.messages[i].Subject + want := msgs[i].Subject + if got != want { + t.Errorf("msg[%v].Subject == %q, want %q", i, got, want) + } + } +} + +func TestHubLogReplayWrap(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + l1 := newTestListener(20) + hub.AddListener(l1) + + // Broadcast more messages than the hub can hold + msgs := make([]Message, 20) + for i := 0; i < len(msgs); i++ { + msgs[i] = Message{ + Subject: fmt.Sprintf("subj %v", i), + } + hub.Broadcast(msgs[i]) + } + + // Wait for messages (live) + select { + case <-l1.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l1) + } + + // Add a new listener + l2 := newTestListener(5) + hub.AddListener(l2) + + // Wait for messages (log) + select { + case <-l2.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l2) + } + + for i := 0; i < 5; i++ { + got := l2.messages[i].Subject + want := msgs[i+15].Subject + if got != want { + t.Errorf("msg[%v].Subject == %q, want %q", i, got, want) + } + } +} + +func TestHubContextCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + hub := New(ctx, 5) + m := Message{} + l := newTestListener(1) + + hub.AddListener(l) + hub.Broadcast(m) + cancel() + time.Sleep(50 * time.Millisecond) + hub.Broadcast(m) + + // Wait for messages + select { + case <-l.overflow: + t.Error(l) + case <-time.After(250 * time.Millisecond): + // Expected result, no overflow + } +} From 6ca2c2774757662663d229f1cb61b57e26f21318 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Mon, 16 Jan 2017 10:50:28 -0800 Subject: [PATCH 02/10] Pull message delivery into its own method --- smtpd/handler.go | 85 ++++++++++++++++++++++------------------ swaks-tests/run-tests.sh | 2 +- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/smtpd/handler.go b/smtpd/handler.go index a9b49a1..0f0e3d9 100644 --- a/smtpd/handler.go +++ b/smtpd/handler.go @@ -67,6 +67,12 @@ var commands = map[string]bool{ "TURN": true, } +// recipientDetails for message delivery +type recipientDetails struct { + address, localPart, domainPart string + mailbox Mailbox +} + // Session holds the state of an SMTP session type Session struct { server *Server @@ -341,13 +347,7 @@ func (ss *Session) mailHandler(cmd string, arg string) { // DATA func (ss *Session) dataHandler() { - type RecipientDetails struct { - address, localPart, domainPart string - mailbox Mailbox - } - recipients := make([]RecipientDetails, 0, ss.recipients.Len()) - // Timestamp for Received header - stamp := time.Now().Format(timeStampFormat) + recipients := make([]recipientDetails, 0, ss.recipients.Len()) // Get a Mailbox and a new Message for each recipient msgSize := 0 if ss.server.storeMessages { @@ -369,7 +369,7 @@ func (ss *Session) dataHandler() { ss.reset() return } - recipients = append(recipients, RecipientDetails{recip, local, domain, mb}) + recipients = append(recipients, recipientDetails{recip, local, domain, mb}) } else { log.Tracef("Not storing message for %q", recip) } @@ -399,39 +399,15 @@ func (ss *Session) dataHandler() { if ss.server.storeMessages { // Create a message for each valid recipient for _, r := range recipients { - msg, err := r.mailbox.NewMessage() - if err != nil { - ss.logError("Failed to create message for %q: %s", r.localPart, err) - ss.send(fmt.Sprintf("451 Failed to create message for %v", r.localPart)) + if ok := ss.deliverMessage(r, msgBuf); ok { + expReceivedTotal.Add(1) + } else { + // Delivery failure + ss.send(fmt.Sprintf("451 Failed to store message for %v", r.localPart)) ss.reset() return } - - // Generate Received header - recd := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n", - ss.remoteDomain, ss.remoteHost, ss.server.domain, r.address, stamp) - if err := msg.Append([]byte(recd)); err != nil { - ss.logError("Failed to write received header for %q: %s", r.localPart, err) - ss.send(fmt.Sprintf("451 Failed to create message for %v", r.localPart)) - ss.reset() - return - } - // Append lines from msgBuf - for _, line = range msgBuf { - if err := msg.Append(line); err != nil { - ss.logError("Failed to append to mailbox %v: %v", - r.mailbox, err) - ss.send("554 Something went wrong") - ss.reset() - // Should really cleanup the crap on filesystem - return - } - } - if err := msg.Close(); err != nil { - ss.logError("Error: %v while writing message", err) - } - expReceivedTotal.Add(1) - } // end for + } } else { expReceivedTotal.Add(1) } @@ -458,6 +434,39 @@ func (ss *Session) dataHandler() { } // end for } +// deliverMessage creates and populates a new Message for the specified recipient +func (ss *Session) deliverMessage(r recipientDetails, msgBuf [][]byte) (ok bool) { + msg, err := r.mailbox.NewMessage() + if err != nil { + ss.logError("Failed to create message for %q: %s", r.localPart, err) + return false + } + + // Generate Received header + stamp := time.Now().Format(timeStampFormat) + recd := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n", + ss.remoteDomain, ss.remoteHost, ss.server.domain, r.address, stamp) + if err := msg.Append([]byte(recd)); err != nil { + ss.logError("Failed to write received header for %q: %s", r.localPart, err) + return false + } + + // Append lines from msgBuf + for _, line := range msgBuf { + if err := msg.Append(line); err != nil { + ss.logError("Failed to append to mailbox %v: %v", r.mailbox, err) + // Should really cleanup the crap on filesystem + return false + } + } + if err := msg.Close(); err != nil { + ss.logError("Error while closing message for %v: %v", r.mailbox, err) + return false + } + + return true +} + func (ss *Session) enterState(state State) { ss.state = state ss.logTrace("Entering state %v", state) diff --git a/swaks-tests/run-tests.sh b/swaks-tests/run-tests.sh index f7c3564..e747dde 100755 --- a/swaks-tests/run-tests.sh +++ b/swaks-tests/run-tests.sh @@ -31,7 +31,7 @@ export SWAKS_OPT_to="$to@inbucket.local" swaks $* --h-Subject: "Swaks Plain Text" --body text.txt # Multi-recipient test -swaks $* --to="$to@inbucket.local,Alt User " --h-Subject: "Swaks Multi-Recipient" \ +swaks $* --to="$to@inbucket.local,alternate@inbucket.local" --h-Subject: "Swaks Multi-Recipient" \ --body text.txt # HTML test From b3db619db909235a0bf668b878ab8d38b1dc2b03 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Mon, 16 Jan 2017 13:09:50 -0800 Subject: [PATCH 03/10] Broadcast deliveries into msghub for #44 --- inbucket.go | 6 +++++- rest/testmocks_test.go | 5 +++++ smtpd/datastore.go | 1 + smtpd/filestore.go | 4 ++++ smtpd/handler.go | 13 +++++++++++++ smtpd/handler_test.go | 20 ++++++++++++++++++-- smtpd/listener.go | 28 ++++++++++++++++++---------- smtpd/retention_test.go | 5 +++++ 8 files changed, 69 insertions(+), 13 deletions(-) diff --git a/inbucket.go b/inbucket.go index 09eeae0..333408a 100644 --- a/inbucket.go +++ b/inbucket.go @@ -14,6 +14,7 @@ import ( "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/httpd" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/pop3d" "github.com/jhillyerd/inbucket/rest" "github.com/jhillyerd/inbucket/smtpd" @@ -95,6 +96,9 @@ func main() { } } + // Create message hub + msgHub := msghub.New(rootCtx, 100) + // Grab our datastore ds := smtpd.DefaultFileDataStore() @@ -110,7 +114,7 @@ func main() { go pop3Server.Start(rootCtx) // Startup SMTP server - smtpServer = smtpd.NewServer(config.GetSMTPConfig(), ds, shutdownChan) + smtpServer = smtpd.NewServer(config.GetSMTPConfig(), shutdownChan, ds, msgHub) go smtpServer.Start(rootCtx) // Loop forever waiting for signals or shutdown channel diff --git a/rest/testmocks_test.go b/rest/testmocks_test.go index a671165..bc720fc 100644 --- a/rest/testmocks_test.go +++ b/rest/testmocks_test.go @@ -50,6 +50,11 @@ func (m *MockMailbox) NewMessage() (smtpd.Message, error) { return args.Get(0).(smtpd.Message), args.Error(1) } +func (m *MockMailbox) Name() string { + args := m.Called() + return args.String(0) +} + func (m *MockMailbox) String() string { args := m.Called() return args.String(0) diff --git a/smtpd/datastore.go b/smtpd/datastore.go index c06ad6a..180a3ec 100644 --- a/smtpd/datastore.go +++ b/smtpd/datastore.go @@ -29,6 +29,7 @@ type Mailbox interface { GetMessage(id string) (Message, error) Purge() error NewMessage() (Message, error) + Name() string String() string } diff --git a/smtpd/filestore.go b/smtpd/filestore.go index f6208ed..5f4d4f0 100644 --- a/smtpd/filestore.go +++ b/smtpd/filestore.go @@ -148,6 +148,10 @@ type FileMailbox struct { messages []*FileMessage } +func (mb *FileMailbox) Name() string { + return mb.name +} + func (mb *FileMailbox) String() string { return mb.name + "[" + mb.dirName + "]" } diff --git a/smtpd/handler.go b/smtpd/handler.go index 0f0e3d9..e267d11 100644 --- a/smtpd/handler.go +++ b/smtpd/handler.go @@ -13,6 +13,7 @@ import ( "time" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" ) // State tracks the current mode of our SMTP state machine @@ -464,6 +465,18 @@ func (ss *Session) deliverMessage(r recipientDetails, msgBuf [][]byte) (ok bool) return false } + // Broadcast message information + broadcast := msghub.Message{ + Mailbox: r.mailbox.Name(), + ID: msg.ID(), + From: msg.From(), + To: msg.To(), + Subject: msg.Subject(), + Date: msg.Date(), + Size: msg.Size(), + } + ss.server.msgHub.Broadcast(broadcast) + return true } diff --git a/smtpd/handler_test.go b/smtpd/handler_test.go index 34814c7..3f4dc82 100644 --- a/smtpd/handler_test.go +++ b/smtpd/handler_test.go @@ -5,13 +5,15 @@ import ( "fmt" "io" - "github.com/jhillyerd/inbucket/config" "log" "net" "net/textproto" "os" "testing" "time" + + "github.com/jhillyerd/inbucket/config" + "github.com/jhillyerd/inbucket/msghub" ) type scriptStep struct { @@ -153,6 +155,13 @@ func TestMailState(t *testing.T) { msg1 := &MockMessage{} mds.On("MailboxFor").Return(mb1, nil) mb1.On("NewMessage").Return(msg1, nil) + mb1.On("Name").Return("u1") + msg1.On("ID").Return("") + msg1.On("From").Return("") + msg1.On("To").Return(make([]string, 0)) + msg1.On("Date").Return(time.Time{}) + msg1.On("Subject").Return("") + msg1.On("Size").Return(0) msg1.On("Close").Return(nil) server, logbuf := setupSMTPServer(mds) @@ -263,6 +272,13 @@ func TestDataState(t *testing.T) { msg1 := &MockMessage{} mds.On("MailboxFor").Return(mb1, nil) mb1.On("NewMessage").Return(msg1, nil) + mb1.On("Name").Return("u1") + msg1.On("ID").Return("") + msg1.On("From").Return("") + msg1.On("To").Return(make([]string, 0)) + msg1.On("Date").Return(time.Time{}) + msg1.On("Subject").Return("") + msg1.On("Size").Return(0) msg1.On("Close").Return(nil) server, logbuf := setupSMTPServer(mds) @@ -378,7 +394,7 @@ func setupSMTPServer(ds DataStore) (*Server, *bytes.Buffer) { // Create a server, don't start it shutdownChan := make(chan bool) - return NewServer(cfg, ds, shutdownChan), buf + return NewServer(cfg, shutdownChan, ds, &msghub.Hub{}), buf } var sessionNum int diff --git a/smtpd/listener.go b/smtpd/listener.go index ba4fca0..c2d3905 100644 --- a/smtpd/listener.go +++ b/smtpd/listener.go @@ -12,24 +12,27 @@ import ( "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" ) // Server holds the configuration and state of our SMTP server type Server struct { + // Configuration domain string domainNoStore string maxRecips int maxIdleSeconds int maxMessageBytes int - dataStore DataStore storeMessages bool - listener net.Listener - // globalShutdown is the signal Inbucket needs to shut down - globalShutdown chan bool + // Dependencies + dataStore DataStore // Mailbox/message store + globalShutdown chan bool // Shuts down Inbucket + msgHub *msghub.Hub // Pub/sub for message info - // waitgroup tracks individual sessions - waitgroup *sync.WaitGroup + // State + listener net.Listener // Incoming network connections + waitgroup *sync.WaitGroup // Waitgroup tracks individual sessions } var ( @@ -54,17 +57,22 @@ var ( ) // NewServer creates a new Server instance with the specificed config -func NewServer(cfg config.SMTPConfig, ds DataStore, globalShutdown chan bool) *Server { +func NewServer( + cfg config.SMTPConfig, + globalShutdown chan bool, + ds DataStore, + msgHub *msghub.Hub) *Server { return &Server{ - dataStore: ds, domain: cfg.Domain, + domainNoStore: strings.ToLower(cfg.DomainNoStore), maxRecips: cfg.MaxRecipients, maxIdleSeconds: cfg.MaxIdleSeconds, maxMessageBytes: cfg.MaxMessageBytes, storeMessages: cfg.StoreMessages, - domainNoStore: strings.ToLower(cfg.DomainNoStore), - waitgroup: new(sync.WaitGroup), globalShutdown: globalShutdown, + dataStore: ds, + msgHub: msgHub, + waitgroup: new(sync.WaitGroup), } } diff --git a/smtpd/retention_test.go b/smtpd/retention_test.go index 95b17c1..a08458b 100644 --- a/smtpd/retention_test.go +++ b/smtpd/retention_test.go @@ -106,6 +106,11 @@ func (m *MockMailbox) NewMessage() (Message, error) { return args.Get(0).(Message), args.Error(1) } +func (m *MockMailbox) Name() string { + args := m.Called() + return args.String(0) +} + func (m *MockMailbox) String() string { args := m.Called() return args.String(0) From e32e6d00d6027c2ba77826645efa74b460fe9b63 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Mon, 16 Jan 2017 16:10:06 -0800 Subject: [PATCH 04/10] Remove httpbuf, manually save sessions - httpbuf prevents connections being upgraded to websockets --- httpd/server.go | 16 +--------------- webui/mailbox_controller.go | 14 +++++++++++++- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/httpd/server.go b/httpd/server.go index 769d7d6..f8c88ac 100644 --- a/httpd/server.go +++ b/httpd/server.go @@ -8,7 +8,6 @@ import ( "net/http" "time" - "github.com/goods/httpbuf" "github.com/gorilla/mux" "github.com/gorilla/securecookie" "github.com/gorilla/sessions" @@ -122,26 +121,13 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { defer ctx.Close() // Run the handler, grab the error, and report it - buf := new(httpbuf.Buffer) log.Tracef("HTTP[%v] %v %v %q", req.RemoteAddr, req.Proto, req.Method, req.RequestURI) - err = h(buf, req, ctx) + err = h(w, req, ctx) if err != nil { log.Errorf("HTTP error handling %q: %v", req.RequestURI, err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - - // Save the session - if err = ctx.Session.Save(req, buf); err != nil { - log.Errorf("HTTP failed to save session: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // Apply the buffered response to the writer - if _, err = buf.Apply(w); err != nil { - log.Errorf("HTTP failed to write response: %v", err) - } } func emergencyShutdown() { diff --git a/webui/mailbox_controller.go b/webui/mailbox_controller.go index 318194a..a1273c1 100644 --- a/webui/mailbox_controller.go +++ b/webui/mailbox_controller.go @@ -20,6 +20,7 @@ func MailboxIndex(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) if len(name) == 0 { ctx.Session.AddFlash("Account name is required", "errors") + _ = ctx.Session.Save(req, w) http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther) return nil } @@ -27,12 +28,16 @@ func MailboxIndex(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) name, err = smtpd.ParseMailboxName(name) if err != nil { ctx.Session.AddFlash(err.Error(), "errors") + _ = ctx.Session.Save(req, w) http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther) return nil } // Remember this mailbox was visited RememberMailbox(ctx, name) + if err = ctx.Session.Save(req, w); err != nil { + return err + } return httpd.RenderTemplate("mailbox/index.html", w, map[string]interface{}{ "ctx": ctx, @@ -48,6 +53,7 @@ func MailboxLink(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) ( name, err := smtpd.ParseMailboxName(ctx.Vars["name"]) if err != nil { ctx.Session.AddFlash(err.Error(), "errors") + _ = ctx.Session.Save(req, w) http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther) return nil } @@ -57,7 +63,7 @@ func MailboxLink(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) ( return nil } -// MailboxList renders a list of messages in a mailbox. Renders JSON or a partial +// MailboxList renders a list of messages in a mailbox. Renders a partial func MailboxList(w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (err error) { // Don't have to validate these aren't empty, Gorilla returns 404 name, err := smtpd.ParseMailboxName(ctx.Vars["name"]) @@ -203,6 +209,7 @@ func MailboxDownloadAttach(w http.ResponseWriter, req *http.Request, ctx *httpd. name, err := smtpd.ParseMailboxName(ctx.Vars["name"]) if err != nil { ctx.Session.AddFlash(err.Error(), "errors") + _ = ctx.Session.Save(req, w) http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther) return nil } @@ -210,6 +217,7 @@ func MailboxDownloadAttach(w http.ResponseWriter, req *http.Request, ctx *httpd. num, err := strconv.ParseUint(numStr, 10, 32) if err != nil { ctx.Session.AddFlash("Attachment number must be unsigned numeric", "errors") + _ = ctx.Session.Save(req, w) http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther) return nil } @@ -234,6 +242,7 @@ func MailboxDownloadAttach(w http.ResponseWriter, req *http.Request, ctx *httpd. } if int(num) >= len(body.Attachments) { ctx.Session.AddFlash("Attachment number too high", "errors") + _ = ctx.Session.Save(req, w) http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther) return nil } @@ -253,6 +262,7 @@ func MailboxViewAttach(w http.ResponseWriter, req *http.Request, ctx *httpd.Cont name, err := smtpd.ParseMailboxName(ctx.Vars["name"]) if err != nil { ctx.Session.AddFlash(err.Error(), "errors") + _ = ctx.Session.Save(req, w) http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther) return nil } @@ -261,6 +271,7 @@ func MailboxViewAttach(w http.ResponseWriter, req *http.Request, ctx *httpd.Cont num, err := strconv.ParseUint(numStr, 10, 32) if err != nil { ctx.Session.AddFlash("Attachment number must be unsigned numeric", "errors") + _ = ctx.Session.Save(req, w) http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther) return nil } @@ -285,6 +296,7 @@ func MailboxViewAttach(w http.ResponseWriter, req *http.Request, ctx *httpd.Cont } if int(num) >= len(body.Attachments) { ctx.Session.AddFlash("Attachment number too high", "errors") + _ = ctx.Session.Save(req, w) http.Redirect(w, req, httpd.Reverse("RootIndex"), http.StatusSeeOther) return nil } From e5aad9f5d01019a8089c7166750e4160d378882b Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Mon, 16 Jan 2017 18:12:27 -0800 Subject: [PATCH 05/10] Implement server side of message monitor for #44 --- httpd/context.go | 3 + httpd/server.go | 12 ++- inbucket.go | 2 +- rest/routes.go | 17 ++-- rest/socketv1_controller.go | 155 ++++++++++++++++++++++++++++++++++++ rest/testutils_test.go | 3 +- 6 files changed, 184 insertions(+), 8 deletions(-) create mode 100644 rest/socketv1_controller.go diff --git a/httpd/context.go b/httpd/context.go index 9e5d3e8..318430e 100644 --- a/httpd/context.go +++ b/httpd/context.go @@ -6,6 +6,7 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/sessions" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/smtpd" ) @@ -14,6 +15,7 @@ type Context struct { Vars map[string]string Session *sessions.Session DataStore smtpd.DataStore + MsgHub *msghub.Hub IsJSON bool } @@ -56,6 +58,7 @@ func NewContext(req *http.Request) (*Context, error) { Vars: vars, Session: sess, DataStore: DataStore, + MsgHub: msgHub, IsJSON: headerMatch(req, "Accept", "application/json"), } return ctx, err diff --git a/httpd/server.go b/httpd/server.go index f8c88ac..9b4a338 100644 --- a/httpd/server.go +++ b/httpd/server.go @@ -13,6 +13,7 @@ import ( "github.com/gorilla/sessions" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/smtpd" ) @@ -23,6 +24,9 @@ var ( // DataStore is where all the mailboxes and messages live DataStore smtpd.DataStore + // msgHub holds a reference to the message pub/sub system + msgHub *msghub.Hub + // Router is shared between httpd, webui and rest packages. It sends // incoming requests to the correct handler function Router = mux.NewRouter() @@ -35,12 +39,18 @@ var ( ) // Initialize sets up things for unit tests or the Start() method -func Initialize(cfg config.WebConfig, ds smtpd.DataStore, shutdownChan chan bool) { +func Initialize( + cfg config.WebConfig, + shutdownChan chan bool, + ds smtpd.DataStore, + mh *msghub.Hub) { + webConfig = cfg globalShutdown = shutdownChan // NewContext() will use this DataStore for the web handlers DataStore = ds + msgHub = mh // Content Paths log.Infof("HTTP templates mapped to %q", cfg.TemplateDir) diff --git a/inbucket.go b/inbucket.go index 333408a..9934084 100644 --- a/inbucket.go +++ b/inbucket.go @@ -103,7 +103,7 @@ func main() { ds := smtpd.DefaultFileDataStore() // Start HTTP server - httpd.Initialize(config.GetWebConfig(), ds, shutdownChan) + httpd.Initialize(config.GetWebConfig(), shutdownChan, ds, msgHub) webui.SetupRoutes(httpd.Router) rest.SetupRoutes(httpd.Router) go httpd.Start(rootCtx) diff --git a/rest/routes.go b/rest/routes.go index d474f28..146e53d 100644 --- a/rest/routes.go +++ b/rest/routes.go @@ -6,9 +6,16 @@ import "github.com/jhillyerd/inbucket/httpd" // SetupRoutes populates the routes for the REST interface func SetupRoutes(r *mux.Router) { // API v1 - r.Path("/api/v1/mailbox/{name}").Handler(httpd.Handler(MailboxListV1)).Name("MailboxListV1").Methods("GET") - r.Path("/api/v1/mailbox/{name}").Handler(httpd.Handler(MailboxPurgeV1)).Name("MailboxPurgeV1").Methods("DELETE") - r.Path("/api/v1/mailbox/{name}/{id}").Handler(httpd.Handler(MailboxShowV1)).Name("MailboxShowV1").Methods("GET") - r.Path("/api/v1/mailbox/{name}/{id}").Handler(httpd.Handler(MailboxDeleteV1)).Name("MailboxDeleteV1").Methods("DELETE") - r.Path("/api/v1/mailbox/{name}/{id}/source").Handler(httpd.Handler(MailboxSourceV1)).Name("MailboxSourceV1").Methods("GET") + r.Path("/api/v1/mailbox/{name}").Handler( + httpd.Handler(MailboxListV1)).Name("MailboxListV1").Methods("GET") + r.Path("/api/v1/mailbox/{name}").Handler( + httpd.Handler(MailboxPurgeV1)).Name("MailboxPurgeV1").Methods("DELETE") + r.Path("/api/v1/mailbox/{name}/{id}").Handler( + httpd.Handler(MailboxShowV1)).Name("MailboxShowV1").Methods("GET") + r.Path("/api/v1/mailbox/{name}/{id}").Handler( + httpd.Handler(MailboxDeleteV1)).Name("MailboxDeleteV1").Methods("DELETE") + r.Path("/api/v1/mailbox/{name}/{id}/source").Handler( + httpd.Handler(MailboxSourceV1)).Name("MailboxSourceV1").Methods("GET") + r.Path("/api/v1/monitor/all/messages").Handler( + httpd.Handler(MonitorAllMessagesV1)).Name("MonitorAllMessagesV1").Methods("GET") } diff --git a/rest/socketv1_controller.go b/rest/socketv1_controller.go new file mode 100644 index 0000000..de5245f --- /dev/null +++ b/rest/socketv1_controller.go @@ -0,0 +1,155 @@ +package rest + +import ( + "net/http" + "time" + + "github.com/gorilla/websocket" + "github.com/jhillyerd/inbucket/httpd" + "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" + "github.com/jhillyerd/inbucket/rest/model" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Maximum message size allowed from peer. + maxMessageSize = 512 +) + +// options for gorilla connection upgrader +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// msgListener handles messages from the msghub +type msgListener struct { + hub *msghub.Hub + c chan msghub.Message +} + +// newMsgListener creates a listener and registers it +func newMsgListener(hub *msghub.Hub) *msgListener { + ml := &msgListener{ + hub: hub, + c: make(chan msghub.Message, 100), + } + hub.AddListener(ml) + return ml +} + +// Receive handles an incoming message +func (ml *msgListener) Receive(msg msghub.Message) error { + ml.c <- msg + return nil +} + +// WSReader makes sure the websocket client is still connected +func (ml *msgListener) WSReader(conn *websocket.Conn) { + defer ml.Close() + conn.SetReadLimit(maxMessageSize) + conn.SetReadDeadline(time.Now().Add(pongWait)) + conn.SetPongHandler(func(string) error { + log.Tracef("HTTP[%v] Got WebSocket pong", conn.RemoteAddr()) + conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + for { + if _, _, err := conn.ReadMessage(); err != nil { + if websocket.IsUnexpectedCloseError( + err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived, + ) { + // Unexpected close code + log.Warnf("HTTP[%v] WebSocket error: %v", conn.RemoteAddr(), err) + } else { + log.Tracef("HTTP[%v] Closing WebSocket", conn.RemoteAddr()) + } + break + } + } +} + +// WSWriter makes sure the websocket client is still connected +func (ml *msgListener) WSWriter(conn *websocket.Conn) { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + ml.Close() + }() + + // Handle messages from hub until msgListener is closed + for { + select { + case msg, ok := <-ml.c: + conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // msgListener closed, exit + conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + header := &model.JSONMessageHeaderV1{ + Mailbox: msg.Mailbox, + ID: msg.ID, + From: msg.From, + To: msg.To, + Subject: msg.Subject, + Date: msg.Date, + Size: msg.Size, + } + if conn.WriteJSON(header) != nil { + // Write failed + return + } + case <-ticker.C: + // Send ping + conn.SetWriteDeadline(time.Now().Add(writeWait)) + if conn.WriteMessage(websocket.PingMessage, []byte{}) != nil { + // Write error + return + } + log.Tracef("HTTP[%v] Sent WebSocket ping", conn.RemoteAddr()) + } + } +} + +// Close removes the listener registration +func (ml *msgListener) Close() { + select { + case <-ml.c: + // Already closed + default: + ml.hub.RemoveListener(ml) + close(ml.c) + } +} + +func MonitorAllMessagesV1( + w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (err error) { + // Upgrade to Websocket + conn, err := upgrader.Upgrade(w, req, nil) + if err != nil { + return err + } + defer conn.Close() + log.Tracef("HTTP[%v] Upgraded to websocket", req.RemoteAddr) + + // Create, register listener; then interact with conn + ml := newMsgListener(ctx.MsgHub) + go ml.WSWriter(conn) + ml.WSReader(conn) + + return nil +} diff --git a/rest/testutils_test.go b/rest/testutils_test.go index efc00ed..7cfd223 100644 --- a/rest/testutils_test.go +++ b/rest/testutils_test.go @@ -12,6 +12,7 @@ import ( "github.com/jhillyerd/enmime" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/httpd" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/smtpd" ) @@ -199,7 +200,7 @@ func setupWebServer(ds smtpd.DataStore) *bytes.Buffer { PublicDir: "../themes/bootstrap/public", } shutdownChan := make(chan bool) - httpd.Initialize(cfg, ds, shutdownChan) + httpd.Initialize(cfg, shutdownChan, ds, &msghub.Hub{}) SetupRoutes(httpd.Router) return buf From 86365a047ca3e827a95e53e3ad743a7ec1f80529 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Mon, 16 Jan 2017 18:53:24 -0800 Subject: [PATCH 06/10] Add open WebSockets to metrics --- httpd/server.go | 9 +++++++++ rest/socketv1_controller.go | 7 ++++++- themes/bootstrap/public/metrics.js | 1 + themes/bootstrap/templates/root/status.html | 6 ++++++ 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/httpd/server.go b/httpd/server.go index 9b4a338..103e0f1 100644 --- a/httpd/server.go +++ b/httpd/server.go @@ -3,6 +3,7 @@ package httpd import ( "context" + "expvar" "fmt" "net" "net/http" @@ -36,8 +37,16 @@ var ( listener net.Listener sessionStore sessions.Store globalShutdown chan bool + + // ExpWebSocketConnectsCurrent tracks the number of open WebSockets + ExpWebSocketConnectsCurrent = new(expvar.Int) ) +func init() { + m := expvar.NewMap("http") + m.Set("WebSocketConnectsCurrent", ExpWebSocketConnectsCurrent) +} + // Initialize sets up things for unit tests or the Start() method func Initialize( cfg config.WebConfig, diff --git a/rest/socketv1_controller.go b/rest/socketv1_controller.go index de5245f..5bab16d 100644 --- a/rest/socketv1_controller.go +++ b/rest/socketv1_controller.go @@ -143,7 +143,12 @@ func MonitorAllMessagesV1( if err != nil { return err } - defer conn.Close() + httpd.ExpWebSocketConnectsCurrent.Add(1) + defer func() { + _ = conn.Close() + httpd.ExpWebSocketConnectsCurrent.Add(-1) + }() + log.Tracef("HTTP[%v] Upgraded to websocket", req.RemoteAddr) // Create, register listener; then interact with conn diff --git a/themes/bootstrap/public/metrics.js b/themes/bootstrap/public/metrics.js index ff01448..c6512bc 100644 --- a/themes/bootstrap/public/metrics.js +++ b/themes/bootstrap/public/metrics.js @@ -104,6 +104,7 @@ function displayMetrics(data, textStatus, jqXHR) { metric('memstatsHeapSys', data.memstats.HeapSys, sizeFilter, true); metric('memstatsHeapObjects', data.memstats.HeapObjects, numberFilter, true); metric('smtpConnectsCurrent', data.smtp.ConnectsCurrent, numberFilter, true); + metric('httpWebSocketConnectsCurrent', data.http.WebSocketConnectsCurrent, numberFilter, true); // Server-side history metric('smtpReceivedTotal', data.smtp.ReceivedTotal, numberFilter, false); diff --git a/themes/bootstrap/templates/root/status.html b/themes/bootstrap/templates/root/status.html index c39d185..d5845a6 100644 --- a/themes/bootstrap/templates/root/status.html +++ b/themes/bootstrap/templates/root/status.html @@ -107,6 +107,12 @@ $(document).ready(
.
+
+
Open WebSockets:
+
.
+
.
+ +
From 63a76696bf7b824c074b98fa486ce7582b8e61fc Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Mon, 16 Jan 2017 21:30:40 -0800 Subject: [PATCH 07/10] Add user interface for monitor, #44 --- inbucket.go | 2 +- themes/bootstrap/public/inbucket.css | 5 ++ themes/bootstrap/public/monitor.js | 39 +++++++++++++++ themes/bootstrap/templates/_base.html | 3 +- themes/bootstrap/templates/root/monitor.html | 51 ++++++++++++++++++++ webui/root_controller.go | 7 +++ webui/routes.go | 32 ++++++++---- 7 files changed, 127 insertions(+), 12 deletions(-) create mode 100644 themes/bootstrap/public/monitor.js create mode 100644 themes/bootstrap/templates/root/monitor.html diff --git a/inbucket.go b/inbucket.go index 9934084..506ace4 100644 --- a/inbucket.go +++ b/inbucket.go @@ -97,7 +97,7 @@ func main() { } // Create message hub - msgHub := msghub.New(rootCtx, 100) + msgHub := msghub.New(rootCtx, 30) // Grab our datastore ds := smtpd.DefaultFileDataStore() diff --git a/themes/bootstrap/public/inbucket.css b/themes/bootstrap/public/inbucket.css index 0da5a2f..63d80bb 100644 --- a/themes/bootstrap/public/inbucket.css +++ b/themes/bootstrap/public/inbucket.css @@ -81,3 +81,8 @@ table.metrics { width: 200px; } +/* Monitor */ +#monitor-message-list td { + cursor: pointer; + font-size: 12px; +} diff --git a/themes/bootstrap/public/monitor.js b/themes/bootstrap/public/monitor.js new file mode 100644 index 0000000..6a975ca --- /dev/null +++ b/themes/bootstrap/public/monitor.js @@ -0,0 +1,39 @@ +var baseURL = window.location.protocol + '//' + window.location.host; + +function startMonitor() { + $.addTemplateFormatter({ + "date": function(value, template) { + return moment(value).calendar(); + }, + "subject": function(value, template) { + if (value == null || value.length == 0) { + return "(No Subject)"; + } + return value; + } + }); + + var uri = '/api/v1/monitor/all/messages' + var l = window.location; + var url = ((l.protocol === "https:") ? "wss://" : "ws://") + l.host + uri + var ws = new WebSocket(url); + + ws.addEventListener('message', function (e) { + var msg = JSON.parse(e.data); + msg['href'] = '/mailbox?name=' + msg.mailbox + '&id=' + msg.id; + $('#monitor-message-list').loadTemplate( + $('#message-template'), + msg, + { append: true }); + }); +} + +function messageClick(node) { + var href = node.attributes['href'].value; + var url = baseURL + href; + window.location.assign(url); +} + +function clearClick() { + $('#monitor-message-list').empty(); +} diff --git a/themes/bootstrap/templates/_base.html b/themes/bootstrap/templates/_base.html index 1fecc90..e7be06b 100644 --- a/themes/bootstrap/templates/_base.html +++ b/themes/bootstrap/templates/_base.html @@ -48,7 +48,8 @@ {{end}} - + +