mirror of
https://blitiri.com.ar/repos/chasquid
synced 2026-01-03 17:17:29 +00:00
Make the max queue size and give up time configurable
Today, the maximum number of items in the queue, as well as how long we keep attempting to send each item, is hard-coded and not changed by end users. While they are totally adequate for chasquid's main use cases, it can still be useful for some users to change them. So this patch adds two new configuration options for those settings. They're marked experimental for now, so we can adjust them if needed after they get more exposure. Thanks to Lewis Ross-Jones <lewis_r_j@hotmail.com> for suggesting this improvement, and help with testing it.
This commit is contained in:
@@ -7,6 +7,7 @@ package config
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"blitiri.com.ar/go/log"
|
||||
|
||||
@@ -30,6 +31,9 @@ var defaultConfig = &Config{
|
||||
DropCharacters: proto.String("."),
|
||||
|
||||
MailLogPath: "<syslog>",
|
||||
|
||||
MaxQueueItems: 200,
|
||||
GiveUpSendAfter: "20h",
|
||||
}
|
||||
|
||||
// Load the config from the given file, with the given overrides.
|
||||
@@ -67,6 +71,12 @@ func Load(path, overrides string) (*Config, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Validate the GiveUpSendAfter value.
|
||||
if _, err := time.ParseDuration(c.GiveUpSendAfter); err != nil {
|
||||
return nil, fmt.Errorf(
|
||||
"invalid give_up_send_after value %q: %v", c.GiveUpSendAfter, err)
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
@@ -126,6 +136,13 @@ func override(c, o *Config) {
|
||||
if o.HaproxyIncoming {
|
||||
c.HaproxyIncoming = true
|
||||
}
|
||||
|
||||
if o.MaxQueueItems > 0 {
|
||||
c.MaxQueueItems = o.MaxQueueItems
|
||||
}
|
||||
if o.GiveUpSendAfter != "" {
|
||||
c.GiveUpSendAfter = o.GiveUpSendAfter
|
||||
}
|
||||
}
|
||||
|
||||
// LogConfig logs the given configuration, in a human-friendly way.
|
||||
@@ -153,4 +170,13 @@ func LogConfig(c *Config) {
|
||||
log.Infof(" Dovecot auth: %v (%q, %q)",
|
||||
c.DovecotAuth, c.DovecotUserdbPath, c.DovecotClientPath)
|
||||
log.Infof(" HAProxy incoming: %v", c.HaproxyIncoming)
|
||||
log.Infof(" Max queue items: %d", c.MaxQueueItems)
|
||||
log.Infof(" Give up send after: %s", c.GiveUpSendAfterDuration())
|
||||
}
|
||||
|
||||
func (c *Config) GiveUpSendAfterDuration() time.Duration {
|
||||
// We validate the string value at config load time, so we know it is well
|
||||
// formed.
|
||||
d, _ := time.ParseDuration(c.GiveUpSendAfter)
|
||||
return d
|
||||
}
|
||||
|
||||
@@ -109,6 +109,18 @@ type Config struct {
|
||||
// This allows deploying chasquid behind a HAProxy server, as the
|
||||
// address information is preserved.
|
||||
HaproxyIncoming bool `protobuf:"varint,16,opt,name=haproxy_incoming,json=haproxyIncoming,proto3" json:"haproxy_incoming,omitempty"`
|
||||
// Maximum number of items in the queue.
|
||||
// If we have this many items in the queue, we reject new incoming
|
||||
// email. Be careful when increasing this, as we keep all items in
|
||||
// memory.
|
||||
// Default: 200 (but may change in the future).
|
||||
MaxQueueItems uint32 `protobuf:"varint,17,opt,name=max_queue_items,json=maxQueueItems,proto3" json:"max_queue_items,omitempty"`
|
||||
// How long do we keep retrying sending an email before we give up.
|
||||
// Once we give up, a DSN will be sent back to the sender.
|
||||
// The format is a Go duration string (e.g. "48h" or "360m"; note days
|
||||
// are not a supported unit).
|
||||
// Default: "20h" (but may change in the future).
|
||||
GiveUpSendAfter string `protobuf:"bytes,18,opt,name=give_up_send_after,json=giveUpSendAfter,proto3" json:"give_up_send_after,omitempty"`
|
||||
}
|
||||
|
||||
func (x *Config) Reset() {
|
||||
@@ -255,11 +267,25 @@ func (x *Config) GetHaproxyIncoming() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *Config) GetMaxQueueItems() uint32 {
|
||||
if x != nil {
|
||||
return x.MaxQueueItems
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Config) GetGiveUpSendAfter() string {
|
||||
if x != nil {
|
||||
return x.GiveUpSendAfter
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
var File_config_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_config_proto_rawDesc = []byte{
|
||||
0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xf4,
|
||||
0x05, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x73,
|
||||
0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc9,
|
||||
0x06, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x73,
|
||||
0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x68, 0x6f, 0x73,
|
||||
0x74, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x27, 0x0a, 0x10, 0x6d, 0x61, 0x78, 0x5f, 0x64, 0x61, 0x74,
|
||||
0x61, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x5f, 0x6d, 0x62, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52,
|
||||
@@ -303,13 +329,18 @@ var file_config_proto_rawDesc = []byte{
|
||||
0x6f, 0x76, 0x65, 0x63, 0x6f, 0x74, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68,
|
||||
0x12, 0x29, 0x0a, 0x10, 0x68, 0x61, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x5f, 0x69, 0x6e, 0x63, 0x6f,
|
||||
0x6d, 0x69, 0x6e, 0x67, 0x18, 0x10, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x68, 0x61, 0x70, 0x72,
|
||||
0x6f, 0x78, 0x79, 0x49, 0x6e, 0x63, 0x6f, 0x6d, 0x69, 0x6e, 0x67, 0x42, 0x14, 0x0a, 0x12, 0x5f,
|
||||
0x73, 0x75, 0x66, 0x66, 0x69, 0x78, 0x5f, 0x73, 0x65, 0x70, 0x61, 0x72, 0x61, 0x74, 0x6f, 0x72,
|
||||
0x73, 0x42, 0x12, 0x0a, 0x10, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x5f, 0x63, 0x68, 0x61, 0x72, 0x61,
|
||||
0x63, 0x74, 0x65, 0x72, 0x73, 0x42, 0x2c, 0x5a, 0x2a, 0x62, 0x6c, 0x69, 0x74, 0x69, 0x72, 0x69,
|
||||
0x2e, 0x63, 0x6f, 0x6d, 0x2e, 0x61, 0x72, 0x2f, 0x67, 0x6f, 0x2f, 0x63, 0x68, 0x61, 0x73, 0x71,
|
||||
0x75, 0x69, 0x64, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x63, 0x6f, 0x6e,
|
||||
0x66, 0x69, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x6f, 0x78, 0x79, 0x49, 0x6e, 0x63, 0x6f, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x26, 0x0a, 0x0f, 0x6d,
|
||||
0x61, 0x78, 0x5f, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x18, 0x11,
|
||||
0x20, 0x01, 0x28, 0x0d, 0x52, 0x0d, 0x6d, 0x61, 0x78, 0x51, 0x75, 0x65, 0x75, 0x65, 0x49, 0x74,
|
||||
0x65, 0x6d, 0x73, 0x12, 0x2b, 0x0a, 0x12, 0x67, 0x69, 0x76, 0x65, 0x5f, 0x75, 0x70, 0x5f, 0x73,
|
||||
0x65, 0x6e, 0x64, 0x5f, 0x61, 0x66, 0x74, 0x65, 0x72, 0x18, 0x12, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x0f, 0x67, 0x69, 0x76, 0x65, 0x55, 0x70, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72,
|
||||
0x42, 0x14, 0x0a, 0x12, 0x5f, 0x73, 0x75, 0x66, 0x66, 0x69, 0x78, 0x5f, 0x73, 0x65, 0x70, 0x61,
|
||||
0x72, 0x61, 0x74, 0x6f, 0x72, 0x73, 0x42, 0x12, 0x0a, 0x10, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x5f,
|
||||
0x63, 0x68, 0x61, 0x72, 0x61, 0x63, 0x74, 0x65, 0x72, 0x73, 0x42, 0x2c, 0x5a, 0x2a, 0x62, 0x6c,
|
||||
0x69, 0x74, 0x69, 0x72, 0x69, 0x2e, 0x63, 0x6f, 0x6d, 0x2e, 0x61, 0x72, 0x2f, 0x67, 0x6f, 0x2f,
|
||||
0x63, 0x68, 0x61, 0x73, 0x71, 0x75, 0x69, 0x64, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61,
|
||||
0x6c, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
@@ -101,4 +101,18 @@ message Config {
|
||||
// This allows deploying chasquid behind a HAProxy server, as the
|
||||
// address information is preserved.
|
||||
bool haproxy_incoming = 16;
|
||||
|
||||
// Maximum number of items in the queue.
|
||||
// If we have this many items in the queue, we reject new incoming
|
||||
// email. Be careful when increasing this, as we keep all items in
|
||||
// memory.
|
||||
// Default: 200 (but may change in the future).
|
||||
uint32 max_queue_items = 17;
|
||||
|
||||
// How long do we keep retrying sending an email before we give up.
|
||||
// Once we give up, a DSN will be sent back to the sender.
|
||||
// The format is a Go duration string (e.g. "48h" or "360m"; note days
|
||||
// are not a supported unit).
|
||||
// Default: "20h" (but may change in the future).
|
||||
string give_up_send_after = 18;
|
||||
}
|
||||
|
||||
@@ -58,6 +58,7 @@ func TestFullConfig(t *testing.T) {
|
||||
monitoring_address: ":1111"
|
||||
max_data_size_mb: 26
|
||||
suffix_separators: ""
|
||||
max_queue_items: 345
|
||||
`
|
||||
|
||||
tmpDir, path := mustCreateConfig(t, confStr)
|
||||
@@ -68,6 +69,7 @@ func TestFullConfig(t *testing.T) {
|
||||
submission_address: ":999"
|
||||
dovecot_auth: true
|
||||
drop_characters: ""
|
||||
give_up_send_after: "7h"
|
||||
`
|
||||
|
||||
expected := &Config{
|
||||
@@ -90,6 +92,9 @@ func TestFullConfig(t *testing.T) {
|
||||
MailLogPath: "<syslog>",
|
||||
|
||||
DovecotAuth: true,
|
||||
|
||||
MaxQueueItems: 345,
|
||||
GiveUpSendAfter: "7h",
|
||||
}
|
||||
|
||||
c, err := Load(path, overrideStr)
|
||||
@@ -134,6 +139,17 @@ func TestBrokenOverride(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvalidGiveUpSendingAfter(t *testing.T) {
|
||||
tmpDir, path := mustCreateConfig(
|
||||
t, `give_up_send_after: "10"`)
|
||||
defer testlib.RemoveIfOk(t, tmpDir)
|
||||
|
||||
c, err := Load(path, "")
|
||||
if err == nil {
|
||||
t.Fatalf("loaded an invalid config: %v", c)
|
||||
}
|
||||
}
|
||||
|
||||
// Run LogConfig, overriding the default logger first. This exercises the
|
||||
// code, we don't yet validate the output, but it is an useful sanity check.
|
||||
func testLogConfig(c *Config) {
|
||||
|
||||
@@ -33,12 +33,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// Maximum size of the queue; we reject emails when we hit this.
|
||||
maxQueueSize = 200
|
||||
|
||||
// Give up sending attempts after this duration.
|
||||
giveUpAfter = 20 * time.Hour
|
||||
|
||||
// Prefix for item file names.
|
||||
// This is for convenience, versioning, and to be able to tell them apart
|
||||
// temporary files and other cruft.
|
||||
@@ -83,12 +77,6 @@ func init() {
|
||||
|
||||
// Queue that keeps mail waiting for delivery.
|
||||
type Queue struct {
|
||||
// Items in the queue. Map of id -> Item.
|
||||
q map[string]*Item
|
||||
|
||||
// Mutex protecting q.
|
||||
mu sync.RWMutex
|
||||
|
||||
// Couriers to use to deliver mail.
|
||||
localC courier.Courier
|
||||
remoteC courier.Courier
|
||||
@@ -101,6 +89,18 @@ type Queue struct {
|
||||
|
||||
// Aliases resolver.
|
||||
aliases *aliases.Resolver
|
||||
|
||||
// The maximum number of items in the queue.
|
||||
MaxItems int
|
||||
|
||||
// Give up sending attempts after this long.
|
||||
GiveUpAfter time.Duration
|
||||
|
||||
// Mutex protecting q.
|
||||
mu sync.RWMutex
|
||||
|
||||
// Items in the queue. Map of id -> Item.
|
||||
q map[string]*Item
|
||||
}
|
||||
|
||||
// New creates a new Queue instance.
|
||||
@@ -115,6 +115,16 @@ func New(path string, localDomains *set.String, aliases *aliases.Resolver,
|
||||
localDomains: localDomains,
|
||||
path: path,
|
||||
aliases: aliases,
|
||||
|
||||
// We reject emails when we hit this.
|
||||
// Note the actual default used in the daemon is set in the config. We
|
||||
// put a non-zero value here just to be safe.
|
||||
MaxItems: 100,
|
||||
|
||||
// We give up sending (and return a DSN) after this long.
|
||||
// Note the actual default used in the daemon is set in the config. We
|
||||
// put a non-zero value here just to be safe.
|
||||
GiveUpAfter: 20 * time.Hour,
|
||||
}
|
||||
return q, err
|
||||
}
|
||||
@@ -155,8 +165,8 @@ func (q *Queue) Put(tr *trace.Trace, from string, to []string, data []byte) (str
|
||||
tr = tr.NewChild("Queue.Put", from)
|
||||
defer tr.Finish()
|
||||
|
||||
if q.Len() >= maxQueueSize {
|
||||
tr.Errorf("queue full")
|
||||
if nItems := q.Len(); nItems >= q.MaxItems {
|
||||
tr.Errorf("queue full (%d items)", nItems)
|
||||
return "", errQueueFull
|
||||
}
|
||||
putCount.Add(1)
|
||||
@@ -305,7 +315,7 @@ func (item *Item) SendLoop(q *Queue) {
|
||||
defer tr.Finish()
|
||||
tr.Printf("from %s", item.From)
|
||||
|
||||
for time.Since(item.CreatedAt) < giveUpAfter {
|
||||
for time.Since(item.CreatedAt) < q.GiveUpAfter {
|
||||
// Send to all recipients that are still pending.
|
||||
var wg sync.WaitGroup
|
||||
for _, rcpt := range item.Rcpt {
|
||||
|
||||
@@ -197,9 +197,9 @@ func TestFullQueue(t *testing.T) {
|
||||
tr := trace.New("test", "TestFullQueue")
|
||||
defer tr.Finish()
|
||||
|
||||
// Force-insert maxQueueSize items in the queue.
|
||||
// Force-insert as many items in the queue as it supports.
|
||||
oneID := ""
|
||||
for i := 0; i < maxQueueSize; i++ {
|
||||
for i := 0; i < q.MaxItems; i++ {
|
||||
item := &Item{
|
||||
Message: Message{
|
||||
ID: <-newID,
|
||||
|
||||
@@ -243,6 +243,15 @@ func (s *Server) InitQueue(path string, localC, remoteC courier.Courier) {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) SetQueueLimits(maxItems uint32, giveUpAfter time.Duration) {
|
||||
if maxItems > 0 {
|
||||
s.queue.MaxItems = int(maxItems)
|
||||
}
|
||||
if giveUpAfter > 0 {
|
||||
s.queue.GiveUpAfter = giveUpAfter
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) aliasResolveRPC(tr *trace.Trace, req url.Values) (url.Values, error) {
|
||||
rcpts, err := s.aliasesR.Resolve(tr, req.Get("Address"))
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user