diff --git a/config/config.go b/config/config.go index 2cdfd9b..bfa1e5f 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,13 @@ type SmtpConfig struct { StoreMessages bool } +type Pop3Config struct { + Ip4address net.IP + Ip4port int + Domain string + MaxIdleSeconds int +} + type WebConfig struct { Ip4address net.IP Ip4port int @@ -42,6 +49,7 @@ var ( // Parsed specific configs smtpConfig *SmtpConfig + pop3Config *Pop3Config webConfig *WebConfig dataStoreConfig *DataStoreConfig ) @@ -51,6 +59,11 @@ func GetSmtpConfig() SmtpConfig { return *smtpConfig } +// GetPop3Config returns a copy of the Pop3Config object +func GetPop3Config() Pop3Config { + return *pop3Config +} + // GetWebConfig returns a copy of the WebConfig object func GetWebConfig() WebConfig { return *webConfig @@ -75,6 +88,7 @@ func LoadConfig(filename string) error { // Validate sections requireSection(messages, "logging") requireSection(messages, "smtp") + requireSection(messages, "pop3") requireSection(messages, "web") requireSection(messages, "datastore") if messages.Len() > 0 { @@ -94,6 +108,10 @@ func LoadConfig(filename string) error { requireOption(messages, "smtp", "max.idle.seconds") requireOption(messages, "smtp", "max.message.bytes") requireOption(messages, "smtp", "store.messages") + requireOption(messages, "pop3", "ip4.address") + requireOption(messages, "pop3", "ip4.port") + requireOption(messages, "pop3", "domain") + requireOption(messages, "pop3", "max.idle.seconds") requireOption(messages, "web", "ip4.address") requireOption(messages, "web", "ip4.port") requireOption(messages, "web", "template.dir") @@ -116,6 +134,10 @@ func LoadConfig(filename string) error { return err } + if err = parsePop3Config(); err != nil { + return err + } + if err = parseWebConfig(); err != nil { return err } @@ -215,6 +237,49 @@ func parseSmtpConfig() error { return nil } +// parsePop3Config trying to catch config errors early +func parsePop3Config() error { + pop3Config = new(Pop3Config) + section := "pop3" + + // Parse IP4 address only, error on IP6. + option := "ip4.address" + str, err := Config.String(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + addr := net.ParseIP(str) + if addr == nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + addr = addr.To4() + if addr == nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v' not IPv4!", section, option, err) + } + pop3Config.Ip4address = addr + + option = "ip4.port" + pop3Config.Ip4port, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + + option = "domain" + str, err = Config.String(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + pop3Config.Domain = str + + option = "max.idle.seconds" + pop3Config.MaxIdleSeconds, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + + return nil +} + // parseWebConfig trying to catch config errors early func parseWebConfig() error { webConfig = new(WebConfig) diff --git a/etc/devel.conf b/etc/devel.conf index 5d888b6..2ce5c4b 100644 --- a/etc/devel.conf +++ b/etc/devel.conf @@ -37,13 +37,29 @@ max.recipients=100 # client, SMTP RFC recommends at least 5 minutes (300 seconds). max.idle.seconds=30 -# Maximum allowable size of message body in bytes (including attachments) +# Maximum allowable size of message body in bytes (including attachments) max.message.bytes=20480000 # Should we place messages into the datastore, or just throw them away # (for load testing): true or false store.messages=true +############################################################################# +[pop3] + +# IPv4 address to listen for POP3 connections on. +ip4.address=0.0.0.0 + +# IPv4 port to listen for POP3 connections on. +ip4.port=1100 + +# used in POP3 greeting +domain=inbucket.local + +# How long we allow a network connection to be idle before hanging up on the +# client, POP3 RFC requires at least 10 minutes (600 seconds). +max.idle.seconds=600 + ############################################################################# [web] diff --git a/etc/inbucket.conf b/etc/inbucket.conf index f8ca8b7..a6c9c81 100644 --- a/etc/inbucket.conf +++ b/etc/inbucket.conf @@ -37,13 +37,29 @@ max.recipients=100 # client, SMTP RFC recommends at least 5 minutes (300 seconds). max.idle.seconds=300 -# Maximum allowable size of message body in bytes (including attachments) +# Maximum allowable size of message body in bytes (including attachments) max.message.bytes=2048000 # Should we place messages into the datastore, or just throw them away # (for load testing): true or false store.messages=true +############################################################################# +[pop3] + +# IPv4 address to listen for POP3 connections on. +ip4.address=0.0.0.0 + +# IPv4 port to listen for POP3 connections on. +ip4.port=1100 + +# used in POP3 greeting +domain=inbucket.local + +# How long we allow a network connection to be idle before hanging up on the +# client, POP3 RFC requires at least 10 minutes (600 seconds). +max.idle.seconds=600 + ############################################################################# [web] diff --git a/etc/unix-sample.conf b/etc/unix-sample.conf index f42c2f2..17b7b9d 100644 --- a/etc/unix-sample.conf +++ b/etc/unix-sample.conf @@ -37,13 +37,29 @@ max.recipients=100 # client, SMTP RFC recommends at least 5 minutes (300 seconds). max.idle.seconds=300 -# Maximum allowable size of message body in bytes (including attachments) +# Maximum allowable size of message body in bytes (including attachments) max.message.bytes=2048000 # Should we place messages into the datastore, or just throw them away # (for load testing): true or false store.messages=true +############################################################################# +[pop3] + +# IPv4 address to listen for POP3 connections on. +ip4.address=0.0.0.0 + +# IPv4 port to listen for POP3 connections on. +ip4.port=110 + +# used in POP3 greeting +domain=inbucket.local + +# How long we allow a network connection to be idle before hanging up on the +# client, POP3 RFC requires at least 10 minutes (600 seconds). +max.idle.seconds=600 + ############################################################################# [web] diff --git a/etc/win-sample.conf b/etc/win-sample.conf index a6ea92b..b0fcdcf 100644 --- a/etc/win-sample.conf +++ b/etc/win-sample.conf @@ -37,13 +37,29 @@ max.recipients=100 # client, SMTP RFC recommends at least 5 minutes (300 seconds). max.idle.seconds=300 -# Maximum allowable size of message body in bytes (including attachments) +# Maximum allowable size of message body in bytes (including attachments) max.message.bytes=2048000 # Should we place messages into the datastore, or just throw them away # (for load testing): true or false store.messages=true +############################################################################# +[pop3] + +# IPv4 address to listen for POP3 connections on. +ip4.address=0.0.0.0 + +# IPv4 port to listen for POP3 connections on. +ip4.port=1100 + +# used in POP3 greeting +domain=inbucket.local + +# How long we allow a network connection to be idle before hanging up on the +# client, POP3 RFC requires at least 10 minutes (600 seconds). +max.idle.seconds=600 + ############################################################################# [web] diff --git a/inbucket.go b/inbucket.go index fb5f262..674b265 100644 --- a/inbucket.go +++ b/inbucket.go @@ -9,6 +9,7 @@ import ( "fmt" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/pop3d" "github.com/jhillyerd/inbucket/smtpd" "github.com/jhillyerd/inbucket/web" golog "log" @@ -30,6 +31,7 @@ var startTime = time.Now() var logf *os.File var smtpServer *smtpd.Server +var pop3Server *pop3d.Server func main() { flag.Parse() @@ -95,11 +97,17 @@ func main() { // Start HTTP server go web.Start() + // Start POP3 server + pop3Server = pop3d.New() + go pop3Server.Start() + // Startup SMTP server, block until it exits smtpServer = smtpd.New() smtpServer.Start() + // Wait for active connections to finish smtpServer.Drain() + pop3Server.Drain() } // openLogFile creates or appends to the logfile passed on commandline diff --git a/pop3d/handler.go b/pop3d/handler.go new file mode 100644 index 0000000..3bacd51 --- /dev/null +++ b/pop3d/handler.go @@ -0,0 +1,632 @@ +package pop3d + +import ( + "bufio" + "bytes" + "fmt" + "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/smtpd" + "io" + "net" + "os" + "strconv" + "strings" + "time" +) + +type State int + +const ( + AUTHORIZATION State = iota // The client must now identify and authenticate + TRANSACTION // Mailbox open, client may now issue commands + QUIT +) + +func (s State) String() string { + switch s { + case AUTHORIZATION: + return "AUTHORIZATION" + case TRANSACTION: + return "TRANSACTION" + case QUIT: + return "QUIT" + } + return "Unknown" +} + +var commands = map[string]bool{ + "QUIT": true, + "STAT": true, + "LIST": true, + "RETR": true, + "DELE": true, + "NOOP": true, + "RSET": true, + "TOP": true, + "UIDL": true, + "USER": true, + "PASS": true, + "APOP": true, + "CAPA": true, +} + +type Session struct { + server *Server // Reference to the server we belong to + id int // Session ID number + conn net.Conn // Our network connection + remoteHost string // IP address of client + sendError error // Used to bail out of read loop on send error + state State // Current session state + reader *bufio.Reader // Buffered reader for our net conn + user string // Mailbox name + mailbox smtpd.Mailbox // Mailbox instance + messages []smtpd.Message // Slice of messages in mailbox + retain []bool // Messages to retain upon UPDATE (true=retain) + msgCount int // Number of undeleted messages +} + +func NewSession(server *Server, id int, conn net.Conn) *Session { + reader := bufio.NewReader(conn) + host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + return &Session{server: server, id: id, conn: conn, state: AUTHORIZATION, + reader: reader, remoteHost: host} +} + +func (ses *Session) String() string { + return fmt.Sprintf("Session{id: %v, state: %v}", ses.id, ses.state) +} + +/* Session flow: + * 1. Send initial greeting + * 2. Receive cmd + * 3. If good cmd, respond, optionally change state + * 4. If bad cmd, respond error + * 5. Goto 2 + */ +func (s *Server) startSession(id int, conn net.Conn) { + log.Info("POP3 connection from %v, starting session <%v>", conn.RemoteAddr(), id) + //expConnectsCurrent.Add(1) + defer func() { + conn.Close() + s.waitgroup.Done() + //expConnectsCurrent.Add(-1) + }() + + ses := NewSession(s, id, conn) + ses.send(fmt.Sprintf("+OK Inbucket POP3 server ready <%v.%v@%v>", os.Getpid(), + time.Now().Unix(), s.domain)) + + // This is our command reading loop + for ses.state != QUIT && ses.sendError == nil { + line, err := ses.readLine() + if err == nil { + if cmd, arg, ok := ses.parseCmd(line); ok { + // Check against valid SMTP commands + if cmd == "" { + ses.send("-ERR Speak up") + continue + } + if !commands[cmd] { + ses.send(fmt.Sprintf("-ERR Syntax error, %v command unrecognized", cmd)) + ses.warn("Unrecognized command: %v", cmd) + continue + } + + // Commands we handle in any state + switch cmd { + case "CAPA": + // List our capabilities per RFC2449 + ses.send("+OK Capability list follows") + ses.send("TOP") + ses.send("USER") + ses.send("UIDL") + ses.send("IMPLEMENTATION Inbucket") + ses.send(".") + continue + } + + // Send command to handler for current state + switch ses.state { + case AUTHORIZATION: + ses.authorizationHandler(cmd, arg) + continue + case TRANSACTION: + ses.transactionHandler(cmd, arg) + continue + } + ses.error("Session entered unexpected state %v", ses.state) + break + } else { + ses.send("-ERR Syntax error, command garbled") + } + } else { + // readLine() returned an error + if err == io.EOF { + switch ses.state { + case AUTHORIZATION: + // EOF is common here + ses.info("Client closed connection (state %v)", ses.state) + default: + ses.warn("Got EOF while in state %v", ses.state) + } + break + } + // not an EOF + ses.warn("Connection error: %v", err) + if netErr, ok := err.(net.Error); ok { + if netErr.Timeout() { + ses.send("-ERR Idle timeout, bye bye") + break + } + } + ses.send("-ERR Connection error, sorry") + break + } + } + if ses.sendError != nil { + ses.warn("Network send error: %v", ses.sendError) + } + ses.info("Closing connection") +} + +// AUTHORIZATION state +func (ses *Session) authorizationHandler(cmd string, args []string) { + switch cmd { + case "QUIT": + ses.send("+OK Goodnight and good luck") + ses.enterState(QUIT) + case "USER": + if len(args) > 0 { + ses.user = args[0] + ses.send(fmt.Sprintf("+OK Hello %v, welcome to Inbucket", ses.user)) + } else { + ses.send("-ERR Missing username argument") + } + case "PASS": + if ses.user == "" { + ses.ooSeq(cmd) + } else { + var err error + ses.mailbox, err = ses.server.dataStore.MailboxFor(ses.user) + if err != nil { + ses.error("Failed to open mailbox for %v", ses.user) + ses.send(fmt.Sprintf("-ERR Failed to open mailbox for %v", ses.user)) + ses.enterState(QUIT) + return + } + ses.loadMailbox() + ses.send(fmt.Sprintf("+OK Found %v messages for %v", ses.msgCount, ses.user)) + ses.enterState(TRANSACTION) + } + case "APOP": + if len(args) != 2 { + ses.warn("Expected two arguments for APOP") + ses.send("-ERR APOP requires two arguments") + return + } + ses.user = args[0] + var err error + ses.mailbox, err = ses.server.dataStore.MailboxFor(ses.user) + if err != nil { + ses.error("Failed to open mailbox for %v", ses.user) + ses.send(fmt.Sprintf("-ERR Failed to open mailbox for %v", ses.user)) + ses.enterState(QUIT) + return + } + ses.loadMailbox() + ses.send(fmt.Sprintf("+OK Found %v messages for %v", ses.msgCount, ses.user)) + ses.enterState(TRANSACTION) + default: + ses.ooSeq(cmd) + } +} + +// TRANSACTION state +func (ses *Session) transactionHandler(cmd string, args []string) { + switch cmd { + case "STAT": + if len(args) != 0 { + ses.warn("STAT got an unexpected argument") + ses.send("-ERR STAT command must have no arguments") + return + } + var count int + var size int64 + for i, msg := range ses.messages { + if ses.retain[i] { + count += 1 + size += msg.Size() + } + } + ses.send(fmt.Sprintf("+OK %v %v", count, size)) + case "LIST": + if len(args) > 1 { + ses.warn("LIST command had more than 1 argument") + ses.send("-ERR LIST command must have zero or one argument") + return + } + if len(args) == 1 { + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("LIST command argument was not an integer") + ses.send("-ERR LIST command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("LIST command argument was less than 1") + ses.send("-ERR LIST argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("LIST command argument was greater than number of messages") + ses.send("-ERR LIST argument must not exceed the number of messages") + return + } + if !ses.retain[msgNum-1] { + ses.warn("Client tried to LIST a message it had deleted") + ses.send(fmt.Sprintf("-ERR You deleted message %v", msgNum)) + return + } + ses.send(fmt.Sprintf("+OK %v %v", msgNum, ses.messages[msgNum-1].Size())) + } else { + ses.send(fmt.Sprintf("+OK Listing %v messages", ses.msgCount)) + for i, msg := range ses.messages { + if ses.retain[i] { + ses.send(fmt.Sprintf("%v %v", i+1, msg.Size())) + } + } + ses.send(".") + } + case "UIDL": + if len(args) > 1 { + ses.warn("UIDL command had more than 1 argument") + ses.send("-ERR UIDL command must have zero or one argument") + return + } + if len(args) == 1 { + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("UIDL command argument was not an integer") + ses.send("-ERR UIDL command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("UIDL command argument was less than 1") + ses.send("-ERR UIDL argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("UIDL command argument was greater than number of messages") + ses.send("-ERR UIDL argument must not exceed the number of messages") + return + } + if !ses.retain[msgNum-1] { + ses.warn("Client tried to UIDL a message it had deleted") + ses.send(fmt.Sprintf("-ERR You deleted message %v", msgNum)) + return + } + ses.send(fmt.Sprintf("+OK %v %v", msgNum, ses.messages[msgNum-1].Id())) + } else { + ses.send(fmt.Sprintf("+OK Listing %v messages", ses.msgCount)) + for i, msg := range ses.messages { + if ses.retain[i] { + ses.send(fmt.Sprintf("%v %v", i+1, msg.Id())) + } + } + ses.send(".") + } + case "DELE": + if len(args) != 1 { + ses.warn("DELE command had invalid number of arguments") + ses.send("-ERR DELE command requires a single argument") + return + } + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("DELE command argument was not an integer") + ses.send("-ERR DELE command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("DELE command argument was less than 1") + ses.send("-ERR DELE argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("DELE command argument was greater than number of messages") + ses.send("-ERR DELE argument must not exceed the number of messages") + return + } + if ses.retain[msgNum-1] { + ses.retain[msgNum-1] = false + ses.msgCount -= 1 + ses.send(fmt.Sprintf("+OK Deleted message %v", msgNum)) + } else { + ses.warn("Client tried to DELE an already deleted message") + ses.send(fmt.Sprintf("-ERR Message %v has already been deleted", msgNum)) + } + case "RETR": + if len(args) != 1 { + ses.warn("RETR command had invalid number of arguments") + ses.send("-ERR RETR command requires a single argument") + return + } + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("RETR command argument was not an integer") + ses.send("-ERR RETR command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("RETR command argument was less than 1") + ses.send("-ERR RETR argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("RETR command argument was greater than number of messages") + ses.send("-ERR RETR argument must not exceed the number of messages") + return + } + ses.send(fmt.Sprintf("+OK %v bytes follows", ses.messages[msgNum-1].Size())) + ses.sendMessage(ses.messages[msgNum-1]) + case "TOP": + if len(args) != 2 { + ses.warn("TOP command had invalid number of arguments") + ses.send("-ERR TOP command requires two arguments") + return + } + msgNum, err := strconv.ParseInt(args[0], 10, 32) + if err != nil { + ses.warn("TOP command first argument was not an integer") + ses.send("-ERR TOP command requires an integer argument") + return + } + if msgNum < 1 { + ses.warn("TOP command first argument was less than 1") + ses.send("-ERR TOP first argument must be greater than 0") + return + } + if int(msgNum) > len(ses.messages) { + ses.warn("TOP command first argument was greater than number of messages") + ses.send("-ERR TOP first argument must not exceed the number of messages") + return + } + + var lines int64 + lines, err = strconv.ParseInt(args[1], 10, 32) + if err != nil { + ses.warn("TOP command second argument was not an integer") + ses.send("-ERR TOP command requires an integer argument") + return + } + if lines < 0 { + ses.warn("TOP command second argument was negative") + ses.send("-ERR TOP second argument must be non-negative") + return + } + ses.send("+OK Top of message follows") + ses.sendMessageTop(ses.messages[msgNum-1], int(lines)) + case "QUIT": + ses.send("+OK We will process your deletes") + ses.processDeletes() + ses.enterState(QUIT) + case "NOOP": + ses.send("+OK I have sucessfully done nothing") + case "RSET": + // Reset session, don't actually delete anything I told you to + ses.trace("Resetting session state on RSET request") + ses.reset() + ses.send("+OK Session reset") + default: + ses.ooSeq(cmd) + } +} + +// Send the contents of the message to the client +func (ses *Session) sendMessage(msg smtpd.Message) { + reader, err := msg.RawReader() + defer reader.Close() + if err != nil { + ses.error("Failed to read message for RETR command") + ses.send("-ERR Failed to RETR that message, internal error") + return + } + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + line := scanner.Text() + // Lines starting with . must be prefixed with another . + if strings.HasPrefix(line, ".") { + line = "." + line + } + ses.send(line) + } + + if err = scanner.Err(); err != nil { + ses.error("Failed to read message for RETR command") + ses.send(".") + ses.send("-ERR Failed to RETR that message, internal error") + return + } + ses.send(".") +} + +// Send the headers plus the top N lines to the client +func (ses *Session) sendMessageTop(msg smtpd.Message, lineCount int) { + reader, err := msg.RawReader() + defer reader.Close() + if err != nil { + ses.error("Failed to read message for RETR command") + ses.send("-ERR Failed to RETR that message, internal error") + return + } + scanner := bufio.NewScanner(reader) + inBody := false + for scanner.Scan() { + line := scanner.Text() + // Lines starting with . must be prefixed with another . + if strings.HasPrefix(line, ".") { + line = "." + line + } + if inBody { + // Check if we need to send anymore lines + if lineCount < 1 { + break + } else { + lineCount -= 1 + } + } else { + if line == "" { + // We've hit the end of the header + inBody = true + } + } + ses.send(line) + } + + if err = scanner.Err(); err != nil { + ses.error("Failed to read message for RETR command") + ses.send(".") + ses.send("-ERR Failed to RETR that message, internal error") + return + } + ses.send(".") +} + +// Load the users mailbox +func (ses *Session) loadMailbox() { + var err error + ses.messages, err = ses.mailbox.GetMessages() + if err != nil { + ses.error("Failed to load messages for %v", ses.user) + } + + ses.retainAll() +} + +// Reset retain flag to true for all messages +func (ses *Session) retainAll() { + ses.retain = make([]bool, len(ses.messages)) + for i, _ := range ses.retain { + ses.retain[i] = true + } + ses.msgCount = len(ses.messages) +} + +// This would be considered the "UPDATE" state in the RFC, but it does not fit +// with our state-machine design here, since no commands are accepted - it just +// indicates that the session was closed cleanly and that deletes should be +// processed. +func (ses *Session) processDeletes() { + ses.info("Processing deletes") + for i, msg := range ses.messages { + if !ses.retain[i] { + ses.trace("Deleting %v", msg) + msg.Delete() + } + } +} + +func (ses *Session) enterState(state State) { + ses.state = state + ses.trace("Entering state %v", state) +} + +// Calculate the next read or write deadline based on maxIdleSeconds +func (ses *Session) nextDeadline() time.Time { + return time.Now().Add(time.Duration(ses.server.maxIdleSeconds) * time.Second) +} + +// Send requested message, store errors in Session.sendError +func (ses *Session) send(msg string) { + if err := ses.conn.SetWriteDeadline(ses.nextDeadline()); err != nil { + ses.sendError = err + return + } + if _, err := fmt.Fprint(ses.conn, msg+"\r\n"); err != nil { + ses.sendError = err + ses.warn("Failed to send: '%v'", msg) + return + } + ses.trace(">> %v >>", msg) +} + +// readByteLine reads a line of input into the provided buffer. Does +// not reset the Buffer - please do so prior to calling. +func (ses *Session) readByteLine(buf *bytes.Buffer) error { + if err := ses.conn.SetReadDeadline(ses.nextDeadline()); err != nil { + return err + } + for { + line, err := ses.reader.ReadBytes('\r') + if err != nil { + return err + } + buf.Write(line) + // Read the next byte looking for '\n' + c, err := ses.reader.ReadByte() + if err != nil { + return err + } + buf.WriteByte(c) + if c == '\n' { + // We've reached the end of the line, return + return nil + } + // Else, keep looking + } + // Should be unreachable + return nil +} + +// Reads a line of input +func (ses *Session) readLine() (line string, err error) { + if err = ses.conn.SetReadDeadline(ses.nextDeadline()); err != nil { + return "", err + } + line, err = ses.reader.ReadString('\n') + if err != nil { + return "", err + } + ses.trace("<< %v <<", strings.TrimRight(line, "\r\n")) + return line, nil +} + +func (ses *Session) parseCmd(line string) (cmd string, args []string, ok bool) { + line = strings.TrimRight(line, "\r\n") + if line == "" { + return "", nil, true + } + + words := strings.Split(line, " ") + return strings.ToUpper(words[0]), words[1:], true +} + +func (ses *Session) reset() { + ses.retainAll() +} + +func (ses *Session) ooSeq(cmd string) { + ses.send(fmt.Sprintf("-ERR Command %v is out of sequence", cmd)) + ses.warn("Wasn't expecting %v here", cmd) +} + +// Session specific logging methods +func (ses *Session) trace(msg string, args ...interface{}) { + log.Trace("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) +} + +func (ses *Session) info(msg string, args ...interface{}) { + log.Info("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) +} + +func (ses *Session) warn(msg string, args ...interface{}) { + // Update metrics + //expWarnsTotal.Add(1) + log.Warn("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) +} + +func (ses *Session) error(msg string, args ...interface{}) { + // Update metrics + //expErrorsTotal.Add(1) + log.Error("POP3<%v> %v", ses.id, fmt.Sprintf(msg, args...)) +} diff --git a/pop3d/listener.go b/pop3d/listener.go new file mode 100644 index 0000000..3bbfb73 --- /dev/null +++ b/pop3d/listener.go @@ -0,0 +1,96 @@ +package pop3d + +import ( + "fmt" + "github.com/jhillyerd/inbucket/config" + "github.com/jhillyerd/inbucket/log" + "github.com/jhillyerd/inbucket/smtpd" + "net" + "sync" + "time" +) + +// Real server code starts here +type Server struct { + domain string + maxIdleSeconds int + dataStore smtpd.DataStore + listener net.Listener + shutdown bool + waitgroup *sync.WaitGroup +} + +// Init a new Server object +func New() *Server { + // TODO is two filestores better/worse than sharing w/ smtpd? + ds := smtpd.NewFileDataStore() + cfg := config.GetPop3Config() + return &Server{domain: cfg.Domain, dataStore: ds, maxIdleSeconds: cfg.MaxIdleSeconds, + waitgroup: new(sync.WaitGroup)} +} + +// Main listener loop +func (s *Server) Start() { + cfg := config.GetPop3Config() + addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%v:%v", + cfg.Ip4address, cfg.Ip4port)) + if err != nil { + log.Error("POP3 Failed to build tcp4 address: %v", err) + // TODO More graceful early-shutdown procedure + panic(err) + } + + log.Info("POP3 listening on TCP4 %v", addr) + s.listener, err = net.ListenTCP("tcp4", addr) + if err != nil { + log.Error("POP3 failed to start tcp4 listener: %v", err) + // TODO More graceful early-shutdown procedure + panic(err) + } + + // Handle incoming connections + var tempDelay time.Duration + for sid := 1; ; sid++ { + if conn, err := s.listener.Accept(); err != nil { + if nerr, ok := err.(net.Error); ok && nerr.Temporary() { + // Temporary error, sleep for a bit and try again + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + log.Error("POP3 accept error: %v; retrying in %v", err, tempDelay) + time.Sleep(tempDelay) + continue + } else { + if s.shutdown { + log.Trace("POP3 listener shutting down on request") + return + } + // TODO Implement a max error counter before shutdown? + // or maybe attempt to restart POP3 + panic(err) + } + } else { + tempDelay = 0 + s.waitgroup.Add(1) + go s.startSession(sid, conn) + } + } +} + +// Stop requests the POP3 server closes it's listener +func (s *Server) Stop() { + log.Trace("POP3 shutdown requested, connections will be drained") + s.shutdown = true + s.listener.Close() +} + +// Drain causes the caller to block until all active POP3 sessions have finished +func (s *Server) Drain() { + s.waitgroup.Wait() + log.Trace("POP3 connections drained") +} diff --git a/smtpd/datastore.go b/smtpd/datastore.go index cd770de..40a2b12 100644 --- a/smtpd/datastore.go +++ b/smtpd/datastore.go @@ -2,6 +2,7 @@ package smtpd import ( "github.com/jhillyerd/go.enmime" + "io" "net/mail" "time" ) @@ -23,6 +24,7 @@ type Message interface { From() string Date() time.Time Subject() string + RawReader() (reader io.ReadCloser, err error) ReadHeader() (msg *mail.Message, err error) ReadBody() (msg *mail.Message, body *enmime.MIMEBody, err error) ReadRaw() (raw *string, err error) @@ -30,4 +32,5 @@ type Message interface { Close() error Delete() error String() string + Size() int64 } diff --git a/smtpd/filestore.go b/smtpd/filestore.go index 67e8db2..b38d4f8 100644 --- a/smtpd/filestore.go +++ b/smtpd/filestore.go @@ -8,6 +8,7 @@ import ( "github.com/jhillyerd/go.enmime" "github.com/jhillyerd/inbucket/config" "github.com/jhillyerd/inbucket/log" + "io" "io/ioutil" "net/mail" "os" @@ -217,6 +218,14 @@ func (m *FileMessage) String() string { return fmt.Sprintf("\"%v\" from %v", m.Fsubject, m.Ffrom) } +func (m *FileMessage) Size() int64 { + fi, err := os.Stat(m.rawPath()) + if err != nil { + return 0 + } + return fi.Size() +} + func (m *FileMessage) gobPath() string { return filepath.Join(m.mailbox.path, m.Fid+".gob") } @@ -258,15 +267,23 @@ func (m *FileMessage) ReadBody() (msg *mail.Message, body *enmime.MIMEBody, err return msg, mime, err } -// ReadRaw opens the .raw portion of a Message and returns it as a string -func (m *FileMessage) ReadRaw() (raw *string, err error) { +// RawReader opens the .raw portion of a Message as an io.ReadCloser +func (m *FileMessage) RawReader() (reader io.ReadCloser, err error) { file, err := os.Open(m.rawPath()) - defer file.Close() if err != nil { return nil, err } - reader := bufio.NewReader(file) - bodyBytes, err := ioutil.ReadAll(reader) + return file, nil +} + +// ReadRaw opens the .raw portion of a Message and returns it as a string +func (m *FileMessage) ReadRaw() (raw *string, err error) { + reader, err := m.RawReader() + defer reader.Close() + if err != nil { + return nil, err + } + bodyBytes, err := ioutil.ReadAll(bufio.NewReader(reader)) if err != nil { return nil, err } diff --git a/smtpd/handler.go b/smtpd/handler.go index 2986fb2..918df0a 100644 --- a/smtpd/handler.go +++ b/smtpd/handler.go @@ -83,12 +83,12 @@ func (ss *Session) String() string { /* Session flow: * 1. Send initial greeting * 2. Receive cmd - * 3. If good cmd, respond, optionally change state + * 3. If good cmd, respond, optionally change state * 4. If bad cmd, respond error * 5. Goto 2 */ func (s *Server) startSession(id int, conn net.Conn) { - log.Info("Connection from %v, starting session <%v>", conn.RemoteAddr(), id) + log.Info("SMTP Connection from %v, starting session <%v>", conn.RemoteAddr(), id) expConnectsCurrent.Add(1) defer func() { conn.Close() @@ -512,21 +512,21 @@ func (ss *Session) ooSeq(cmd string) { // Session specific logging methods func (ss *Session) trace(msg string, args ...interface{}) { - log.Trace("%v<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...)) + log.Trace("SMTP<%v> %v", ss.id, fmt.Sprintf(msg, args...)) } func (ss *Session) info(msg string, args ...interface{}) { - log.Info("%v<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...)) + log.Info("SMTP<%v> %v", ss.id, fmt.Sprintf(msg, args...)) } func (ss *Session) warn(msg string, args ...interface{}) { // Update metrics expWarnsTotal.Add(1) - log.Warn("%v<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...)) + log.Warn("SMTP<%v> %v", ss.id, fmt.Sprintf(msg, args...)) } func (ss *Session) error(msg string, args ...interface{}) { // Update metrics expErrorsTotal.Add(1) - log.Error("%v<%v> %v", ss.remoteHost, ss.id, fmt.Sprintf(msg, args...)) + log.Error("SMTP<%v> %v", ss.id, fmt.Sprintf(msg, args...)) }