From 338698d461a16c94ee743eea93245d25f719aca9 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Mon, 9 Sep 2013 15:51:26 -0700 Subject: [PATCH 1/6] Add empty shell of POP3 server --- config/config.go | 56 +++++++++ etc/devel.conf | 15 ++- inbucket.go | 8 ++ pop3d/handler.go | 305 ++++++++++++++++++++++++++++++++++++++++++++++ pop3d/listener.go | 95 +++++++++++++++ 5 files changed, 478 insertions(+), 1 deletion(-) create mode 100644 pop3d/handler.go create mode 100644 pop3d/listener.go diff --git a/config/config.go b/config/config.go index 2cdfd9b..62c7f69 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,12 @@ type SmtpConfig struct { StoreMessages bool } +type Pop3Config struct { + Ip4address net.IP + Ip4port int + MaxIdleSeconds int +} + type WebConfig struct { Ip4address net.IP Ip4port int @@ -42,6 +48,7 @@ var ( // Parsed specific configs smtpConfig *SmtpConfig + pop3Config *Pop3Config webConfig *WebConfig dataStoreConfig *DataStoreConfig ) @@ -51,6 +58,11 @@ func GetSmtpConfig() SmtpConfig { return *smtpConfig } +// GetPop3Config returns a copy of the Pop3Config object +func GetPop3Config() Pop3Config { + return *pop3Config +} + // GetWebConfig returns a copy of the WebConfig object func GetWebConfig() WebConfig { return *webConfig @@ -75,6 +87,7 @@ func LoadConfig(filename string) error { // Validate sections requireSection(messages, "logging") requireSection(messages, "smtp") + requireSection(messages, "pop3") requireSection(messages, "web") requireSection(messages, "datastore") if messages.Len() > 0 { @@ -94,6 +107,9 @@ func LoadConfig(filename string) error { requireOption(messages, "smtp", "max.idle.seconds") requireOption(messages, "smtp", "max.message.bytes") requireOption(messages, "smtp", "store.messages") + requireOption(messages, "pop3", "ip4.address") + requireOption(messages, "pop3", "ip4.port") + requireOption(messages, "pop3", "max.idle.seconds") requireOption(messages, "web", "ip4.address") requireOption(messages, "web", "ip4.port") requireOption(messages, "web", "template.dir") @@ -116,6 +132,10 @@ func LoadConfig(filename string) error { return err } + if err = parsePop3Config(); err != nil { + return err + } + if err = parseWebConfig(); err != nil { return err } @@ -215,6 +235,42 @@ func parseSmtpConfig() error { return nil } +// parsePop3Config trying to catch config errors early +func parsePop3Config() error { + pop3Config = new(Pop3Config) + section := "pop3" + + // Parse IP4 address only, error on IP6. + option := "ip4.address" + str, err := Config.String(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + addr := net.ParseIP(str) + if addr == nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + addr = addr.To4() + if addr == nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v' not IPv4!", section, option, err) + } + pop3Config.Ip4address = addr + + option = "ip4.port" + pop3Config.Ip4port, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + + option = "max.idle.seconds" + pop3Config.MaxIdleSeconds, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + + return nil +} + // parseWebConfig trying to catch config errors early func parseWebConfig() error { webConfig = new(WebConfig) diff --git a/etc/devel.conf b/etc/devel.conf index 5d888b6..bcca491 100644 --- a/etc/devel.conf +++ b/etc/devel.conf @@ -37,13 +37,26 @@ max.recipients=100 # client, SMTP RFC recommends at least 5 minutes (300 seconds). max.idle.seconds=30 -# Maximum allowable size of message body in bytes (including attachments) +# Maximum allowable size of message body in bytes (including attachments) max.message.bytes=20480000 # Should we place messages into the datastore, or just throw them away # (for load testing): true or false store.messages=true +############################################################################# +[pop3] + +# IPv4 address to listen for POP3 connections on. +ip4.address=0.0.0.0 + +# IPv4 port to listen for POP3 connections on. +ip4.port=1100 + +# How long we allow a network connection to be idle before hanging up on the +# client, POP3 RFC requires at least 10 minutes (600 seconds). +max.idle.seconds=600 + ############################################################################# [web] diff --git a/inbucket.go b/inbucket.go index fb5f262..674b265 100644 --- a/inbucket.go +++ b/inbucket.go @@ -9,6 +9,7 @@ import ( "fmt" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/pop3d" "github.com/jhillyerd/inbucket/smtpd" "github.com/jhillyerd/inbucket/web" golog "log" @@ -30,6 +31,7 @@ var startTime = time.Now() var logf *os.File var smtpServer *smtpd.Server +var pop3Server *pop3d.Server func main() { flag.Parse() @@ -95,11 +97,17 @@ func main() { // Start HTTP server go web.Start() + // Start POP3 server + pop3Server = pop3d.New() + go pop3Server.Start() + // Startup SMTP server, block until it exits smtpServer = smtpd.New() smtpServer.Start() + // Wait for active connections to finish smtpServer.Drain() + pop3Server.Drain() } // openLogFile creates or appends to the logfile passed on commandline diff --git a/pop3d/handler.go b/pop3d/handler.go new file mode 100644 index 0000000..66166a7 --- /dev/null +++ b/pop3d/handler.go @@ -0,0 +1,305 @@ +package pop3d + +import ( + "bufio" + "bytes" + //"container/list" + "fmt" + "github.com/jhillyerd/inbucket/log" + "io" + "net" + //"strconv" + "strings" + "time" +) + +type State int + +const ( + AUTHORIZATION State = iota // The client must now identify and authenticate + TRANSACTION // Mailbox open, client may now issue commands + UPDATE // Purge deleted messages, cleanup + QUIT +) + +func (s State) String() string { + switch s { + case AUTHORIZATION: + return "AUTHORIZATION" + case TRANSACTION: + return "TRANSACTION" + case UPDATE: + return "UPDATE" + case QUIT: + return "QUIT" + } + return "Unknown" +} + +var commands = map[string]bool{ + "QUIT": true, + "STAT": true, + "LIST": true, + "RETR": true, + "DELE": true, + "NOOP": true, + "RSET": true, + "TOP": true, + "UIDL": true, + "USER": true, + "PASS": true, + "APOP": true, +} + +type Session struct { + server *Server + id int + conn net.Conn + remoteHost string + sendError error + state State + reader *bufio.Reader + user string +} + +func NewSession(server *Server, id int, conn net.Conn) *Session { + reader := bufio.NewReader(conn) + host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + return &Session{server: server, id: id, conn: conn, state: AUTHORIZATION, + reader: reader, remoteHost: host} +} + +func (ses *Session) String() string { + return fmt.Sprintf("Session{id: %v, state: %v}", ses.id, ses.state) +} + +/* Session flow: + * 1. Send initial greeting + * 2. Receive cmd + * 3. If good cmd, respond, optionally change state + * 4. If bad cmd, respond error + * 5. Goto 2 + */ +func (s *Server) startSession(id int, conn net.Conn) { + log.Info("Connection from %v, starting session <%v>", conn.RemoteAddr(), id) + //expConnectsCurrent.Add(1) + defer func() { + conn.Close() + s.waitgroup.Done() + //expConnectsCurrent.Add(-1) + }() + + ses := NewSession(s, id, conn) + ses.greet() + + // This is our command reading loop + for ses.state != QUIT && ses.sendError == nil { + line, err := ses.readLine() + if err == nil { + if cmd, arg, ok := ses.parseCmd(line); ok { + // Check against valid SMTP commands + if cmd == "" { + ses.send("-ERR Speak up") + continue + } + if !commands[cmd] { + ses.send(fmt.Sprintf("-ERR Syntax error, %v command unrecognized", cmd)) + ses.warn("Unrecognized command: %v", cmd) + continue + } + + // Commands we handle in any state + switch cmd { + case "APOP", "TOP": + // These commands are not implemented in any state + ses.send(fmt.Sprintf("-ERR %v command not implemented", cmd)) + ses.warn("Command %v not implemented by Inbucket", cmd) + continue + case "NOOP": + // TODO move to transaction state + ses.send("+OK I have sucessfully done nothing") + continue + case "RSET": + // TODO move to transaction state + // Reset session + ses.trace("Resetting session state on RSET request") + ses.reset() + ses.send("+OK Session reset") + continue + case "QUIT": + // TODO should be handled differently by transaciton + ses.send("+OK Goodnight and good luck") + ses.enterState(QUIT) + continue + } + + // Send command to handler for current state + switch ses.state { + case AUTHORIZATION: + ses.authorizationHandler(cmd, arg) + continue + case TRANSACTION: + //ses.transactionHandler(cmd, arg) + continue + case UPDATE: + //ses.updateHandler(cmd, arg) + continue + } + ses.error("Session entered unexpected state %v", ses.state) + break + } else { + ses.send("-ERR Syntax error, command garbled") + } + } else { + // readLine() returned an error + if err == io.EOF { + switch ses.state { + case AUTHORIZATION: + // EOF is common here + ses.info("Client closed connection (state %v)", ses.state) + default: + ses.warn("Got EOF while in state %v", ses.state) + } + break + } + // not an EOF + ses.warn("Connection error: %v", err) + if netErr, ok := err.(net.Error); ok { + if netErr.Timeout() { + ses.send("-ERR Idle timeout, bye bye") + break + } + } + ses.send("-ERR Connection error, sorry") + break + } + } + if ses.sendError != nil { + ses.warn("Network send error: %v", ses.sendError) + } + ses.info("Closing connection") +} + +// AUTHORIZATION state +func (ses *Session) authorizationHandler(cmd string, arg []string) { + switch cmd { + case "HELO": + ses.send("250 Great, let's get this show on the road") + //ses.enterState(READY) + case "EHLO": + ses.send("250-Great, let's get this show on the road") + ses.send("250-8BITMIME") + //ses.enterState(READY) + default: + ses.ooSeq(cmd) + } +} + +func (ses *Session) enterState(state State) { + ses.state = state + ses.trace("Entering state %v", state) +} + +func (ses *Session) greet() { + ses.send("+OK Inbucket POP3 server ready") +} + +// Calculate the next read or write deadline based on maxIdleSeconds +func (ses *Session) nextDeadline() time.Time { + return time.Now().Add(time.Duration(ses.server.maxIdleSeconds) * time.Second) +} + +// Send requested message, store errors in Session.sendError +func (ses *Session) send(msg string) { + if err := ses.conn.SetWriteDeadline(ses.nextDeadline()); err != nil { + ses.sendError = err + return + } + if _, err := fmt.Fprint(ses.conn, msg+"\r\n"); err != nil { + ses.sendError = err + ses.warn("Failed to send: '%v'", msg) + return + } + ses.trace(">> %v >>", msg) +} + +// readByteLine reads a line of input into the provided buffer. Does +// not reset the Buffer - please do so prior to calling. +func (ses *Session) readByteLine(buf *bytes.Buffer) error { + if err := ses.conn.SetReadDeadline(ses.nextDeadline()); err != nil { + return err + } + for { + line, err := ses.reader.ReadBytes('\r') + if err != nil { + return err + } + buf.Write(line) + // Read the next byte looking for '\n' + c, err := ses.reader.ReadByte() + if err != nil { + return err + } + buf.WriteByte(c) + if c == '\n' { + // We've reached the end of the line, return + return nil + } + // Else, keep looking + } + // Should be unreachable + return nil +} + +// Reads a line of input +func (ses *Session) readLine() (line string, err error) { + if err = ses.conn.SetReadDeadline(ses.nextDeadline()); err != nil { + return "", err + } + line, err = ses.reader.ReadString('\n') + if err != nil { + return "", err + } + ses.trace("<< %v <<", strings.TrimRight(line, "\r\n")) + return line, nil +} + +func (ses *Session) parseCmd(line string) (cmd string, args []string, ok bool) { + line = strings.TrimRight(line, "\r\n") + if len(line) == 0 { + return "", nil, true + } + + words := strings.Split(line, " ") + return strings.ToUpper(words[0]), words[1:], true +} + +func (ses *Session) reset() { + //ses.enterState(READY) +} + +func (ses *Session) ooSeq(cmd string) { + ses.send(fmt.Sprintf("-ERR Command %v is out of sequence", cmd)) + ses.warn("Wasn't expecting %v here", cmd) +} + +// Session specific logging methods +func (ses *Session) trace(msg string, args ...interface{}) { + log.Trace("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) +} + +func (ses *Session) info(msg string, args ...interface{}) { + log.Info("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) +} + +func (ses *Session) warn(msg string, args ...interface{}) { + // Update metrics + //expWarnsTotal.Add(1) + log.Warn("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) +} + +func (ses *Session) error(msg string, args ...interface{}) { + // Update metrics + //expErrorsTotal.Add(1) + log.Error("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) +} diff --git a/pop3d/listener.go b/pop3d/listener.go new file mode 100644 index 0000000..5a9377d --- /dev/null +++ b/pop3d/listener.go @@ -0,0 +1,95 @@ +package pop3d + +import ( + "fmt" + "github.com/jhillyerd/inbucket/config" + "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/smtpd" + "net" + "sync" + "time" +) + +// Real server code starts here +type Server struct { + maxIdleSeconds int + dataStore smtpd.DataStore + listener net.Listener + shutdown bool + waitgroup *sync.WaitGroup +} + +// Init a new Server object +func New() *Server { + // TODO is two filestores better/worse than sharing w/ smtpd? + ds := smtpd.NewFileDataStore() + cfg := config.GetPop3Config() + return &Server{dataStore: ds, maxIdleSeconds: cfg.MaxIdleSeconds, + waitgroup: new(sync.WaitGroup)} +} + +// Main listener loop +func (s *Server) Start() { + cfg := config.GetPop3Config() + addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%v:%v", + cfg.Ip4address, cfg.Ip4port)) + if err != nil { + log.Error("Failed to build tcp4 address: %v", err) + // TODO More graceful early-shutdown procedure + panic(err) + } + + log.Info("POP3 listening on TCP4 %v", addr) + s.listener, err = net.ListenTCP("tcp4", addr) + if err != nil { + log.Error("POP3 failed to start tcp4 listener: %v", err) + // TODO More graceful early-shutdown procedure + panic(err) + } + + // Handle incoming connections + var tempDelay time.Duration + for sid := 1; ; sid++ { + if conn, err := s.listener.Accept(); err != nil { + if nerr, ok := err.(net.Error); ok && nerr.Temporary() { + // Temporary error, sleep for a bit and try again + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + log.Error("POP3 accept error: %v; retrying in %v", err, tempDelay) + time.Sleep(tempDelay) + continue + } else { + if s.shutdown { + log.Trace("POP3 listener shutting down on request") + return + } + // TODO Implement a max error counter before shutdown? + // or maybe attempt to restart POP3 + panic(err) + } + } else { + tempDelay = 0 + s.waitgroup.Add(1) + go s.startSession(sid, conn) + } + } +} + +// Stop requests the POP3 server closes it's listener +func (s *Server) Stop() { + log.Trace("POP3 shutdown requested, connections will be drained") + s.shutdown = true + s.listener.Close() +} + +// Drain causes the caller to block until all active POP3 sessions have finished +func (s *Server) Drain() { + s.waitgroup.Wait() + log.Trace("POP3 connections drained") +} From 983b4f745a3370a008d1f3fa7db36aaae6b80b99 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Tue, 10 Sep 2013 17:56:04 -0700 Subject: [PATCH 2/6] More basic POP3 implementation Filestore/Datastore now supports a Size() method. --- pop3d/handler.go | 152 ++++++++++++++++++++++++++++++++------------- pop3d/listener.go | 2 +- smtpd/datastore.go | 1 + smtpd/filestore.go | 8 +++ 4 files changed, 118 insertions(+), 45 deletions(-) diff --git a/pop3d/handler.go b/pop3d/handler.go index 66166a7..b5ae365 100644 --- a/pop3d/handler.go +++ b/pop3d/handler.go @@ -6,9 +6,10 @@ import ( //"container/list" "fmt" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/smtpd" "io" "net" - //"strconv" + "strconv" "strings" "time" ) @@ -18,7 +19,6 @@ type State int const ( AUTHORIZATION State = iota // The client must now identify and authenticate TRANSACTION // Mailbox open, client may now issue commands - UPDATE // Purge deleted messages, cleanup QUIT ) @@ -28,8 +28,6 @@ func (s State) String() string { return "AUTHORIZATION" case TRANSACTION: return "TRANSACTION" - case UPDATE: - return "UPDATE" case QUIT: return "QUIT" } @@ -59,7 +57,9 @@ type Session struct { sendError error state State reader *bufio.Reader - user string + user string + mailbox smtpd.Mailbox + messages []smtpd.Message } func NewSession(server *Server, id int, conn net.Conn) *Session { @@ -81,7 +81,7 @@ func (ses *Session) String() string { * 5. Goto 2 */ func (s *Server) startSession(id int, conn net.Conn) { - log.Info("Connection from %v, starting session <%v>", conn.RemoteAddr(), id) + log.Info("POP3 connection from %v, starting session <%v>", conn.RemoteAddr(), id) //expConnectsCurrent.Add(1) defer func() { conn.Close() @@ -90,7 +90,7 @@ func (s *Server) startSession(id int, conn net.Conn) { }() ses := NewSession(s, id, conn) - ses.greet() + ses.send("+OK Inbucket POP3 server ready") // This is our command reading loop for ses.state != QUIT && ses.sendError == nil { @@ -115,22 +115,6 @@ func (s *Server) startSession(id int, conn net.Conn) { ses.send(fmt.Sprintf("-ERR %v command not implemented", cmd)) ses.warn("Command %v not implemented by Inbucket", cmd) continue - case "NOOP": - // TODO move to transaction state - ses.send("+OK I have sucessfully done nothing") - continue - case "RSET": - // TODO move to transaction state - // Reset session - ses.trace("Resetting session state on RSET request") - ses.reset() - ses.send("+OK Session reset") - continue - case "QUIT": - // TODO should be handled differently by transaciton - ses.send("+OK Goodnight and good luck") - ses.enterState(QUIT) - continue } // Send command to handler for current state @@ -139,10 +123,7 @@ func (s *Server) startSession(id int, conn net.Conn) { ses.authorizationHandler(cmd, arg) continue case TRANSACTION: - //ses.transactionHandler(cmd, arg) - continue - case UPDATE: - //ses.updateHandler(cmd, arg) + ses.transactionHandler(cmd, arg) continue } ses.error("Session entered unexpected state %v", ses.state) @@ -181,29 +162,112 @@ func (s *Server) startSession(id int, conn net.Conn) { } // AUTHORIZATION state -func (ses *Session) authorizationHandler(cmd string, arg []string) { +func (ses *Session) authorizationHandler(cmd string, args []string) { switch cmd { - case "HELO": - ses.send("250 Great, let's get this show on the road") - //ses.enterState(READY) - case "EHLO": - ses.send("250-Great, let's get this show on the road") - ses.send("250-8BITMIME") - //ses.enterState(READY) + case "QUIT": + ses.send("+OK Goodnight and good luck") + ses.enterState(QUIT) + case "USER": + if len(args) > 0 { + ses.user = args[0] + ses.send(fmt.Sprintf("+OK Hello %v, welcome to Inbucket", ses.user)) + } else { + ses.send("-ERR Missing username argument") + } + case "PASS": + if ses.user == "" { + ses.ooSeq(cmd) + } else { + var err error + ses.mailbox, err = ses.server.dataStore.MailboxFor(ses.user) + if err != nil { + ses.error("Failed to open mailbox for %v", ses.user) + ses.send(fmt.Sprintf("-ERR Failed to open mailbox for %v", ses.user)) + ses.enterState(QUIT) + return + } + ses.loadMailbox() + ses.send(fmt.Sprintf("+OK Found %v messages for %v", len(ses.messages), ses.user)) + ses.enterState(TRANSACTION) + } default: ses.ooSeq(cmd) } } +// TRANSACTION state +func (ses *Session) transactionHandler(cmd string, args []string) { + switch cmd { + case "LIST": + // TODO implement list argument + ses.send(fmt.Sprintf("+OK Listing %v messages", len(ses.messages))) + for i, msg := range ses.messages { + ses.send(fmt.Sprintf("%v %v", i+1, msg.Size())) + } + ses.send(".") + case "RETR": + if len(args) != 1 { + ses.warn("RETR command had invalid number of arguments") + ses.send("-ERR RETR command requires a single argument") + return + } + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("RETR command argument was not an integer") + ses.send("-ERR RETR command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("RETR command argument was less than 1") + ses.send("-ERR RETR argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("RETR command argument was greater than number of messages") + ses.send("-ERR RETR argument must not exceed the number of messages") + return + } + + // TODO actually retrieve the message... + ses.send("+OK") + case "QUIT": + ses.send("+OK We will process your deletes") + ses.processDeletes() + ses.enterState(QUIT) + case "NOOP": + ses.send("+OK I have sucessfully done nothing") + case "RSET": + // Reset session, don't actually delete anything I told you to + ses.trace("Resetting session state on RSET request") + ses.reset() + ses.send("+OK Session reset") + default: + ses.ooSeq(cmd) + } +} + +// Load the users mailbox +func (ses *Session) loadMailbox() { + var err error + ses.messages, err = ses.mailbox.GetMessages() + if err != nil { + ses.error("Failed to load messages for %v", ses.user) + } +} + +// This would be considered the "UPDATE" state in the RFC, but it does not fit +// with our state-machine design here, since no commands are accepted - it just +// indicates that the session was closed cleanly and that deletes should be +// processed. +func (ses *Session) processDeletes() { + ses.trace("Processing deletes") +} + func (ses *Session) enterState(state State) { ses.state = state ses.trace("Entering state %v", state) } -func (ses *Session) greet() { - ses.send("+OK Inbucket POP3 server ready") -} - // Calculate the next read or write deadline based on maxIdleSeconds func (ses *Session) nextDeadline() time.Time { return time.Now().Add(time.Duration(ses.server.maxIdleSeconds) * time.Second) @@ -266,7 +330,7 @@ func (ses *Session) readLine() (line string, err error) { func (ses *Session) parseCmd(line string) (cmd string, args []string, ok bool) { line = strings.TrimRight(line, "\r\n") - if len(line) == 0 { + if line == "" { return "", nil, true } @@ -285,21 +349,21 @@ func (ses *Session) ooSeq(cmd string) { // Session specific logging methods func (ses *Session) trace(msg string, args ...interface{}) { - log.Trace("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Trace("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) } func (ses *Session) info(msg string, args ...interface{}) { - log.Info("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Info("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) } func (ses *Session) warn(msg string, args ...interface{}) { // Update metrics //expWarnsTotal.Add(1) - log.Warn("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Warn("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) } func (ses *Session) error(msg string, args ...interface{}) { // Update metrics //expErrorsTotal.Add(1) - log.Error("%v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Error("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) } diff --git a/pop3d/listener.go b/pop3d/listener.go index 5a9377d..7fc9d30 100644 --- a/pop3d/listener.go +++ b/pop3d/listener.go @@ -34,7 +34,7 @@ func (s *Server) Start() { addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%v:%v", cfg.Ip4address, cfg.Ip4port)) if err != nil { - log.Error("Failed to build tcp4 address: %v", err) + log.Error("POP3 Failed to build tcp4 address: %v", err) // TODO More graceful early-shutdown procedure panic(err) } diff --git a/smtpd/datastore.go b/smtpd/datastore.go index cd770de..09d926d 100644 --- a/smtpd/datastore.go +++ b/smtpd/datastore.go @@ -30,4 +30,5 @@ type Message interface { Close() error Delete() error String() string + Size() int64 } diff --git a/smtpd/filestore.go b/smtpd/filestore.go index 67e8db2..7e8bdea 100644 --- a/smtpd/filestore.go +++ b/smtpd/filestore.go @@ -217,6 +217,14 @@ func (m *FileMessage) String() string { return fmt.Sprintf("\"%v\" from %v", m.Fsubject, m.Ffrom) } +func (m *FileMessage) Size() int64 { + fi, err := os.Stat(m.rawPath()) + if err != nil { + return 0 + } + return fi.Size() +} + func (m *FileMessage) gobPath() string { return filepath.Join(m.mailbox.path, m.Fid+".gob") } From 4649fd0b055435851458af163dfe27fd50fb616e Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Wed, 11 Sep 2013 23:12:22 -0700 Subject: [PATCH 3/6] Started impl DELE --- pop3d/handler.go | 164 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 141 insertions(+), 23 deletions(-) diff --git a/pop3d/handler.go b/pop3d/handler.go index b5ae365..9ffd6b1 100644 --- a/pop3d/handler.go +++ b/pop3d/handler.go @@ -50,16 +50,18 @@ var commands = map[string]bool{ } type Session struct { - server *Server - id int - conn net.Conn - remoteHost string - sendError error - state State - reader *bufio.Reader - user string - mailbox smtpd.Mailbox - messages []smtpd.Message + server *Server // Reference to the server we belong to + id int // Session ID number + conn net.Conn // Our network connection + remoteHost string // IP address of client + sendError error // Used to bail out of read loop on send error + state State // Current session state + reader *bufio.Reader // Buffered reader for our net conn + user string // Mailbox name + mailbox smtpd.Mailbox // Mailbox instance + messages []smtpd.Message // Slice of messages in mailbox + retain []bool // Messages to retain upon UPDATE (true=retain) + msgCount int // Number of undeleted messages } func NewSession(server *Server, id int, conn net.Conn) *Session { @@ -187,7 +189,7 @@ func (ses *Session) authorizationHandler(cmd string, args []string) { return } ses.loadMailbox() - ses.send(fmt.Sprintf("+OK Found %v messages for %v", len(ses.messages), ses.user)) + ses.send(fmt.Sprintf("+OK Found %v messages for %v", ses.msgCount, ses.user)) ses.enterState(TRANSACTION) } default: @@ -199,12 +201,111 @@ func (ses *Session) authorizationHandler(cmd string, args []string) { func (ses *Session) transactionHandler(cmd string, args []string) { switch cmd { case "LIST": - // TODO implement list argument - ses.send(fmt.Sprintf("+OK Listing %v messages", len(ses.messages))) - for i, msg := range ses.messages { - ses.send(fmt.Sprintf("%v %v", i+1, msg.Size())) + if len(args) > 1 { + ses.warn("LIST command had more than 1 argument") + ses.send("-ERR LIST command must have zero or one argument") + return + } + if len(args) == 1 { + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("LIST command argument was not an integer") + ses.send("-ERR LIST command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("LIST command argument was less than 1") + ses.send("-ERR LIST argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("LIST command argument was greater than number of messages") + ses.send("-ERR LIST argument must not exceed the number of messages") + return + } + if !ses.retain[msgNum-1] { + ses.warn("Client tried to LIST a message it had deleted") + ses.send(fmt.Sprintf("-ERR You deleted message %v", msgNum)) + return + } + ses.send(fmt.Sprintf("+OK %v %v", msgNum, ses.messages[msgNum-1].Size())) + } else { + ses.send(fmt.Sprintf("+OK Listing %v messages", ses.msgCount)) + for i, msg := range ses.messages { + if ses.retain[i] { + ses.send(fmt.Sprintf("%v %v", i+1, msg.Size())) + } + } + ses.send(".") + } + case "UIDL": + if len(args) > 1 { + ses.warn("UIDL command had more than 1 argument") + ses.send("-ERR UIDL command must have zero or one argument") + return + } + if len(args) == 1 { + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("UIDL command argument was not an integer") + ses.send("-ERR UIDL command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("UIDL command argument was less than 1") + ses.send("-ERR UIDL argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("UIDL command argument was greater than number of messages") + ses.send("-ERR UIDL argument must not exceed the number of messages") + return + } + if !ses.retain[msgNum-1] { + ses.warn("Client tried to UIDL a message it had deleted") + ses.send(fmt.Sprintf("-ERR You deleted message %v", msgNum)) + return + } + ses.send(fmt.Sprintf("+OK %v %v", msgNum, ses.messages[msgNum-1].Id())) + } else { + ses.send(fmt.Sprintf("+OK Listing %v messages", ses.msgCount)) + for i, msg := range ses.messages { + if ses.retain[i] { + ses.send(fmt.Sprintf("%v %v", i+1, msg.Id())) + } + } + ses.send(".") + } + case "DELE": + if len(args) != 1 { + ses.warn("DELE command had invalid number of arguments") + ses.send("-ERR DELE command requires a single argument") + return + } + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("DELE command argument was not an integer") + ses.send("-ERR DELE command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("DELE command argument was less than 1") + ses.send("-ERR DELE argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("DELE command argument was greater than number of messages") + ses.send("-ERR DELE argument must not exceed the number of messages") + return + } + if ses.retain[msgNum-1] { + ses.retain[msgNum-1] = false + ses.msgCount -= 1 + ses.send(fmt.Sprintf("+OK Deleted message %v", msgNum)) + } else { + ses.warn("Client tried to DELE an already deleted message") + ses.send(fmt.Sprintf("-ERR Message %v has already been deleted", msgNum)) } - ses.send(".") case "RETR": if len(args) != 1 { ses.warn("RETR command had invalid number of arguments") @@ -228,8 +329,14 @@ func (ses *Session) transactionHandler(cmd string, args []string) { return } - // TODO actually retrieve the message... - ses.send("+OK") + raw, err := ses.messages[msgNum-1].ReadRaw() + if err != nil { + ses.error("Failed to read message for RETR command") + ses.send("-ERR Failed to RETR that message, internal error") + return + } + ses.send(*raw) + ses.send(".") case "QUIT": ses.send("+OK We will process your deletes") ses.processDeletes() @@ -253,6 +360,17 @@ func (ses *Session) loadMailbox() { if err != nil { ses.error("Failed to load messages for %v", ses.user) } + + ses.retainAll() +} + +// Reset retain flag to true for all messages +func (ses *Session) retainAll() { + ses.retain = make([]bool, len(ses.messages)) + for i, _ := range ses.retain { + ses.retain[i] = true + } + ses.msgCount = len(ses.messages) } // This would be considered the "UPDATE" state in the RFC, but it does not fit @@ -339,7 +457,7 @@ func (ses *Session) parseCmd(line string) (cmd string, args []string, ok bool) { } func (ses *Session) reset() { - //ses.enterState(READY) + ses.retainAll() } func (ses *Session) ooSeq(cmd string) { @@ -349,21 +467,21 @@ func (ses *Session) ooSeq(cmd string) { // Session specific logging methods func (ses *Session) trace(msg string, args ...interface{}) { - log.Trace("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Trace("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) } func (ses *Session) info(msg string, args ...interface{}) { - log.Info("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Info("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) } func (ses *Session) warn(msg string, args ...interface{}) { // Update metrics //expWarnsTotal.Add(1) - log.Warn("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Warn("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) } func (ses *Session) error(msg string, args ...interface{}) { // Update metrics //expErrorsTotal.Add(1) - log.Error("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Error("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) } From 01f6ad514b05de3023d94ed4841fc5b37fa9d725 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Wed, 11 Sep 2013 23:12:22 -0700 Subject: [PATCH 4/6] Started impl DELE --- pop3d/handler.go | 265 ++++++++++++++++++++++++++++++++++++++++----- smtpd/datastore.go | 2 + smtpd/filestore.go | 19 +++- 3 files changed, 256 insertions(+), 30 deletions(-) diff --git a/pop3d/handler.go b/pop3d/handler.go index b5ae365..a831220 100644 --- a/pop3d/handler.go +++ b/pop3d/handler.go @@ -3,7 +3,6 @@ package pop3d import ( "bufio" "bytes" - //"container/list" "fmt" "github.com/jhillyerd/inbucket/log" "github.com/jhillyerd/inbucket/smtpd" @@ -50,16 +49,18 @@ var commands = map[string]bool{ } type Session struct { - server *Server - id int - conn net.Conn - remoteHost string - sendError error - state State - reader *bufio.Reader - user string - mailbox smtpd.Mailbox - messages []smtpd.Message + server *Server // Reference to the server we belong to + id int // Session ID number + conn net.Conn // Our network connection + remoteHost string // IP address of client + sendError error // Used to bail out of read loop on send error + state State // Current session state + reader *bufio.Reader // Buffered reader for our net conn + user string // Mailbox name + mailbox smtpd.Mailbox // Mailbox instance + messages []smtpd.Message // Slice of messages in mailbox + retain []bool // Messages to retain upon UPDATE (true=retain) + msgCount int // Number of undeleted messages } func NewSession(server *Server, id int, conn net.Conn) *Session { @@ -110,7 +111,7 @@ func (s *Server) startSession(id int, conn net.Conn) { // Commands we handle in any state switch cmd { - case "APOP", "TOP": + case "APOP": // These commands are not implemented in any state ses.send(fmt.Sprintf("-ERR %v command not implemented", cmd)) ses.warn("Command %v not implemented by Inbucket", cmd) @@ -187,7 +188,7 @@ func (ses *Session) authorizationHandler(cmd string, args []string) { return } ses.loadMailbox() - ses.send(fmt.Sprintf("+OK Found %v messages for %v", len(ses.messages), ses.user)) + ses.send(fmt.Sprintf("+OK Found %v messages for %v", ses.msgCount, ses.user)) ses.enterState(TRANSACTION) } default: @@ -199,12 +200,111 @@ func (ses *Session) authorizationHandler(cmd string, args []string) { func (ses *Session) transactionHandler(cmd string, args []string) { switch cmd { case "LIST": - // TODO implement list argument - ses.send(fmt.Sprintf("+OK Listing %v messages", len(ses.messages))) - for i, msg := range ses.messages { - ses.send(fmt.Sprintf("%v %v", i+1, msg.Size())) + if len(args) > 1 { + ses.warn("LIST command had more than 1 argument") + ses.send("-ERR LIST command must have zero or one argument") + return + } + if len(args) == 1 { + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("LIST command argument was not an integer") + ses.send("-ERR LIST command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("LIST command argument was less than 1") + ses.send("-ERR LIST argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("LIST command argument was greater than number of messages") + ses.send("-ERR LIST argument must not exceed the number of messages") + return + } + if !ses.retain[msgNum-1] { + ses.warn("Client tried to LIST a message it had deleted") + ses.send(fmt.Sprintf("-ERR You deleted message %v", msgNum)) + return + } + ses.send(fmt.Sprintf("+OK %v %v", msgNum, ses.messages[msgNum-1].Size())) + } else { + ses.send(fmt.Sprintf("+OK Listing %v messages", ses.msgCount)) + for i, msg := range ses.messages { + if ses.retain[i] { + ses.send(fmt.Sprintf("%v %v", i+1, msg.Size())) + } + } + ses.send(".") + } + case "UIDL": + if len(args) > 1 { + ses.warn("UIDL command had more than 1 argument") + ses.send("-ERR UIDL command must have zero or one argument") + return + } + if len(args) == 1 { + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("UIDL command argument was not an integer") + ses.send("-ERR UIDL command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("UIDL command argument was less than 1") + ses.send("-ERR UIDL argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("UIDL command argument was greater than number of messages") + ses.send("-ERR UIDL argument must not exceed the number of messages") + return + } + if !ses.retain[msgNum-1] { + ses.warn("Client tried to UIDL a message it had deleted") + ses.send(fmt.Sprintf("-ERR You deleted message %v", msgNum)) + return + } + ses.send(fmt.Sprintf("+OK %v %v", msgNum, ses.messages[msgNum-1].Id())) + } else { + ses.send(fmt.Sprintf("+OK Listing %v messages", ses.msgCount)) + for i, msg := range ses.messages { + if ses.retain[i] { + ses.send(fmt.Sprintf("%v %v", i+1, msg.Id())) + } + } + ses.send(".") + } + case "DELE": + if len(args) != 1 { + ses.warn("DELE command had invalid number of arguments") + ses.send("-ERR DELE command requires a single argument") + return + } + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("DELE command argument was not an integer") + ses.send("-ERR DELE command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("DELE command argument was less than 1") + ses.send("-ERR DELE argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("DELE command argument was greater than number of messages") + ses.send("-ERR DELE argument must not exceed the number of messages") + return + } + if ses.retain[msgNum-1] { + ses.retain[msgNum-1] = false + ses.msgCount -= 1 + ses.send(fmt.Sprintf("+OK Deleted message %v", msgNum)) + } else { + ses.warn("Client tried to DELE an already deleted message") + ses.send(fmt.Sprintf("-ERR Message %v has already been deleted", msgNum)) } - ses.send(".") case "RETR": if len(args) != 1 { ses.warn("RETR command had invalid number of arguments") @@ -227,9 +327,43 @@ func (ses *Session) transactionHandler(cmd string, args []string) { ses.send("-ERR RETR argument must not exceed the number of messages") return } + ses.sendMessage(ses.messages[msgNum-1]) + case "TOP": + if len(args) != 2 { + ses.warn("TOP command had invalid number of arguments") + ses.send("-ERR TOP command requires two arguments") + return + } + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("TOP command first argument was not an integer") + ses.send("-ERR TOP command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("TOP command first argument was less than 1") + ses.send("-ERR TOP first argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("TOP command first argument was greater than number of messages") + ses.send("-ERR TOP first argument must not exceed the number of messages") + return + } - // TODO actually retrieve the message... - ses.send("+OK") + var lines int64 + lines, err = strconv.ParseInt(args[1], 10, 32) + if err != nil { + ses.warn("TOP command second argument was not an integer") + ses.send("-ERR TOP command requires an integer argument") + return + } + if lines < 0 { + ses.warn("TOP command second argument was negative") + ses.send("-ERR TOP second argument must be non-negative") + return + } + ses.sendMessageTop(ses.messages[msgNum-1], int(lines)) case "QUIT": ses.send("+OK We will process your deletes") ses.processDeletes() @@ -246,6 +380,76 @@ func (ses *Session) transactionHandler(cmd string, args []string) { } } +// Send the contents of the message to the client +func (ses *Session) sendMessage(msg smtpd.Message) { + reader, err := msg.RawReader() + defer reader.Close() + if err != nil { + ses.error("Failed to read message for RETR command") + ses.send("-ERR Failed to RETR that message, internal error") + return + } + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + line := scanner.Text() + // Lines starting with . must be prefixed with another . + if strings.HasPrefix(line, ".") { + line = "." + line + } + ses.send(line) + } + + if err = scanner.Err(); err != nil { + ses.error("Failed to read message for RETR command") + ses.send(".") + ses.send("-ERR Failed to RETR that message, internal error") + return + } + ses.send(".") +} + +// Send the headers plus the top N lines to the client +func (ses *Session) sendMessageTop(msg smtpd.Message, lineCount int) { + reader, err := msg.RawReader() + defer reader.Close() + if err != nil { + ses.error("Failed to read message for RETR command") + ses.send("-ERR Failed to RETR that message, internal error") + return + } + scanner := bufio.NewScanner(reader) + inBody := false + for scanner.Scan() { + line := scanner.Text() + // Lines starting with . must be prefixed with another . + if strings.HasPrefix(line, ".") { + line = "." + line + } + if inBody { + // Check if we need to send anymore lines + if lineCount < 1 { + break + } else { + lineCount -= 1 + } + } else { + if line == "" { + // We've hit the end of the header + inBody = true + } + } + ses.send(line) + } + + if err = scanner.Err(); err != nil { + ses.error("Failed to read message for RETR command") + ses.send(".") + ses.send("-ERR Failed to RETR that message, internal error") + return + } + ses.send(".") +} + // Load the users mailbox func (ses *Session) loadMailbox() { var err error @@ -253,6 +457,17 @@ func (ses *Session) loadMailbox() { if err != nil { ses.error("Failed to load messages for %v", ses.user) } + + ses.retainAll() +} + +// Reset retain flag to true for all messages +func (ses *Session) retainAll() { + ses.retain = make([]bool, len(ses.messages)) + for i, _ := range ses.retain { + ses.retain[i] = true + } + ses.msgCount = len(ses.messages) } // This would be considered the "UPDATE" state in the RFC, but it does not fit @@ -339,7 +554,7 @@ func (ses *Session) parseCmd(line string) (cmd string, args []string, ok bool) { } func (ses *Session) reset() { - //ses.enterState(READY) + ses.retainAll() } func (ses *Session) ooSeq(cmd string) { @@ -349,21 +564,21 @@ func (ses *Session) ooSeq(cmd string) { // Session specific logging methods func (ses *Session) trace(msg string, args ...interface{}) { - log.Trace("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Trace("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) } func (ses *Session) info(msg string, args ...interface{}) { - log.Info("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Info("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) } func (ses *Session) warn(msg string, args ...interface{}) { // Update metrics //expWarnsTotal.Add(1) - log.Warn("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Warn("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) } func (ses *Session) error(msg string, args ...interface{}) { // Update metrics //expErrorsTotal.Add(1) - log.Error("POP3 %v<%v> %v", ses.remoteHost, ses.id, fmt.Sprintf(msg, args...)) + log.Error("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) } diff --git a/smtpd/datastore.go b/smtpd/datastore.go index 09d926d..40a2b12 100644 --- a/smtpd/datastore.go +++ b/smtpd/datastore.go @@ -2,6 +2,7 @@ package smtpd import ( "github.com/jhillyerd/go.enmime" + "io" "net/mail" "time" ) @@ -23,6 +24,7 @@ type Message interface { From() string Date() time.Time Subject() string + RawReader() (reader io.ReadCloser, err error) ReadHeader() (msg *mail.Message, err error) ReadBody() (msg *mail.Message, body *enmime.MIMEBody, err error) ReadRaw() (raw *string, err error) diff --git a/smtpd/filestore.go b/smtpd/filestore.go index 7e8bdea..b38d4f8 100644 --- a/smtpd/filestore.go +++ b/smtpd/filestore.go @@ -8,6 +8,7 @@ import ( "github.com/jhillyerd/go.enmime" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "io" "io/ioutil" "net/mail" "os" @@ -266,15 +267,23 @@ func (m *FileMessage) ReadBody() (msg *mail.Message, body *enmime.MIMEBody, err return msg, mime, err } -// ReadRaw opens the .raw portion of a Message and returns it as a string -func (m *FileMessage) ReadRaw() (raw *string, err error) { +// RawReader opens the .raw portion of a Message as an io.ReadCloser +func (m *FileMessage) RawReader() (reader io.ReadCloser, err error) { file, err := os.Open(m.rawPath()) - defer file.Close() if err != nil { return nil, err } - reader := bufio.NewReader(file) - bodyBytes, err := ioutil.ReadAll(reader) + return file, nil +} + +// ReadRaw opens the .raw portion of a Message and returns it as a string +func (m *FileMessage) ReadRaw() (raw *string, err error) { + reader, err := m.RawReader() + defer reader.Close() + if err != nil { + return nil, err + } + bodyBytes, err := ioutil.ReadAll(bufio.NewReader(reader)) if err != nil { return nil, err } From 06ce860435c11d46d44274272015b9e14a2a2e36 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Thu, 12 Sep 2013 17:00:08 -0700 Subject: [PATCH 5/6] Impl deletes, update SMTP session tracing --- pop3d/handler.go | 8 +++++++- smtpd/handler.go | 12 ++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pop3d/handler.go b/pop3d/handler.go index a1a4299..01f10e5 100644 --- a/pop3d/handler.go +++ b/pop3d/handler.go @@ -492,7 +492,13 @@ func (ses *Session) retainAll() { // indicates that the session was closed cleanly and that deletes should be // processed. func (ses *Session) processDeletes() { - ses.trace("Processing deletes") + ses.info("Processing deletes") + for i, msg := range ses.messages { + if !ses.retain[i] { + ses.trace("Deleting %v", msg) + msg.Delete() + } + } } func (ses *Session) enterState(state State) { diff --git a/smtpd/handler.go b/smtpd/handler.go index 2986fb2..918df0a 100644 --- a/smtpd/handler.go +++ b/smtpd/handler.go @@ -83,12 +83,12 @@ func (ss *Session) String() string { /* Session flow: * 1. Send initial greeting * 2. Receive cmd - * 3. If good cmd, respond, optionally change state + * 3. If good cmd, respond, optionally change state * 4. If bad cmd, respond error * 5. Goto 2 */ func (s *Server) startSession(id int, conn net.Conn) { - log.Info("Connection from %v, starting session <%v>", conn.RemoteAddr(), id) + log.Info("SMTP Connection from %v, starting session <%v>", conn.RemoteAddr(), id) expConnectsCurrent.Add(1) defer func() { conn.Close() @@ -512,21 +512,21 @@ func (ss *Session) ooSeq(cmd string) { // Session specific logging methods func (ss *Session) trace(msg string, args ...interface{}) { - log.Trace("%v<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...)) + log.Trace("SMTP<%v> %v", ss.id, fmt.Sprintf(msg, args...)) } func (ss *Session) info(msg string, args ...interface{}) { - log.Info("%v<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...)) + log.Info("SMTP<%v> %v", ss.id, fmt.Sprintf(msg, args...)) } func (ss *Session) warn(msg string, args ...interface{}) { // Update metrics expWarnsTotal.Add(1) - log.Warn("%v<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...)) + log.Warn("SMTP<%v> %v", ss.id, fmt.Sprintf(msg, args...)) } func (ss *Session) error(msg string, args ...interface{}) { // Update metrics expErrorsTotal.Add(1) - log.Error("%v<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...)) + log.Error("SMTP<%v> %v", ss.id, fmt.Sprintf(msg, args...)) } From aa7b7603853bce8d8406e8b8f85bb705f22837b7 Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Thu, 12 Sep 2013 22:07:24 -0700 Subject: [PATCH 6/6] POP3 is working Added pop3.domain config option (for APOP greeting) Implemented CAPA command Implemented APOP "encrypted" authorization Updated all sample config files to include [pop3] section Closes #8 --- config/config.go | 9 +++++++++ etc/devel.conf | 3 +++ etc/inbucket.conf | 18 +++++++++++++++++- etc/unix-sample.conf | 18 +++++++++++++++++- etc/win-sample.conf | 18 +++++++++++++++++- pop3d/handler.go | 35 ++++++++++++++++++++++++++++++----- pop3d/listener.go | 13 +++++++------ 7 files changed, 100 insertions(+), 14 deletions(-) diff --git a/config/config.go b/config/config.go index 62c7f69..bfa1e5f 100644 --- a/config/config.go +++ b/config/config.go @@ -25,6 +25,7 @@ type SmtpConfig struct { type Pop3Config struct { Ip4address net.IP Ip4port int + Domain string MaxIdleSeconds int } @@ -109,6 +110,7 @@ func LoadConfig(filename string) error { requireOption(messages, "smtp", "store.messages") requireOption(messages, "pop3", "ip4.address") requireOption(messages, "pop3", "ip4.port") + requireOption(messages, "pop3", "domain") requireOption(messages, "pop3", "max.idle.seconds") requireOption(messages, "web", "ip4.address") requireOption(messages, "web", "ip4.port") @@ -262,6 +264,13 @@ func parsePop3Config() error { return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } + option = "domain" + str, err = Config.String(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + pop3Config.Domain = str + option = "max.idle.seconds" pop3Config.MaxIdleSeconds, err = Config.Int(section, option) if err != nil { diff --git a/etc/devel.conf b/etc/devel.conf index bcca491..2ce5c4b 100644 --- a/etc/devel.conf +++ b/etc/devel.conf @@ -53,6 +53,9 @@ ip4.address=0.0.0.0 # IPv4 port to listen for POP3 connections on. ip4.port=1100 +# used in POP3 greeting +domain=inbucket.local + # How long we allow a network connection to be idle before hanging up on the # client, POP3 RFC requires at least 10 minutes (600 seconds). max.idle.seconds=600 diff --git a/etc/inbucket.conf b/etc/inbucket.conf index f8ca8b7..a6c9c81 100644 --- a/etc/inbucket.conf +++ b/etc/inbucket.conf @@ -37,13 +37,29 @@ max.recipients=100 # client, SMTP RFC recommends at least 5 minutes (300 seconds). max.idle.seconds=300 -# Maximum allowable size of message body in bytes (including attachments) +# Maximum allowable size of message body in bytes (including attachments) max.message.bytes=2048000 # Should we place messages into the datastore, or just throw them away # (for load testing): true or false store.messages=true +############################################################################# +[pop3] + +# IPv4 address to listen for POP3 connections on. +ip4.address=0.0.0.0 + +# IPv4 port to listen for POP3 connections on. +ip4.port=1100 + +# used in POP3 greeting +domain=inbucket.local + +# How long we allow a network connection to be idle before hanging up on the +# client, POP3 RFC requires at least 10 minutes (600 seconds). +max.idle.seconds=600 + ############################################################################# [web] diff --git a/etc/unix-sample.conf b/etc/unix-sample.conf index f42c2f2..17b7b9d 100644 --- a/etc/unix-sample.conf +++ b/etc/unix-sample.conf @@ -37,13 +37,29 @@ max.recipients=100 # client, SMTP RFC recommends at least 5 minutes (300 seconds). max.idle.seconds=300 -# Maximum allowable size of message body in bytes (including attachments) +# Maximum allowable size of message body in bytes (including attachments) max.message.bytes=2048000 # Should we place messages into the datastore, or just throw them away # (for load testing): true or false store.messages=true +############################################################################# +[pop3] + +# IPv4 address to listen for POP3 connections on. +ip4.address=0.0.0.0 + +# IPv4 port to listen for POP3 connections on. +ip4.port=110 + +# used in POP3 greeting +domain=inbucket.local + +# How long we allow a network connection to be idle before hanging up on the +# client, POP3 RFC requires at least 10 minutes (600 seconds). +max.idle.seconds=600 + ############################################################################# [web] diff --git a/etc/win-sample.conf b/etc/win-sample.conf index a6ea92b..b0fcdcf 100644 --- a/etc/win-sample.conf +++ b/etc/win-sample.conf @@ -37,13 +37,29 @@ max.recipients=100 # client, SMTP RFC recommends at least 5 minutes (300 seconds). max.idle.seconds=300 -# Maximum allowable size of message body in bytes (including attachments) +# Maximum allowable size of message body in bytes (including attachments) max.message.bytes=2048000 # Should we place messages into the datastore, or just throw them away # (for load testing): true or false store.messages=true +############################################################################# +[pop3] + +# IPv4 address to listen for POP3 connections on. +ip4.address=0.0.0.0 + +# IPv4 port to listen for POP3 connections on. +ip4.port=1100 + +# used in POP3 greeting +domain=inbucket.local + +# How long we allow a network connection to be idle before hanging up on the +# client, POP3 RFC requires at least 10 minutes (600 seconds). +max.idle.seconds=600 + ############################################################################# [web] diff --git a/pop3d/handler.go b/pop3d/handler.go index 01f10e5..3bacd51 100644 --- a/pop3d/handler.go +++ b/pop3d/handler.go @@ -8,6 +8,7 @@ import ( "github.com/jhillyerd/inbucket/smtpd" "io" "net" + "os" "strconv" "strings" "time" @@ -46,6 +47,7 @@ var commands = map[string]bool{ "USER": true, "PASS": true, "APOP": true, + "CAPA": true, } type Session struct { @@ -91,7 +93,8 @@ func (s *Server) startSession(id int, conn net.Conn) { }() ses := NewSession(s, id, conn) - ses.send("+OK Inbucket POP3 server ready") + ses.send(fmt.Sprintf("+OK Inbucket POP3 server ready <%v.%v@%v>", os.Getpid(), + time.Now().Unix(), s.domain)) // This is our command reading loop for ses.state != QUIT && ses.sendError == nil { @@ -111,10 +114,14 @@ func (s *Server) startSession(id int, conn net.Conn) { // Commands we handle in any state switch cmd { - case "APOP": - // These commands are not implemented in any state - ses.send(fmt.Sprintf("-ERR %v command not implemented", cmd)) - ses.warn("Command %v not implemented by Inbucket", cmd) + case "CAPA": + // List our capabilities per RFC2449 + ses.send("+OK Capability list follows") + ses.send("TOP") + ses.send("USER") + ses.send("UIDL") + ses.send("IMPLEMENTATION Inbucket") + ses.send(".") continue } @@ -191,6 +198,24 @@ func (ses *Session) authorizationHandler(cmd string, args []string) { ses.send(fmt.Sprintf("+OK Found %v messages for %v", ses.msgCount, ses.user)) ses.enterState(TRANSACTION) } + case "APOP": + if len(args) != 2 { + ses.warn("Expected two arguments for APOP") + ses.send("-ERR APOP requires two arguments") + return + } + ses.user = args[0] + var err error + ses.mailbox, err = ses.server.dataStore.MailboxFor(ses.user) + if err != nil { + ses.error("Failed to open mailbox for %v", ses.user) + ses.send(fmt.Sprintf("-ERR Failed to open mailbox for %v", ses.user)) + ses.enterState(QUIT) + return + } + ses.loadMailbox() + ses.send(fmt.Sprintf("+OK Found %v messages for %v", ses.msgCount, ses.user)) + ses.enterState(TRANSACTION) default: ses.ooSeq(cmd) } diff --git a/pop3d/listener.go b/pop3d/listener.go index 7fc9d30..3bbfb73 100644 --- a/pop3d/listener.go +++ b/pop3d/listener.go @@ -12,11 +12,12 @@ import ( // Real server code starts here type Server struct { - maxIdleSeconds int - dataStore smtpd.DataStore - listener net.Listener - shutdown bool - waitgroup *sync.WaitGroup + domain string + maxIdleSeconds int + dataStore smtpd.DataStore + listener net.Listener + shutdown bool + waitgroup *sync.WaitGroup } // Init a new Server object @@ -24,7 +25,7 @@ func New() *Server { // TODO is two filestores better/worse than sharing w/ smtpd? ds := smtpd.NewFileDataStore() cfg := config.GetPop3Config() - return &Server{dataStore: ds, maxIdleSeconds: cfg.MaxIdleSeconds, + return &Server{domain: cfg.Domain, dataStore: ds, maxIdleSeconds: cfg.MaxIdleSeconds, waitgroup: new(sync.WaitGroup)} }