diff --git a/config/config.go b/config/config.go index 9d1bdf1..9e8c270 100644 --- a/config/config.go +++ b/config/config.go @@ -33,13 +33,15 @@ type POP3Config struct { // WebConfig contains the HTTP server configuration type WebConfig struct { - IP4address net.IP - IP4port int - TemplateDir string - TemplateCache bool - PublicDir string - GreetingFile string - CookieAuthKey string + IP4address net.IP + IP4port int + TemplateDir string + TemplateCache bool + PublicDir string + GreetingFile string + CookieAuthKey string + MonitorVisible bool + MonitorHistory int } // DataStoreConfig contains the mail store configuration @@ -130,6 +132,8 @@ func LoadConfig(filename string) error { requireOption(messages, "web", "template.dir") requireOption(messages, "web", "template.cache") requireOption(messages, "web", "public.dir") + requireOption(messages, "web", "monitor.visible") + requireOption(messages, "web", "monitor.history") requireOption(messages, "datastore", "path") requireOption(messages, "datastore", "retention.minutes") requireOption(messages, "datastore", "retention.sleep.millis") @@ -349,6 +353,19 @@ func parseWebConfig() error { } webConfig.GreetingFile = str + option = "monitor.visible" + flag, err = Config.Bool(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + webConfig.MonitorVisible = flag + + option = "monitor.history" + webConfig.MonitorHistory, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + option = "cookie.auth.key" if Config.HasOption(section, option) { str, err = Config.String(section, option) diff --git a/etc/devel.conf b/etc/devel.conf index 2549705..5e7e536 100644 --- a/etc/devel.conf +++ b/etc/devel.conf @@ -91,6 +91,18 @@ greeting.file=%(install.dir)s/themes/greeting.html # and previous sessions will be invalidated. cookie.auth.key=secret-inbucket-session-cookie-key +# Enable or disable the live message monitor tab for the web UI. This will let +# anybody see all messages delivered to Inbucket. This setting has no impact +# on the availability of the underlying WebSocket. +monitor.visible=true + +# How many historical message headers should be cached for display by new +# monitor connections. It does not limit the number of messages displayed by +# the browser once the monitor is open; all freshly received messages will be +# appended to the on screen list. This setting also affects the underlying +# API/WebSocket. +monitor.history=30 + ############################################################################# [datastore] diff --git a/etc/docker/defaults/inbucket.conf b/etc/docker/defaults/inbucket.conf index 5fb6192..a7aafed 100644 --- a/etc/docker/defaults/inbucket.conf +++ b/etc/docker/defaults/inbucket.conf @@ -93,6 +93,18 @@ greeting.file=/con/configuration/greeting.html # and previous sessions will be invalidated. #cookie.auth.key=secret-inbucket-session-cookie-key +# Enable or disable the live message monitor tab for the web UI. This will let +# anybody see all messages delivered to Inbucket. This setting has no impact +# on the availability of the underlying WebSocket. +monitor.visible=true + +# How many historical message headers should be cached for display by new +# monitor connections. It does not limit the number of messages displayed by +# the browser once the monitor is open; all freshly received messages will be +# appended to the on screen list. This setting also affects the underlying +# API/WebSocket. +monitor.history=30 + ############################################################################# [datastore] diff --git a/etc/homebrew/inbucket.conf b/etc/homebrew/inbucket.conf index 6281583..dd09fef 100644 --- a/etc/homebrew/inbucket.conf +++ b/etc/homebrew/inbucket.conf @@ -93,6 +93,18 @@ greeting.file=%(themes.dir)s/greeting.html # and previous sessions will be invalidated. cookie.auth.key=secret-inbucket-session-cookie-key +# Enable or disable the live message monitor tab for the web UI. This will let +# anybody see all messages delivered to Inbucket. This setting has no impact +# on the availability of the underlying WebSocket. +monitor.visible=true + +# How many historical message headers should be cached for display by new +# monitor connections. It does not limit the number of messages displayed by +# the browser once the monitor is open; all freshly received messages will be +# appended to the on screen list. This setting also affects the underlying +# API/WebSocket. +monitor.history=30 + ############################################################################# [datastore] diff --git a/etc/inbucket.conf b/etc/inbucket.conf index e086f87..6918d9e 100644 --- a/etc/inbucket.conf +++ b/etc/inbucket.conf @@ -91,6 +91,18 @@ greeting.file=%(install.dir)s/themes/greeting.html # and previous sessions will be invalidated. #cookie.auth.key=secret-inbucket-session-cookie-key +# Enable or disable the live message monitor tab for the web UI. This will let +# anybody see all messages delivered to Inbucket. This setting has no impact +# on the availability of the underlying WebSocket. +monitor.visible=true + +# How many historical message headers should be cached for display by new +# monitor connections. It does not limit the number of messages displayed by +# the browser once the monitor is open; all freshly received messages will be +# appended to the on screen list. This setting also affects the underlying +# API/WebSocket. +monitor.history=30 + ############################################################################# [datastore] diff --git a/etc/unix-sample.conf b/etc/unix-sample.conf index 07b8144..cdb21c3 100644 --- a/etc/unix-sample.conf +++ b/etc/unix-sample.conf @@ -91,6 +91,18 @@ greeting.file=%(install.dir)s/themes/greeting.html # and previous sessions will be invalidated. #cookie.auth.key=secret-inbucket-session-cookie-key +# Enable or disable the live message monitor tab for the web UI. This will let +# anybody see all messages delivered to Inbucket. This setting has no impact +# on the availability of the underlying WebSocket. +monitor.visible=true + +# How many historical message headers should be cached for display by new +# monitor connections. It does not limit the number of messages displayed by +# the browser once the monitor is open; all freshly received messages will be +# appended to the on screen list. This setting also affects the underlying +# API/WebSocket. +monitor.history=30 + ############################################################################# [datastore] diff --git a/etc/win-sample.conf b/etc/win-sample.conf index fbbfbe2..0227176 100644 --- a/etc/win-sample.conf +++ b/etc/win-sample.conf @@ -91,6 +91,18 @@ greeting.file=%(install.dir)s\themes\greeting.html # and previous sessions will be invalidated. #cookie.auth.key=secret-inbucket-session-cookie-key +# Enable or disable the live message monitor tab for the web UI. This will let +# anybody see all messages delivered to Inbucket. This setting has no impact +# on the availability of the underlying WebSocket. +monitor.visible=true + +# How many historical message headers should be cached for display by new +# monitor connections. It does not limit the number of messages displayed by +# the browser once the monitor is open; all freshly received messages will be +# appended to the on screen list. This setting also affects the underlying +# API/WebSocket. +monitor.history=30 + ############################################################################# [datastore] diff --git a/httpd/context.go b/httpd/context.go index 9e5d3e8..52abfc6 100644 --- a/httpd/context.go +++ b/httpd/context.go @@ -6,6 +6,8 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/sessions" + "github.com/jhillyerd/inbucket/config" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/smtpd" ) @@ -14,6 +16,8 @@ type Context struct { Vars map[string]string Session *sessions.Session DataStore smtpd.DataStore + MsgHub *msghub.Hub + WebConfig config.WebConfig IsJSON bool } @@ -56,6 +60,8 @@ func NewContext(req *http.Request) (*Context, error) { Vars: vars, Session: sess, DataStore: DataStore, + MsgHub: msgHub, + WebConfig: webConfig, IsJSON: headerMatch(req, "Accept", "application/json"), } return ctx, err diff --git a/httpd/server.go b/httpd/server.go index 769d7d6..103e0f1 100644 --- a/httpd/server.go +++ b/httpd/server.go @@ -3,17 +3,18 @@ package httpd import ( "context" + "expvar" "fmt" "net" "net/http" "time" - "github.com/goods/httpbuf" "github.com/gorilla/mux" "github.com/gorilla/securecookie" "github.com/gorilla/sessions" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/smtpd" ) @@ -24,6 +25,9 @@ var ( // DataStore is where all the mailboxes and messages live DataStore smtpd.DataStore + // msgHub holds a reference to the message pub/sub system + msgHub *msghub.Hub + // Router is shared between httpd, webui and rest packages. It sends // incoming requests to the correct handler function Router = mux.NewRouter() @@ -33,15 +37,29 @@ var ( listener net.Listener sessionStore sessions.Store globalShutdown chan bool + + // ExpWebSocketConnectsCurrent tracks the number of open WebSockets + ExpWebSocketConnectsCurrent = new(expvar.Int) ) +func init() { + m := expvar.NewMap("http") + m.Set("WebSocketConnectsCurrent", ExpWebSocketConnectsCurrent) +} + // Initialize sets up things for unit tests or the Start() method -func Initialize(cfg config.WebConfig, ds smtpd.DataStore, shutdownChan chan bool) { +func Initialize( + cfg config.WebConfig, + shutdownChan chan bool, + ds smtpd.DataStore, + mh *msghub.Hub) { + webConfig = cfg globalShutdown = shutdownChan // NewContext() will use this DataStore for the web handlers DataStore = ds + msgHub = mh // Content Paths log.Infof("HTTP templates mapped to %q", cfg.TemplateDir) @@ -122,26 +140,13 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { defer ctx.Close() // Run the handler, grab the error, and report it - buf := new(httpbuf.Buffer) log.Tracef("HTTP[%v] %v %v %q", req.RemoteAddr, req.Proto, req.Method, req.RequestURI) - err = h(buf, req, ctx) + err = h(w, req, ctx) if err != nil { log.Errorf("HTTP error handling %q: %v", req.RequestURI, err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - - // Save the session - if err = ctx.Session.Save(req, buf); err != nil { - log.Errorf("HTTP failed to save session: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // Apply the buffered response to the writer - if _, err = buf.Apply(w); err != nil { - log.Errorf("HTTP failed to write response: %v", err) - } } func emergencyShutdown() { diff --git a/inbucket.go b/inbucket.go index 09eeae0..d737e5d 100644 --- a/inbucket.go +++ b/inbucket.go @@ -14,6 +14,7 @@ import ( "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/httpd" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/pop3d" "github.com/jhillyerd/inbucket/rest" "github.com/jhillyerd/inbucket/smtpd" @@ -95,11 +96,14 @@ func main() { } } + // Create message hub + msgHub := msghub.New(rootCtx, config.GetWebConfig().MonitorHistory) + // Grab our datastore ds := smtpd.DefaultFileDataStore() // Start HTTP server - httpd.Initialize(config.GetWebConfig(), ds, shutdownChan) + httpd.Initialize(config.GetWebConfig(), shutdownChan, ds, msgHub) webui.SetupRoutes(httpd.Router) rest.SetupRoutes(httpd.Router) go httpd.Start(rootCtx) @@ -110,7 +114,7 @@ func main() { go pop3Server.Start(rootCtx) // Startup SMTP server - smtpServer = smtpd.NewServer(config.GetSMTPConfig(), ds, shutdownChan) + smtpServer = smtpd.NewServer(config.GetSMTPConfig(), shutdownChan, ds, msgHub) go smtpServer.Start(rootCtx) // Loop forever waiting for signals or shutdown channel diff --git a/msghub/hub.go b/msghub/hub.go new file mode 100644 index 0000000..0f43077 --- /dev/null +++ b/msghub/hub.go @@ -0,0 +1,116 @@ +package msghub + +import ( + "container/ring" + "context" + "sync" + "time" +) + +// Message contains the basic header data for a message +type Message struct { + Mailbox string + ID string + From string + To []string + Subject string + Date time.Time + Size int64 +} + +// Listener receives the contents of the log, followed by new messages +type Listener interface { + Receive(msg Message) error +} + +// Hub relays messages on to its listeners +type Hub struct { + // log stores history, points next spot to write. First non-nil entry is oldest Message + log *ring.Ring + logMx sync.RWMutex + + // listeners interested in new messages + listeners map[Listener]struct{} + listenersMx sync.RWMutex + + // broadcast receives new messages + broadcast chan Message +} + +// New constructs a new Hub which will cache logSize messages in memory for playback to future +// listeners. A goroutine is created to handle incoming messages; it will run until the provided +// context is canceled. +func New(ctx context.Context, logSize int) *Hub { + h := &Hub{ + log: ring.New(logSize), + listeners: make(map[Listener]struct{}), + broadcast: make(chan Message, 100), + } + + go func() { + for { + select { + case <-ctx.Done(): + // Shutdown + close(h.broadcast) + h.broadcast = nil + return + case msg := <-h.broadcast: + // Log message + h.logMx.Lock() + h.log.Value = msg + h.log = h.log.Next() + h.logMx.Unlock() + // Deliver message to listeners + h.deliver(msg) + } + } + }() + + return h +} + +// Broadcast queues a message for processing by the hub. The message will be placed into the +// in-memory log and relayed to all registered listeners. +func (h *Hub) Broadcast(msg Message) { + if h.broadcast != nil { + h.broadcast <- msg + } +} + +// AddListener registers a listener to receive broadcasted messages. +func (h *Hub) AddListener(l Listener) { + // Playback log + h.logMx.RLock() + h.log.Do(func(v interface{}) { + if v != nil { + l.Receive(v.(Message)) + } + }) + h.logMx.RUnlock() + + // Add to listeners + h.listenersMx.Lock() + h.listeners[l] = struct{}{} + h.listenersMx.Unlock() +} + +// RemoveListener deletes a listener registration, it will cease to receive messages. +func (h *Hub) RemoveListener(l Listener) { + h.listenersMx.Lock() + defer h.listenersMx.Unlock() + if _, ok := h.listeners[l]; ok { + delete(h.listeners, l) + } +} + +// deliver message to all listeners, removing listeners if they return an error +func (h *Hub) deliver(msg Message) { + h.listenersMx.RLock() + defer h.listenersMx.RUnlock() + for l := range h.listeners { + if err := l.Receive(msg); err != nil { + h.RemoveListener(l) + } + } +} diff --git a/msghub/hub_test.go b/msghub/hub_test.go new file mode 100644 index 0000000..f6c131e --- /dev/null +++ b/msghub/hub_test.go @@ -0,0 +1,243 @@ +package msghub + +import ( + "context" + "fmt" + "testing" + "time" +) + +// testListener implements the Listener interface, mock for unit tests +type testListener struct { + messages []*Message // received messages + wantMessages int // how many messages this listener wants to receive + errorAfter int // when != 0, messages until Receive() begins returning error + + done chan struct{} // closed once we have received wantMessages + overflow chan struct{} // closed if we receive wantMessages+1 +} + +func newTestListener(want int) *testListener { + l := &testListener{ + messages: make([]*Message, 0, want*2), + wantMessages: want, + done: make(chan struct{}), + overflow: make(chan struct{}), + } + if want == 0 { + close(l.done) + } + return l +} + +// Receive a Message, store it in the messages slice, close applicable channels, and return an error +// if instructed +func (l *testListener) Receive(msg Message) error { + l.messages = append(l.messages, &msg) + if len(l.messages) == l.wantMessages { + close(l.done) + } + if len(l.messages) == l.wantMessages+1 { + close(l.overflow) + } + if l.errorAfter > 0 && len(l.messages) > l.errorAfter { + return fmt.Errorf("Too many messages") + } + return nil +} + +// String formats the got vs wanted message counts +func (l *testListener) String() string { + return fmt.Sprintf("got %v messages, wanted %v", len(l.messages), l.wantMessages) +} + +func TestHubNew(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + if hub == nil { + t.Fatal("New() == nil, expected a new Hub") + } +} + +func TestHubZeroListeners(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + m := Message{} + for i := 0; i < 100; i++ { + hub.Broadcast(m) + } + // Just making sure Hub doesn't panic +} + +func TestHubOneListener(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + m := Message{} + l := newTestListener(1) + + hub.AddListener(l) + hub.Broadcast(m) + + // Wait for messages + select { + case <-l.done: + case <-time.After(time.Second): + t.Error("Timeout:", l) + } +} + +func TestHubRemoveListener(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + m := Message{} + l := newTestListener(1) + + hub.AddListener(l) + hub.Broadcast(m) + hub.RemoveListener(l) + hub.Broadcast(m) + + // Wait for messages + select { + case <-l.overflow: + t.Error(l) + case <-time.After(250 * time.Millisecond): + // Expected result, no overflow + } +} + +func TestHubRemoveListenerOnError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + m := Message{} + + // error after 1 means listener should receive 2 messages before being removed + l := newTestListener(2) + l.errorAfter = 1 + + hub.AddListener(l) + hub.Broadcast(m) + hub.Broadcast(m) + hub.Broadcast(m) + hub.Broadcast(m) + + // Wait for messages + select { + case <-l.overflow: + t.Error(l) + case <-time.After(250 * time.Millisecond): + // Expected result, no overflow + } +} + +func TestHubLogReplay(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 100) + l1 := newTestListener(3) + hub.AddListener(l1) + + // Broadcast 3 messages with no listeners + msgs := make([]Message, 3) + for i := 0; i < len(msgs); i++ { + msgs[i] = Message{ + Subject: fmt.Sprintf("subj %v", i), + } + hub.Broadcast(msgs[i]) + } + + // Wait for messages (live) + select { + case <-l1.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l1) + } + + // Add a new listener + l2 := newTestListener(3) + hub.AddListener(l2) + + // Wait for messages (log) + select { + case <-l2.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l2) + } + + for i := 0; i < len(msgs); i++ { + got := l2.messages[i].Subject + want := msgs[i].Subject + if got != want { + t.Errorf("msg[%v].Subject == %q, want %q", i, got, want) + } + } +} + +func TestHubLogReplayWrap(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(ctx, 5) + l1 := newTestListener(20) + hub.AddListener(l1) + + // Broadcast more messages than the hub can hold + msgs := make([]Message, 20) + for i := 0; i < len(msgs); i++ { + msgs[i] = Message{ + Subject: fmt.Sprintf("subj %v", i), + } + hub.Broadcast(msgs[i]) + } + + // Wait for messages (live) + select { + case <-l1.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l1) + } + + // Add a new listener + l2 := newTestListener(5) + hub.AddListener(l2) + + // Wait for messages (log) + select { + case <-l2.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l2) + } + + for i := 0; i < 5; i++ { + got := l2.messages[i].Subject + want := msgs[i+15].Subject + if got != want { + t.Errorf("msg[%v].Subject == %q, want %q", i, got, want) + } + } +} + +func TestHubContextCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + hub := New(ctx, 5) + m := Message{} + l := newTestListener(1) + + hub.AddListener(l) + hub.Broadcast(m) + cancel() + time.Sleep(50 * time.Millisecond) + hub.Broadcast(m) + + // Wait for messages + select { + case <-l.overflow: + t.Error(l) + case <-time.After(250 * time.Millisecond): + // Expected result, no overflow + } +} diff --git a/rest/routes.go b/rest/routes.go index d474f28..146e53d 100644 --- a/rest/routes.go +++ b/rest/routes.go @@ -6,9 +6,16 @@ import "github.com/jhillyerd/inbucket/httpd" // SetupRoutes populates the routes for the REST interface func SetupRoutes(r *mux.Router) { // API v1 - r.Path("/api/v1/mailbox/{name}").Handler(httpd.Handler(MailboxListV1)).Name("MailboxListV1").Methods("GET") - r.Path("/api/v1/mailbox/{name}").Handler(httpd.Handler(MailboxPurgeV1)).Name("MailboxPurgeV1").Methods("DELETE") - r.Path("/api/v1/mailbox/{name}/{id}").Handler(httpd.Handler(MailboxShowV1)).Name("MailboxShowV1").Methods("GET") - r.Path("/api/v1/mailbox/{name}/{id}").Handler(httpd.Handler(MailboxDeleteV1)).Name("MailboxDeleteV1").Methods("DELETE") - r.Path("/api/v1/mailbox/{name}/{id}/source").Handler(httpd.Handler(MailboxSourceV1)).Name("MailboxSourceV1").Methods("GET") + r.Path("/api/v1/mailbox/{name}").Handler( + httpd.Handler(MailboxListV1)).Name("MailboxListV1").Methods("GET") + r.Path("/api/v1/mailbox/{name}").Handler( + httpd.Handler(MailboxPurgeV1)).Name("MailboxPurgeV1").Methods("DELETE") + r.Path("/api/v1/mailbox/{name}/{id}").Handler( + httpd.Handler(MailboxShowV1)).Name("MailboxShowV1").Methods("GET") + r.Path("/api/v1/mailbox/{name}/{id}").Handler( + httpd.Handler(MailboxDeleteV1)).Name("MailboxDeleteV1").Methods("DELETE") + r.Path("/api/v1/mailbox/{name}/{id}/source").Handler( + httpd.Handler(MailboxSourceV1)).Name("MailboxSourceV1").Methods("GET") + r.Path("/api/v1/monitor/all/messages").Handler( + httpd.Handler(MonitorAllMessagesV1)).Name("MonitorAllMessagesV1").Methods("GET") } diff --git a/rest/socketv1_controller.go b/rest/socketv1_controller.go new file mode 100644 index 0000000..5bab16d --- /dev/null +++ b/rest/socketv1_controller.go @@ -0,0 +1,160 @@ +package rest + +import ( + "net/http" + "time" + + "github.com/gorilla/websocket" + "github.com/jhillyerd/inbucket/httpd" + "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" + "github.com/jhillyerd/inbucket/rest/model" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Maximum message size allowed from peer. + maxMessageSize = 512 +) + +// options for gorilla connection upgrader +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// msgListener handles messages from the msghub +type msgListener struct { + hub *msghub.Hub + c chan msghub.Message +} + +// newMsgListener creates a listener and registers it +func newMsgListener(hub *msghub.Hub) *msgListener { + ml := &msgListener{ + hub: hub, + c: make(chan msghub.Message, 100), + } + hub.AddListener(ml) + return ml +} + +// Receive handles an incoming message +func (ml *msgListener) Receive(msg msghub.Message) error { + ml.c <- msg + return nil +} + +// WSReader makes sure the websocket client is still connected +func (ml *msgListener) WSReader(conn *websocket.Conn) { + defer ml.Close() + conn.SetReadLimit(maxMessageSize) + conn.SetReadDeadline(time.Now().Add(pongWait)) + conn.SetPongHandler(func(string) error { + log.Tracef("HTTP[%v] Got WebSocket pong", conn.RemoteAddr()) + conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + for { + if _, _, err := conn.ReadMessage(); err != nil { + if websocket.IsUnexpectedCloseError( + err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived, + ) { + // Unexpected close code + log.Warnf("HTTP[%v] WebSocket error: %v", conn.RemoteAddr(), err) + } else { + log.Tracef("HTTP[%v] Closing WebSocket", conn.RemoteAddr()) + } + break + } + } +} + +// WSWriter makes sure the websocket client is still connected +func (ml *msgListener) WSWriter(conn *websocket.Conn) { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + ml.Close() + }() + + // Handle messages from hub until msgListener is closed + for { + select { + case msg, ok := <-ml.c: + conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // msgListener closed, exit + conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + header := &model.JSONMessageHeaderV1{ + Mailbox: msg.Mailbox, + ID: msg.ID, + From: msg.From, + To: msg.To, + Subject: msg.Subject, + Date: msg.Date, + Size: msg.Size, + } + if conn.WriteJSON(header) != nil { + // Write failed + return + } + case <-ticker.C: + // Send ping + conn.SetWriteDeadline(time.Now().Add(writeWait)) + if conn.WriteMessage(websocket.PingMessage, []byte{}) != nil { + // Write error + return + } + log.Tracef("HTTP[%v] Sent WebSocket ping", conn.RemoteAddr()) + } + } +} + +// Close removes the listener registration +func (ml *msgListener) Close() { + select { + case <-ml.c: + // Already closed + default: + ml.hub.RemoveListener(ml) + close(ml.c) + } +} + +func MonitorAllMessagesV1( + w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (err error) { + // Upgrade to Websocket + conn, err := upgrader.Upgrade(w, req, nil) + if err != nil { + return err + } + httpd.ExpWebSocketConnectsCurrent.Add(1) + defer func() { + _ = conn.Close() + httpd.ExpWebSocketConnectsCurrent.Add(-1) + }() + + log.Tracef("HTTP[%v] Upgraded to websocket", req.RemoteAddr) + + // Create, register listener; then interact with conn + ml := newMsgListener(ctx.MsgHub) + go ml.WSWriter(conn) + ml.WSReader(conn) + + return nil +} diff --git a/rest/testmocks_test.go b/rest/testmocks_test.go index a671165..bc720fc 100644 --- a/rest/testmocks_test.go +++ b/rest/testmocks_test.go @@ -50,6 +50,11 @@ func (m *MockMailbox) NewMessage() (smtpd.Message, error) { return args.Get(0).(smtpd.Message), args.Error(1) } +func (m *MockMailbox) Name() string { + args := m.Called() + return args.String(0) +} + func (m *MockMailbox) String() string { args := m.Called() return args.String(0) diff --git a/rest/testutils_test.go b/rest/testutils_test.go index efc00ed..7cfd223 100644 --- a/rest/testutils_test.go +++ b/rest/testutils_test.go @@ -12,6 +12,7 @@ import ( "github.com/jhillyerd/enmime" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/httpd" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/smtpd" ) @@ -199,7 +200,7 @@ func setupWebServer(ds smtpd.DataStore) *bytes.Buffer { PublicDir: "../themes/bootstrap/public", } shutdownChan := make(chan bool) - httpd.Initialize(cfg, ds, shutdownChan) + httpd.Initialize(cfg, shutdownChan, ds, &msghub.Hub{}) SetupRoutes(httpd.Router) return buf diff --git a/smtpd/datastore.go b/smtpd/datastore.go index c06ad6a..180a3ec 100644 --- a/smtpd/datastore.go +++ b/smtpd/datastore.go @@ -29,6 +29,7 @@ type Mailbox interface { GetMessage(id string) (Message, error) Purge() error NewMessage() (Message, error) + Name() string String() string } diff --git a/smtpd/filestore.go b/smtpd/filestore.go index f6208ed..5f4d4f0 100644 --- a/smtpd/filestore.go +++ b/smtpd/filestore.go @@ -148,6 +148,10 @@ type FileMailbox struct { messages []*FileMessage } +func (mb *FileMailbox) Name() string { + return mb.name +} + func (mb *FileMailbox) String() string { return mb.name + "[" + mb.dirName + "]" } diff --git a/smtpd/handler.go b/smtpd/handler.go index a9b49a1..e267d11 100644 --- a/smtpd/handler.go +++ b/smtpd/handler.go @@ -13,6 +13,7 @@ import ( "time" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" ) // State tracks the current mode of our SMTP state machine @@ -67,6 +68,12 @@ var commands = map[string]bool{ "TURN": true, } +// recipientDetails for message delivery +type recipientDetails struct { + address, localPart, domainPart string + mailbox Mailbox +} + // Session holds the state of an SMTP session type Session struct { server *Server @@ -341,13 +348,7 @@ func (ss *Session) mailHandler(cmd string, arg string) { // DATA func (ss *Session) dataHandler() { - type RecipientDetails struct { - address, localPart, domainPart string - mailbox Mailbox - } - recipients := make([]RecipientDetails, 0, ss.recipients.Len()) - // Timestamp for Received header - stamp := time.Now().Format(timeStampFormat) + recipients := make([]recipientDetails, 0, ss.recipients.Len()) // Get a Mailbox and a new Message for each recipient msgSize := 0 if ss.server.storeMessages { @@ -369,7 +370,7 @@ func (ss *Session) dataHandler() { ss.reset() return } - recipients = append(recipients, RecipientDetails{recip, local, domain, mb}) + recipients = append(recipients, recipientDetails{recip, local, domain, mb}) } else { log.Tracef("Not storing message for %q", recip) } @@ -399,39 +400,15 @@ func (ss *Session) dataHandler() { if ss.server.storeMessages { // Create a message for each valid recipient for _, r := range recipients { - msg, err := r.mailbox.NewMessage() - if err != nil { - ss.logError("Failed to create message for %q: %s", r.localPart, err) - ss.send(fmt.Sprintf("451 Failed to create message for %v", r.localPart)) + if ok := ss.deliverMessage(r, msgBuf); ok { + expReceivedTotal.Add(1) + } else { + // Delivery failure + ss.send(fmt.Sprintf("451 Failed to store message for %v", r.localPart)) ss.reset() return } - - // Generate Received header - recd := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n", - ss.remoteDomain, ss.remoteHost, ss.server.domain, r.address, stamp) - if err := msg.Append([]byte(recd)); err != nil { - ss.logError("Failed to write received header for %q: %s", r.localPart, err) - ss.send(fmt.Sprintf("451 Failed to create message for %v", r.localPart)) - ss.reset() - return - } - // Append lines from msgBuf - for _, line = range msgBuf { - if err := msg.Append(line); err != nil { - ss.logError("Failed to append to mailbox %v: %v", - r.mailbox, err) - ss.send("554 Something went wrong") - ss.reset() - // Should really cleanup the crap on filesystem - return - } - } - if err := msg.Close(); err != nil { - ss.logError("Error: %v while writing message", err) - } - expReceivedTotal.Add(1) - } // end for + } } else { expReceivedTotal.Add(1) } @@ -458,6 +435,51 @@ func (ss *Session) dataHandler() { } // end for } +// deliverMessage creates and populates a new Message for the specified recipient +func (ss *Session) deliverMessage(r recipientDetails, msgBuf [][]byte) (ok bool) { + msg, err := r.mailbox.NewMessage() + if err != nil { + ss.logError("Failed to create message for %q: %s", r.localPart, err) + return false + } + + // Generate Received header + stamp := time.Now().Format(timeStampFormat) + recd := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n", + ss.remoteDomain, ss.remoteHost, ss.server.domain, r.address, stamp) + if err := msg.Append([]byte(recd)); err != nil { + ss.logError("Failed to write received header for %q: %s", r.localPart, err) + return false + } + + // Append lines from msgBuf + for _, line := range msgBuf { + if err := msg.Append(line); err != nil { + ss.logError("Failed to append to mailbox %v: %v", r.mailbox, err) + // Should really cleanup the crap on filesystem + return false + } + } + if err := msg.Close(); err != nil { + ss.logError("Error while closing message for %v: %v", r.mailbox, err) + return false + } + + // Broadcast message information + broadcast := msghub.Message{ + Mailbox: r.mailbox.Name(), + ID: msg.ID(), + From: msg.From(), + To: msg.To(), + Subject: msg.Subject(), + Date: msg.Date(), + Size: msg.Size(), + } + ss.server.msgHub.Broadcast(broadcast) + + return true +} + func (ss *Session) enterState(state State) { ss.state = state ss.logTrace("Entering state %v", state) diff --git a/smtpd/handler_test.go b/smtpd/handler_test.go index 34814c7..3f4dc82 100644 --- a/smtpd/handler_test.go +++ b/smtpd/handler_test.go @@ -5,13 +5,15 @@ import ( "fmt" "io" - "github.com/jhillyerd/inbucket/config" "log" "net" "net/textproto" "os" "testing" "time" + + "github.com/jhillyerd/inbucket/config" + "github.com/jhillyerd/inbucket/msghub" ) type scriptStep struct { @@ -153,6 +155,13 @@ func TestMailState(t *testing.T) { msg1 := &MockMessage{} mds.On("MailboxFor").Return(mb1, nil) mb1.On("NewMessage").Return(msg1, nil) + mb1.On("Name").Return("u1") + msg1.On("ID").Return("") + msg1.On("From").Return("") + msg1.On("To").Return(make([]string, 0)) + msg1.On("Date").Return(time.Time{}) + msg1.On("Subject").Return("") + msg1.On("Size").Return(0) msg1.On("Close").Return(nil) server, logbuf := setupSMTPServer(mds) @@ -263,6 +272,13 @@ func TestDataState(t *testing.T) { msg1 := &MockMessage{} mds.On("MailboxFor").Return(mb1, nil) mb1.On("NewMessage").Return(msg1, nil) + mb1.On("Name").Return("u1") + msg1.On("ID").Return("") + msg1.On("From").Return("") + msg1.On("To").Return(make([]string, 0)) + msg1.On("Date").Return(time.Time{}) + msg1.On("Subject").Return("") + msg1.On("Size").Return(0) msg1.On("Close").Return(nil) server, logbuf := setupSMTPServer(mds) @@ -378,7 +394,7 @@ func setupSMTPServer(ds DataStore) (*Server, *bytes.Buffer) { // Create a server, don't start it shutdownChan := make(chan bool) - return NewServer(cfg, ds, shutdownChan), buf + return NewServer(cfg, shutdownChan, ds, &msghub.Hub{}), buf } var sessionNum int diff --git a/smtpd/listener.go b/smtpd/listener.go index ba4fca0..c2d3905 100644 --- a/smtpd/listener.go +++ b/smtpd/listener.go @@ -12,24 +12,27 @@ import ( "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" ) // Server holds the configuration and state of our SMTP server type Server struct { + // Configuration domain string domainNoStore string maxRecips int maxIdleSeconds int maxMessageBytes int - dataStore DataStore storeMessages bool - listener net.Listener - // globalShutdown is the signal Inbucket needs to shut down - globalShutdown chan bool + // Dependencies + dataStore DataStore // Mailbox/message store + globalShutdown chan bool // Shuts down Inbucket + msgHub *msghub.Hub // Pub/sub for message info - // waitgroup tracks individual sessions - waitgroup *sync.WaitGroup + // State + listener net.Listener // Incoming network connections + waitgroup *sync.WaitGroup // Waitgroup tracks individual sessions } var ( @@ -54,17 +57,22 @@ var ( ) // NewServer creates a new Server instance with the specificed config -func NewServer(cfg config.SMTPConfig, ds DataStore, globalShutdown chan bool) *Server { +func NewServer( + cfg config.SMTPConfig, + globalShutdown chan bool, + ds DataStore, + msgHub *msghub.Hub) *Server { return &Server{ - dataStore: ds, domain: cfg.Domain, + domainNoStore: strings.ToLower(cfg.DomainNoStore), maxRecips: cfg.MaxRecipients, maxIdleSeconds: cfg.MaxIdleSeconds, maxMessageBytes: cfg.MaxMessageBytes, storeMessages: cfg.StoreMessages, - domainNoStore: strings.ToLower(cfg.DomainNoStore), - waitgroup: new(sync.WaitGroup), globalShutdown: globalShutdown, + dataStore: ds, + msgHub: msgHub, + waitgroup: new(sync.WaitGroup), } } diff --git a/smtpd/retention_test.go b/smtpd/retention_test.go index 95b17c1..a08458b 100644 --- a/smtpd/retention_test.go +++ b/smtpd/retention_test.go @@ -106,6 +106,11 @@ func (m *MockMailbox) NewMessage() (Message, error) { return args.Get(0).(Message), args.Error(1) } +func (m *MockMailbox) Name() string { + args := m.Called() + return args.String(0) +} + func (m *MockMailbox) String() string { args := m.Called() return args.String(0) diff --git a/swaks-tests/run-tests.sh b/swaks-tests/run-tests.sh index f7c3564..e747dde 100755 --- a/swaks-tests/run-tests.sh +++ b/swaks-tests/run-tests.sh @@ -31,7 +31,7 @@ export SWAKS_OPT_to="$to@inbucket.local" swaks $* --h-Subject: "Swaks Plain Text" --body text.txt # Multi-recipient test -swaks $* --to="$to@inbucket.local,Alt User " --h-Subject: "Swaks Multi-Recipient" \ +swaks $* --to="$to@inbucket.local,alternate@inbucket.local" --h-Subject: "Swaks Multi-Recipient" \ --body text.txt # HTML test diff --git a/themes/bootstrap/public/inbucket.css b/themes/bootstrap/public/inbucket.css index 0da5a2f..c3c9f49 100644 --- a/themes/bootstrap/public/inbucket.css +++ b/themes/bootstrap/public/inbucket.css @@ -81,3 +81,12 @@ table.metrics { width: 200px; } +/* Monitor */ +#monitor-message-list td { + cursor: pointer; + font-size: 12px; +} + +#conn-status { + font-style: italic; +} diff --git a/themes/bootstrap/public/metrics.js b/themes/bootstrap/public/metrics.js index ff01448..c6512bc 100644 --- a/themes/bootstrap/public/metrics.js +++ b/themes/bootstrap/public/metrics.js @@ -104,6 +104,7 @@ function displayMetrics(data, textStatus, jqXHR) { metric('memstatsHeapSys', data.memstats.HeapSys, sizeFilter, true); metric('memstatsHeapObjects', data.memstats.HeapObjects, numberFilter, true); metric('smtpConnectsCurrent', data.smtp.ConnectsCurrent, numberFilter, true); + metric('httpWebSocketConnectsCurrent', data.http.WebSocketConnectsCurrent, numberFilter, true); // Server-side history metric('smtpReceivedTotal', data.smtp.ReceivedTotal, numberFilter, false); diff --git a/themes/bootstrap/public/monitor.js b/themes/bootstrap/public/monitor.js new file mode 100644 index 0000000..e7716d2 --- /dev/null +++ b/themes/bootstrap/public/monitor.js @@ -0,0 +1,45 @@ +var baseURL = window.location.protocol + '//' + window.location.host; + +function startMonitor() { + $.addTemplateFormatter({ + "date": function(value, template) { + return moment(value).calendar(); + }, + "subject": function(value, template) { + if (value == null || value.length == 0) { + return "(No Subject)"; + } + return value; + } + }); + + var uri = '/api/v1/monitor/all/messages' + var l = window.location; + var url = ((l.protocol === "https:") ? "wss://" : "ws://") + l.host + uri + var ws = new WebSocket(url); + + ws.addEventListener('open', function (e) { + $('#conn-status').text('Connected.'); + }); + ws.addEventListener('message', function (e) { + var msg = JSON.parse(e.data); + msg['href'] = '/mailbox?name=' + msg.mailbox + '&id=' + msg.id; + $('#monitor-message-list').loadTemplate( + $('#message-template'), + msg, + { append: true }); + }); + ws.addEventListener('close', function (e) { + $('#conn-status').text('Disconnected!'); + }); +} + +function messageClick(node) { + var href = node.attributes['href'].value; + var url = baseURL + href; + window.location.assign(url); +} + +function clearClick() { + $('#monitor-message-list').empty(); +} diff --git a/themes/bootstrap/templates/_base.html b/themes/bootstrap/templates/_base.html index 1fecc90..114e1ec 100644 --- a/themes/bootstrap/templates/_base.html +++ b/themes/bootstrap/templates/_base.html @@ -48,7 +48,10 @@ {{end}} - + {{if .ctx.WebConfig.MonitorVisible}} + + {{end}} +