From 831ef131325a6aa3ed9be9f9dec10e02c9bf43c3 Mon Sep 17 00:00:00 2001 From: Alberto Bertogli Date: Tue, 19 Jul 2016 23:02:42 +0100 Subject: [PATCH] queue: Add a mutex to protect item's results The item results get accessed in various places concurrently, so this patch adds a mutex to protect it. --- internal/queue/queue.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 14ed2b1..6dc29a0 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -134,6 +134,14 @@ type Item struct { // Map of recipient -> last result of sending it. Results map[string]error + mu sync.Mutex +} + +func (item *Item) resultsFor(to string) (error, bool) { + item.mu.Lock() + defer item.mu.Unlock() + value, ok := item.Results[to] + return value, ok } func (item *Item) SendLoop(q *Queue) { @@ -149,7 +157,7 @@ func (item *Item) SendLoop(q *Queue) { // Send to all recipients that are still pending. var wg sync.WaitGroup for _, to := range item.To { - if err, ok := item.Results[to]; ok && err == nil { + if err, ok := item.resultsFor(to); ok && err == nil { // Successful send for this recipient, nothing to do. continue } @@ -160,7 +168,9 @@ func (item *Item) SendLoop(q *Queue) { tr.LazyPrintf("%s sending", to) err = q.courier.Deliver(item.From, to, item.Data) + item.mu.Lock() item.Results[to] = err + item.mu.Unlock() if err != nil { tr.LazyPrintf("error: %v", err) @@ -175,7 +185,7 @@ func (item *Item) SendLoop(q *Queue) { successful := 0 for _, to := range item.To { - if err, ok := item.Results[to]; ok && err == nil { + if err, ok := item.resultsFor(to); ok && err == nil { successful++ } }