diff --git a/config/config.go b/config/config.go index 2cdfd9b..62c7f69 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,12 @@ type SmtpConfig struct { StoreMessages bool } +type Pop3Config struct { + Ip4address net.IP + Ip4port int + MaxIdleSeconds int +} + type WebConfig struct { Ip4address net.IP Ip4port int @@ -42,6 +48,7 @@ var ( // Parsed specific configs smtpConfig *SmtpConfig + pop3Config *Pop3Config webConfig *WebConfig dataStoreConfig *DataStoreConfig ) @@ -51,6 +58,11 @@ func GetSmtpConfig() SmtpConfig { return *smtpConfig } +// GetPop3Config returns a copy of the Pop3Config object +func GetPop3Config() Pop3Config { + return *pop3Config +} + // GetWebConfig returns a copy of the WebConfig object func GetWebConfig() WebConfig { return *webConfig @@ -75,6 +87,7 @@ func LoadConfig(filename string) error { // Validate sections requireSection(messages, "logging") requireSection(messages, "smtp") + requireSection(messages, "pop3") requireSection(messages, "web") requireSection(messages, "datastore") if messages.Len() > 0 { @@ -94,6 +107,9 @@ func LoadConfig(filename string) error { requireOption(messages, "smtp", "max.idle.seconds") requireOption(messages, "smtp", "max.message.bytes") requireOption(messages, "smtp", "store.messages") + requireOption(messages, "pop3", "ip4.address") + requireOption(messages, "pop3", "ip4.port") + requireOption(messages, "pop3", "max.idle.seconds") requireOption(messages, "web", "ip4.address") requireOption(messages, "web", "ip4.port") requireOption(messages, "web", "template.dir") @@ -116,6 +132,10 @@ func LoadConfig(filename string) error { return err } + if err = parsePop3Config(); err != nil { + return err + } + if err = parseWebConfig(); err != nil { return err } @@ -215,6 +235,42 @@ func parseSmtpConfig() error { return nil } +// parsePop3Config trying to catch config errors early +func parsePop3Config() error { + pop3Config = new(Pop3Config) + section := "pop3" + + // Parse IP4 address only, error on IP6. + option := "ip4.address" + str, err := Config.String(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + addr := net.ParseIP(str) + if addr == nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + addr = addr.To4() + if addr == nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v' not IPv4!", section, option, err) + } + pop3Config.Ip4address = addr + + option = "ip4.port" + pop3Config.Ip4port, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + + option = "max.idle.seconds" + pop3Config.MaxIdleSeconds, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + + return nil +} + // parseWebConfig trying to catch config errors early func parseWebConfig() error { webConfig = new(WebConfig) diff --git a/etc/devel.conf b/etc/devel.conf index 5d888b6..bcca491 100644 --- a/etc/devel.conf +++ b/etc/devel.conf @@ -37,13 +37,26 @@ max.recipients=100 # client, SMTP RFC recommends at least 5 minutes (300 seconds). max.idle.seconds=30 -# Maximum allowable size of message body in bytes (including attachments) +# Maximum allowable size of message body in bytes (including attachments) max.message.bytes=20480000 # Should we place messages into the datastore, or just throw them away # (for load testing): true or false store.messages=true +############################################################################# +[pop3] + +# IPv4 address to listen for POP3 connections on. +ip4.address=0.0.0.0 + +# IPv4 port to listen for POP3 connections on. +ip4.port=1100 + +# How long we allow a network connection to be idle before hanging up on the +# client, POP3 RFC requires at least 10 minutes (600 seconds). +max.idle.seconds=600 + ############################################################################# [web] diff --git a/inbucket.go b/inbucket.go index fb5f262..674b265 100644 --- a/inbucket.go +++ b/inbucket.go @@ -9,6 +9,7 @@ import ( "fmt" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/pop3d" "github.com/jhillyerd/inbucket/smtpd" "github.com/jhillyerd/inbucket/web" golog "log" @@ -30,6 +31,7 @@ var startTime = time.Now() var logf *os.File var smtpServer *smtpd.Server +var pop3Server *pop3d.Server func main() { flag.Parse() @@ -95,11 +97,17 @@ func main() { // Start HTTP server go web.Start() + // Start POP3 server + pop3Server = pop3d.New() + go pop3Server.Start() + // Startup SMTP server, block until it exits smtpServer = smtpd.New() smtpServer.Start() + // Wait for active connections to finish smtpServer.Drain() + pop3Server.Drain() } // openLogFile creates or appends to the logfile passed on commandline diff --git a/pop3d/handler.go b/pop3d/handler.go new file mode 100644 index 0000000..66166a7 --- /dev/null +++ b/pop3d/handler.go @@ -0,0 +1,305 @@ +package pop3d + +import ( + "bufio" + "bytes" + //"container/list" + "fmt" + "github.com/jhillyerd/inbucket/log" + "io" + "net" + //"strconv" + "strings" + "time" +) + +type State int + +const ( + AUTHORIZATION State = iota // The client must now identify and authenticate + TRANSACTION // Mailbox open, client may now issue commands + UPDATE // Purge deleted messages, cleanup + QUIT +) + +func (s State) String() string { + switch s { + case AUTHORIZATION: + return "AUTHORIZATION" + case TRANSACTION: + return "TRANSACTION" + case UPDATE: + return "UPDATE" + case QUIT: + return "QUIT" + } + return "Unknown" +} + +var commands = map[string]bool{ + "QUIT": true, + "STAT": true, + "LIST": true, + "RETR": true, + "DELE": true, + "NOOP": true, + "RSET": true, + "TOP": true, + "UIDL": true, + "USER": true, + "PASS": true, + "APOP": true, +} + +type Session struct { + server *Server + id int + conn net.Conn + remoteHost string + sendError error + state State + reader *bufio.Reader + user string +} + +func NewSession(server *Server, id int, conn net.Conn) *Session { + reader := bufio.NewReader(conn) + host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + return &Session{server: server, id: id, conn: conn, state: AUTHORIZATION, + reader: reader, remoteHost: host} +} + +func (ses *Session) String() string { + return fmt.Sprintf("Session{id: %v, state: %v}", ses.id, ses.state) +} + +/* Session flow: + * 1. Send initial greeting + * 2. Receive cmd + * 3. If good cmd, respond, optionally change state + * 4. If bad cmd, respond error + * 5. Goto 2 + */ +func (s *Server) startSession(id int, conn net.Conn) { + log.Info("Connection from %v, starting session <%v>", conn.RemoteAddr(), id) + //expConnectsCurrent.Add(1) + defer func() { + conn.Close() + s.waitgroup.Done() + //expConnectsCurrent.Add(-1) + }() + + ses := NewSession(s, id, conn) + ses.greet() + + // This is our command reading loop + for ses.state != QUIT && ses.sendError == nil { + line, err := ses.readLine() + if err == nil { + if cmd, arg, ok := ses.parseCmd(line); ok { + // Check against valid SMTP commands + if cmd == "" { + ses.send("-ERR Speak up") + continue + } + if !commands[cmd] { + ses.send(fmt.Sprintf("-ERR Syntax error, %v command unrecognized", cmd)) + ses.warn("Unrecognized command: %v", cmd) + continue + } + + // Commands we handle in any state + switch cmd { + case "APOP", "TOP": + // These commands are not implemented in any state + ses.send(fmt.Sprintf("-ERR %v command not implemented", cmd)) + ses.warn("Command %v not implemented by Inbucket", cmd) + continue + case "NOOP": + // TODO move to transaction state + ses.send("+OK I have sucessfully done nothing") + continue + case "RSET": + // TODO move to transaction state + // Reset session + ses.trace("Resetting session state on RSET request") + ses.reset() + ses.send("+OK Session reset") + continue + case "QUIT": + // TODO should be handled differently by transaciton + ses.send("+OK Goodnight and good luck") + ses.enterState(QUIT) + continue + } + + // Send command to handler for current state + switch ses.state { + case AUTHORIZATION: + ses.authorizationHandler(cmd, arg) + continue + case TRANSACTION: + //ses.transactionHandler(cmd, arg) + continue + case UPDATE: + //ses.updateHandler(cmd, arg) + continue + } + ses.error("Session entered unexpected state %v", ses.state) + break + } else { + ses.send("-ERR Syntax error, command garbled") + } + } else { + // readLine() returned an error + if err == io.EOF { + switch ses.state { + case AUTHORIZATION: + // EOF is common here + ses.info("Client closed connection (state %v)", ses.state) + default: + ses.warn("Got EOF while in state %v", ses.state) + } + break + } + // not an EOF + ses.warn("Connection error: %v", err) + if netErr, ok := err.(net.Error); ok { + if netErr.Timeout() { + ses.send("-ERR Idle timeout, bye bye") + break + } + } + ses.send("-ERR Connection error, sorry") + break + } + } + if ses.sendError != nil { + ses.warn("Network send error: %v", ses.sendError) + } + ses.info("Closing connection") +} + +// AUTHORIZATION state +func (ses *Session) authorizationHandler(cmd string, arg []string) { + switch cmd { + case "HELO": + ses.send("250 Great, let's get this show on the road") + //ses.enterState(READY) + case "EHLO": + ses.send("250-Great, let's get this show on the road") + ses.send("250-8BITMIME") + //ses.enterState(READY) + default: + ses.ooSeq(cmd) + } +} + +func (ses *Session) enterState(state State) { + ses.state = state + ses.trace("Entering state %v", state) +} + +func (ses *Session) greet() { + ses.send("+OK Inbucket POP3 server ready") +} + +// Calculate the next read or write deadline based on maxIdleSeconds +func (ses *Session) nextDeadline() time.Time { + return time.Now().Add(time.Duration(ses.server.maxIdleSeconds) * time.Second) +} + +// Send requested message, store errors in Session.sendError +func (ses *Session) send(msg string) { + if err := ses.conn.SetWriteDeadline(ses.nextDeadline()); err != nil { + ses.sendError = err + return + } + if _, err := fmt.Fprint(ses.conn, msg+"\r\n"); err != nil { + ses.sendError = err + ses.warn("Failed to send: '%v'", msg) + return + } + ses.trace(">> %v >>", msg) +} + +// readByteLine reads a line of input into the provided buffer. Does +// not reset the Buffer - please do so prior to calling. +func (ses *Session) readByteLine(buf *bytes.Buffer) error { + if err := ses.conn.SetReadDeadline(ses.nextDeadline()); err != nil { + return err + } + for { + line, err := ses.reader.ReadBytes('\r') + if err != nil { + return err + } + buf.Write(line) + // Read the next byte looking for '\n' + c, err := ses.reader.ReadByte() + if err != nil { + return err + } + buf.WriteByte(c) + if c == '\n' { + // We've reached the end of the line, return + return nil + } + // Else, keep looking + } + // Should be unreachable + return nil +} + +// Reads a line of input +func (ses *Session) readLine() (line string, err error) { + if err = ses.conn.SetReadDeadline(ses.nextDeadline()); err != nil { + return "", err + } + line, err = ses.reader.ReadString('\n') + if err != nil { + return "", err + } + ses.trace("<< %v <<", strings.TrimRight(line, "\r\n")) + return line, nil +} + +func (ses *Session) parseCmd(line string) (cmd string, args []string, ok bool) { + line = strings.TrimRight(line, "\r\n") + if len(line) == 0 { + return "", nil, true + } + + words := strings.Split(line, " ") + return strings.ToUpper(words[0]), words[1:], true +} + +func (ses *Session) reset() { + //ses.enterState(READY) +} + +func (ses *Session) ooSeq(cmd string) { + ses.send(fmt.Sprintf("-ERR Command %v is out of sequence", cmd)) + ses.warn("Wasn't expecting %v here", cmd) +} + +// Session specific logging methods +func (ses *Session) trace(msg string, args ...interface{}) { + log.Trace("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) +} + +func (ses *Session) info(msg string, args ...interface{}) { + log.Info("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) +} + +func (ses *Session) warn(msg string, args ...interface{}) { + // Update metrics + //expWarnsTotal.Add(1) + log.Warn("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) +} + +func (ses *Session) error(msg string, args ...interface{}) { + // Update metrics + //expErrorsTotal.Add(1) + log.Error("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) +} diff --git a/pop3d/listener.go b/pop3d/listener.go new file mode 100644 index 0000000..5a9377d --- /dev/null +++ b/pop3d/listener.go @@ -0,0 +1,95 @@ +package pop3d + +import ( + "fmt" + "github.com/jhillyerd/inbucket/config" + "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/smtpd" + "net" + "sync" + "time" +) + +// Real server code starts here +type Server struct { + maxIdleSeconds int + dataStore smtpd.DataStore + listener net.Listener + shutdown bool + waitgroup *sync.WaitGroup +} + +// Init a new Server object +func New() *Server { + // TODO is two filestores better/worse than sharing w/ smtpd? + ds := smtpd.NewFileDataStore() + cfg := config.GetPop3Config() + return &Server{dataStore: ds, maxIdleSeconds: cfg.MaxIdleSeconds, + waitgroup: new(sync.WaitGroup)} +} + +// Main listener loop +func (s *Server) Start() { + cfg := config.GetPop3Config() + addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%v:%v", + cfg.Ip4address, cfg.Ip4port)) + if err != nil { + log.Error("Failed to build tcp4 address: %v", err) + // TODO More graceful early-shutdown procedure + panic(err) + } + + log.Info("POP3 listening on TCP4 %v", addr) + s.listener, err = net.ListenTCP("tcp4", addr) + if err != nil { + log.Error("POP3 failed to start tcp4 listener: %v", err) + // TODO More graceful early-shutdown procedure + panic(err) + } + + // Handle incoming connections + var tempDelay time.Duration + for sid := 1; ; sid++ { + if conn, err := s.listener.Accept(); err != nil { + if nerr, ok := err.(net.Error); ok && nerr.Temporary() { + // Temporary error, sleep for a bit and try again + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + log.Error("POP3 accept error: %v; retrying in %v", err, tempDelay) + time.Sleep(tempDelay) + continue + } else { + if s.shutdown { + log.Trace("POP3 listener shutting down on request") + return + } + // TODO Implement a max error counter before shutdown? + // or maybe attempt to restart POP3 + panic(err) + } + } else { + tempDelay = 0 + s.waitgroup.Add(1) + go s.startSession(sid, conn) + } + } +} + +// Stop requests the POP3 server closes it's listener +func (s *Server) Stop() { + log.Trace("POP3 shutdown requested, connections will be drained") + s.shutdown = true + s.listener.Close() +} + +// Drain causes the caller to block until all active POP3 sessions have finished +func (s *Server) Drain() { + s.waitgroup.Wait() + log.Trace("POP3 connections drained") +}