1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 17:47:03 +00:00
Files
go-inbucket/pkg/extension/async_broker.go
2023-02-16 16:17:06 -08:00

90 lines
2.2 KiB
Go

package extension
import (
"errors"
"sync"
"time"
)
// AsyncEventBroker maintains a list of listeners interested in a specific type
// of event. Events are sent in parallel to all listeners, and no result is
// returned.
type AsyncEventBroker[E any] struct {
sync.RWMutex
listenerNames []string // Ordered listener names.
listenerFuncs []func(E) // Ordered listener functions.
}
// Emit sends the provided event to each registered listener in parallel.
func (eb *AsyncEventBroker[E]) Emit(event *E) {
eb.RLock()
defer eb.RUnlock()
for _, l := range eb.listenerFuncs {
// Events are copied to minimize the risk of mutation.
go l(*event)
}
}
// AddListener registers the named listener, replacing one with a duplicate
// name if present. Listeners should be added in order of priority, most
// significant first.
func (eb *AsyncEventBroker[E]) AddListener(name string, listener func(E)) {
eb.Lock()
defer eb.Unlock()
eb.lockedRemoveListener(name)
eb.listenerNames = append(eb.listenerNames, name)
eb.listenerFuncs = append(eb.listenerFuncs, listener)
}
// RemoveListener unregisters the named listener.
func (eb *AsyncEventBroker[E]) RemoveListener(name string) {
eb.Lock()
defer eb.Unlock()
eb.lockedRemoveListener(name)
}
func (eb *AsyncEventBroker[E]) lockedRemoveListener(name string) {
for i, entry := range eb.listenerNames {
if entry == name {
eb.listenerNames = append(eb.listenerNames[:i], eb.listenerNames[i+1:]...)
eb.listenerFuncs = append(eb.listenerFuncs[:i], eb.listenerFuncs[i+1:]...)
break
}
}
}
// AsyncTestListener returns a func that will wait for an event and return it, or timeout
// with an error.
func (eb *AsyncEventBroker[E]) AsyncTestListener(name string, capacity int) func() (*E, error) {
// Send event down channel.
events := make(chan E, capacity)
eb.AddListener(name,
func(msg E) {
events <- msg
})
count := 0
return func() (*E, error) {
count++
defer func() {
if count >= capacity {
eb.RemoveListener(name)
close(events)
}
}()
select {
case event := <-events:
return &event, nil
case <-time.After(time.Second * 2):
return nil, errors.New("Timeout waiting for event")
}
}
}