diff --git a/cmd/inbucket/main.go b/cmd/inbucket/main.go index 3b70cb3..834c05c 100644 --- a/cmd/inbucket/main.go +++ b/cmd/inbucket/main.go @@ -15,18 +15,10 @@ import ( "time" "github.com/inbucket/inbucket/pkg/config" - "github.com/inbucket/inbucket/pkg/message" - "github.com/inbucket/inbucket/pkg/msghub" - "github.com/inbucket/inbucket/pkg/policy" - "github.com/inbucket/inbucket/pkg/rest" - "github.com/inbucket/inbucket/pkg/server/pop3" - "github.com/inbucket/inbucket/pkg/server/smtp" - "github.com/inbucket/inbucket/pkg/server/web" + "github.com/inbucket/inbucket/pkg/server" "github.com/inbucket/inbucket/pkg/storage" "github.com/inbucket/inbucket/pkg/storage/file" "github.com/inbucket/inbucket/pkg/storage/mem" - "github.com/inbucket/inbucket/pkg/stringutil" - "github.com/inbucket/inbucket/pkg/webui" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -114,36 +106,16 @@ func main() { } } - // Configure internal services. - rootCtx, rootCancel := context.WithCancel(context.Background()) - shutdownChan := make(chan bool) - store, err := storage.FromConfig(conf.Storage) + // Configure and start internal services. + svcCtx, svcCancel := context.WithCancel(context.Background()) + services, err := server.FullAssembly(conf) if err != nil { + startupLog.Fatal().Err(err).Msg("Fatal error during startup") removePIDFile(*pidfile) - startupLog.Fatal().Err(err).Str("module", "storage").Msg("Fatal storage error") } - msgHub := msghub.New(rootCtx, conf.Web.MonitorHistory) - addrPolicy := &policy.Addressing{Config: conf} - mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub} - - // Start Retention scanner. - retentionScanner := storage.NewRetentionScanner(conf.Storage, store, shutdownChan) - retentionScanner.Start() - - // Configure routes and start HTTP server. - prefix := stringutil.MakePathPrefixer(conf.Web.BasePath) - webui.SetupRoutes(web.Router.PathPrefix(prefix("/serve/")).Subrouter()) - rest.SetupRoutes(web.Router.PathPrefix(prefix("/api/")).Subrouter()) - web.Initialize(conf, shutdownChan, mmanager, msgHub) - go web.Start(rootCtx) - - // Start POP3 server. - pop3Server := pop3.New(conf.POP3, shutdownChan, store) - go pop3Server.Start(rootCtx) - - // Start SMTP server. - smtpServer := smtp.NewServer(conf.SMTP, shutdownChan, mmanager, addrPolicy) - go smtpServer.Start(rootCtx) + services.Start(svcCtx, func() { + startupLog.Debug().Msg("All services report ready") + }) // Loop forever waiting for signals or shutdown channel. signalLoop: @@ -155,24 +127,27 @@ signalLoop: // Shutdown requested log.Info().Str("phase", "shutdown").Str("signal", "SIGINT"). Msg("Received SIGINT, shutting down") - close(shutdownChan) + svcCancel() + break signalLoop case syscall.SIGTERM: // Shutdown requested log.Info().Str("phase", "shutdown").Str("signal", "SIGTERM"). Msg("Received SIGTERM, shutting down") - close(shutdownChan) + svcCancel() + break signalLoop } - case <-shutdownChan: - rootCancel() + case <-services.Notify(): + log.Info().Str("phase", "shutdown").Msg("Shutting down due to service failure") + svcCancel() break signalLoop } } // Wait for active connections to finish. go timedExit(*pidfile) - smtpServer.Drain() - pop3Server.Drain() - retentionScanner.Join() + services.SMTPServer.Drain() + services.POP3Server.Drain() + services.RetentionScanner.Join() removePIDFile(*pidfile) closeLog() } diff --git a/pkg/msghub/hub.go b/pkg/msghub/hub.go index 77e06ee..59f5445 100644 --- a/pkg/msghub/hub.go +++ b/pkg/msghub/hub.go @@ -36,27 +36,26 @@ type Hub struct { // New constructs a new Hub which will cache historyLen messages in memory for playback to future // listeners. A goroutine is created to handle incoming messages; it will run until the provided // context is canceled. -func New(ctx context.Context, historyLen int) *Hub { - h := &Hub{ +func New(historyLen int) *Hub { + return &Hub{ history: ring.New(historyLen), listeners: make(map[Listener]struct{}), opChan: make(chan func(h *Hub), opChanLen), } +} - go func() { - for { - select { - case <-ctx.Done(): - // Shutdown - close(h.opChan) - return - case op := <-h.opChan: - op(h) - } +// Start Hub processing loop. +func (hub *Hub) Start(ctx context.Context) { + for { + select { + case <-ctx.Done(): + // Shutdown + close(hub.opChan) + return + case op := <-hub.opChan: + op(hub) } - }() - - return h + } } // Dispatch queues a message for broadcast by the hub. The message will be placed into the diff --git a/pkg/msghub/hub_test.go b/pkg/msghub/hub_test.go index db77c7f..5e0ec5c 100644 --- a/pkg/msghub/hub_test.go +++ b/pkg/msghub/hub_test.go @@ -52,9 +52,7 @@ func (l *testListener) String() string { } func TestHubNew(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - hub := New(ctx, 5) + hub := New(5) if hub == nil { t.Fatal("New() == nil, expected a new Hub") } @@ -63,29 +61,32 @@ func TestHubNew(t *testing.T) { func TestHubZeroLen(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(ctx, 0) + hub := New(0) + go hub.Start(ctx) m := Message{} for i := 0; i < 100; i++ { hub.Dispatch(m) } - // Just making sure Hub doesn't panic + // Ensures Hub doesn't panic } func TestHubZeroListeners(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(ctx, 5) + hub := New(5) + go hub.Start(ctx) m := Message{} for i := 0; i < 100; i++ { hub.Dispatch(m) } - // Just making sure Hub doesn't panic + // Ensures Hub doesn't panic } func TestHubOneListener(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(ctx, 5) + hub := New(5) + go hub.Start(ctx) m := Message{} l := newTestListener(1) @@ -103,7 +104,8 @@ func TestHubOneListener(t *testing.T) { func TestHubRemoveListener(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(ctx, 5) + hub := New(5) + go hub.Start(ctx) m := Message{} l := newTestListener(1) @@ -125,7 +127,8 @@ func TestHubRemoveListener(t *testing.T) { func TestHubRemoveListenerOnError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(ctx, 5) + hub := New(5) + go hub.Start(ctx) m := Message{} // error after 1 means listener should receive 2 messages before being removed @@ -151,7 +154,8 @@ func TestHubRemoveListenerOnError(t *testing.T) { func TestHubHistoryReplay(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(ctx, 100) + hub := New(100) + go hub.Start(ctx) l1 := newTestListener(3) hub.AddListener(l1) @@ -194,7 +198,8 @@ func TestHubHistoryReplay(t *testing.T) { func TestHubHistoryReplayWrap(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - hub := New(ctx, 5) + hub := New(5) + go hub.Start(ctx) l1 := newTestListener(20) hub.AddListener(l1) @@ -236,7 +241,8 @@ func TestHubHistoryReplayWrap(t *testing.T) { func TestHubContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - hub := New(ctx, 5) + hub := New(5) + go hub.Start(ctx) m := Message{} l := newTestListener(1) diff --git a/pkg/rest/testutils_test.go b/pkg/rest/testutils_test.go index 9936b52..a64f411 100644 --- a/pkg/rest/testutils_test.go +++ b/pkg/rest/testutils_test.go @@ -48,9 +48,8 @@ func setupWebServer(mm message.Manager) *bytes.Buffer { UIDir: "../ui", }, } - shutdownChan := make(chan bool) SetupRoutes(web.Router.PathPrefix("/api/").Subrouter()) - web.Initialize(cfg, shutdownChan, mm, &msghub.Hub{}) + web.NewServer(cfg, mm, &msghub.Hub{}) return buf } diff --git a/pkg/server/lifecycle.go b/pkg/server/lifecycle.go new file mode 100644 index 0000000..79b477d --- /dev/null +++ b/pkg/server/lifecycle.go @@ -0,0 +1,105 @@ +package server + +import ( + "context" + "sync" + + "github.com/inbucket/inbucket/pkg/config" + "github.com/inbucket/inbucket/pkg/message" + "github.com/inbucket/inbucket/pkg/msghub" + "github.com/inbucket/inbucket/pkg/policy" + "github.com/inbucket/inbucket/pkg/rest" + "github.com/inbucket/inbucket/pkg/server/pop3" + "github.com/inbucket/inbucket/pkg/server/smtp" + "github.com/inbucket/inbucket/pkg/server/web" + "github.com/inbucket/inbucket/pkg/storage" + "github.com/inbucket/inbucket/pkg/stringutil" + "github.com/inbucket/inbucket/pkg/webui" +) + +// Services holds the configured services. +type Services struct { + MsgHub *msghub.Hub + POP3Server *pop3.Server + RetentionScanner *storage.RetentionScanner + SMTPServer *smtp.Server + WebServer *web.Server + notify chan error // Combined notification for failed services. + ready *sync.WaitGroup // Tracks services that have not reported ready. +} + +// FullAssembly wires up a complete Inbucket environment. +func FullAssembly(conf *config.Root) (*Services, error) { + // Configure storage. + store, err := storage.FromConfig(conf.Storage) + if err != nil { + return nil, err + } + + addrPolicy := &policy.Addressing{Config: conf} + msgHub := msghub.New(conf.Web.MonitorHistory) + mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub} + + // Start Retention scanner. + retentionScanner := storage.NewRetentionScanner(conf.Storage, store) + + // Configure routes and build HTTP server. + prefix := stringutil.MakePathPrefixer(conf.Web.BasePath) + webui.SetupRoutes(web.Router.PathPrefix(prefix("/serve/")).Subrouter()) + rest.SetupRoutes(web.Router.PathPrefix(prefix("/api/")).Subrouter()) + webServer := web.NewServer(conf, mmanager, msgHub) + + pop3Server := pop3.NewServer(conf.POP3, store) + smtpServer := smtp.NewServer(conf.SMTP, mmanager, addrPolicy) + + return &Services{ + MsgHub: msgHub, + RetentionScanner: retentionScanner, + POP3Server: pop3Server, + SMTPServer: smtpServer, + WebServer: webServer, + ready: &sync.WaitGroup{}, + }, nil +} + +// Start all services, returns immediately. Callers may use Notify to detect failed services. +func (s *Services) Start(ctx context.Context, readyFunc func()) { + go s.MsgHub.Start(ctx) + go s.WebServer.Start(ctx, s.makeReadyFunc()) + go s.SMTPServer.Start(ctx, s.makeReadyFunc()) + go s.POP3Server.Start(ctx, s.makeReadyFunc()) + go s.RetentionScanner.Start(ctx) + + // Notify when all services report ready. + go func() { + s.ready.Wait() + readyFunc() + }() +} + +// Notify merges the error notification channels of all fallible services, allowing the process to +// be shutdown if needed. +func (s *Services) Notify() <-chan error { + c := make(chan error, 1) + go func() { + // TODO: What level to log failure. + select { + case err := <-s.POP3Server.Notify(): + c <- err + case err := <-s.SMTPServer.Notify(): + c <- err + case err := <-s.WebServer.Notify(): + c <- err + } + }() + + return c +} + +func (s *Services) makeReadyFunc() func() { + s.ready.Add(1) + var once sync.Once + return func() { + once.Do(s.ready.Done) + } +} diff --git a/pkg/server/pop3/listener.go b/pkg/server/pop3/listener.go index b1aa002..8216593 100644 --- a/pkg/server/pop3/listener.go +++ b/pkg/server/pop3/listener.go @@ -13,47 +13,53 @@ import ( // Server defines an instance of the POP3 server. type Server struct { - config config.POP3 // POP3 configuration. - store storage.Store // Mail store. - listener net.Listener // TCP listener. - globalShutdown chan bool // Inbucket shutdown signal. - wg *sync.WaitGroup // Waitgroup tracking sessions. + config config.POP3 // POP3 configuration. + store storage.Store // Mail store. + listener net.Listener // TCP listener. + wg *sync.WaitGroup // Waitgroup tracking sessions. + notify chan error // Notify on fatal error. } -// New creates a new Server struct. -func New(pop3Config config.POP3, shutdownChan chan bool, store storage.Store) *Server { +// NewServer creates a new, unstarted, POP3 server. +func NewServer(pop3Config config.POP3, store storage.Store) *Server { return &Server{ - config: pop3Config, - store: store, - globalShutdown: shutdownChan, - wg: new(sync.WaitGroup), + config: pop3Config, + store: store, + wg: new(sync.WaitGroup), + notify: make(chan error, 1), } } // Start the server and listen for connections -func (s *Server) Start(ctx context.Context) { +func (s *Server) Start(ctx context.Context, readyFunc func()) { slog := log.With().Str("module", "pop3").Str("phase", "startup").Logger() addr, err := net.ResolveTCPAddr("tcp4", s.config.Addr) if err != nil { slog.Error().Err(err).Msg("Failed to build tcp4 address") - s.emergencyShutdown() + s.notify <- err + close(s.notify) return } slog.Info().Str("addr", addr.String()).Msg("POP3 listening on tcp4") s.listener, err = net.ListenTCP("tcp4", addr) if err != nil { slog.Error().Err(err).Msg("Failed to start tcp4 listener") - s.emergencyShutdown() + s.notify <- err + close(s.notify) return } - // Listener go routine. + + // Start listener go routine. go s.serve(ctx) + readyFunc() + // Wait for shutdown. select { case _ = <-ctx.Done(): } slog = log.With().Str("module", "pop3").Str("phase", "shutdown").Logger() slog.Debug().Msg("POP3 shutdown requested, connections will be drained") + // Closing the listener will cause the serve() go routine to exit. if err := s.listener.Close(); err != nil { slog.Error().Err(err).Msg("Failed to close POP3 listener") @@ -88,7 +94,8 @@ func (s *Server) serve(ctx context.Context) { return default: // Something went wrong. - s.emergencyShutdown() + s.notify <- err + close(s.notify) return } } @@ -100,18 +107,14 @@ func (s *Server) serve(ctx context.Context) { } } -func (s *Server) emergencyShutdown() { - // Shutdown Inbucket - select { - case _ = <-s.globalShutdown: - default: - close(s.globalShutdown) - } -} - // Drain causes the caller to block until all active POP3 sessions have finished func (s *Server) Drain() { // Wait for sessions to close s.wg.Wait() log.Debug().Str("module", "pop3").Str("phase", "shutdown").Msg("POP3 connections have drained") } + +// Notify allows the running POP3 server to be monitored for a fatal error. +func (s *Server) Notify() <-chan error { + return s.notify +} diff --git a/pkg/server/smtp/handler_test.go b/pkg/server/smtp/handler_test.go index 6cf2cb6..f07e824 100644 --- a/pkg/server/smtp/handler_test.go +++ b/pkg/server/smtp/handler_test.go @@ -484,13 +484,11 @@ func setupSMTPServer(ds storage.Store) (s *Server, buf *bytes.Buffer, teardown f buf = new(bytes.Buffer) log.SetOutput(buf) // Create a server, don't start it. - shutdownChan := make(chan bool) - teardown = func() { - close(shutdownChan) - } + // TODO Remove teardown. + teardown = func() {} addrPolicy := &policy.Addressing{Config: cfg} manager := &message.StoreManager{Store: ds} - s = NewServer(cfg.SMTP, shutdownChan, manager, addrPolicy) + s = NewServer(cfg.SMTP, manager, addrPolicy) return s, buf, teardown } diff --git a/pkg/server/smtp/listener.go b/pkg/server/smtp/listener.go index 9ef8c4e..5a281f7 100644 --- a/pkg/server/smtp/listener.go +++ b/pkg/server/smtp/listener.go @@ -58,19 +58,18 @@ func init() { // Server holds the configuration and state of our SMTP server. type Server struct { - config config.SMTP // SMTP configuration. - addrPolicy *policy.Addressing // Address policy. - globalShutdown chan bool // Shuts down Inbucket. - manager message.Manager // Used to deliver messages. - listener net.Listener // Incoming network connections. - wg *sync.WaitGroup // Waitgroup tracks individual sessions. - tlsConfig *tls.Config + config config.SMTP // SMTP configuration. + addrPolicy *policy.Addressing // Address policy. + manager message.Manager // Used to deliver messages. + listener net.Listener // Incoming network connections. + wg *sync.WaitGroup // Waitgroup tracks individual sessions. + tlsConfig *tls.Config // TLS encryption configuration. + notify chan error // Notify on fatal error. } -// NewServer creates a new Server instance with the specificed config. +// NewServer creates a new, unstarted, SMTP server instance with the specificed config. func NewServer( smtpConfig config.SMTP, - globalShutdown chan bool, manager message.Manager, apolicy *policy.Addressing, ) *Server { @@ -90,37 +89,43 @@ func NewServer( } return &Server{ - config: smtpConfig, - globalShutdown: globalShutdown, - manager: manager, - addrPolicy: apolicy, - wg: new(sync.WaitGroup), - tlsConfig: tlsConfig, + config: smtpConfig, + manager: manager, + addrPolicy: apolicy, + wg: new(sync.WaitGroup), + tlsConfig: tlsConfig, + notify: make(chan error, 1), } } // Start the listener and handle incoming connections. -func (s *Server) Start(ctx context.Context) { +func (s *Server) Start(ctx context.Context, readyFunc func()) { slog := log.With().Str("module", "smtp").Str("phase", "startup").Logger() addr, err := net.ResolveTCPAddr("tcp4", s.config.Addr) if err != nil { slog.Error().Err(err).Msg("Failed to build tcp4 address") - s.emergencyShutdown() + s.notify <- err + close(s.notify) return } slog.Info().Str("addr", addr.String()).Msg("SMTP listening on tcp4") s.listener, err = net.ListenTCP("tcp4", addr) if err != nil { slog.Error().Err(err).Msg("Failed to start tcp4 listener") - s.emergencyShutdown() + s.notify <- err + close(s.notify) return } - // Listener go routine. + + // Start listener go routine. go s.serve(ctx) + readyFunc() + // Wait for shutdown. <-ctx.Done() slog = log.With().Str("module", "smtp").Str("phase", "shutdown").Logger() slog.Debug().Msg("SMTP shutdown requested, connections will be drained") + // Closing the listener will cause the serve() go routine to exit. if err := s.listener.Close(); err != nil { slog.Error().Err(err).Msg("Failed to close SMTP listener") @@ -156,7 +161,8 @@ func (s *Server) serve(ctx context.Context) { return default: // Something went wrong. - s.emergencyShutdown() + s.notify <- err + close(s.notify) return } } @@ -169,18 +175,14 @@ func (s *Server) serve(ctx context.Context) { } } -func (s *Server) emergencyShutdown() { - // Shutdown Inbucket. - select { - case <-s.globalShutdown: - default: - close(s.globalShutdown) - } -} - // Drain causes the caller to block until all active SMTP sessions have finished func (s *Server) Drain() { // Wait for sessions to close. s.wg.Wait() log.Debug().Str("module", "smtp").Str("phase", "shutdown").Msg("SMTP connections have drained") } + +// Notify allows the running SMTP server to be monitored for a fatal error. +func (s *Server) Notify() <-chan error { + return s.notify +} diff --git a/pkg/server/web/server.go b/pkg/server/web/server.go index b4ce078..9e0e4f4 100644 --- a/pkg/server/web/server.go +++ b/pkg/server/web/server.go @@ -31,10 +31,9 @@ var ( // incoming requests to the correct handler function Router = mux.NewRouter() - rootConfig *config.Root - server *http.Server - listener net.Listener - globalShutdown chan bool + rootConfig *config.Root + server *http.Server + listener net.Listener // ExpWebSocketConnectsCurrent tracks the number of open WebSockets ExpWebSocketConnectsCurrent = new(expvar.Int) @@ -45,15 +44,19 @@ func init() { m.Set("WebSocketConnectsCurrent", ExpWebSocketConnectsCurrent) } -// Initialize sets up things for unit tests or the Start() method. -func Initialize( +// Server defines an instance of the Web server. +type Server struct { + // TODO Migrate global vars here. + notify chan error // Notify on fatal error. +} + +// NewServer sets up things for unit tests or the Start() method. +func NewServer( conf *config.Root, - shutdownChan chan bool, mm message.Manager, - mh *msghub.Hub) { + mh *msghub.Hub) *Server { rootConfig = conf - globalShutdown = shutdownChan // NewContext() will use this DataStore for the web handlers. msgHub = mh @@ -118,10 +121,16 @@ func Initialize( http.StatusNotFound, "No route matches URI path") Router.MethodNotAllowedHandler = noMatchHandler( http.StatusMethodNotAllowed, "Method not allowed for URI path") + + s := &Server{ + notify: make(chan error, 1), + } + + return s } // Start begins listening for HTTP requests -func Start(ctx context.Context) { +func (s *Server) Start(ctx context.Context, readyFunc func()) { server = &http.Server{ Addr: rootConfig.Web.Addr, Handler: requestLoggingWrapper(Router), @@ -137,12 +146,14 @@ func Start(ctx context.Context) { if err != nil { log.Error().Str("module", "web").Str("phase", "startup").Err(err). Msg("HTTP failed to start TCP4 listener") - emergencyShutdown() + s.notify <- err + close(s.notify) return } - // Listener go routine - go serve(ctx) + // Start listener go routine + go s.serve(ctx) + readyFunc() // Wait for shutdown select { @@ -176,7 +187,7 @@ func appConfigCookie(webConfig config.Web) *http.Cookie { } // serve begins serving HTTP requests -func serve(ctx context.Context) { +func (s *Server) serve(ctx context.Context) { // server.Serve blocks until we close the listener err := server.Serve(listener) @@ -186,16 +197,13 @@ func serve(ctx context.Context) { default: log.Error().Str("module", "web").Str("phase", "startup").Err(err). Msg("HTTP server failed") - emergencyShutdown() + s.notify <- err + close(s.notify) return } } -func emergencyShutdown() { - // Shutdown Inbucket - select { - case _ = <-globalShutdown: - default: - close(globalShutdown) - } +// Notify allows the running Web server to be monitored for a fatal error. +func (s *Server) Notify() <-chan error { + return s.notify } diff --git a/pkg/storage/retention.go b/pkg/storage/retention.go index 86839c1..70ed315 100644 --- a/pkg/storage/retention.go +++ b/pkg/storage/retention.go @@ -2,6 +2,7 @@ package storage import ( "container/list" + "context" "expvar" "time" @@ -50,7 +51,6 @@ func init() { // RetentionScanner looks for messages older than the configured retention period and deletes them. type RetentionScanner struct { - globalShutdown chan bool // Closes when Inbucket needs to shut down retentionShutdown chan bool // Closed after the scanner has shut down ds Store retentionPeriod time.Duration @@ -61,10 +61,8 @@ type RetentionScanner struct { func NewRetentionScanner( cfg config.Storage, ds Store, - shutdownChannel chan bool, ) *RetentionScanner { rs := &RetentionScanner{ - globalShutdown: shutdownChannel, retentionShutdown: make(chan bool), ds: ds, retentionPeriod: cfg.RetentionPeriod, @@ -76,20 +74,16 @@ func NewRetentionScanner( } // Start up the retention scanner if retention period > 0 -func (rs *RetentionScanner) Start() { +func (rs *RetentionScanner) Start(ctx context.Context) { + slog := log.With().Str("module", "storage").Logger() + if rs.retentionPeriod <= 0 { - log.Info().Str("phase", "startup").Str("module", "storage").Msg("Retention scanner disabled") + slog.Info().Str("phase", "startup").Msg("Retention scanner disabled") close(rs.retentionShutdown) return } - log.Info().Str("phase", "startup").Str("module", "storage"). - Msgf("Retention configured for %v", rs.retentionPeriod) - go rs.run() -} + slog.Info().Str("phase", "startup").Msgf("Retention configured for %v", rs.retentionPeriod) -// run loops to kick off the scanner on the correct schedule -func (rs *RetentionScanner) run() { - slog := log.With().Str("module", "storage").Logger() start := time.Now() retentionLoop: for { @@ -99,19 +93,19 @@ retentionLoop: dur := time.Minute - since slog.Debug().Msgf("Retention scanner sleeping for %v", dur) select { - case <-rs.globalShutdown: + case <-ctx.Done(): break retentionLoop case <-time.After(dur): } } // Kickoff scan start = time.Now() - if err := rs.DoScan(); err != nil { + if err := rs.DoScan(ctx); err != nil { slog.Error().Err(err).Msg("Error during retention scan") } // Check for global shutdown select { - case <-rs.globalShutdown: + case <-ctx.Done(): break retentionLoop default: } @@ -121,13 +115,14 @@ retentionLoop: } // DoScan does a single pass of all mailboxes looking for messages that can be purged. -func (rs *RetentionScanner) DoScan() error { +func (rs *RetentionScanner) DoScan(ctx context.Context) error { slog := log.With().Str("module", "storage").Logger() slog.Debug().Msg("Starting retention scan") cutoff := time.Now().Add(-1 * rs.retentionPeriod) + + // Loop over all mailboxes. retained := 0 storeSize := int64(0) - // Loop over all mailboxes. err := rs.ds.VisitMailboxes(func(messages []Message) bool { for _, msg := range messages { if msg.Date().Before(cutoff) { @@ -145,7 +140,7 @@ func (rs *RetentionScanner) DoScan() error { } } select { - case <-rs.globalShutdown: + case <-ctx.Done(): slog.Debug().Str("phase", "shutdown").Msg("Retention scan aborted due to shutdown") return false case <-time.After(rs.retentionSleep): @@ -156,10 +151,12 @@ func (rs *RetentionScanner) DoScan() error { if err != nil { return err } + // Update metrics scanCompletedMillis.Set(time.Now().UnixNano() / 1000000) expRetainedCurrent.Set(int64(retained)) expRetainedSize.Set(storeSize) + return nil } diff --git a/pkg/storage/retention_test.go b/pkg/storage/retention_test.go index 52c9934..b9a6e57 100644 --- a/pkg/storage/retention_test.go +++ b/pkg/storage/retention_test.go @@ -1,6 +1,7 @@ package storage_test import ( + "context" "fmt" "testing" "time" @@ -13,6 +14,7 @@ import ( func TestDoRetentionScan(t *testing.T) { ds := test.NewStore() + // Mockup some different aged messages (num is in hours) new1 := stubMessage("mb1", 0) new2 := stubMessage("mb2", 1) @@ -26,22 +28,26 @@ func TestDoRetentionScan(t *testing.T) { ds.AddMessage(old3) ds.AddMessage(new2) ds.AddMessage(new3) + // Test 4 hour retention cfg := config.Storage{ RetentionPeriod: 239 * time.Minute, RetentionSleep: 0, } - shutdownChan := make(chan bool) - rs := storage.NewRetentionScanner(cfg, ds, shutdownChan) - if err := rs.DoScan(); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rs := storage.NewRetentionScanner(cfg, ds) + if err := rs.DoScan(ctx); err != nil { t.Error(err) } + // Delete should not have been called on new messages for _, m := range []storage.Message{new1, new2, new3} { if ds.MessageDeleted(m) { t.Errorf("Expected %v to be present, was deleted", m.ID()) } } + // Delete should have been called once on old messages for _, m := range []storage.Message{old1, old2, old3} { if !ds.MessageDeleted(m) { diff --git a/pkg/test/integration_test.go b/pkg/test/integration_test.go index 704e493..14cf476 100644 --- a/pkg/test/integration_test.go +++ b/pkg/test/integration_test.go @@ -214,7 +214,7 @@ func formatMessage(m *client.Message) []byte { } func startServer() (func(), error) { - // TODO Refactor inbucket/main.go so we don't need to repeat all this here. + // TODO Move integration setup into lifecycle. log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, NoColor: true}) storage.Constructors["memory"] = mem.New os.Clearenv() @@ -222,32 +222,34 @@ func startServer() (func(), error) { if err != nil { return nil, err } - rootCtx, rootCancel := context.WithCancel(context.Background()) - shutdownChan := make(chan bool) + svcCtx, svcCancel := context.WithCancel(context.Background()) store, err := storage.FromConfig(conf.Storage) if err != nil { - rootCancel() + svcCancel() return nil, err } - msgHub := msghub.New(rootCtx, conf.Web.MonitorHistory) + + // TODO Test should not pass with unstarted msghub. + msgHub := msghub.New(conf.Web.MonitorHistory) addrPolicy := &policy.Addressing{Config: conf} mmanager := &message.StoreManager{AddrPolicy: addrPolicy, Store: store, Hub: msgHub} + // Start HTTP server. webui.SetupRoutes(web.Router.PathPrefix("/serve/").Subrouter()) rest.SetupRoutes(web.Router.PathPrefix("/api/").Subrouter()) - web.Initialize(conf, shutdownChan, mmanager, msgHub) - go web.Start(rootCtx) - // Start SMTP server. - smtpServer := smtp.NewServer(conf.SMTP, shutdownChan, mmanager, addrPolicy) - go smtpServer.Start(rootCtx) + webServer := web.NewServer(conf, mmanager, msgHub) + go webServer.Start(svcCtx, func() {}) - // TODO Implmement an elegant way to determine server readiness. + // Start SMTP server. + smtpServer := smtp.NewServer(conf.SMTP, mmanager, addrPolicy) + go smtpServer.Start(svcCtx, func() {}) + + // TODO Use a readyFunc to determine server readiness. time.Sleep(500 * time.Millisecond) return func() { // Shut everything down. - close(shutdownChan) - rootCancel() + svcCancel() smtpServer.Drain() }, nil }