From c00648b4ae3e1ed4458ebdfce3879682cd9cc449 Mon Sep 17 00:00:00 2001 From: Alberto Bertogli Date: Mon, 17 Oct 2016 23:08:02 +0100 Subject: [PATCH] queue: Calculate next delay based on creation time Calculating the next delay based on the previous delay causes daemon restarts to start from scratch, as we don't persist it. This can cause a few server restarts to generate many unnecessary sends. This patch changes the next delay calculation to use the creation time instead, and also adds a <=1m random perturbation to avoid all queued emails to be retried at the exact same time after a restart. --- internal/queue/queue.go | 28 ++++++++++++++++++---------- internal/queue/queue_test.go | 22 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 7338933..74635f7 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -11,6 +11,7 @@ import ( "encoding/base64" "expvar" "fmt" + mathrand "math/rand" "os" "os/exec" "path/filepath" @@ -305,7 +306,6 @@ func (item *Item) SendLoop(q *Queue) { defer tr.Finish() tr.Printf("from %s", item.From) - var delay time.Duration for time.Since(item.CreatedAt) < giveUpAfter { // Send to all recipients that are still pending. var wg sync.WaitGroup @@ -327,7 +327,7 @@ func (item *Item) SendLoop(q *Queue) { // TODO: Consider sending a non-final notification after 30m or so, // that some of the messages have been delayed. - delay = nextDelay(delay) + delay := nextDelay(item.CreatedAt) tr.Printf("waiting for %v", delay) maillog.QueueLoop(item.ID, delay) time.Sleep(delay) @@ -452,17 +452,25 @@ func sendDSN(tr *trace.Trace, q *Queue, item *Item) { dsnQueued.Add(1) } -func nextDelay(last time.Duration) time.Duration { +func nextDelay(createdAt time.Time) time.Duration { + var delay time.Duration + + since := time.Since(createdAt) switch { - case last < 1*time.Minute: - return 1 * time.Minute - case last < 5*time.Minute: - return 5 * time.Minute - case last < 10*time.Minute: - return 10 * time.Minute + case since < 1*time.Minute: + delay = 1 * time.Minute + case since < 5*time.Minute: + delay = 5 * time.Minute + case since < 10*time.Minute: + delay = 10 * time.Minute default: - return 20 * time.Minute + delay = 20 * time.Minute } + + // Perturb the delay, to avoid all queued emails to be retried at the + // exact same time after a restart. + delay += time.Duration(mathrand.Intn(60)) * time.Second + return delay } func timestampNow() *timestamp.Timestamp { diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index 0815a25..5a37cd5 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -263,3 +263,25 @@ func TestPipes(t *testing.T) { t.Errorf("pipe delivery failed: %v", err) } } + +func TestNextDelay(t *testing.T) { + cases := []struct{ since, min time.Duration }{ + {10 * time.Second, 1 * time.Minute}, + {3 * time.Minute, 5 * time.Minute}, + {7 * time.Minute, 10 * time.Minute}, + {15 * time.Minute, 20 * time.Minute}, + {30 * time.Minute, 20 * time.Minute}, + } + for _, c := range cases { + // Repeat each case a few times to exercise the perturbation a bit. + for i := 0; i < 10; i++ { + delay := nextDelay(time.Now().Add(-c.since)) + + max := c.min + 1*time.Minute + if delay < c.min || delay > max { + t.Errorf("since:%v expected [%v, %v], got %v", + c.since, c.min, max, delay) + } + } + } +}