diff --git a/httpd/context.go b/httpd/context.go index 9e5d3e8..318430e 100644 --- a/httpd/context.go +++ b/httpd/context.go @@ -6,6 +6,7 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/sessions" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/smtpd" ) @@ -14,6 +15,7 @@ type Context struct { Vars map[string]string Session *sessions.Session DataStore smtpd.DataStore + MsgHub *msghub.Hub IsJSON bool } @@ -56,6 +58,7 @@ func NewContext(req *http.Request) (*Context, error) { Vars: vars, Session: sess, DataStore: DataStore, + MsgHub: msgHub, IsJSON: headerMatch(req, "Accept", "application/json"), } return ctx, err diff --git a/httpd/server.go b/httpd/server.go index f8c88ac..9b4a338 100644 --- a/httpd/server.go +++ b/httpd/server.go @@ -13,6 +13,7 @@ import ( "github.com/gorilla/sessions" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/smtpd" ) @@ -23,6 +24,9 @@ var ( // DataStore is where all the mailboxes and messages live DataStore smtpd.DataStore + // msgHub holds a reference to the message pub/sub system + msgHub *msghub.Hub + // Router is shared between httpd, webui and rest packages. It sends // incoming requests to the correct handler function Router = mux.NewRouter() @@ -35,12 +39,18 @@ var ( ) // Initialize sets up things for unit tests or the Start() method -func Initialize(cfg config.WebConfig, ds smtpd.DataStore, shutdownChan chan bool) { +func Initialize( + cfg config.WebConfig, + shutdownChan chan bool, + ds smtpd.DataStore, + mh *msghub.Hub) { + webConfig = cfg globalShutdown = shutdownChan // NewContext() will use this DataStore for the web handlers DataStore = ds + msgHub = mh // Content Paths log.Infof("HTTP templates mapped to %q", cfg.TemplateDir) diff --git a/inbucket.go b/inbucket.go index 333408a..9934084 100644 --- a/inbucket.go +++ b/inbucket.go @@ -103,7 +103,7 @@ func main() { ds := smtpd.DefaultFileDataStore() // Start HTTP server - httpd.Initialize(config.GetWebConfig(), ds, shutdownChan) + httpd.Initialize(config.GetWebConfig(), shutdownChan, ds, msgHub) webui.SetupRoutes(httpd.Router) rest.SetupRoutes(httpd.Router) go httpd.Start(rootCtx) diff --git a/rest/routes.go b/rest/routes.go index d474f28..146e53d 100644 --- a/rest/routes.go +++ b/rest/routes.go @@ -6,9 +6,16 @@ import "github.com/jhillyerd/inbucket/httpd" // SetupRoutes populates the routes for the REST interface func SetupRoutes(r *mux.Router) { // API v1 - r.Path("/api/v1/mailbox/{name}").Handler(httpd.Handler(MailboxListV1)).Name("MailboxListV1").Methods("GET") - r.Path("/api/v1/mailbox/{name}").Handler(httpd.Handler(MailboxPurgeV1)).Name("MailboxPurgeV1").Methods("DELETE") - r.Path("/api/v1/mailbox/{name}/{id}").Handler(httpd.Handler(MailboxShowV1)).Name("MailboxShowV1").Methods("GET") - r.Path("/api/v1/mailbox/{name}/{id}").Handler(httpd.Handler(MailboxDeleteV1)).Name("MailboxDeleteV1").Methods("DELETE") - r.Path("/api/v1/mailbox/{name}/{id}/source").Handler(httpd.Handler(MailboxSourceV1)).Name("MailboxSourceV1").Methods("GET") + r.Path("/api/v1/mailbox/{name}").Handler( + httpd.Handler(MailboxListV1)).Name("MailboxListV1").Methods("GET") + r.Path("/api/v1/mailbox/{name}").Handler( + httpd.Handler(MailboxPurgeV1)).Name("MailboxPurgeV1").Methods("DELETE") + r.Path("/api/v1/mailbox/{name}/{id}").Handler( + httpd.Handler(MailboxShowV1)).Name("MailboxShowV1").Methods("GET") + r.Path("/api/v1/mailbox/{name}/{id}").Handler( + httpd.Handler(MailboxDeleteV1)).Name("MailboxDeleteV1").Methods("DELETE") + r.Path("/api/v1/mailbox/{name}/{id}/source").Handler( + httpd.Handler(MailboxSourceV1)).Name("MailboxSourceV1").Methods("GET") + r.Path("/api/v1/monitor/all/messages").Handler( + httpd.Handler(MonitorAllMessagesV1)).Name("MonitorAllMessagesV1").Methods("GET") } diff --git a/rest/socketv1_controller.go b/rest/socketv1_controller.go new file mode 100644 index 0000000..de5245f --- /dev/null +++ b/rest/socketv1_controller.go @@ -0,0 +1,155 @@ +package rest + +import ( + "net/http" + "time" + + "github.com/gorilla/websocket" + "github.com/jhillyerd/inbucket/httpd" + "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/msghub" + "github.com/jhillyerd/inbucket/rest/model" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Maximum message size allowed from peer. + maxMessageSize = 512 +) + +// options for gorilla connection upgrader +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// msgListener handles messages from the msghub +type msgListener struct { + hub *msghub.Hub + c chan msghub.Message +} + +// newMsgListener creates a listener and registers it +func newMsgListener(hub *msghub.Hub) *msgListener { + ml := &msgListener{ + hub: hub, + c: make(chan msghub.Message, 100), + } + hub.AddListener(ml) + return ml +} + +// Receive handles an incoming message +func (ml *msgListener) Receive(msg msghub.Message) error { + ml.c <- msg + return nil +} + +// WSReader makes sure the websocket client is still connected +func (ml *msgListener) WSReader(conn *websocket.Conn) { + defer ml.Close() + conn.SetReadLimit(maxMessageSize) + conn.SetReadDeadline(time.Now().Add(pongWait)) + conn.SetPongHandler(func(string) error { + log.Tracef("HTTP[%v] Got WebSocket pong", conn.RemoteAddr()) + conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + for { + if _, _, err := conn.ReadMessage(); err != nil { + if websocket.IsUnexpectedCloseError( + err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseNoStatusReceived, + ) { + // Unexpected close code + log.Warnf("HTTP[%v] WebSocket error: %v", conn.RemoteAddr(), err) + } else { + log.Tracef("HTTP[%v] Closing WebSocket", conn.RemoteAddr()) + } + break + } + } +} + +// WSWriter makes sure the websocket client is still connected +func (ml *msgListener) WSWriter(conn *websocket.Conn) { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + ml.Close() + }() + + // Handle messages from hub until msgListener is closed + for { + select { + case msg, ok := <-ml.c: + conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // msgListener closed, exit + conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + header := &model.JSONMessageHeaderV1{ + Mailbox: msg.Mailbox, + ID: msg.ID, + From: msg.From, + To: msg.To, + Subject: msg.Subject, + Date: msg.Date, + Size: msg.Size, + } + if conn.WriteJSON(header) != nil { + // Write failed + return + } + case <-ticker.C: + // Send ping + conn.SetWriteDeadline(time.Now().Add(writeWait)) + if conn.WriteMessage(websocket.PingMessage, []byte{}) != nil { + // Write error + return + } + log.Tracef("HTTP[%v] Sent WebSocket ping", conn.RemoteAddr()) + } + } +} + +// Close removes the listener registration +func (ml *msgListener) Close() { + select { + case <-ml.c: + // Already closed + default: + ml.hub.RemoveListener(ml) + close(ml.c) + } +} + +func MonitorAllMessagesV1( + w http.ResponseWriter, req *http.Request, ctx *httpd.Context) (err error) { + // Upgrade to Websocket + conn, err := upgrader.Upgrade(w, req, nil) + if err != nil { + return err + } + defer conn.Close() + log.Tracef("HTTP[%v] Upgraded to websocket", req.RemoteAddr) + + // Create, register listener; then interact with conn + ml := newMsgListener(ctx.MsgHub) + go ml.WSWriter(conn) + ml.WSReader(conn) + + return nil +} diff --git a/rest/testutils_test.go b/rest/testutils_test.go index efc00ed..7cfd223 100644 --- a/rest/testutils_test.go +++ b/rest/testutils_test.go @@ -12,6 +12,7 @@ import ( "github.com/jhillyerd/enmime" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/httpd" + "github.com/jhillyerd/inbucket/msghub" "github.com/jhillyerd/inbucket/smtpd" ) @@ -199,7 +200,7 @@ func setupWebServer(ds smtpd.DataStore) *bytes.Buffer { PublicDir: "../themes/bootstrap/public", } shutdownChan := make(chan bool) - httpd.Initialize(cfg, ds, shutdownChan) + httpd.Initialize(cfg, shutdownChan, ds, &msghub.Hub{}) SetupRoutes(httpd.Router) return buf