From 45e1995d178f37ec25dc402f147145f48a2e6eaf Mon Sep 17 00:00:00 2001 From: James Hillyerd Date: Thu, 25 Oct 2012 18:06:29 -0700 Subject: [PATCH] Begin work on message retention - Refactor datastore such that we have a FileDataStore that implements the DataStore interface. - Add in missing SMTP configuration options: max recips, max idle, max message size - Add retention options to config --- README.md | 2 +- config/config.go | 149 +++++++++++++++++++++++++++++++++------------ etc/devel.conf | 21 +++++++ etc/inbucket.conf | 21 +++++++ smtpd/datastore.go | 129 ++++++++++++++++++++++++++++----------- smtpd/handler.go | 4 +- smtpd/listener.go | 10 +-- web/context.go | 4 +- 8 files changed, 255 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index 4d4c045..cbe77a8 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Screenshots *Viewing an email in Inbucket.* ![Metrics](http://cloud.github.com/downloads/jhillyerd/inbucket/inbucket-ss2.png) -*Monitoring metrics while Inbucket receives approximately 4,500 messages per minute.* +*Watching metrics while Inbucket handles over 4,000 messages per minute.* Development Status ------------------ diff --git a/config/config.go b/config/config.go index f04a962..56a45cf 100644 --- a/config/config.go +++ b/config/config.go @@ -12,9 +12,12 @@ import ( // SmtpConfig houses the SMTP server configuration - not using pointers // so that I can pass around copies of the object safely. type SmtpConfig struct { - Ip4address net.IP - Ip4port int - Domain string + Ip4address net.IP + Ip4port int + Domain string + MaxRecipients int + MaxIdleSeconds int + MaxMessageBytes int } type WebConfig struct { @@ -25,11 +28,21 @@ type WebConfig struct { PublicDir string } -var smtpConfig *SmtpConfig +type DataStoreConfig struct { + Path string + RetentionMinutes int + RetentionSleep int +} -var webConfig *WebConfig +var ( + // Global goconfig object + Config *config.Config -var Config *config.Config + // Parsed specific configs + smtpConfig *SmtpConfig + webConfig *WebConfig + dataStoreConfig *DataStoreConfig +) // GetSmtpConfig returns a copy of the SmtpConfig object func GetSmtpConfig() SmtpConfig { @@ -41,6 +54,11 @@ func GetWebConfig() WebConfig { return *webConfig } +// GetDataStoreConfig returns a copy of the DataStoreConfig object +func GetDataStoreConfig() DataStoreConfig { + return *dataStoreConfig +} + // LoadConfig loads the specified configuration file into inbucket.Config // and performs validations on it. func LoadConfig(filename string) error { @@ -70,12 +88,19 @@ func LoadConfig(filename string) error { requireOption(messages, "smtp", "ip4.address") requireOption(messages, "smtp", "ip4.port") requireOption(messages, "smtp", "domain") + requireOption(messages, "smtp", "max.recipients") + requireOption(messages, "smtp", "max.idle.seconds") + requireOption(messages, "smtp", "max.message.bytes") requireOption(messages, "web", "ip4.address") requireOption(messages, "web", "ip4.port") requireOption(messages, "web", "template.dir") requireOption(messages, "web", "template.cache") requireOption(messages, "web", "public.dir") requireOption(messages, "datastore", "path") + requireOption(messages, "datastore", "retention.minutes") + requireOption(messages, "datastore", "retention.sleep.millis") + + // Return error if validations failed if messages.Len() > 0 { fmt.Fprintln(os.Stderr, "Error(s) validating configuration:") for e := messages.Front(); e != nil; e = e.Next() { @@ -96,15 +121,17 @@ func LoadConfig(filename string) error { // parseLoggingConfig trying to catch config errors early func parseLoggingConfig() error { - option := "[logging]level" - str, err := Config.String("logging", "level") + section := "logging" + + option := "level" + str, err := Config.String(section, option) if err != nil { - return fmt.Errorf("Failed to parse %v: %v", option, err) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } switch strings.ToUpper(str) { case "TRACE", "INFO", "WARN", "ERROR": default: - return fmt.Errorf("Invalid value provided for %v: %v", option, str) + return fmt.Errorf("Invalid value provided for [%v]%v: '%v'", section, option, str) } return nil } @@ -112,89 +139,135 @@ func parseLoggingConfig() error { // parseSmtpConfig trying to catch config errors early func parseSmtpConfig() error { smtpConfig = new(SmtpConfig) + section := "smtp" // Parse IP4 address only, error on IP6. - option := "[smtp]ip4.address" - str, err := Config.String("smtp", "ip4.address") + option := "ip4.address" + str, err := Config.String(section, option) if err != nil { - return fmt.Errorf("Failed to parse %v: %v", option, err) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } addr := net.ParseIP(str) if addr == nil { - return fmt.Errorf("Failed to parse %v '%v'", option, str) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } addr = addr.To4() if addr == nil { - return fmt.Errorf("Failed to parse %v '%v' not IPv4!", option, str) + return fmt.Errorf("Failed to parse [%v]%v: '%v' not IPv4!", section, option, err) } smtpConfig.Ip4address = addr - option = "[smtp]ip4.port" - smtpConfig.Ip4port, err = Config.Int("smtp", "ip4.port") + option = "ip4.port" + smtpConfig.Ip4port, err = Config.Int(section, option) if err != nil { - return fmt.Errorf("Failed to parse %v: %v", option, err) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } - option = "[smtp]domain" - str, err = Config.String("smtp", "domain") + option = "domain" + str, err = Config.String(section, option) if err != nil { - return fmt.Errorf("Failed to parse %v: %v", option, err) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } smtpConfig.Domain = str + option = "max.recipients" + smtpConfig.MaxRecipients, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + + option = "max.idle.seconds" + smtpConfig.MaxIdleSeconds, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + + option = "max.message.bytes" + smtpConfig.MaxMessageBytes, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + return nil } // parseWebConfig trying to catch config errors early func parseWebConfig() error { webConfig = new(WebConfig) + section := "web" // Parse IP4 address only, error on IP6. - option := "[web]ip4.address" - str, err := Config.String("web", "ip4.address") + option := "ip4.address" + str, err := Config.String(section, option) if err != nil { - return fmt.Errorf("Failed to parse %v: %v", option, err) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } addr := net.ParseIP(str) if addr == nil { - return fmt.Errorf("Failed to parse %v '%v'", option, str) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } addr = addr.To4() if addr == nil { - return fmt.Errorf("Failed to parse %v '%v' not IPv4!", option, str) + return fmt.Errorf("Failed to parse [%v]%v: '%v' not IPv4!", section, option, err) } webConfig.Ip4address = addr - option = "[web]ip4.port" - webConfig.Ip4port, err = Config.Int("web", "ip4.port") + option = "ip4.port" + webConfig.Ip4port, err = Config.Int(section, option) if err != nil { - return fmt.Errorf("Failed to parse %v: %v", option, err) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } - option = "[web]template.dir" - str, err = Config.String("web", "template.dir") + option = "template.dir" + str, err = Config.String(section, option) if err != nil { - return fmt.Errorf("Failed to parse %v: %v", option, err) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } webConfig.TemplateDir = str - option = "[web]template.cache" - flag, err := Config.Bool("web", "template.cache") + option = "template.cache" + flag, err := Config.Bool(section, option) if err != nil { - return fmt.Errorf("Failed to parse %v: %v", option, err) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } webConfig.TemplateCache = flag - option = "[web]public.dir" - str, err = Config.String("web", "public.dir") + option = "public.dir" + str, err = Config.String(section, option) if err != nil { - return fmt.Errorf("Failed to parse %v: %v", option, err) + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) } webConfig.PublicDir = str return nil } +// parseDataStoreConfig trying to catch config errors early +func parseDataStoreConfig() error { + dataStoreConfig = new(DataStoreConfig) + section := "datastore" + + option := "path" + str, err := Config.String(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + dataStoreConfig.Path = str + + option = "retention.minutes" + dataStoreConfig.RetentionMinutes, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + option = "retention.sleep.millis" + dataStoreConfig.RetentionSleep, err = Config.Int(section, option) + if err != nil { + return fmt.Errorf("Failed to parse [%v]%v: '%v'", section, option, err) + } + + return nil +} + // requireSection checks that a [section] is defined in the configuration file, // appending a message if not. func requireSection(messages *list.List, section string) { diff --git a/etc/devel.conf b/etc/devel.conf index 0dbe942..4f2e360 100644 --- a/etc/devel.conf +++ b/etc/devel.conf @@ -25,6 +25,17 @@ ip4.port=2500 # used in SMTP greeting domain=inbucket.local +# Maximum number of RCPT TO: addresses we allow from clients, the SMTP +# RFC recommends this be at least 100. +max.recipients=100 + +# How long we allow a network connection to be idle before hanging up on the +# client, SMTP RFC recommends at least 5 minutes (300 seconds). +max.idle.seconds=30 + +# Maximum allowable size of message body in bytes (including attachments) +max.message.bytes=2048000 + ############################################################################# [web] @@ -51,3 +62,13 @@ public.dir=%(install.dir)s/themes/%(theme)s/public # Path to the datastore, mail will be written into subdirectories path=/tmp/inbucket + +# How many minutes after receipt should a message be stored until it's +# automatically purged. To retain messages until manually deleted, set this +# to 0 +retention.minutes=1 + +# How many milliseconds to sleep after purging messages from a mailbox. +# This should help reduce disk I/O when there are a large number of messages +# to purge. +retention.sleep.millis=100 diff --git a/etc/inbucket.conf b/etc/inbucket.conf index 54ef3bc..3c95310 100644 --- a/etc/inbucket.conf +++ b/etc/inbucket.conf @@ -25,6 +25,17 @@ ip4.port=2500 # used in SMTP greeting domain=inbucket.local +# Maximum number of RCPT TO: addresses we allow from clients, the SMTP +# RFC recommends this be at least 100. +max.recipients=100 + +# How long we allow a network connection to be idle before hanging up on the +# client, SMTP RFC recommends at least 5 minutes (300 seconds). +max.idle.seconds=300 + +# Maximum allowable size of message body in bytes (including attachments) +max.message.bytes=2048000 + ############################################################################# [web] @@ -51,3 +62,13 @@ public.dir=%(install.dir)s/themes/%(theme)s/public # Path to the datastore, mail will be written into subdirectories path=/tmp/inbucket + +# How many minutes after receipt should a message be stored until it's +# automatically purged. To retain messages until manually deleted, set this +# to 0 +retention.minutes=240 + +# How many milliseconds to sleep after purging messages from a mailbox. +# This should help reduce disk I/O when there are a large number of messages +# to purge. +retention.sleep.millis=100 diff --git a/smtpd/datastore.go b/smtpd/datastore.go index 590f3fd..79bde52 100644 --- a/smtpd/datastore.go +++ b/smtpd/datastore.go @@ -15,6 +15,31 @@ import ( "time" ) +type DataStore interface { + MailboxFor(emailAddress string) (Mailbox, error) +} + +type Mailbox interface { + GetMessages() ([]Message, error) + GetMessage(id string) (Message, error) + NewMessage() Message + String() string +} + +type Message interface { + Id() string + From() string + Date() time.Time + Subject() string + ReadHeader() (msg *mail.Message, err error) + ReadBody() (msg *mail.Message, body *MIMEBody, err error) + ReadRaw() (raw *string, err error) + Append(data []byte) error + Close() error + Delete() error + String() string +} + var ErrNotWritable = errors.New("Message not writable") // Global because we only want one regardless of the number of DataStore objects @@ -34,14 +59,14 @@ func countGenerator(c chan int) { // A DataStore is the root of the mail storage hiearchy. It provides access to // Mailbox objects -type DataStore struct { +type FileDataStore struct { path string mailPath string } // NewDataStore creates a new DataStore object. It uses the inbucket.Config object to // construct it's path. -func NewDataStore() *DataStore { +func NewFileDataStore() DataStore { path, err := config.Config.String("datastore", "path") if err != nil { log.Error("Error getting datastore path: %v", err) @@ -52,12 +77,12 @@ func NewDataStore() *DataStore { return nil } mailPath := filepath.Join(path, "mail") - return &DataStore{path: path, mailPath: mailPath} + return &FileDataStore{path: path, mailPath: mailPath} } // Retrieves the Mailbox object for a specified email address, if the mailbox // does not exist, it will attempt to create it. -func (ds *DataStore) MailboxFor(emailAddress string) (*Mailbox, error) { +func (ds *FileDataStore) MailboxFor(emailAddress string) (Mailbox, error) { name := ParseMailboxName(emailAddress) dir := HashMailboxName(name) s1 := dir[0:3] @@ -67,32 +92,32 @@ func (ds *DataStore) MailboxFor(emailAddress string) (*Mailbox, error) { log.Error("Failed to create directory %v, %v", path, err) return nil, err } - return &Mailbox{store: ds, name: name, dirName: dir, path: path}, nil + return &FileMailbox{store: ds, name: name, dirName: dir, path: path}, nil } // A Mailbox manages the mail for a specific user and correlates to a particular // directory on disk. -type Mailbox struct { - store *DataStore +type FileMailbox struct { + store *FileDataStore name string dirName string path string } -func (mb *Mailbox) String() string { +func (mb *FileMailbox) String() string { return mb.name + "[" + mb.dirName + "]" } // GetMessages scans the mailbox directory for .gob files and decodes them into // a slice of Message objects. -func (mb *Mailbox) GetMessages() ([]*Message, error) { +func (mb *FileMailbox) GetMessages() ([]Message, error) { files, err := ioutil.ReadDir(mb.path) if err != nil { return nil, err } log.Trace("Scanning %v files for %v", len(files), mb) - messages := make([]*Message, 0, len(files)) + messages := make([]Message, 0, len(files)) for _, f := range files { if (!f.IsDir()) && strings.HasSuffix(strings.ToLower(f.Name()), ".gob") { // We have a gob file @@ -101,7 +126,7 @@ func (mb *Mailbox) GetMessages() ([]*Message, error) { return nil, err } dec := gob.NewDecoder(bufio.NewReader(file)) - msg := new(Message) + msg := new(FileMessage) if err = dec.Decode(msg); err != nil { return nil, fmt.Errorf("While decoding message: %v", err) } @@ -115,14 +140,14 @@ func (mb *Mailbox) GetMessages() ([]*Message, error) { } // GetMessage decodes a single message by Id and returns a Message object -func (mb *Mailbox) GetMessage(id string) (*Message, error) { +func (mb *FileMailbox) GetMessage(id string) (Message, error) { file, err := os.Open(filepath.Join(mb.path, id+".gob")) if err != nil { return nil, err } dec := gob.NewDecoder(bufio.NewReader(file)) - msg := new(Message) + msg := new(FileMessage) if err = dec.Decode(msg); err != nil { return nil, err } @@ -135,12 +160,13 @@ func (mb *Mailbox) GetMessage(id string) (*Message, error) { // Message contains a little bit of data about a particular email message, and // methods to retrieve the rest of it from disk. -type Message struct { - mailbox *Mailbox - Id string - Date time.Time - From string - Subject string +type FileMessage struct { + mailbox *FileMailbox + // Stored in GOB + Fid string + Fdate time.Time + Ffrom string + Fsubject string // These are for creating new messages only writable bool writerFile *os.File @@ -148,27 +174,43 @@ type Message struct { } // NewMessage creates a new Message object and sets the Date and Id fields. -func (mb *Mailbox) NewMessage() *Message { +func (mb *FileMailbox) NewMessage() Message { date := time.Now() - id := date.Format("20060102T150405") + "-" + fmt.Sprintf("%04d", <-countChannel) + id := generateId(date) - return &Message{mailbox: mb, Id: id, Date: date, writable: true} + return &FileMessage{mailbox: mb, Fid: id, Fdate: date, writable: true} } -func (m *Message) String() string { - return fmt.Sprintf("\"%v\" from %v", m.Subject, m.From) +func (m *FileMessage) Id() string { + return m.Fid } -func (m *Message) gobPath() string { - return filepath.Join(m.mailbox.path, m.Id+".gob") +func (m *FileMessage) Date() time.Time { + return m.Fdate } -func (m *Message) rawPath() string { - return filepath.Join(m.mailbox.path, m.Id+".raw") +func (m *FileMessage) From() string { + return m.Ffrom +} + +func (m *FileMessage) Subject() string { + return m.Fsubject +} + +func (m *FileMessage) String() string { + return fmt.Sprintf("\"%v\" from %v", m.Fsubject, m.Ffrom) +} + +func (m *FileMessage) gobPath() string { + return filepath.Join(m.mailbox.path, m.Fid+".gob") +} + +func (m *FileMessage) rawPath() string { + return filepath.Join(m.mailbox.path, m.Fid+".raw") } // ReadHeader opens the .raw portion of a Message and returns a standard Go mail.Message object -func (m *Message) ReadHeader() (msg *mail.Message, err error) { +func (m *FileMessage) ReadHeader() (msg *mail.Message, err error) { file, err := os.Open(m.rawPath()) defer file.Close() if err != nil { @@ -182,7 +224,7 @@ func (m *Message) ReadHeader() (msg *mail.Message, err error) { // ReadBody opens the .raw portion of a Message and returns a MIMEBody object, along // with a free mail.Message containing the Headers, since we had to make one of those // anyway. -func (m *Message) ReadBody() (msg *mail.Message, body *MIMEBody, err error) { +func (m *FileMessage) ReadBody() (msg *mail.Message, body *MIMEBody, err error) { file, err := os.Open(m.rawPath()) defer file.Close() if err != nil { @@ -201,7 +243,7 @@ func (m *Message) ReadBody() (msg *mail.Message, body *MIMEBody, err error) { } // ReadRaw opens the .raw portion of a Message and returns it as a string -func (m *Message) ReadRaw() (raw *string, err error) { +func (m *FileMessage) ReadRaw() (raw *string, err error) { file, err := os.Open(m.rawPath()) defer file.Close() if err != nil { @@ -218,7 +260,7 @@ func (m *Message) ReadRaw() (raw *string, err error) { // Append data to a newly opened Message, this will fail on a pre-existing Message and // after Close() is called. -func (m *Message) Append(data []byte) error { +func (m *FileMessage) Append(data []byte) error { // Prevent Appending to a pre-existing Message if !m.writable { return ErrNotWritable @@ -240,7 +282,7 @@ func (m *Message) Append(data []byte) error { // Close this Message for writing - no more data may be Appended. Close() will also // trigger the creation of the .gob file. -func (m *Message) Close() error { +func (m *FileMessage) Close() error { // nil out the writer fields so they can't be used writer := m.writer writerFile := m.writerFile @@ -268,7 +310,7 @@ func (m *Message) Close() error { } // Delete this Message from disk by removing both the gob and raw files -func (m *Message) Delete() error { +func (m *FileMessage) Delete() error { log.Trace("Deleting %v", m.gobPath()) err := os.Remove(m.gobPath()) if err != nil { @@ -280,7 +322,7 @@ func (m *Message) Delete() error { // createGob reads the .raw file to grab the From and Subject header entries, // then creates the .gob file. -func (m *Message) createGob() error { +func (m *FileMessage) createGob() error { // Open gob for writing file, err := os.Create(m.gobPath()) defer file.Close() @@ -296,8 +338,8 @@ func (m *Message) createGob() error { } // Only public fields are stored in gob - m.From = msg.Header.Get("From") - m.Subject = msg.Header.Get("Subject") + m.Ffrom = msg.Header.Get("From") + m.Fsubject = msg.Header.Get("Subject") // Write & flush enc := gob.NewEncoder(writer) @@ -308,3 +350,16 @@ func (m *Message) createGob() error { writer.Flush() return nil } + +// generatePrefix converts a Time object into the ISO style format we use +// as a prefix for message files. Note: It is used directly by unit +// tests. +func generatePrefix(date time.Time) string { + return date.Format("20060102T150405") +} + +// generateId adds a 4-digit unique number onto the end of the string +// returned by generatePrefix() +func generateId(date time.Time) string { + return generatePrefix(date) + "-" + fmt.Sprintf("%04d", <-countChannel) +} diff --git a/smtpd/handler.go b/smtpd/handler.go index d09c305..dfb67c7 100644 --- a/smtpd/handler.go +++ b/smtpd/handler.go @@ -294,8 +294,8 @@ func (ss *Session) dataHandler() { msgSize := 0 // Get a Mailbox and a new Message for each recipient - mailboxes := make([]*Mailbox, ss.recipients.Len()) - messages := make([]*Message, ss.recipients.Len()) + mailboxes := make([]Mailbox, ss.recipients.Len()) + messages := make([]Message, ss.recipients.Len()) i := 0 for e := ss.recipients.Front(); e != nil; e = e.Next() { recip := e.Value.(string) diff --git a/smtpd/listener.go b/smtpd/listener.go index f0d22af..72f348d 100644 --- a/smtpd/listener.go +++ b/smtpd/listener.go @@ -16,7 +16,7 @@ type Server struct { maxRecips int maxIdleSeconds int maxMessageBytes int - dataStore *DataStore + dataStore DataStore } // Raw stat collectors @@ -40,10 +40,10 @@ var expWarnsHist = new(expvar.String) // Init a new Server object func New() *Server { - ds := NewDataStore() - // TODO Make more of these configurable - return &Server{domain: config.GetSmtpConfig().Domain, maxRecips: 100, maxIdleSeconds: 300, - dataStore: ds, maxMessageBytes: 2048000} + ds := NewFileDataStore() + cfg := config.GetSmtpConfig() + return &Server{dataStore: ds, domain: cfg.Domain, maxRecips: cfg.MaxRecipients, + maxIdleSeconds: cfg.MaxIdleSeconds, maxMessageBytes: cfg.MaxMessageBytes} } // Main listener loop diff --git a/web/context.go b/web/context.go index 5585f37..7946cdc 100644 --- a/web/context.go +++ b/web/context.go @@ -10,7 +10,7 @@ import ( type Context struct { Vars map[string]string Session *sessions.Session - DataStore *smtpd.DataStore + DataStore smtpd.DataStore } func (c *Context) Close() { @@ -20,7 +20,7 @@ func (c *Context) Close() { func NewContext(req *http.Request) (*Context, error) { vars := mux.Vars(req) sess, err := sessionStore.Get(req, "inbucket") - ds := smtpd.NewDataStore() + ds := smtpd.NewFileDataStore() ctx := &Context{ Vars: vars, Session: sess,