1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 17:47:03 +00:00

Cache message in memory during receipt, closes #23

This commit is contained in:
James Hillyerd
2016-09-18 16:35:13 -07:00
parent f84b36039e
commit a939605d4a
2 changed files with 56 additions and 52 deletions

View File

@@ -4,9 +4,16 @@ Change Log
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/). This project adheres to [Semantic Versioning](http://semver.org/).
[1.1.0] - 2016-09-03 [Unreleased]
------------ ------------
### Fixed
- We should no longer run out of file handles when dealing with a large number
of recipients on a single message.
[1.1.0] - 2016-09-03
--------------------
### Added ### Added
- Homebrew inbucket.conf and formula (see README) - Homebrew inbucket.conf and formula (see README)

View File

@@ -341,14 +341,16 @@ func (ss *Session) mailHandler(cmd string, arg string) {
// DATA // DATA
func (ss *Session) dataHandler() { func (ss *Session) dataHandler() {
type RecipientDetails struct {
address, localPart, domainPart string
mailbox Mailbox
}
recipients := make([]RecipientDetails, 0, ss.recipients.Len())
// Timestamp for Received header // Timestamp for Received header
stamp := time.Now().Format(timeStampFormat) stamp := time.Now().Format(timeStampFormat)
// Get a Mailbox and a new Message for each recipient // Get a Mailbox and a new Message for each recipient
mailboxes := make([]Mailbox, ss.recipients.Len())
messages := make([]Message, ss.recipients.Len())
msgSize := 0 msgSize := 0
if ss.server.storeMessages { if ss.server.storeMessages {
i := 0
for e := ss.recipients.Front(); e != nil; e = e.Next() { for e := ss.recipients.Front(); e != nil; e = e.Next() {
recip := e.Value.(string) recip := e.Value.(string)
local, domain, err := ParseEmailAddress(recip) local, domain, err := ParseEmailAddress(recip)
@@ -367,35 +369,19 @@ func (ss *Session) dataHandler() {
ss.reset() ss.reset()
return return
} }
mailboxes[i] = mb recipients = append(recipients, RecipientDetails{recip, local, domain, mb})
if messages[i], err = mb.NewMessage(); err != nil {
ss.logError("Failed to create message for %q: %s", local, err)
ss.send(fmt.Sprintf("451 Failed to create message for %v", local))
ss.reset()
return
}
// Generate Received header
recd := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n",
ss.remoteDomain, ss.remoteHost, ss.server.domain, recip, stamp)
if err := messages[i].Append([]byte(recd)); err != nil {
ss.logError("Failed to write received header for %q: %s", local, err)
ss.send(fmt.Sprintf("451 Failed to create message for %v", local))
ss.reset()
return
}
} else { } else {
log.Tracef("Not storing message for %q", recip) log.Tracef("Not storing message for %q", recip)
} }
i++
} }
} }
ss.send("354 Start mail input; end with <CRLF>.<CRLF>") ss.send("354 Start mail input; end with <CRLF>.<CRLF>")
var buf bytes.Buffer var lineBuf bytes.Buffer
msgBuf := make([][]byte, 0, 1024)
for { for {
buf.Reset() lineBuf.Reset()
err := ss.readByteLine(&buf) err := ss.readByteLine(&lineBuf)
if err != nil { if err != nil {
if netErr, ok := err.(net.Error); ok { if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() { if netErr.Timeout() {
@@ -406,22 +392,45 @@ func (ss *Session) dataHandler() {
ss.enterState(QUIT) ss.enterState(QUIT)
return return
} }
line := buf.Bytes() line := lineBuf.Bytes()
if string(line) == ".\r\n" { if string(line) == ".\r\n" {
// Mail data complete // Mail data complete
if ss.server.storeMessages { if ss.server.storeMessages {
for _, m := range messages { // Create a message for each valid recipient
if m != nil { for _, r := range recipients {
if err := m.Close(); err != nil { msg, err := r.mailbox.NewMessage()
// This logic should be updated to report failures if err != nil {
// writing the initial message file to the client ss.logError("Failed to create message for %q: %s", r.localPart, err)
// after we implement a single-store system (issue ss.send(fmt.Sprintf("451 Failed to create message for %v", r.localPart))
// #23) ss.reset()
return
}
// Generate Received header
recd := fmt.Sprintf("Received: from %s ([%s]) by %s\r\n for <%s>; %s\r\n",
ss.remoteDomain, ss.remoteHost, ss.server.domain, r.address, stamp)
if err := msg.Append([]byte(recd)); err != nil {
ss.logError("Failed to write received header for %q: %s", r.localPart, err)
ss.send(fmt.Sprintf("451 Failed to create message for %v", r.localPart))
ss.reset()
return
}
// Append lines from msgBuf
for _, line = range msgBuf {
if err := msg.Append(line); err != nil {
ss.logError("Failed to append to mailbox %v: %v",
r.mailbox, err)
ss.send("554 Something went wrong")
ss.reset()
// Should really cleanup the crap on filesystem
return
}
}
if err := msg.Close(); err != nil {
ss.logError("Error: %v while writing message", err) ss.logError("Error: %v while writing message", err)
} }
expReceivedTotal.Add(1) expReceivedTotal.Add(1)
} } // end for
}
} else { } else {
expReceivedTotal.Add(1) expReceivedTotal.Add(1)
} }
@@ -434,6 +443,8 @@ func (ss *Session) dataHandler() {
if len(line) > 0 && line[0] == '.' { if len(line) > 0 && line[0] == '.' {
line = line[1:] line = line[1:]
} }
// Second append copies line/lineBuf so we can reuse it
msgBuf = append(msgBuf, append([]byte{}, line...))
msgSize += len(line) msgSize += len(line)
if msgSize > ss.server.maxMessageBytes { if msgSize > ss.server.maxMessageBytes {
// Max message size exceeded // Max message size exceeded
@@ -443,21 +454,7 @@ func (ss *Session) dataHandler() {
// Should really cleanup the crap on filesystem (after issue #23) // Should really cleanup the crap on filesystem (after issue #23)
return return
} }
// Append to message objects } // end for
if ss.server.storeMessages {
for i, m := range messages {
if m != nil {
if err := m.Append(line); err != nil {
ss.logError("Failed to append to mailbox %v: %v", mailboxes[i], err)
ss.send("554 Something went wrong")
ss.reset()
// Should really cleanup the crap on filesystem (after issue #23)
return
}
}
}
}
}
} }
func (ss *Session) enterState(state State) { func (ss *Session) enterState(state State) {