1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-22 03:57:02 +00:00

Merge branch 'feature/zerolog' into develop, closes #90

This commit is contained in:
James Hillyerd
2018-03-31 16:17:04 -07:00
26 changed files with 693 additions and 858 deletions

View File

@@ -9,7 +9,7 @@ before_script:
- make deps
go:
- "1.10"
- "1.10.x"
deploy:
provider: script

View File

@@ -26,6 +26,13 @@ This project adheres to [Semantic Versioning](http://semver.org/).
- Uses the same default ports as other builds; smtp:2500 http:9000 pop3:1100
- Uses volume `/config` for `greeting.html`
- Uses volume `/storage` for mail storage
- Log output is now structured, and will be output as JSON with the `-logjson`
flag; which is enabled by default for the Docker container.
- SMTP and POP3 network tracing is no longer logged regardless of level, but can
be sent to stdout via `-netdebug` flag.
### Removed
- Support for SIGHUP and log file rotation.
## [v1.3.1] - 2018-03-10

View File

@@ -25,7 +25,8 @@ VOLUME /config
VOLUME /storage
WORKDIR $INBUCKET_HOME
ENTRYPOINT "/start-inbucket.sh"
ENTRYPOINT ["/start-inbucket.sh"]
CMD ["-logjson"]
# Build Inbucket
COPY . $INBUCKET_SRC/

View File

@@ -15,6 +15,7 @@ $(commands): %: cmd/%
clean:
go clean $(PKGS)
rm -f $(commands)
rm -rf dist
deps:
go get -t ./...

View File

@@ -2,18 +2,20 @@
package main
import (
"bufio"
"context"
"expvar"
"flag"
"fmt"
"io"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
"github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/message"
"github.com/jhillyerd/inbucket/pkg/msghub"
"github.com/jhillyerd/inbucket/pkg/policy"
@@ -25,6 +27,8 @@ import (
"github.com/jhillyerd/inbucket/pkg/storage/file"
"github.com/jhillyerd/inbucket/pkg/storage/mem"
"github.com/jhillyerd/inbucket/pkg/webui"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
var (
@@ -57,6 +61,8 @@ func main() {
help := flag.Bool("help", false, "Displays help on flags and env variables.")
pidfile := flag.String("pidfile", "", "Write our PID into the specified file.")
logfile := flag.String("logfile", "stderr", "Write out log into the specified file.")
logjson := flag.Bool("logjson", false, "Logs are written in JSON format.")
netdebug := flag.Bool("netdebug", false, "Dump SMTP & POP3 network traffic to stdout.")
flag.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage: inbucket [options]")
flag.PrintDefaults()
@@ -76,27 +82,32 @@ func main() {
fmt.Fprintf(os.Stderr, "Configuration error: %v\n", err)
os.Exit(1)
}
// Setup signal handler.
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
// Initialize logging.
log.SetLogLevel(conf.LogLevel)
if err := log.Initialize(*logfile); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
if *netdebug {
conf.POP3.Debug = true
conf.SMTP.Debug = true
}
// Logger setup.
closeLog, err := openLog(conf.LogLevel, *logfile, *logjson)
if err != nil {
fmt.Fprintf(os.Stderr, "Log error: %v\n", err)
os.Exit(1)
}
defer log.Close()
log.Infof("Inbucket %v (%v) starting...", config.Version, config.BuildDate)
startupLog := log.With().Str("phase", "startup").Logger()
// Setup signal handler.
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
// Initialize logging.
startupLog.Info().Str("version", config.Version).Str("buildDate", config.BuildDate).
Msg("Inbucket starting")
// Write pidfile if requested.
if *pidfile != "" {
pidf, err := os.Create(*pidfile)
if err != nil {
log.Errorf("Failed to create %q: %v", *pidfile, err)
os.Exit(1)
startupLog.Fatal().Err(err).Str("path", *pidfile).Msg("Failed to create pidfile")
}
fmt.Fprintf(pidf, "%v\n", os.Getpid())
if err := pidf.Close(); err != nil {
log.Errorf("Failed to close PID file %q: %v", *pidfile, err)
startupLog.Fatal().Err(err).Str("path", *pidfile).Msg("Failed to close pidfile")
}
}
// Configure internal services.
@@ -104,9 +115,8 @@ func main() {
shutdownChan := make(chan bool)
store, err := storage.FromConfig(conf.Storage)
if err != nil {
log.Errorf("Fatal storage error: %v", err)
removePIDFile(*pidfile)
os.Exit(1)
startupLog.Fatal().Err(err).Str("module", "storage").Msg("Fatal storage error")
}
msgHub := msghub.New(rootCtx, conf.Web.MonitorHistory)
addrPolicy := &policy.Addressing{Config: conf.SMTP}
@@ -131,16 +141,15 @@ signalLoop:
select {
case sig := <-sigChan:
switch sig {
case syscall.SIGHUP:
log.Infof("Recieved SIGHUP, cycling logfile")
log.Rotate()
case syscall.SIGINT:
// Shutdown requested
log.Infof("Received SIGINT, shutting down")
log.Info().Str("phase", "shutdown").Str("signal", "SIGINT").
Msg("Received SIGINT, shutting down")
close(shutdownChan)
case syscall.SIGTERM:
// Shutdown requested
log.Infof("Received SIGTERM, shutting down")
log.Info().Str("phase", "shutdown").Str("signal", "SIGTERM").
Msg("Received SIGTERM, shutting down")
close(shutdownChan)
}
case <-shutdownChan:
@@ -154,13 +163,62 @@ signalLoop:
pop3Server.Drain()
retentionScanner.Join()
removePIDFile(*pidfile)
closeLog()
}
// openLog configures zerolog output, returns func to close logfile.
func openLog(level string, logfile string, json bool) (close func(), err error) {
switch strings.ToUpper(level) {
case "DEBUG":
zerolog.SetGlobalLevel(zerolog.DebugLevel)
case "INFO":
zerolog.SetGlobalLevel(zerolog.InfoLevel)
case "WARN":
zerolog.SetGlobalLevel(zerolog.WarnLevel)
case "ERROR":
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
default:
return nil, fmt.Errorf("Log level %q not one of: DEBUG, INFO, WARN, ERROR", level)
}
close = func() {}
var w io.Writer
color := true
switch logfile {
case "stderr":
w = os.Stderr
case "stdout":
w = os.Stdout
default:
logf, err := os.OpenFile(logfile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return nil, err
}
bw := bufio.NewWriter(logf)
w = bw
color = false
close = func() {
_ = bw.Flush()
_ = logf.Close()
}
}
w = zerolog.SyncWriter(w)
if json {
log.Logger = log.Output(w)
return close, nil
}
log.Logger = log.Output(zerolog.ConsoleWriter{
Out: w,
NoColor: !color,
})
return close, nil
}
// removePIDFile removes the PID file if created.
func removePIDFile(pidfile string) {
if pidfile != "" {
if err := os.Remove(pidfile); err != nil {
log.Errorf("Failed to remove %q: %v", pidfile, err)
log.Error().Str("phase", "shutdown").Err(err).Str("path", pidfile).
Msg("Failed to remove pidfile")
}
}
}
@@ -168,7 +226,7 @@ func removePIDFile(pidfile string) {
// timedExit is called as a goroutine during shutdown, it will force an exit after 15 seconds.
func timedExit(pidfile string) {
time.Sleep(15 * time.Second)
log.Errorf("Clean shutdown took too long, forcing exit")
removePIDFile(pidfile)
log.Error().Str("phase", "shutdown").Msg("Clean shutdown took too long, forcing exit")
os.Exit(0)
}

View File

@@ -8,7 +8,7 @@ Running `inbucket -help` will yield a condensed summary of the environment
variables it supports:
KEY DEFAULT DESCRIPTION
INBUCKET_LOGLEVEL INFO TRACE, INFO, WARN, or ERROR
INBUCKET_LOGLEVEL INFO DEBUG, INFO, WARN, or ERROR
INBUCKET_SMTP_ADDR 0.0.0.0:2500 SMTP server IP4 host:port
INBUCKET_SMTP_DOMAIN inbucket HELO domain
INBUCKET_SMTP_DOMAINNOSTORE Load testing domain
@@ -47,7 +47,7 @@ should probably select INFO, but a busy shared installation would be better off
with WARN or ERROR.
- Default: `INFO`
- Values: one of `TRACE`, `INFO`, `WARN`, or `ERROR`
- Values: one of `DEBUG`, `INFO`, `WARN`, or `ERROR`
## SMTP

View File

@@ -29,7 +29,7 @@ var (
// Root wraps all other configurations.
type Root struct {
LogLevel string `required:"true" default:"INFO" desc:"TRACE, INFO, WARN, or ERROR"`
LogLevel string `required:"true" default:"INFO" desc:"DEBUG, INFO, WARN, or ERROR"`
SMTP SMTP
POP3 POP3
Web Web
@@ -45,6 +45,7 @@ type SMTP struct {
MaxMessageBytes int `required:"true" default:"10240000" desc:"Maximum message size"`
StoreMessages bool `required:"true" default:"true" desc:"Store incoming mail?"`
Timeout time.Duration `required:"true" default:"300s" desc:"Idle network timeout"`
Debug bool `ignored:"true"`
}
// POP3 contains the POP3 server configuration.
@@ -52,6 +53,7 @@ type POP3 struct {
Addr string `required:"true" default:"0.0.0.0:1100" desc:"POP3 server IP4 host:port"`
Domain string `required:"true" default:"inbucket" desc:"HELLO domain"`
Timeout time.Duration `required:"true" default:"600s" desc:"Idle network timeout"`
Debug bool `ignored:"true"`
}
// Web contains the HTTP server configuration.

View File

@@ -1,162 +0,0 @@
package log
import (
"fmt"
golog "log"
"os"
"strings"
"sync"
)
// Level is used to indicate the severity of a log entry
type Level int
const (
// ERROR indicates a significant problem was encountered
ERROR Level = iota
// WARN indicates something that may be a problem
WARN
// INFO indicates a purely informational log entry
INFO
// TRACE entries are meant for development purposes only
TRACE
)
var (
// MaxLevel is the highest Level we will log (max TRACE, min ERROR)
MaxLevel = TRACE
// logfname is the name of the logfile
logfname string
// logf is the file we send log output to, will be nil for stderr or stdout
logf *os.File
mu sync.RWMutex
)
// Initialize logging. If logfile is equal to "stderr" or "stdout", then
// we will log to that output stream. Otherwise the specificed file will
// opened for writing, and all log data will be placed in it.
func Initialize(logfile string) error {
mu.Lock()
defer mu.Unlock()
if logfile != "stderr" {
// stderr is the go logging default
if logfile == "stdout" {
// set to stdout
golog.SetOutput(os.Stdout)
} else {
logfname = logfile
if err := openLogFile(); err != nil {
return err
}
// Platform specific
closeStdin()
}
}
return nil
}
// SetLogLevel sets MaxLevel based on the provided string
func SetLogLevel(level string) (ok bool) {
mu.Lock()
defer mu.Unlock()
switch strings.ToUpper(level) {
case "ERROR":
MaxLevel = ERROR
case "WARN":
MaxLevel = WARN
case "INFO":
MaxLevel = INFO
case "TRACE":
MaxLevel = TRACE
default:
golog.Print("Error, unknown log level requested: " + level)
return false
}
return true
}
// Errorf logs a message to the 'standard' Logger (always), accepts format strings
func Errorf(msg string, args ...interface{}) {
mu.RLock()
defer mu.RUnlock()
msg = "[ERROR] " + msg
golog.Printf(msg, args...)
}
// Warnf logs a message to the 'standard' Logger if MaxLevel is >= WARN, accepts format strings
func Warnf(msg string, args ...interface{}) {
mu.RLock()
defer mu.RUnlock()
if MaxLevel >= WARN {
msg = "[WARN ] " + msg
golog.Printf(msg, args...)
}
}
// Infof logs a message to the 'standard' Logger if MaxLevel is >= INFO, accepts format strings
func Infof(msg string, args ...interface{}) {
mu.RLock()
defer mu.RUnlock()
if MaxLevel >= INFO {
msg = "[INFO ] " + msg
golog.Printf(msg, args...)
}
}
// Tracef logs a message to the 'standard' Logger if MaxLevel is >= TRACE, accepts format strings
func Tracef(msg string, args ...interface{}) {
mu.RLock()
defer mu.RUnlock()
if MaxLevel >= TRACE {
msg = "[TRACE] " + msg
golog.Printf(msg, args...)
}
}
// Rotate closes the current log file, then reopens it. This gives an external
// log rotation system the opportunity to move the existing log file out of the
// way and have Inbucket create a new one.
func Rotate() {
mu.Lock()
defer mu.Unlock()
// Rotate logs if configured
if logf != nil {
closeLogFile()
// There is nothing we can do if the log open fails
_ = openLogFile()
} else {
Infof("Ignoring SIGHUP, logfile not configured")
}
}
// Close the log file if we have one open
func Close() {
mu.Lock()
defer mu.Unlock()
if logf != nil {
closeLogFile()
}
}
// openLogFile creates or appends to the logfile passed on commandline
func openLogFile() error {
// use specified log file
var err error
logf, err = os.OpenFile(logfname, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return fmt.Errorf("failed to create %v: %v", logfname, err)
}
golog.SetOutput(logf)
// Platform specific
reassignStdout()
return nil
}
// closeLogFile closes the current logfile
func closeLogFile() {
// We are never in a situation where we can do anything about failing to close
_ = logf.Close()
}

View File

@@ -1,32 +0,0 @@
// +build !windows
package log
import (
"log"
"os"
"golang.org/x/sys/unix"
)
// closeStdin will close stdin on Unix platforms - this is standard practice
// for daemons
func closeStdin() {
if err := os.Stdin.Close(); err != nil {
// Not a fatal error
log.Printf("Failed to close os.Stdin during log setup")
}
}
// reassignStdout points stdout/stderr to our logfile on systems that support
// the Dup2 syscall per https://github.com/golang/go/issues/325
func reassignStdout() {
if err := unix.Dup2(int(logf.Fd()), 1); err != nil {
// Not considered fatal
log.Printf("Failed to re-assign stdout to logfile: %v", err)
}
if err := unix.Dup2(int(logf.Fd()), 2); err != nil {
// Not considered fatal
log.Printf("Failed to re-assign stderr to logfile: %v", err)
}
}

View File

@@ -1,37 +0,0 @@
// +build windows
package log
import (
"log"
"os"
)
var stdOutsClosed = false
// closeStdin does nothing on Windows, it would always fail
func closeStdin() {
// Nop
}
// reassignStdout points stdout/stderr to our logfile on systems that do not
// support the Dup2 syscall
func reassignStdout() {
if !stdOutsClosed {
// Close std* streams to prevent accidental output, they will be redirected to
// our logfile below
// Warning: this will hide panic() output, sorry Windows users
if err := os.Stderr.Close(); err != nil {
// Not considered fatal
log.Printf("Failed to close os.Stderr during log setup")
}
if err := os.Stdin.Close(); err != nil {
// Not considered fatal
log.Printf("Failed to close os.Stdin during log setup")
}
os.Stdout = logf
os.Stderr = logf
stdOutsClosed = true
}
}

View File

@@ -1,4 +1,4 @@
package log
package metric
import (
"container/list"
@@ -7,7 +7,7 @@ import (
"time"
)
// TickerFunc is the type of metrics function accepted by AddTickerFunc
// TickerFunc is the function signature accepted by AddTickerFunc, will be called once per minute.
type TickerFunc func()
var tickerFuncChan = make(chan TickerFunc)
@@ -22,10 +22,10 @@ func AddTickerFunc(f TickerFunc) {
tickerFuncChan <- f
}
// PushMetric adds the metric to the end of the list and returns a comma separated string of the
// Push adds the metric to the end of the list and returns a comma separated string of the
// previous 61 entries. We return 61 instead of 60 (an hour) because the chart on the client
// tracks deltas between these values - there is nothing to compare the first value against.
func PushMetric(history *list.List, ev expvar.Var) string {
func Push(history *list.List, ev expvar.Var) string {
history.PushBack(ev.String())
if history.Len() > 61 {
history.Remove(history.Front())
@@ -33,18 +33,7 @@ func PushMetric(history *list.List, ev expvar.Var) string {
return joinStringList(history)
}
// joinStringList joins a List containing strings by commas
func joinStringList(listOfStrings *list.List) string {
if listOfStrings.Len() == 0 {
return ""
}
s := make([]string, 0, listOfStrings.Len())
for e := listOfStrings.Front(); e != nil; e = e.Next() {
s = append(s, e.Value.(string))
}
return strings.Join(s, ",")
}
// metricsTicker calls the current list of TickerFuncs once per minute.
func metricsTicker() {
funcs := make([]TickerFunc, 0)
ticker := time.NewTicker(time.Minute)
@@ -60,3 +49,15 @@ func metricsTicker() {
}
}
}
// joinStringList joins a List containing strings by commas.
func joinStringList(listOfStrings *list.List) string {
if listOfStrings.Len() == 0 {
return ""
}
s := make([]string, 0, listOfStrings.Len())
for e := listOfStrings.Front(); e != nil; e = e.Next() {
s = append(s, e.Value.(string))
}
return strings.Join(s, ",")
}

View File

@@ -9,7 +9,6 @@ import (
"encoding/hex"
"strconv"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/rest/model"
"github.com/jhillyerd/inbucket/pkg/server/web"
"github.com/jhillyerd/inbucket/pkg/storage"
@@ -28,8 +27,6 @@ func MailboxListV1(w http.ResponseWriter, req *http.Request, ctx *web.Context) (
// This doesn't indicate empty, likely an IO error
return fmt.Errorf("Failed to get messages for %v: %v", name, err)
}
log.Tracef("Got %v messsages", len(messages))
jmessages := make([]*model.JSONMessageHeaderV1, len(messages))
for i, msg := range messages {
jmessages[i] = &model.JSONMessageHeaderV1{
@@ -62,7 +59,6 @@ func MailboxShowV1(w http.ResponseWriter, req *http.Request, ctx *web.Context) (
// This doesn't indicate empty, likely an IO error
return fmt.Errorf("GetMessage(%q) failed: %v", id, err)
}
attachParts := msg.Attachments()
attachments := make([]*model.JSONMessageAttachmentV1, len(attachParts))
for i, part := range attachParts {
@@ -78,7 +74,6 @@ func MailboxShowV1(w http.ResponseWriter, req *http.Request, ctx *web.Context) (
MD5: hex.EncodeToString(checksum[:]),
}
}
return web.RenderJSON(w,
&model.JSONMessageV1{
Mailbox: name,
@@ -109,8 +104,6 @@ func MailboxPurgeV1(w http.ResponseWriter, req *http.Request, ctx *web.Context)
if err != nil {
return fmt.Errorf("Mailbox(%q) purge failed: %v", name, err)
}
log.Tracef("HTTP purged mailbox for %q", name)
return web.RenderJSON(w, "OK")
}
@@ -122,7 +115,6 @@ func MailboxSourceV1(w http.ResponseWriter, req *http.Request, ctx *web.Context)
if err != nil {
return err
}
r, err := ctx.Manager.SourceReader(name, id)
if err == storage.ErrNotExist {
http.NotFound(w, req)
@@ -155,6 +147,5 @@ func MailboxDeleteV1(w http.ResponseWriter, req *http.Request, ctx *web.Context)
// This doesn't indicate missing, likely an IO error
return fmt.Errorf("RemoveMessage(%q) failed: %v", id, err)
}
return web.RenderJSON(w, "OK")
}

View File

@@ -5,10 +5,10 @@ import (
"time"
"github.com/gorilla/websocket"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/msghub"
"github.com/jhillyerd/inbucket/pkg/rest/model"
"github.com/jhillyerd/inbucket/pkg/server/web"
"github.com/rs/zerolog/log"
)
const (
@@ -62,11 +62,13 @@ func (ml *msgListener) Receive(msg msghub.Message) error {
// WSReader makes sure the websocket client is still connected, discards any messages from client
func (ml *msgListener) WSReader(conn *websocket.Conn) {
slog := log.With().Str("module", "rest").Str("proto", "WebSocket").
Str("remote", conn.RemoteAddr().String()).Logger()
defer ml.Close()
conn.SetReadLimit(maxMessageSize)
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
log.Tracef("HTTP[%v] Got WebSocket pong", conn.RemoteAddr())
slog.Debug().Msg("Got pong")
conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
@@ -80,9 +82,9 @@ func (ml *msgListener) WSReader(conn *websocket.Conn) {
websocket.CloseNoStatusReceived,
) {
// Unexpected close code
log.Warnf("HTTP[%v] WebSocket error: %v", conn.RemoteAddr(), err)
slog.Warn().Err(err).Msg("Socket error")
} else {
log.Tracef("HTTP[%v] Closing WebSocket", conn.RemoteAddr())
slog.Debug().Msg("Closing socket")
}
break
}
@@ -127,7 +129,8 @@ func (ml *msgListener) WSWriter(conn *websocket.Conn) {
// Write error
return
}
log.Tracef("HTTP[%v] Sent WebSocket ping", conn.RemoteAddr())
log.Debug().Str("module", "rest").Str("proto", "WebSocket").
Str("remote", conn.RemoteAddr().String()).Msg("Sent ping")
}
}
}
@@ -147,7 +150,7 @@ func (ml *msgListener) Close() {
// the client of all messages received.
func MonitorAllMessagesV1(
w http.ResponseWriter, req *http.Request, ctx *web.Context) (err error) {
// Upgrade to Websocket
// Upgrade to Websocket.
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
return err
@@ -157,14 +160,12 @@ func MonitorAllMessagesV1(
_ = conn.Close()
web.ExpWebSocketConnectsCurrent.Add(-1)
}()
log.Tracef("HTTP[%v] Upgraded to websocket", req.RemoteAddr)
// Create, register listener; then interact with conn
log.Debug().Str("module", "rest").Str("proto", "WebSocket").
Str("remote", conn.RemoteAddr().String()).Msg("Upgraded to WebSocket")
// Create, register listener; then interact with conn.
ml := newMsgListener(ctx.MsgHub, "")
go ml.WSWriter(conn)
ml.WSReader(conn)
return nil
}
@@ -176,7 +177,7 @@ func MonitorMailboxMessagesV1(
if err != nil {
return err
}
// Upgrade to Websocket
// Upgrade to Websocket.
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
return err
@@ -186,13 +187,11 @@ func MonitorMailboxMessagesV1(
_ = conn.Close()
web.ExpWebSocketConnectsCurrent.Add(-1)
}()
log.Tracef("HTTP[%v] Upgraded to websocket", req.RemoteAddr)
// Create, register listener; then interact with conn
log.Debug().Str("module", "rest").Str("proto", "WebSocket").
Str("remote", conn.RemoteAddr().String()).Msg("Upgraded to WebSocket")
// Create, register listener; then interact with conn.
ml := newMsgListener(ctx.MsgHub, name)
go ml.WSWriter(conn)
ml.WSReader(conn)
return nil
}

View File

@@ -2,7 +2,6 @@ package pop3
import (
"bufio"
"bytes"
"fmt"
"io"
"net"
@@ -11,8 +10,9 @@ import (
"strings"
"time"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/storage"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
// State tracks the current mode of our POP3 state machine
@@ -57,29 +57,39 @@ var commands = map[string]bool{
// Session defines an active POP3 session
type Session struct {
server *Server // Reference to the server we belong to
id int // Session ID number
conn net.Conn // Our network connection
remoteHost string // IP address of client
sendError error // Used to bail out of read loop on send error
state State // Current session state
reader *bufio.Reader // Buffered reader for our net conn
user string // Mailbox name
messages []storage.Message // Slice of messages in mailbox
retain []bool // Messages to retain upon UPDATE (true=retain)
msgCount int // Number of undeleted messages
server *Server // Reference to the server we belong to.
id int // Session ID number.
conn net.Conn // Our network connection.
remoteHost string // IP address of client.
sendError error // Used to bail out of read loop on send error.
state State // Current session state.
reader *bufio.Reader // Buffered reader for our net conn.
user string // Mailbox name.
messages []storage.Message // Slice of messages in mailbox.
retain []bool // Messages to retain upon UPDATE (true=retain).
msgCount int // Number of undeleted messages.
logger zerolog.Logger // Session specific logger.
debug bool // Print network traffic to stdout.
}
// NewSession creates a new POP3 session
func NewSession(server *Server, id int, conn net.Conn) *Session {
func NewSession(server *Server, id int, conn net.Conn, logger zerolog.Logger) *Session {
reader := bufio.NewReader(conn)
host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
return &Session{server: server, id: id, conn: conn, state: AUTHORIZATION,
reader: reader, remoteHost: host}
return &Session{
server: server,
id: id,
conn: conn,
state: AUTHORIZATION,
reader: reader,
remoteHost: host,
logger: logger,
debug: server.config.Debug,
}
}
func (ses *Session) String() string {
return fmt.Sprintf("Session{id: %v, state: %v}", ses.id, ses.state)
func (s *Session) String() string {
return fmt.Sprintf("Session{id: %v, state: %v}", s.id, s.state)
}
/* Session flow:
@@ -90,33 +100,33 @@ func (ses *Session) String() string {
* 5. Goto 2
*/
func (s *Server) startSession(id int, conn net.Conn) {
log.Infof("POP3 connection from %v, starting session <%v>", conn.RemoteAddr(), id)
//expConnectsCurrent.Add(1)
logger := log.With().Str("module", "pop3").Str("remote", conn.RemoteAddr().String()).
Int("session", id).Logger()
logger.Info().Msg("Starting POP3 session")
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Error closing POP3 connection for <%v>: %v", id, err)
logger.Warn().Err(err).Msg("Closing connection")
}
s.waitgroup.Done()
//expConnectsCurrent.Add(-1)
}()
ses := NewSession(s, id, conn)
ses.send(fmt.Sprintf("+OK Inbucket POP3 server ready <%v.%v@%v>", os.Getpid(),
ssn := NewSession(s, id, conn, logger)
ssn.send(fmt.Sprintf("+OK Inbucket POP3 server ready <%v.%v@%v>", os.Getpid(),
time.Now().Unix(), s.domain))
// This is our command reading loop
for ses.state != QUIT && ses.sendError == nil {
line, err := ses.readLine()
for ssn.state != QUIT && ssn.sendError == nil {
line, err := ssn.readLine()
if err == nil {
if cmd, arg, ok := ses.parseCmd(line); ok {
if cmd, arg, ok := ssn.parseCmd(line); ok {
// Check against valid SMTP commands
if cmd == "" {
ses.send("-ERR Speak up")
ssn.send("-ERR Speak up")
continue
}
if !commands[cmd] {
ses.send(fmt.Sprintf("-ERR Syntax error, %v command unrecognized", cmd))
ses.logWarn("Unrecognized command: %v", cmd)
ssn.send(fmt.Sprintf("-ERR Syntax error, %v command unrecognized", cmd))
ssn.logger.Warn().Msgf("Unrecognized command: %v", cmd)
continue
}
@@ -124,307 +134,307 @@ func (s *Server) startSession(id int, conn net.Conn) {
switch cmd {
case "CAPA":
// List our capabilities per RFC2449
ses.send("+OK Capability list follows")
ses.send("TOP")
ses.send("USER")
ses.send("UIDL")
ses.send("IMPLEMENTATION Inbucket")
ses.send(".")
ssn.send("+OK Capability list follows")
ssn.send("TOP")
ssn.send("USER")
ssn.send("UIDL")
ssn.send("IMPLEMENTATION Inbucket")
ssn.send(".")
continue
}
// Send command to handler for current state
switch ses.state {
switch ssn.state {
case AUTHORIZATION:
ses.authorizationHandler(cmd, arg)
ssn.authorizationHandler(cmd, arg)
continue
case TRANSACTION:
ses.transactionHandler(cmd, arg)
ssn.transactionHandler(cmd, arg)
continue
}
ses.logError("Session entered unexpected state %v", ses.state)
ssn.logger.Error().Msgf("Session entered unexpected state %v", ssn.state)
break
} else {
ses.send("-ERR Syntax error, command garbled")
ssn.send("-ERR Syntax error, command garbled")
}
} else {
// readLine() returned an error
if err == io.EOF {
switch ses.state {
switch ssn.state {
case AUTHORIZATION:
// EOF is common here
ses.logInfo("Client closed connection (state %v)", ses.state)
ssn.logger.Info().Msgf("Client closed connection (state %v)", ssn.state)
default:
ses.logWarn("Got EOF while in state %v", ses.state)
ssn.logger.Warn().Msgf("Got EOF while in state %v", ssn.state)
}
break
}
// not an EOF
ses.logWarn("Connection error: %v", err)
ssn.logger.Warn().Msgf("Connection error: %v", err)
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
ses.send("-ERR Idle timeout, bye bye")
ssn.send("-ERR Idle timeout, bye bye")
break
}
}
ses.send("-ERR Connection error, sorry")
ssn.send("-ERR Connection error, sorry")
break
}
}
if ses.sendError != nil {
ses.logWarn("Network send error: %v", ses.sendError)
if ssn.sendError != nil {
ssn.logger.Warn().Msgf("Network send error: %v", ssn.sendError)
}
ses.logInfo("Closing connection")
ssn.logger.Info().Msgf("Closing connection")
}
// AUTHORIZATION state
func (ses *Session) authorizationHandler(cmd string, args []string) {
func (s *Session) authorizationHandler(cmd string, args []string) {
switch cmd {
case "QUIT":
ses.send("+OK Goodnight and good luck")
ses.enterState(QUIT)
s.send("+OK Goodnight and good luck")
s.enterState(QUIT)
case "USER":
if len(args) > 0 {
ses.user = args[0]
ses.send(fmt.Sprintf("+OK Hello %v, welcome to Inbucket", ses.user))
s.user = args[0]
s.send(fmt.Sprintf("+OK Hello %v, welcome to Inbucket", s.user))
} else {
ses.send("-ERR Missing username argument")
s.send("-ERR Missing username argument")
}
case "PASS":
if ses.user == "" {
ses.ooSeq(cmd)
if s.user == "" {
s.ooSeq(cmd)
} else {
ses.loadMailbox()
ses.send(fmt.Sprintf("+OK Found %v messages for %v", ses.msgCount, ses.user))
ses.enterState(TRANSACTION)
s.loadMailbox()
s.send(fmt.Sprintf("+OK Found %v messages for %v", s.msgCount, s.user))
s.enterState(TRANSACTION)
}
case "APOP":
if len(args) != 2 {
ses.logWarn("Expected two arguments for APOP")
ses.send("-ERR APOP requires two arguments")
s.logger.Warn().Msgf("Expected two arguments for APOP")
s.send("-ERR APOP requires two arguments")
return
}
ses.user = args[0]
ses.loadMailbox()
ses.send(fmt.Sprintf("+OK Found %v messages for %v", ses.msgCount, ses.user))
ses.enterState(TRANSACTION)
s.user = args[0]
s.loadMailbox()
s.send(fmt.Sprintf("+OK Found %v messages for %v", s.msgCount, s.user))
s.enterState(TRANSACTION)
default:
ses.ooSeq(cmd)
s.ooSeq(cmd)
}
}
// TRANSACTION state
func (ses *Session) transactionHandler(cmd string, args []string) {
func (s *Session) transactionHandler(cmd string, args []string) {
switch cmd {
case "STAT":
if len(args) != 0 {
ses.logWarn("STAT got an unexpected argument")
ses.send("-ERR STAT command must have no arguments")
s.logger.Warn().Msgf("STAT got an unexpected argument")
s.send("-ERR STAT command must have no arguments")
return
}
var count int
var size int64
for i, msg := range ses.messages {
if ses.retain[i] {
for i, msg := range s.messages {
if s.retain[i] {
count++
size += msg.Size()
}
}
ses.send(fmt.Sprintf("+OK %v %v", count, size))
s.send(fmt.Sprintf("+OK %v %v", count, size))
case "LIST":
if len(args) > 1 {
ses.logWarn("LIST command had more than 1 argument")
ses.send("-ERR LIST command must have zero or one argument")
s.logger.Warn().Msgf("LIST command had more than 1 argument")
s.send("-ERR LIST command must have zero or one argument")
return
}
if len(args) == 1 {
msgNum, err := strconv.ParseInt(args[0], 10, 32)
if err != nil {
ses.logWarn("LIST command argument was not an integer")
ses.send("-ERR LIST command requires an integer argument")
s.logger.Warn().Msgf("LIST command argument was not an integer")
s.send("-ERR LIST command requires an integer argument")
return
}
if msgNum < 1 {
ses.logWarn("LIST command argument was less than 1")
ses.send("-ERR LIST argument must be greater than 0")
s.logger.Warn().Msgf("LIST command argument was less than 1")
s.send("-ERR LIST argument must be greater than 0")
return
}
if int(msgNum) > len(ses.messages) {
ses.logWarn("LIST command argument was greater than number of messages")
ses.send("-ERR LIST argument must not exceed the number of messages")
if int(msgNum) > len(s.messages) {
s.logger.Warn().Msgf("LIST command argument was greater than number of messages")
s.send("-ERR LIST argument must not exceed the number of messages")
return
}
if !ses.retain[msgNum-1] {
ses.logWarn("Client tried to LIST a message it had deleted")
ses.send(fmt.Sprintf("-ERR You deleted message %v", msgNum))
if !s.retain[msgNum-1] {
s.logger.Warn().Msgf("Client tried to LIST a message it had deleted")
s.send(fmt.Sprintf("-ERR You deleted message %v", msgNum))
return
}
ses.send(fmt.Sprintf("+OK %v %v", msgNum, ses.messages[msgNum-1].Size()))
s.send(fmt.Sprintf("+OK %v %v", msgNum, s.messages[msgNum-1].Size()))
} else {
ses.send(fmt.Sprintf("+OK Listing %v messages", ses.msgCount))
for i, msg := range ses.messages {
if ses.retain[i] {
ses.send(fmt.Sprintf("%v %v", i+1, msg.Size()))
s.send(fmt.Sprintf("+OK Listing %v messages", s.msgCount))
for i, msg := range s.messages {
if s.retain[i] {
s.send(fmt.Sprintf("%v %v", i+1, msg.Size()))
}
}
ses.send(".")
s.send(".")
}
case "UIDL":
if len(args) > 1 {
ses.logWarn("UIDL command had more than 1 argument")
ses.send("-ERR UIDL command must have zero or one argument")
s.logger.Warn().Msgf("UIDL command had more than 1 argument")
s.send("-ERR UIDL command must have zero or one argument")
return
}
if len(args) == 1 {
msgNum, err := strconv.ParseInt(args[0], 10, 32)
if err != nil {
ses.logWarn("UIDL command argument was not an integer")
ses.send("-ERR UIDL command requires an integer argument")
s.logger.Warn().Msgf("UIDL command argument was not an integer")
s.send("-ERR UIDL command requires an integer argument")
return
}
if msgNum < 1 {
ses.logWarn("UIDL command argument was less than 1")
ses.send("-ERR UIDL argument must be greater than 0")
s.logger.Warn().Msgf("UIDL command argument was less than 1")
s.send("-ERR UIDL argument must be greater than 0")
return
}
if int(msgNum) > len(ses.messages) {
ses.logWarn("UIDL command argument was greater than number of messages")
ses.send("-ERR UIDL argument must not exceed the number of messages")
if int(msgNum) > len(s.messages) {
s.logger.Warn().Msgf("UIDL command argument was greater than number of messages")
s.send("-ERR UIDL argument must not exceed the number of messages")
return
}
if !ses.retain[msgNum-1] {
ses.logWarn("Client tried to UIDL a message it had deleted")
ses.send(fmt.Sprintf("-ERR You deleted message %v", msgNum))
if !s.retain[msgNum-1] {
s.logger.Warn().Msgf("Client tried to UIDL a message it had deleted")
s.send(fmt.Sprintf("-ERR You deleted message %v", msgNum))
return
}
ses.send(fmt.Sprintf("+OK %v %v", msgNum, ses.messages[msgNum-1].ID()))
s.send(fmt.Sprintf("+OK %v %v", msgNum, s.messages[msgNum-1].ID()))
} else {
ses.send(fmt.Sprintf("+OK Listing %v messages", ses.msgCount))
for i, msg := range ses.messages {
if ses.retain[i] {
ses.send(fmt.Sprintf("%v %v", i+1, msg.ID()))
s.send(fmt.Sprintf("+OK Listing %v messages", s.msgCount))
for i, msg := range s.messages {
if s.retain[i] {
s.send(fmt.Sprintf("%v %v", i+1, msg.ID()))
}
}
ses.send(".")
s.send(".")
}
case "DELE":
if len(args) != 1 {
ses.logWarn("DELE command had invalid number of arguments")
ses.send("-ERR DELE command requires a single argument")
s.logger.Warn().Msgf("DELE command had invalid number of arguments")
s.send("-ERR DELE command requires a single argument")
return
}
msgNum, err := strconv.ParseInt(args[0], 10, 32)
if err != nil {
ses.logWarn("DELE command argument was not an integer")
ses.send("-ERR DELE command requires an integer argument")
s.logger.Warn().Msgf("DELE command argument was not an integer")
s.send("-ERR DELE command requires an integer argument")
return
}
if msgNum < 1 {
ses.logWarn("DELE command argument was less than 1")
ses.send("-ERR DELE argument must be greater than 0")
s.logger.Warn().Msgf("DELE command argument was less than 1")
s.send("-ERR DELE argument must be greater than 0")
return
}
if int(msgNum) > len(ses.messages) {
ses.logWarn("DELE command argument was greater than number of messages")
ses.send("-ERR DELE argument must not exceed the number of messages")
if int(msgNum) > len(s.messages) {
s.logger.Warn().Msgf("DELE command argument was greater than number of messages")
s.send("-ERR DELE argument must not exceed the number of messages")
return
}
if ses.retain[msgNum-1] {
ses.retain[msgNum-1] = false
ses.msgCount--
ses.send(fmt.Sprintf("+OK Deleted message %v", msgNum))
if s.retain[msgNum-1] {
s.retain[msgNum-1] = false
s.msgCount--
s.send(fmt.Sprintf("+OK Deleted message %v", msgNum))
} else {
ses.logWarn("Client tried to DELE an already deleted message")
ses.send(fmt.Sprintf("-ERR Message %v has already been deleted", msgNum))
s.logger.Warn().Msgf("Client tried to DELE an already deleted message")
s.send(fmt.Sprintf("-ERR Message %v has already been deleted", msgNum))
}
case "RETR":
if len(args) != 1 {
ses.logWarn("RETR command had invalid number of arguments")
ses.send("-ERR RETR command requires a single argument")
s.logger.Warn().Msgf("RETR command had invalid number of arguments")
s.send("-ERR RETR command requires a single argument")
return
}
msgNum, err := strconv.ParseInt(args[0], 10, 32)
if err != nil {
ses.logWarn("RETR command argument was not an integer")
ses.send("-ERR RETR command requires an integer argument")
s.logger.Warn().Msgf("RETR command argument was not an integer")
s.send("-ERR RETR command requires an integer argument")
return
}
if msgNum < 1 {
ses.logWarn("RETR command argument was less than 1")
ses.send("-ERR RETR argument must be greater than 0")
s.logger.Warn().Msgf("RETR command argument was less than 1")
s.send("-ERR RETR argument must be greater than 0")
return
}
if int(msgNum) > len(ses.messages) {
ses.logWarn("RETR command argument was greater than number of messages")
ses.send("-ERR RETR argument must not exceed the number of messages")
if int(msgNum) > len(s.messages) {
s.logger.Warn().Msgf("RETR command argument was greater than number of messages")
s.send("-ERR RETR argument must not exceed the number of messages")
return
}
ses.send(fmt.Sprintf("+OK %v bytes follows", ses.messages[msgNum-1].Size()))
ses.sendMessage(ses.messages[msgNum-1])
s.send(fmt.Sprintf("+OK %v bytes follows", s.messages[msgNum-1].Size()))
s.sendMessage(s.messages[msgNum-1])
case "TOP":
if len(args) != 2 {
ses.logWarn("TOP command had invalid number of arguments")
ses.send("-ERR TOP command requires two arguments")
s.logger.Warn().Msgf("TOP command had invalid number of arguments")
s.send("-ERR TOP command requires two arguments")
return
}
msgNum, err := strconv.ParseInt(args[0], 10, 32)
if err != nil {
ses.logWarn("TOP command first argument was not an integer")
ses.send("-ERR TOP command requires an integer argument")
s.logger.Warn().Msgf("TOP command first argument was not an integer")
s.send("-ERR TOP command requires an integer argument")
return
}
if msgNum < 1 {
ses.logWarn("TOP command first argument was less than 1")
ses.send("-ERR TOP first argument must be greater than 0")
s.logger.Warn().Msgf("TOP command first argument was less than 1")
s.send("-ERR TOP first argument must be greater than 0")
return
}
if int(msgNum) > len(ses.messages) {
ses.logWarn("TOP command first argument was greater than number of messages")
ses.send("-ERR TOP first argument must not exceed the number of messages")
if int(msgNum) > len(s.messages) {
s.logger.Warn().Msgf("TOP command first argument was greater than number of messages")
s.send("-ERR TOP first argument must not exceed the number of messages")
return
}
var lines int64
lines, err = strconv.ParseInt(args[1], 10, 32)
if err != nil {
ses.logWarn("TOP command second argument was not an integer")
ses.send("-ERR TOP command requires an integer argument")
s.logger.Warn().Msgf("TOP command second argument was not an integer")
s.send("-ERR TOP command requires an integer argument")
return
}
if lines < 0 {
ses.logWarn("TOP command second argument was negative")
ses.send("-ERR TOP second argument must be non-negative")
s.logger.Warn().Msgf("TOP command second argument was negative")
s.send("-ERR TOP second argument must be non-negative")
return
}
ses.send("+OK Top of message follows")
ses.sendMessageTop(ses.messages[msgNum-1], int(lines))
s.send("+OK Top of message follows")
s.sendMessageTop(s.messages[msgNum-1], int(lines))
case "QUIT":
ses.send("+OK We will process your deletes")
ses.processDeletes()
ses.enterState(QUIT)
s.send("+OK We will process your deletes")
s.processDeletes()
s.enterState(QUIT)
case "NOOP":
ses.send("+OK I have sucessfully done nothing")
s.send("+OK I have sucessfully done nothing")
case "RSET":
// Reset session, don't actually delete anything I told you to
ses.logTrace("Resetting session state on RSET request")
ses.reset()
ses.send("+OK Session reset")
s.logger.Debug().Msgf("Resetting session state on RSET request")
s.reset()
s.send("+OK Session reset")
default:
ses.ooSeq(cmd)
s.ooSeq(cmd)
}
}
// Send the contents of the message to the client
func (ses *Session) sendMessage(msg storage.Message) {
func (s *Session) sendMessage(msg storage.Message) {
reader, err := msg.Source()
if err != nil {
ses.logError("Failed to read message for RETR command")
ses.send("-ERR Failed to RETR that message, internal error")
s.logger.Error().Msgf("Failed to read message for RETR command")
s.send("-ERR Failed to RETR that message, internal error")
return
}
defer func() {
if err := reader.Close(); err != nil {
ses.logError("Failed to close message: %v", err)
s.logger.Error().Msgf("Failed to close message: %v", err)
}
}()
@@ -435,29 +445,29 @@ func (ses *Session) sendMessage(msg storage.Message) {
if strings.HasPrefix(line, ".") {
line = "." + line
}
ses.send(line)
s.send(line)
}
if err = scanner.Err(); err != nil {
ses.logError("Failed to read message for RETR command")
ses.send(".")
ses.send("-ERR Failed to RETR that message, internal error")
s.logger.Error().Msgf("Failed to read message for RETR command")
s.send(".")
s.send("-ERR Failed to RETR that message, internal error")
return
}
ses.send(".")
s.send(".")
}
// Send the headers plus the top N lines to the client
func (ses *Session) sendMessageTop(msg storage.Message, lineCount int) {
func (s *Session) sendMessageTop(msg storage.Message, lineCount int) {
reader, err := msg.Source()
if err != nil {
ses.logError("Failed to read message for RETR command")
ses.send("-ERR Failed to RETR that message, internal error")
s.logger.Error().Msgf("Failed to read message for RETR command")
s.send("-ERR Failed to RETR that message, internal error")
return
}
defer func() {
if err := reader.Close(); err != nil {
ses.logError("Failed to close message: %v", err)
s.logger.Error().Msgf("Failed to close message: %v", err)
}
}()
@@ -482,122 +492,96 @@ func (ses *Session) sendMessageTop(msg storage.Message, lineCount int) {
inBody = true
}
}
ses.send(line)
s.send(line)
}
if err = scanner.Err(); err != nil {
ses.logError("Failed to read message for RETR command")
ses.send(".")
ses.send("-ERR Failed to RETR that message, internal error")
s.logger.Error().Msgf("Failed to read message for RETR command")
s.send(".")
s.send("-ERR Failed to RETR that message, internal error")
return
}
ses.send(".")
s.send(".")
}
// Load the users mailbox
func (ses *Session) loadMailbox() {
m, err := ses.server.store.GetMessages(ses.user)
func (s *Session) loadMailbox() {
s.logger = s.logger.With().Str("mailbox", s.user).Logger()
m, err := s.server.store.GetMessages(s.user)
if err != nil {
ses.logError("Failed to load messages for %v: %v", ses.user, err)
s.logger.Error().Msgf("Failed to load messages for %v: %v", s.user, err)
}
ses.messages = m
ses.retainAll()
s.messages = m
s.retainAll()
}
// Reset retain flag to true for all messages
func (ses *Session) retainAll() {
ses.retain = make([]bool, len(ses.messages))
for i := range ses.retain {
ses.retain[i] = true
func (s *Session) retainAll() {
s.retain = make([]bool, len(s.messages))
for i := range s.retain {
s.retain[i] = true
}
ses.msgCount = len(ses.messages)
s.msgCount = len(s.messages)
}
// This would be considered the "UPDATE" state in the RFC, but it does not fit
// with our state-machine design here, since no commands are accepted - it just
// indicates that the session was closed cleanly and that deletes should be
// processed.
func (ses *Session) processDeletes() {
ses.logInfo("Processing deletes")
for i, msg := range ses.messages {
if !ses.retain[i] {
ses.logTrace("Deleting %v", msg)
if err := ses.server.store.RemoveMessage(ses.user, msg.ID()); err != nil {
ses.logWarn("Error deleting %v: %v", msg, err)
func (s *Session) processDeletes() {
s.logger.Info().Msgf("Processing deletes")
for i, msg := range s.messages {
if !s.retain[i] {
s.logger.Debug().Str("id", msg.ID()).Msg("Deleting message")
if err := s.server.store.RemoveMessage(s.user, msg.ID()); err != nil {
s.logger.Warn().Str("id", msg.ID()).Err(err).Msg("Error deleting message")
}
}
}
}
func (ses *Session) enterState(state State) {
ses.state = state
ses.logTrace("Entering state %v", state)
func (s *Session) enterState(state State) {
s.state = state
s.logger.Debug().Msgf("Entering state %v", state)
}
// Calculate the next read or write deadline based on maxIdleSeconds
func (ses *Session) nextDeadline() time.Time {
return time.Now().Add(ses.server.timeout)
func (s *Session) nextDeadline() time.Time {
return time.Now().Add(s.server.timeout)
}
// Send requested message, store errors in Session.sendError
func (ses *Session) send(msg string) {
if err := ses.conn.SetWriteDeadline(ses.nextDeadline()); err != nil {
ses.sendError = err
func (s *Session) send(msg string) {
if err := s.conn.SetWriteDeadline(s.nextDeadline()); err != nil {
s.sendError = err
return
}
if _, err := fmt.Fprint(ses.conn, msg+"\r\n"); err != nil {
ses.sendError = err
ses.logWarn("Failed to send: '%v'", msg)
if _, err := fmt.Fprint(s.conn, msg+"\r\n"); err != nil {
s.sendError = err
s.logger.Warn().Msgf("Failed to send: %q", msg)
return
}
ses.logTrace(">> %v >>", msg)
}
// readByteLine reads a line of input into the provided buffer. Does
// not reset the Buffer - please do so prior to calling.
func (ses *Session) readByteLine(buf *bytes.Buffer) error {
if err := ses.conn.SetReadDeadline(ses.nextDeadline()); err != nil {
return err
if s.debug {
fmt.Printf("%04d > %v\n", s.id, msg)
}
for {
line, err := ses.reader.ReadBytes('\r')
if err != nil {
return err
}
if _, err = buf.Write(line); err != nil {
return err
}
// Read the next byte looking for '\n'
c, err := ses.reader.ReadByte()
if err != nil {
return err
}
if err := buf.WriteByte(c); err != nil {
return err
}
if c == '\n' {
// We've reached the end of the line, return
return nil
}
// Else, keep looking
}
// Should be unreachable
}
// Reads a line of input
func (ses *Session) readLine() (line string, err error) {
if err = ses.conn.SetReadDeadline(ses.nextDeadline()); err != nil {
func (s *Session) readLine() (line string, err error) {
if err = s.conn.SetReadDeadline(s.nextDeadline()); err != nil {
return "", err
}
line, err = ses.reader.ReadString('\n')
line, err = s.reader.ReadString('\n')
if err != nil {
return "", err
}
ses.logTrace("<< %v <<", strings.TrimRight(line, "\r\n"))
if s.debug {
fmt.Printf("%04d %v\n", s.id, strings.TrimRight(line, "\r\n"))
}
return line, nil
}
func (ses *Session) parseCmd(line string) (cmd string, args []string, ok bool) {
func (s *Session) parseCmd(line string) (cmd string, args []string, ok bool) {
line = strings.TrimRight(line, "\r\n")
if line == "" {
return "", nil, true
@@ -607,32 +591,11 @@ func (ses *Session) parseCmd(line string) (cmd string, args []string, ok bool) {
return strings.ToUpper(words[0]), words[1:], true
}
func (ses *Session) reset() {
ses.retainAll()
func (s *Session) reset() {
s.retainAll()
}
func (ses *Session) ooSeq(cmd string) {
ses.send(fmt.Sprintf("-ERR Command %v is out of sequence", cmd))
ses.logWarn("Wasn't expecting %v here", cmd)
}
// Session specific logging methods
func (ses *Session) logTrace(msg string, args ...interface{}) {
log.Tracef("POP3[%v]<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...))
}
func (ses *Session) logInfo(msg string, args ...interface{}) {
log.Infof("POP3[%v]<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...))
}
func (ses *Session) logWarn(msg string, args ...interface{}) {
// Update metrics
//expWarnsTotal.Add(1)
log.Warnf("POP3[%v]<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...))
}
func (ses *Session) logError(msg string, args ...interface{}) {
// Update metrics
//expErrorsTotal.Add(1)
log.Errorf("POP3[%v]<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...))
func (s *Session) ooSeq(cmd string) {
s.send(fmt.Sprintf("-ERR Command %v is out of sequence", cmd))
s.logger.Warn().Msgf("Wasn't expecting %v here", cmd)
}

View File

@@ -7,12 +7,14 @@ import (
"time"
"github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/storage"
"github.com/rs/zerolog/log"
)
// Server defines an instance of our POP3 server
type Server struct {
// TODO(#91) Refactor config items out of this struct
config config.POP3
host string
domain string
timeout time.Duration
@@ -25,6 +27,7 @@ type Server struct {
// New creates a new Server struct
func New(cfg config.POP3, shutdownChan chan bool, store storage.Store) *Server {
return &Server{
config: cfg,
host: cfg.Addr,
domain: cfg.Domain,
store: store,
@@ -36,44 +39,42 @@ func New(cfg config.POP3, shutdownChan chan bool, store storage.Store) *Server {
// Start the server and listen for connections
func (s *Server) Start(ctx context.Context) {
slog := log.With().Str("module", "pop3").Str("phase", "startup").Logger()
addr, err := net.ResolveTCPAddr("tcp4", s.host)
if err != nil {
log.Errorf("POP3 Failed to build tcp4 address: %v", err)
slog.Error().Err(err).Msg("Failed to build tcp4 address")
s.emergencyShutdown()
return
}
log.Infof("POP3 listening on TCP4 %v", addr)
slog.Info().Str("addr", addr.String()).Msg("POP3 listening on tcp4")
s.listener, err = net.ListenTCP("tcp4", addr)
if err != nil {
log.Errorf("POP3 failed to start tcp4 listener: %v", err)
slog.Error().Err(err).Msg("Failed to start tcp4 listener")
s.emergencyShutdown()
return
}
// Listener go routine
// Listener go routine.
go s.serve(ctx)
// Wait for shutdown
// Wait for shutdown.
select {
case _ = <-ctx.Done():
}
log.Tracef("POP3 shutdown requested, connections will be drained")
// Closing the listener will cause the serve() go routine to exit
slog = log.With().Str("module", "pop3").Str("phase", "shutdown").Logger()
slog.Debug().Msg("POP3 shutdown requested, connections will be drained")
// Closing the listener will cause the serve() go routine to exit.
if err := s.listener.Close(); err != nil {
log.Errorf("Error closing POP3 listener: %v", err)
slog.Error().Err(err).Msg("Failed to close POP3 listener")
}
}
// serve is the listen/accept loop
// serve is the listen/accept loop.
func (s *Server) serve(ctx context.Context) {
// Handle incoming connections
// Handle incoming connections.
var tempDelay time.Duration
for sid := 1; ; sid++ {
if conn, err := s.listener.Accept(); err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
// Temporary error, sleep for a bit and try again
// Temporary error, sleep for a bit and try again.
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
@@ -82,17 +83,18 @@ func (s *Server) serve(ctx context.Context) {
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
log.Errorf("POP3 accept error: %v; retrying in %v", err, tempDelay)
log.Error().Str("module", "pop3").Err(err).
Msgf("POP3 accept error; retrying in %v", tempDelay)
time.Sleep(tempDelay)
continue
} else {
// Permanent error
// Permanent error.
select {
case <-ctx.Done():
// POP3 is shutting down
// POP3 is shutting down.
return
default:
// Something went wrong
// Something went wrong.
s.emergencyShutdown()
return
}
@@ -118,5 +120,5 @@ func (s *Server) emergencyShutdown() {
func (s *Server) Drain() {
// Wait for sessions to close
s.waitgroup.Wait()
log.Tracef("POP3 connections have drained")
log.Debug().Str("module", "pop3").Str("phase", "shutdown").Msg("POP3 connections have drained")
}

View File

@@ -11,8 +11,9 @@ import (
"strings"
"time"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/policy"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
// State tracks the current mode of our SMTP state machine
@@ -79,10 +80,12 @@ type Session struct {
reader *bufio.Reader
from string
recipients []*policy.Recipient
logger zerolog.Logger // Session specific logger.
debug bool // Print network traffic to stdout.
}
// NewSession creates a new Session for the given connection
func NewSession(server *Server, id int, conn net.Conn) *Session {
func NewSession(server *Server, id int, conn net.Conn, logger zerolog.Logger) *Session {
reader := bufio.NewReader(conn)
host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
return &Session{
@@ -93,11 +96,13 @@ func NewSession(server *Server, id int, conn net.Conn) *Session {
reader: reader,
remoteHost: host,
recipients: make([]*policy.Recipient, 0),
logger: logger,
debug: server.config.Debug,
}
}
func (ss *Session) String() string {
return fmt.Sprintf("Session{id: %v, state: %v}", ss.id, ss.state)
func (s *Session) String() string {
return fmt.Sprintf("Session{id: %v, state: %v}", s.id, s.state)
}
/* Session flow:
@@ -108,37 +113,41 @@ func (ss *Session) String() string {
* 5. Goto 2
*/
func (s *Server) startSession(id int, conn net.Conn) {
log.Infof("SMTP Connection from %v, starting session <%v>", conn.RemoteAddr(), id)
logger := log.Hook(logHook{}).With().
Str("module", "smtp").
Str("remote", conn.RemoteAddr().String()).
Int("session", id).Logger()
logger.Info().Msg("Starting SMTP session")
expConnectsCurrent.Add(1)
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Error closing connection for <%v>: %v", id, err)
logger.Warn().Err(err).Msg("Closing connection")
}
s.waitgroup.Done()
expConnectsCurrent.Add(-1)
}()
ss := NewSession(s, id, conn)
ss.greet()
ssn := NewSession(s, id, conn, logger)
ssn.greet()
// This is our command reading loop
for ss.state != QUIT && ss.sendError == nil {
if ss.state == DATA {
for ssn.state != QUIT && ssn.sendError == nil {
if ssn.state == DATA {
// Special case, does not use SMTP command format
ss.dataHandler()
ssn.dataHandler()
continue
}
line, err := ss.readLine()
line, err := ssn.readLine()
if err == nil {
if cmd, arg, ok := ss.parseCmd(line); ok {
if cmd, arg, ok := ssn.parseCmd(line); ok {
// Check against valid SMTP commands
if cmd == "" {
ss.send("500 Speak up")
ssn.send("500 Speak up")
continue
}
if !commands[cmd] {
ss.send(fmt.Sprintf("500 Syntax error, %v command unrecognized", cmd))
ss.logWarn("Unrecognized command: %v", cmd)
ssn.send(fmt.Sprintf("500 Syntax error, %v command unrecognized", cmd))
ssn.logger.Warn().Msgf("Unrecognized command: %v", cmd)
continue
}
@@ -146,99 +155,99 @@ func (s *Server) startSession(id int, conn net.Conn) {
switch cmd {
case "SEND", "SOML", "SAML", "EXPN", "HELP", "TURN":
// These commands are not implemented in any state
ss.send(fmt.Sprintf("502 %v command not implemented", cmd))
ss.logWarn("Command %v not implemented by Inbucket", cmd)
ssn.send(fmt.Sprintf("502 %v command not implemented", cmd))
ssn.logger.Warn().Msgf("Command %v not implemented by Inbucket", cmd)
continue
case "VRFY":
ss.send("252 Cannot VRFY user, but will accept message")
ssn.send("252 Cannot VRFY user, but will accept message")
continue
case "NOOP":
ss.send("250 I have sucessfully done nothing")
ssn.send("250 I have sucessfully done nothing")
continue
case "RSET":
// Reset session
ss.logTrace("Resetting session state on RSET request")
ss.reset()
ss.send("250 Session reset")
ssn.logger.Debug().Msgf("Resetting session state on RSET request")
ssn.reset()
ssn.send("250 Session reset")
continue
case "QUIT":
ss.send("221 Goodnight and good luck")
ss.enterState(QUIT)
ssn.send("221 Goodnight and good luck")
ssn.enterState(QUIT)
continue
}
// Send command to handler for current state
switch ss.state {
switch ssn.state {
case GREET:
ss.greetHandler(cmd, arg)
ssn.greetHandler(cmd, arg)
continue
case READY:
ss.readyHandler(cmd, arg)
ssn.readyHandler(cmd, arg)
continue
case MAIL:
ss.mailHandler(cmd, arg)
ssn.mailHandler(cmd, arg)
continue
}
ss.logError("Session entered unexpected state %v", ss.state)
ssn.logger.Error().Msgf("Session entered unexpected state %v", ssn.state)
break
} else {
ss.send("500 Syntax error, command garbled")
ssn.send("500 Syntax error, command garbled")
}
} else {
// readLine() returned an error
if err == io.EOF {
switch ss.state {
switch ssn.state {
case GREET, READY:
// EOF is common here
ss.logInfo("Client closed connection (state %v)", ss.state)
ssn.logger.Info().Msgf("Client closed connection (state %v)", ssn.state)
default:
ss.logWarn("Got EOF while in state %v", ss.state)
ssn.logger.Warn().Msgf("Got EOF while in state %v", ssn.state)
}
break
}
// not an EOF
ss.logWarn("Connection error: %v", err)
ssn.logger.Warn().Msgf("Connection error: %v", err)
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
ss.send("221 Idle timeout, bye bye")
ssn.send("221 Idle timeout, bye bye")
break
}
}
ss.send("221 Connection error, sorry")
ssn.send("221 Connection error, sorry")
break
}
}
if ss.sendError != nil {
ss.logWarn("Network send error: %v", ss.sendError)
if ssn.sendError != nil {
ssn.logger.Warn().Msgf("Network send error: %v", ssn.sendError)
}
ss.logInfo("Closing connection")
ssn.logger.Info().Msgf("Closing connection")
}
// GREET state -> waiting for HELO
func (ss *Session) greetHandler(cmd string, arg string) {
func (s *Session) greetHandler(cmd string, arg string) {
switch cmd {
case "HELO":
domain, err := parseHelloArgument(arg)
if err != nil {
ss.send("501 Domain/address argument required for HELO")
s.send("501 Domain/address argument required for HELO")
return
}
ss.remoteDomain = domain
ss.send("250 Great, let's get this show on the road")
ss.enterState(READY)
s.remoteDomain = domain
s.send("250 Great, let's get this show on the road")
s.enterState(READY)
case "EHLO":
domain, err := parseHelloArgument(arg)
if err != nil {
ss.send("501 Domain/address argument required for EHLO")
s.send("501 Domain/address argument required for EHLO")
return
}
ss.remoteDomain = domain
ss.send("250-Great, let's get this show on the road")
ss.send("250-8BITMIME")
ss.send(fmt.Sprintf("250 SIZE %v", ss.server.maxMessageBytes))
ss.enterState(READY)
s.remoteDomain = domain
s.send("250-Great, let's get this show on the road")
s.send("250-8BITMIME")
s.send(fmt.Sprintf("250 SIZE %v", s.server.maxMessageBytes))
s.enterState(READY)
default:
ss.ooSeq(cmd)
s.ooSeq(cmd)
}
}
@@ -254,139 +263,139 @@ func parseHelloArgument(arg string) (string, error) {
}
// READY state -> waiting for MAIL
func (ss *Session) readyHandler(cmd string, arg string) {
func (s *Session) readyHandler(cmd string, arg string) {
if cmd == "MAIL" {
// Match FROM, while accepting '>' as quoted pair and in double quoted strings
// (?i) makes the regex case insensitive, (?:) is non-grouping sub-match
re := regexp.MustCompile("(?i)^FROM:\\s*<((?:\\\\>|[^>])+|\"[^\"]+\"@[^>]+)>( [\\w= ]+)?$")
m := re.FindStringSubmatch(arg)
if m == nil {
ss.send("501 Was expecting MAIL arg syntax of FROM:<address>")
ss.logWarn("Bad MAIL argument: %q", arg)
s.send("501 Was expecting MAIL arg syntax of FROM:<address>")
s.logger.Warn().Msgf("Bad MAIL argument: %q", arg)
return
}
from := m[1]
if _, _, err := policy.ParseEmailAddress(from); err != nil {
ss.send("501 Bad sender address syntax")
ss.logWarn("Bad address as MAIL arg: %q, %s", from, err)
s.send("501 Bad sender address syntax")
s.logger.Warn().Msgf("Bad address as MAIL arg: %q, %s", from, err)
return
}
// This is where the client may put BODY=8BITMIME, but we already
// read the DATA as bytes, so it does not effect our processing.
if m[2] != "" {
args, ok := ss.parseArgs(m[2])
args, ok := s.parseArgs(m[2])
if !ok {
ss.send("501 Unable to parse MAIL ESMTP parameters")
ss.logWarn("Bad MAIL argument: %q", arg)
s.send("501 Unable to parse MAIL ESMTP parameters")
s.logger.Warn().Msgf("Bad MAIL argument: %q", arg)
return
}
if args["SIZE"] != "" {
size, err := strconv.ParseInt(args["SIZE"], 10, 32)
if err != nil {
ss.send("501 Unable to parse SIZE as an integer")
ss.logWarn("Unable to parse SIZE %q as an integer", args["SIZE"])
s.send("501 Unable to parse SIZE as an integer")
s.logger.Warn().Msgf("Unable to parse SIZE %q as an integer", args["SIZE"])
return
}
if int(size) > ss.server.maxMessageBytes {
ss.send("552 Max message size exceeded")
ss.logWarn("Client wanted to send oversized message: %v", args["SIZE"])
if int(size) > s.server.maxMessageBytes {
s.send("552 Max message size exceeded")
s.logger.Warn().Msgf("Client wanted to send oversized message: %v", args["SIZE"])
return
}
}
}
ss.from = from
ss.logInfo("Mail from: %v", from)
ss.send(fmt.Sprintf("250 Roger, accepting mail from <%v>", from))
ss.enterState(MAIL)
s.from = from
s.logger.Info().Msgf("Mail from: %v", from)
s.send(fmt.Sprintf("250 Roger, accepting mail from <%v>", from))
s.enterState(MAIL)
} else {
ss.ooSeq(cmd)
s.ooSeq(cmd)
}
}
// MAIL state -> waiting for RCPTs followed by DATA
func (ss *Session) mailHandler(cmd string, arg string) {
func (s *Session) mailHandler(cmd string, arg string) {
switch cmd {
case "RCPT":
if (len(arg) < 4) || (strings.ToUpper(arg[0:3]) != "TO:") {
ss.send("501 Was expecting RCPT arg syntax of TO:<address>")
ss.logWarn("Bad RCPT argument: %q", arg)
s.send("501 Was expecting RCPT arg syntax of TO:<address>")
s.logger.Warn().Msgf("Bad RCPT argument: %q", arg)
return
}
// This trim is probably too forgiving
addr := strings.Trim(arg[3:], "<> ")
recip, err := ss.server.apolicy.NewRecipient(addr)
recip, err := s.server.apolicy.NewRecipient(addr)
if err != nil {
ss.send("501 Bad recipient address syntax")
ss.logWarn("Bad address as RCPT arg: %q, %s", addr, err)
s.send("501 Bad recipient address syntax")
s.logger.Warn().Msgf("Bad address as RCPT arg: %q, %s", addr, err)
return
}
if len(ss.recipients) >= ss.server.maxRecips {
ss.logWarn("Maximum limit of %v recipients reached", ss.server.maxRecips)
ss.send(fmt.Sprintf("552 Maximum limit of %v recipients reached", ss.server.maxRecips))
if len(s.recipients) >= s.server.maxRecips {
s.logger.Warn().Msgf("Maximum limit of %v recipients reached", s.server.maxRecips)
s.send(fmt.Sprintf("552 Maximum limit of %v recipients reached", s.server.maxRecips))
return
}
ss.recipients = append(ss.recipients, recip)
ss.logInfo("Recipient: %v", addr)
ss.send(fmt.Sprintf("250 I'll make sure <%v> gets this", addr))
s.recipients = append(s.recipients, recip)
s.logger.Info().Msgf("Recipient: %v", addr)
s.send(fmt.Sprintf("250 I'll make sure <%v> gets this", addr))
return
case "DATA":
if arg != "" {
ss.send("501 DATA command should not have any arguments")
ss.logWarn("Got unexpected args on DATA: %q", arg)
s.send("501 DATA command should not have any arguments")
s.logger.Warn().Msgf("Got unexpected args on DATA: %q", arg)
return
}
if len(ss.recipients) > 0 {
if len(s.recipients) > 0 {
// We have recipients, go to accept data
ss.enterState(DATA)
s.enterState(DATA)
return
}
// DATA out of sequence
ss.ooSeq(cmd)
s.ooSeq(cmd)
return
}
ss.ooSeq(cmd)
s.ooSeq(cmd)
}
// DATA
func (ss *Session) dataHandler() {
ss.send("354 Start mail input; end with <CRLF>.<CRLF>")
func (s *Session) dataHandler() {
s.send("354 Start mail input; end with <CRLF>.<CRLF>")
msgBuf := &bytes.Buffer{}
for {
lineBuf, err := ss.readByteLine()
lineBuf, err := s.readByteLine()
if err != nil {
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
ss.send("221 Idle timeout, bye bye")
s.send("221 Idle timeout, bye bye")
}
}
ss.logWarn("Error: %v while reading", err)
ss.enterState(QUIT)
s.logger.Warn().Msgf("Error: %v while reading", err)
s.enterState(QUIT)
return
}
if bytes.Equal(lineBuf, []byte(".\r\n")) || bytes.Equal(lineBuf, []byte(".\n")) {
// Mail data complete.
tstamp := time.Now().Format(timeStampFormat)
for _, recip := range ss.recipients {
for _, recip := range s.recipients {
if recip.ShouldStore() {
// Generate Received header.
prefix := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n",
ss.remoteDomain, ss.remoteHost, ss.server.domain, recip.Address.Address,
s.remoteDomain, s.remoteHost, s.server.domain, recip.Address.Address,
tstamp)
// Deliver message.
_, err := ss.server.manager.Deliver(
recip, ss.from, ss.recipients, prefix, msgBuf.Bytes())
_, err := s.server.manager.Deliver(
recip, s.from, s.recipients, prefix, msgBuf.Bytes())
if err != nil {
ss.logError("delivery for %v: %v", recip.LocalPart, err)
ss.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart))
ss.reset()
s.logger.Error().Msgf("delivery for %v: %v", recip.LocalPart, err)
s.send(fmt.Sprintf("451 Failed to store message for %v", recip.LocalPart))
s.reset()
return
}
}
expReceivedTotal.Add(1)
}
ss.send("250 Mail accepted for delivery")
ss.logInfo("Message size %v bytes", msgBuf.Len())
ss.reset()
s.send("250 Mail accepted for delivery")
s.logger.Info().Msgf("Message size %v bytes", msgBuf.Len())
s.reset()
return
}
// RFC: remove leading periods from DATA.
@@ -394,84 +403,92 @@ func (ss *Session) dataHandler() {
lineBuf = lineBuf[1:]
}
msgBuf.Write(lineBuf)
if msgBuf.Len() > ss.server.maxMessageBytes {
ss.send("552 Maximum message size exceeded")
ss.logWarn("Max message size exceeded while in DATA")
ss.reset()
if msgBuf.Len() > s.server.maxMessageBytes {
s.send("552 Maximum message size exceeded")
s.logger.Warn().Msgf("Max message size exceeded while in DATA")
s.reset()
return
}
}
}
func (ss *Session) enterState(state State) {
ss.state = state
ss.logTrace("Entering state %v", state)
func (s *Session) enterState(state State) {
s.state = state
s.logger.Debug().Msgf("Entering state %v", state)
}
func (ss *Session) greet() {
ss.send(fmt.Sprintf("220 %v Inbucket SMTP ready", ss.server.domain))
func (s *Session) greet() {
s.send(fmt.Sprintf("220 %v Inbucket SMTP ready", s.server.domain))
}
// Calculate the next read or write deadline based on maxIdle
func (ss *Session) nextDeadline() time.Time {
return time.Now().Add(ss.server.timeout)
func (s *Session) nextDeadline() time.Time {
return time.Now().Add(s.server.timeout)
}
// Send requested message, store errors in Session.sendError
func (ss *Session) send(msg string) {
if err := ss.conn.SetWriteDeadline(ss.nextDeadline()); err != nil {
ss.sendError = err
func (s *Session) send(msg string) {
if err := s.conn.SetWriteDeadline(s.nextDeadline()); err != nil {
s.sendError = err
return
}
if _, err := fmt.Fprint(ss.conn, msg+"\r\n"); err != nil {
ss.sendError = err
ss.logWarn("Failed to send: %q", msg)
if _, err := fmt.Fprint(s.conn, msg+"\r\n"); err != nil {
s.sendError = err
s.logger.Warn().Msgf("Failed to send: %q", msg)
return
}
ss.logTrace(">> %v >>", msg)
if s.debug {
fmt.Printf("%04d > %v\n", s.id, msg)
}
}
// readByteLine reads a line of input, returns byte slice.
func (ss *Session) readByteLine() ([]byte, error) {
if err := ss.conn.SetReadDeadline(ss.nextDeadline()); err != nil {
func (s *Session) readByteLine() ([]byte, error) {
if err := s.conn.SetReadDeadline(s.nextDeadline()); err != nil {
return nil, err
}
return ss.reader.ReadBytes('\n')
b, err := s.reader.ReadBytes('\n')
if err == nil && s.debug {
fmt.Printf("%04d %s\n", s.id, bytes.TrimRight(b, "\r\n"))
}
return b, err
}
// Reads a line of input
func (ss *Session) readLine() (line string, err error) {
if err = ss.conn.SetReadDeadline(ss.nextDeadline()); err != nil {
func (s *Session) readLine() (line string, err error) {
if err = s.conn.SetReadDeadline(s.nextDeadline()); err != nil {
return "", err
}
line, err = ss.reader.ReadString('\n')
line, err = s.reader.ReadString('\n')
if err != nil {
return "", err
}
ss.logTrace("<< %v <<", strings.TrimRight(line, "\r\n"))
if s.debug {
fmt.Printf("%04d %v\n", s.id, strings.TrimRight(line, "\r\n"))
}
return line, nil
}
func (ss *Session) parseCmd(line string) (cmd string, arg string, ok bool) {
func (s *Session) parseCmd(line string) (cmd string, arg string, ok bool) {
line = strings.TrimRight(line, "\r\n")
l := len(line)
switch {
case l == 0:
return "", "", true
case l < 4:
ss.logWarn("Command too short: %q", line)
s.logger.Warn().Msgf("Command too short: %q", line)
return "", "", false
case l == 4:
return strings.ToUpper(line), "", true
case l == 5:
// Too long to be only command, too short to have args
ss.logWarn("Mangled command: %q", line)
s.logger.Warn().Msgf("Mangled command: %q", line)
return "", "", false
}
// If we made it here, command is long enough to have args
if line[4] != ' ' {
// There wasn't a space after the command?
ss.logWarn("Mangled command: %q", line)
s.logger.Warn().Msgf("Mangled command: %q", line)
return "", "", false
}
// I'm not sure if we should trim the args or not, but we will for now
@@ -483,49 +500,28 @@ func (ss *Session) parseCmd(line string) (cmd string, arg string, ok bool) {
// string:
// " BODY=8BITMIME SIZE=1024"
// The leading space is mandatory.
func (ss *Session) parseArgs(arg string) (args map[string]string, ok bool) {
func (s *Session) parseArgs(arg string) (args map[string]string, ok bool) {
args = make(map[string]string)
re := regexp.MustCompile(` (\w+)=(\w+)`)
pm := re.FindAllStringSubmatch(arg, -1)
if pm == nil {
ss.logWarn("Failed to parse arg string: %q")
s.logger.Warn().Msgf("Failed to parse arg string: %q")
return nil, false
}
for _, m := range pm {
args[strings.ToUpper(m[1])] = m[2]
}
ss.logTrace("ESMTP params: %v", args)
s.logger.Debug().Msgf("ESMTP params: %v", args)
return args, true
}
func (ss *Session) reset() {
ss.enterState(READY)
ss.from = ""
ss.recipients = nil
func (s *Session) reset() {
s.enterState(READY)
s.from = ""
s.recipients = nil
}
func (ss *Session) ooSeq(cmd string) {
ss.send(fmt.Sprintf("503 Command %v is out of sequence", cmd))
ss.logWarn("Wasn't expecting %v here", cmd)
}
// Session specific logging methods
func (ss *Session) logTrace(msg string, args ...interface{}) {
log.Tracef("SMTP[%v]<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...))
}
func (ss *Session) logInfo(msg string, args ...interface{}) {
log.Infof("SMTP[%v]<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...))
}
func (ss *Session) logWarn(msg string, args ...interface{}) {
// Update metrics
expWarnsTotal.Add(1)
log.Warnf("SMTP[%v]<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...))
}
func (ss *Session) logError(msg string, args ...interface{}) {
// Update metrics
expErrorsTotal.Add(1)
log.Errorf("SMTP[%v]<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...))
func (s *Session) ooSeq(cmd string) {
s.send(fmt.Sprintf("503 Command %v is out of sequence", cmd))
s.logger.Warn().Msgf("Wasn't expecting %v here", cmd)
}

View File

@@ -10,9 +10,10 @@ import (
"time"
"github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/message"
"github.com/jhillyerd/inbucket/pkg/metric"
"github.com/jhillyerd/inbucket/pkg/policy"
"github.com/rs/zerolog/log"
)
func init() {
@@ -27,35 +28,14 @@ func init() {
m.Set("WarnsTotal", expWarnsTotal)
m.Set("WarnsHist", expWarnsHist)
log.AddTickerFunc(func() {
expReceivedHist.Set(log.PushMetric(deliveredHist, expReceivedTotal))
expConnectsHist.Set(log.PushMetric(connectsHist, expConnectsTotal))
expErrorsHist.Set(log.PushMetric(errorsHist, expErrorsTotal))
expWarnsHist.Set(log.PushMetric(warnsHist, expWarnsTotal))
metric.AddTickerFunc(func() {
expReceivedHist.Set(metric.Push(deliveredHist, expReceivedTotal))
expConnectsHist.Set(metric.Push(connectsHist, expConnectsTotal))
expErrorsHist.Set(metric.Push(errorsHist, expErrorsTotal))
expWarnsHist.Set(metric.Push(warnsHist, expWarnsTotal))
})
}
// Server holds the configuration and state of our SMTP server
type Server struct {
// Configuration
host string
domain string
domainNoStore string
maxRecips int
maxMessageBytes int
storeMessages bool
timeout time.Duration
// Dependencies
apolicy *policy.Addressing // Address policy.
globalShutdown chan bool // Shuts down Inbucket.
manager message.Manager // Used to deliver messages.
// State
listener net.Listener // Incoming network connections
waitgroup *sync.WaitGroup // Waitgroup tracks individual sessions
}
var (
// Raw stat collectors
expConnectsTotal = new(expvar.Int)
@@ -77,6 +57,29 @@ var (
expWarnsHist = new(expvar.String)
)
// Server holds the configuration and state of our SMTP server
type Server struct {
// TODO(#91) Refactor config items out of this struct
config config.SMTP
// Configuration
host string
domain string
domainNoStore string
maxRecips int
maxMessageBytes int
storeMessages bool
timeout time.Duration
// Dependencies
apolicy *policy.Addressing // Address policy.
globalShutdown chan bool // Shuts down Inbucket.
manager message.Manager // Used to deliver messages.
// State
listener net.Listener // Incoming network connections
waitgroup *sync.WaitGroup // Waitgroup tracks individual sessions
}
// NewServer creates a new Server instance with the specificed config
func NewServer(
cfg config.SMTP,
@@ -85,6 +88,7 @@ func NewServer(
apolicy *policy.Addressing,
) *Server {
return &Server{
config: cfg,
host: cfg.Addr,
domain: cfg.Domain,
domainNoStore: strings.ToLower(cfg.DomainNoStore),
@@ -99,51 +103,48 @@ func NewServer(
}
}
// Start the listener and handle incoming connections
// Start the listener and handle incoming connections.
func (s *Server) Start(ctx context.Context) {
slog := log.With().Str("module", "smtp").Str("phase", "startup").Logger()
addr, err := net.ResolveTCPAddr("tcp4", s.host)
if err != nil {
log.Errorf("Failed to build tcp4 address: %v", err)
slog.Error().Err(err).Msg("Failed to build tcp4 address")
s.emergencyShutdown()
return
}
log.Infof("SMTP listening on TCP4 %v", addr)
slog.Info().Str("addr", addr.String()).Msg("SMTP listening on tcp4")
s.listener, err = net.ListenTCP("tcp4", addr)
if err != nil {
log.Errorf("SMTP failed to start tcp4 listener: %v", err)
slog.Error().Err(err).Msg("Failed to start tcp4 listener")
s.emergencyShutdown()
return
}
if !s.storeMessages {
log.Infof("Load test mode active, messages will not be stored")
slog.Info().Msg("Load test mode active, messages will not be stored")
} else if s.domainNoStore != "" {
log.Infof("Messages sent to domain '%v' will be discarded", s.domainNoStore)
slog.Info().Msgf("Messages sent to domain '%v' will be discarded", s.domainNoStore)
}
// Listener go routine
// Listener go routine.
go s.serve(ctx)
// Wait for shutdown
// Wait for shutdown.
<-ctx.Done()
log.Tracef("SMTP shutdown requested, connections will be drained")
// Closing the listener will cause the serve() go routine to exit
slog = log.With().Str("module", "smtp").Str("phase", "shutdown").Logger()
slog.Debug().Msg("SMTP shutdown requested, connections will be drained")
// Closing the listener will cause the serve() go routine to exit.
if err := s.listener.Close(); err != nil {
log.Errorf("Failed to close SMTP listener: %v", err)
slog.Error().Err(err).Msg("Failed to close SMTP listener")
}
}
// serve is the listen/accept loop
// serve is the listen/accept loop.
func (s *Server) serve(ctx context.Context) {
// Handle incoming connections
// Handle incoming connections.
var tempDelay time.Duration
for sessionID := 1; ; sessionID++ {
if conn, err := s.listener.Accept(); err != nil {
// There was an error accepting the connection
// There was an error accepting the connection.
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
// Temporary error, sleep for a bit and try again
// Temporary error, sleep for a bit and try again.
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
@@ -152,17 +153,18 @@ func (s *Server) serve(ctx context.Context) {
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
log.Errorf("SMTP accept error: %v; retrying in %v", err, tempDelay)
log.Error().Str("module", "smtp").Err(err).
Msgf("SMTP accept error; retrying in %v", tempDelay)
time.Sleep(tempDelay)
continue
} else {
// Permanent error
// Permanent error.
select {
case <-ctx.Done():
// SMTP is shutting down
// SMTP is shutting down.
return
default:
// Something went wrong
// Something went wrong.
s.emergencyShutdown()
return
}
@@ -177,7 +179,7 @@ func (s *Server) serve(ctx context.Context) {
}
func (s *Server) emergencyShutdown() {
// Shutdown Inbucket
// Shutdown Inbucket.
select {
case <-s.globalShutdown:
default:
@@ -187,7 +189,7 @@ func (s *Server) emergencyShutdown() {
// Drain causes the caller to block until all active SMTP sessions have finished
func (s *Server) Drain() {
// Wait for sessions to close
// Wait for sessions to close.
s.waitgroup.Wait()
log.Tracef("SMTP connections have drained")
log.Debug().Str("module", "smtp").Str("phase", "shutdown").Msg("SMTP connections have drained")
}

View File

@@ -0,0 +1,15 @@
package smtp
import "github.com/rs/zerolog"
type logHook struct{}
// Run implements a zerolog hook that updates the SMTP warning/error expvars.
func (h logHook) Run(e *zerolog.Event, level zerolog.Level, msg string) {
switch level {
case zerolog.WarnLevel:
expWarnsTotal.Add(1)
case zerolog.ErrorLevel:
expErrorsTotal.Add(1)
}
}

View File

@@ -8,7 +8,7 @@ import (
"strings"
"time"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/rs/zerolog/log"
)
// TemplateFuncs declares functions made available to all templates (including partials)
@@ -42,7 +42,8 @@ func Reverse(name string, things ...interface{}) string {
// Grab the route
u, err := Router.Get(name).URL(strs...)
if err != nil {
log.Errorf("Failed to reverse route: %v", err)
log.Error().Str("module", "web").Str("name", name).Err(err).
Msg("Failed to reverse route")
return "/ROUTE-ERROR"
}
return u.Path

View File

@@ -13,9 +13,9 @@ import (
"github.com/gorilla/securecookie"
"github.com/gorilla/sessions"
"github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/message"
"github.com/jhillyerd/inbucket/pkg/msghub"
"github.com/rs/zerolog/log"
)
// Handler is a function type that handles an HTTP request in Inbucket
@@ -66,17 +66,20 @@ func Initialize(
// Content Paths
staticPath := filepath.Join(conf.Web.UIDir, staticDir)
log.Infof("Web UI content mapped to path: %s", conf.Web.UIDir)
log.Info().Str("module", "web").Str("phase", "startup").Str("path", conf.Web.UIDir).
Msg("Web UI content mapped")
Router.PathPrefix("/public/").Handler(http.StripPrefix("/public/",
http.FileServer(http.Dir(staticPath))))
http.Handle("/", Router)
// Session cookie setup
if conf.Web.CookieAuthKey == "" {
log.Infof("HTTP generating random cookie.auth.key")
log.Info().Str("module", "web").Str("phase", "startup").
Msg("Generating random cookie.auth.key")
sessionStore = sessions.NewCookieStore(securecookie.GenerateRandomKey(64))
} else {
log.Tracef("HTTP using configured cookie.auth.key")
log.Info().Str("module", "web").Str("phase", "startup").
Msg("Using configured cookie.auth.key")
sessionStore = sessions.NewCookieStore([]byte(conf.Web.CookieAuthKey))
}
}
@@ -91,11 +94,13 @@ func Start(ctx context.Context) {
}
// We don't use ListenAndServe because it lacks a way to close the listener
log.Infof("HTTP listening on TCP4 %v", server.Addr)
log.Info().Str("module", "web").Str("phase", "startup").Str("addr", server.Addr).
Msg("HTTP listening on tcp4")
var err error
listener, err = net.Listen("tcp", server.Addr)
if err != nil {
log.Errorf("HTTP failed to start TCP4 listener: %v", err)
log.Error().Str("module", "web").Str("phase", "startup").Err(err).
Msg("HTTP failed to start TCP4 listener")
emergencyShutdown()
return
}
@@ -106,12 +111,14 @@ func Start(ctx context.Context) {
// Wait for shutdown
select {
case _ = <-ctx.Done():
log.Tracef("HTTP server shutting down on request")
log.Debug().Str("module", "web").Str("phase", "shutdown").
Msg("HTTP server shutting down on request")
}
// Closing the listener will cause the serve() go routine to exit
if err := listener.Close(); err != nil {
log.Errorf("Failed to close HTTP listener: %v", err)
log.Debug().Str("module", "web").Str("phase", "shutdown").Err(err).
Msg("Failed to close HTTP listener")
}
}
@@ -124,7 +131,8 @@ func serve(ctx context.Context) {
case _ = <-ctx.Done():
// Nop
default:
log.Errorf("HTTP server failed: %v", err)
log.Error().Str("module", "web").Str("phase", "startup").Err(err).
Msg("HTTP server failed")
emergencyShutdown()
return
}
@@ -135,17 +143,19 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Create the context
ctx, err := NewContext(req)
if err != nil {
log.Errorf("HTTP failed to create context: %v", err)
log.Error().Str("module", "web").Err(err).Msg("HTTP failed to create context")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer ctx.Close()
// Run the handler, grab the error, and report it
log.Tracef("HTTP[%v] %v %v %q", req.RemoteAddr, req.Proto, req.Method, req.RequestURI)
log.Debug().Str("module", "web").Str("remote", req.RemoteAddr).Str("proto", req.Proto).
Str("method", req.Method).Str("path", req.RequestURI).Msg("Request")
err = h(w, req, ctx)
if err != nil {
log.Errorf("HTTP error handling %q: %v", req.RequestURI, err)
log.Error().Str("module", "web").Str("path", req.RequestURI).Err(err).
Msg("Error handling request")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

View File

@@ -7,7 +7,7 @@ import (
"path/filepath"
"sync"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/rs/zerolog/log"
)
var cachedMutex sync.Mutex
@@ -19,7 +19,8 @@ var cachedPartials = map[string]*template.Template{}
func RenderTemplate(name string, w http.ResponseWriter, data interface{}) error {
t, err := ParseTemplate(name, false)
if err != nil {
log.Errorf("Error in template '%v': %v", name, err)
log.Error().Str("module", "web").Str("path", name).Err(err).
Msg("Error in template")
return err
}
w.Header().Set("Expires", "-1")
@@ -31,7 +32,8 @@ func RenderTemplate(name string, w http.ResponseWriter, data interface{}) error
func RenderPartial(name string, w http.ResponseWriter, data interface{}) error {
t, err := ParseTemplate(name, true)
if err != nil {
log.Errorf("Error in template '%v': %v", name, err)
log.Error().Str("module", "web").Str("path", name).Err(err).
Msg("Error in template")
return err
}
w.Header().Set("Expires", "-1")
@@ -49,7 +51,7 @@ func ParseTemplate(name string, partial bool) (*template.Template, error) {
}
tempFile := filepath.Join(rootConfig.Web.UIDir, templateDir, filepath.FromSlash(name))
log.Tracef("Parsing template %v", tempFile)
log.Debug().Str("module", "web").Str("path", name).Msg("Parsing template")
var err error
var t *template.Template
@@ -70,10 +72,10 @@ func ParseTemplate(name string, partial bool) (*template.Template, error) {
// Allows us to disable caching for theme development
if rootConfig.Web.TemplateCache {
if partial {
log.Tracef("Caching partial %v", name)
log.Debug().Str("module", "web").Str("path", name).Msg("Caching partial")
cachedTemplates[name] = t
} else {
log.Tracef("Caching template %v", name)
log.Debug().Str("module", "web").Str("path", name).Msg("Caching template")
cachedTemplates[name] = t
}
}

View File

@@ -7,7 +7,7 @@ import (
"path/filepath"
"time"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/rs/zerolog/log"
)
// Message implements Message and contains a little bit of data about a
@@ -35,9 +35,12 @@ func (mb *mbox) newMessage() (*Message, error) {
// Delete old messages over messageCap
if mb.store.messageCap > 0 {
for len(mb.messages) >= mb.store.messageCap {
log.Infof("Mailbox %q over configured message cap", mb.name)
if err := mb.removeMessage(mb.messages[0].ID()); err != nil {
log.Errorf("Error deleting message: %s", err)
log.Info().Str("module", "storage").Str("mailbox", mb.name).
Msg("Mailbox over message cap")
id := mb.messages[0].ID()
if err := mb.removeMessage(id); err != nil {
log.Error().Str("module", "storage").Str("mailbox", mb.name).Str("id", id).
Err(err).Msg("Unable to delete message")
}
}
}

View File

@@ -10,10 +10,10 @@ import (
"time"
"github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/policy"
"github.com/jhillyerd/inbucket/pkg/storage"
"github.com/jhillyerd/inbucket/pkg/stringutil"
"github.com/rs/zerolog/log"
)
// Name of index file in each mailbox
@@ -57,7 +57,8 @@ func New(cfg config.Storage) (storage.Store, error) {
if _, err := os.Stat(mailPath); err != nil {
// Mail datastore does not yet exist
if err = os.MkdirAll(mailPath, 0770); err != nil {
log.Errorf("Error creating dir %q: %v", mailPath, err)
log.Error().Str("module", "storage").Str("path", mailPath).Err(err).
Msg("Error creating dir")
}
}
return &Store{path: path, mailPath: mailPath, messageCap: cfg.MailboxMsgCap}, nil

View File

@@ -9,8 +9,8 @@ import (
"path/filepath"
"sync"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/storage"
"github.com/rs/zerolog/log"
)
// mbox manages the mail for a specific user and correlates to a particular directory on disk.
@@ -87,7 +87,7 @@ func (mb *mbox) removeMessage(id string) error {
return nil
}
// There are still messages in the index
log.Tracef("Deleting %v", msg.rawPath())
log.Debug().Str("module", "storage").Str("path", msg.rawPath()).Msg("Deleting file")
return os.Remove(msg.rawPath())
}
@@ -104,7 +104,8 @@ func (mb *mbox) readIndex() error {
// Check if index exists
if _, err := os.Stat(mb.indexPath); err != nil {
// Does not exist, but that's not an error in our world
log.Tracef("Index %v does not exist (yet)", mb.indexPath)
log.Debug().Str("module", "storage").Str("path", mb.indexPath).
Msg("Index does not yet exist")
mb.indexLoaded = true
return nil
}
@@ -114,7 +115,8 @@ func (mb *mbox) readIndex() error {
}
defer func() {
if err := file.Close(); err != nil {
log.Errorf("Failed to close %q: %v", mb.indexPath, err)
log.Error().Str("module", "storage").Str("path", mb.indexPath).Err(err).
Msg("Failed to close")
}
}()
// Decode gob data
@@ -171,12 +173,13 @@ func (mb *mbox) writeIndex() error {
return err
}
if err := file.Close(); err != nil {
log.Errorf("Failed to close %q: %v", mb.indexPath, err)
log.Error().Str("module", "storage").Str("path", mb.indexPath).Err(err).
Msg("Failed to close")
return err
}
} else {
// No messages, delete index+maildir
log.Tracef("Removing mailbox %v", mb.path)
log.Debug().Str("module", "storage").Str("path", mb.path).Msg("Removing mailbox")
return mb.removeDir()
}
return nil
@@ -186,7 +189,8 @@ func (mb *mbox) writeIndex() error {
func (mb *mbox) createDir() error {
if _, err := os.Stat(mb.path); err != nil {
if err := os.MkdirAll(mb.path, 0770); err != nil {
log.Errorf("Failed to create directory %v, %v", mb.path, err)
log.Error().Str("module", "storage").Str("path", mb.path).Err(err).
Msg("Failed to create directory")
return err
}
}
@@ -223,10 +227,10 @@ func removeDirIfEmpty(path string) (removed bool) {
// Dir not empty
return false
}
log.Tracef("Removing dir %v", path)
log.Debug().Str("module", "storage").Str("path", path).Msg("Removing dir")
err = os.Remove(path)
if err != nil {
log.Errorf("Failed to remove %q: %v", path, err)
log.Error().Str("module", "storage").Str("path", path).Err(err).Msg("Failed to remove")
return false
}
return true

View File

@@ -7,7 +7,8 @@ import (
"time"
"github.com/jhillyerd/inbucket/pkg/config"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/metric"
"github.com/rs/zerolog/log"
)
var (
@@ -42,10 +43,10 @@ func init() {
rm.Set("RetainedSize", expRetainedSize)
rm.Set("SizeHist", expSizeHist)
log.AddTickerFunc(func() {
expRetentionDeletesHist.Set(log.PushMetric(retentionDeletesHist, expRetentionDeletesTotal))
expRetainedHist.Set(log.PushMetric(retainedHist, expRetainedCurrent))
expSizeHist.Set(log.PushMetric(sizeHist, expRetainedSize))
metric.AddTickerFunc(func() {
expRetentionDeletesHist.Set(metric.Push(retentionDeletesHist, expRetentionDeletesTotal))
expRetainedHist.Set(metric.Push(retainedHist, expRetainedCurrent))
expSizeHist.Set(metric.Push(sizeHist, expRetainedSize))
})
}
@@ -79,16 +80,18 @@ func NewRetentionScanner(
// Start up the retention scanner if retention period > 0
func (rs *RetentionScanner) Start() {
if rs.retentionPeriod <= 0 {
log.Infof("Retention scanner disabled")
log.Info().Str("phase", "startup").Str("module", "storage").Msg("Retention scanner disabled")
close(rs.retentionShutdown)
return
}
log.Infof("Retention configured for %v", rs.retentionPeriod)
log.Info().Str("phase", "startup").Str("module", "storage").
Msgf("Retention configured for %v", rs.retentionPeriod)
go rs.run()
}
// run loops to kick off the scanner on the correct schedule
func (rs *RetentionScanner) run() {
slog := log.With().Str("module", "storage").Logger()
start := time.Now()
retentionLoop:
for {
@@ -96,7 +99,7 @@ retentionLoop:
since := time.Since(start)
if since < time.Minute {
dur := time.Minute - since
log.Tracef("Retention scanner sleeping for %v", dur)
slog.Debug().Msgf("Retention scanner sleeping for %v", dur)
select {
case <-rs.globalShutdown:
break retentionLoop
@@ -106,7 +109,7 @@ retentionLoop:
// Kickoff scan
start = time.Now()
if err := rs.DoScan(); err != nil {
log.Errorf("Error during retention scan: %v", err)
slog.Error().Err(err).Msg("Error during retention scan")
}
// Check for global shutdown
select {
@@ -115,13 +118,14 @@ retentionLoop:
default:
}
}
log.Tracef("Retention scanner shut down")
slog.Debug().Str("phase", "shutdown").Msg("Retention scanner shut down")
close(rs.retentionShutdown)
}
// DoScan does a single pass of all mailboxes looking for messages that can be purged.
func (rs *RetentionScanner) DoScan() error {
log.Tracef("Starting retention scan")
slog := log.With().Str("module", "storage").Logger()
slog.Debug().Msg("Starting retention scan")
cutoff := time.Now().Add(-1 * rs.retentionPeriod)
retained := 0
storeSize := int64(0)
@@ -129,9 +133,11 @@ func (rs *RetentionScanner) DoScan() error {
err := rs.ds.VisitMailboxes(func(messages []Message) bool {
for _, msg := range messages {
if msg.Date().Before(cutoff) {
log.Tracef("Purging expired message %v/%v", msg.Mailbox(), msg.ID())
slog.Debug().Str("mailbox", msg.Mailbox()).
Msgf("Purging expired message %v", msg.ID())
if err := rs.ds.RemoveMessage(msg.Mailbox(), msg.ID()); err != nil {
log.Errorf("Failed to purge message %v: %v", msg.ID(), err)
slog.Error().Str("mailbox", msg.Mailbox()).Err(err).
Msgf("Failed to purge message %v", msg.ID())
} else {
expRetentionDeletesTotal.Add(1)
}
@@ -142,7 +148,7 @@ func (rs *RetentionScanner) DoScan() error {
}
select {
case <-rs.globalShutdown:
log.Tracef("Retention scan aborted due to shutdown")
slog.Debug().Str("phase", "shutdown").Msg("Retention scan aborted due to shutdown")
return false
case <-time.After(rs.retentionSleep):
// Reduce disk thrashing

View File

@@ -7,10 +7,10 @@ import (
"net/http"
"strconv"
"github.com/jhillyerd/inbucket/pkg/log"
"github.com/jhillyerd/inbucket/pkg/server/web"
"github.com/jhillyerd/inbucket/pkg/storage"
"github.com/jhillyerd/inbucket/pkg/webui/sanitize"
"github.com/rs/zerolog/log"
)
// MailboxIndex renders the index page for a particular mailbox
@@ -76,7 +76,6 @@ func MailboxList(w http.ResponseWriter, req *http.Request, ctx *web.Context) (er
// This doesn't indicate empty, likely an IO error
return fmt.Errorf("Failed to get messages for %v: %v", name, err)
}
log.Tracef("Got %v messsages", len(messages))
// Render partial template
return web.RenderPartial("mailbox/_list.html", w, map[string]interface{}{
"ctx": ctx,
@@ -109,7 +108,9 @@ func MailboxShow(w http.ResponseWriter, req *http.Request, ctx *web.Context) (er
if str, err := sanitize.HTML(msg.HTML()); err == nil {
htmlBody = template.HTML(str)
} else {
log.Warnf("HTML sanitizer failed: %s", err)
// Soft failure, render empty tab.
log.Warn().Str("module", "webui").Str("mailbox", name).Str("id", id).Err(err).
Msg("HTML sanitizer failed")
}
}
// Render partial template