From 09d3c73f6cb8b6acd7bbbeddf980ce6fae6cf9f1 Mon Sep 17 00:00:00 2001 From: Alberto Bertogli Date: Wed, 12 Oct 2016 23:54:49 +0100 Subject: [PATCH] queue: Simplify sending loop This patch simplifies the sending loop code: - Move the recipient sending function from a closure to a method. - Simplify the status update logic: we now update and write unconditionally (as we should have been doing). - Create a function for counting recipients in a given status. It also adds a test for the removal of completed items from the queue, which was not covered before and came up during development. --- internal/queue/queue.go | 123 ++++++++++++++++------------------- internal/queue/queue_test.go | 11 ++++ 2 files changed, 66 insertions(+), 68 deletions(-) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index dd043d7..d1dc9d4 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -305,60 +305,17 @@ func (item *Item) SendLoop(q *Queue) { // Send to all recipients that are still pending. var wg sync.WaitGroup for _, rcpt := range item.Rcpt { - item.Lock() - status := rcpt.Status - item.Unlock() - - if status != Recipient_PENDING { + if rcpt.Status != Recipient_PENDING { continue } wg.Add(1) - go func(rcpt *Recipient, oldStatus Recipient_Status) { - defer wg.Done() - to := rcpt.Address - tr.Debugf("%s sending", to) - - err, permanent := item.deliver(q, rcpt) - - if err != nil { - if permanent { - tr.Errorf("%s permanent error: %v", to, err) - status = Recipient_FAILED - } else { - tr.Printf("%s temporary error: %v", to, err) - } - } else { - tr.Printf("%s sent", to) - status = Recipient_SENT - } - - // Update + write on status change. - if oldStatus != status { - item.Lock() - rcpt.Status = status - if err != nil { - rcpt.LastFailureMessage = err.Error() - } - item.Unlock() - - err = item.WriteTo(q.path) - if err != nil { - tr.Errorf("failed to write: %v", err) - } - } - }(rcpt, status) + go item.sendOneRcpt(&wg, tr, q, rcpt) } wg.Wait() // If they're all done, no need to wait. - pending := 0 - for _, rcpt := range item.Rcpt { - if rcpt.Status == Recipient_PENDING { - pending++ - } - } - if pending == 0 { + if item.countRcpt(Recipient_PENDING) == 0 { break } @@ -371,41 +328,41 @@ func (item *Item) SendLoop(q *Queue) { } // Completed to all recipients (some may not have succeeded). - - failed := 0 - for _, rcpt := range item.Rcpt { - if rcpt.Status == Recipient_FAILED { - failed++ - } - } - - if failed > 0 && item.From != "<>" { + if item.countRcpt(Recipient_FAILED) > 0 && item.From != "<>" { sendDSN(tr, q, item) } tr.Printf("all done") q.Remove(item.ID) - - return } -func sendDSN(tr *trace.Trace, q *Queue, item *Item) { - tr.Debugf("sending DSN") +// sendOneRcpt, and update it with the results. +func (item *Item) sendOneRcpt(wg *sync.WaitGroup, tr *trace.Trace, q *Queue, rcpt *Recipient) { + defer wg.Done() + to := rcpt.Address + tr.Debugf("%s sending", to) - msg, err := deliveryStatusNotification(item) + err, permanent := item.deliver(q, rcpt) + + item.Lock() if err != nil { - tr.Errorf("failed to build DSN: %v", err) - return + rcpt.LastFailureMessage = err.Error() + if permanent { + tr.Errorf("%s permanent error: %v", to, err) + rcpt.Status = Recipient_FAILED + } else { + tr.Printf("%s temporary error: %v", to, err) + } + } else { + tr.Printf("%s sent", to) + rcpt.Status = Recipient_SENT } + item.Unlock() - id, err := q.Put(item.Hostname, "<>", []string{item.From}, msg) + err = item.WriteTo(q.path) if err != nil { - tr.Errorf("failed to queue DSN: %v", err) - return + tr.Errorf("failed to write: %v", err) } - - tr.Printf("queued DSN: %s", id) - dsnQueued.Add(1) } // deliver the item to the given recipient, using the couriers from the queue. @@ -452,6 +409,36 @@ func (item *Item) deliver(q *Queue, rcpt *Recipient) (err error, permanent bool) } } +// countRcpt counts how many recipients are in the given status. +func (item *Item) countRcpt(status Recipient_Status) int { + c := 0 + for _, rcpt := range item.Rcpt { + if rcpt.Status == status { + c++ + } + } + return c +} + +func sendDSN(tr *trace.Trace, q *Queue, item *Item) { + tr.Debugf("sending DSN") + + msg, err := deliveryStatusNotification(item) + if err != nil { + tr.Errorf("failed to build DSN: %v", err) + return + } + + id, err := q.Put(item.Hostname, "<>", []string{item.From}, msg) + if err != nil { + tr.Errorf("failed to queue DSN: %v", err) + return + } + + tr.Printf("queued DSN: %s", id) + dsnQueued.Add(1) +} + func nextDelay(last time.Duration) time.Duration { switch { case last < 1*time.Minute: diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index b77ebe1..f729f1c 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -80,6 +80,17 @@ func TestBasic(t *testing.T) { localC.wg.Wait() remoteC.wg.Wait() + // Make sure the delivered items leave the queue. + for d := time.Now().Add(2 * time.Second); time.Now().Before(d); { + if q.Len() == 0 { + break + } + time.Sleep(20 * time.Millisecond) + } + if q.Len() != 0 { + t.Fatalf("%d items not removed from the queue after delivery", q.Len()) + } + cases := []struct { courier *TestCourier expectedTo string