mirror of
https://blitiri.com.ar/repos/chasquid
synced 2025-12-17 14:37:02 +00:00
queue: Add a mutex to protect item's results
The item results get accessed in various places concurrently, so this patch adds a mutex to protect it.
This commit is contained in:
@@ -134,6 +134,14 @@ type Item struct {
|
||||
|
||||
// Map of recipient -> last result of sending it.
|
||||
Results map[string]error
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (item *Item) resultsFor(to string) (error, bool) {
|
||||
item.mu.Lock()
|
||||
defer item.mu.Unlock()
|
||||
value, ok := item.Results[to]
|
||||
return value, ok
|
||||
}
|
||||
|
||||
func (item *Item) SendLoop(q *Queue) {
|
||||
@@ -149,7 +157,7 @@ func (item *Item) SendLoop(q *Queue) {
|
||||
// Send to all recipients that are still pending.
|
||||
var wg sync.WaitGroup
|
||||
for _, to := range item.To {
|
||||
if err, ok := item.Results[to]; ok && err == nil {
|
||||
if err, ok := item.resultsFor(to); ok && err == nil {
|
||||
// Successful send for this recipient, nothing to do.
|
||||
continue
|
||||
}
|
||||
@@ -160,7 +168,9 @@ func (item *Item) SendLoop(q *Queue) {
|
||||
tr.LazyPrintf("%s sending", to)
|
||||
|
||||
err = q.courier.Deliver(item.From, to, item.Data)
|
||||
item.mu.Lock()
|
||||
item.Results[to] = err
|
||||
item.mu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
tr.LazyPrintf("error: %v", err)
|
||||
@@ -175,7 +185,7 @@ func (item *Item) SendLoop(q *Queue) {
|
||||
|
||||
successful := 0
|
||||
for _, to := range item.To {
|
||||
if err, ok := item.Results[to]; ok && err == nil {
|
||||
if err, ok := item.resultsFor(to); ok && err == nil {
|
||||
successful++
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user