1
0
mirror of https://github.com/jhillyerd/inbucket.git synced 2025-12-17 17:47:03 +00:00
Files
go-inbucket/pkg/msghub/hub.go
2023-02-16 16:17:06 -08:00

162 lines
3.9 KiB
Go

package msghub
import (
"container/ring"
"context"
"github.com/inbucket/inbucket/pkg/extension"
"github.com/inbucket/inbucket/pkg/extension/event"
"github.com/rs/zerolog/log"
)
// Length of msghub operation queue
const opChanLen = 100
// Listener receives the contents of the history buffer, followed by new messages
type Listener interface {
Receive(msg event.MessageMetadata) error
Delete(mailbox string, id string) error
}
// Hub relays messages on to its listeners
type Hub struct {
// history buffer, points next Message to write. Proceeding non-nil entry is oldest Message
history *ring.Ring
listeners map[Listener]struct{} // listeners interested in new messages
opChan chan func(h *Hub) // operations queued for this actor
}
// New constructs a new Hub which will cache historyLen messages in memory for playback to future
// listeners. A goroutine is created to handle incoming messages; it will run until the provided
// context is canceled.
func New(historyLen int, extHost *extension.Host) *Hub {
hub := &Hub{
history: ring.New(historyLen),
listeners: make(map[Listener]struct{}),
opChan: make(chan func(h *Hub), opChanLen),
}
// Register an extension event listener for MessageStored.
extHost.Events.AfterMessageStored.AddListener("msghub",
func(msg event.MessageMetadata) {
hub.Dispatch(msg)
})
extHost.Events.AfterMessageDeleted.AddListener("msghub",
func(msg event.MessageMetadata) {
hub.Delete(msg.Mailbox, msg.ID)
})
return hub
}
// Start Hub processing loop.
func (hub *Hub) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
// Shutdown
close(hub.opChan)
return
case op := <-hub.opChan:
hub.runOp(op)
}
}
}
// Dispatch queues a message for broadcast by the hub. The message will be placed into the
// history buffer and then relayed to all registered listeners.
func (hub *Hub) Dispatch(msg event.MessageMetadata) {
hub.opChan <- func(h *Hub) {
if h.history != nil {
// Add to history buffer
h.history.Value = msg
h.history = h.history.Next()
// Relay event to all listeners, removing listeners if they return an error.
for l := range h.listeners {
if err := l.Receive(msg); err != nil {
delete(h.listeners, l)
}
}
}
}
}
// Delete removes the message from the history buffer and instructs listeners to do the same.
func (hub *Hub) Delete(mailbox string, id string) {
hub.opChan <- func(h *Hub) {
if h.history == nil {
return
}
// Locate and remove history entry.
p := h.history
end := p
for {
if next, ok := p.Next().Value.(event.MessageMetadata); ok {
if mailbox == next.Mailbox && id == next.ID {
p.Unlink(1) // Remove next node.
break
}
}
if p = p.Next(); p == end {
break
}
}
// Relay event to all listeners, removing listeners if they return an error.
for l := range h.listeners {
if err := l.Delete(mailbox, id); err != nil {
delete(h.listeners, l)
}
}
}
}
// AddListener registers a listener to receive broadcasted messages.
func (hub *Hub) AddListener(l Listener) {
hub.opChan <- func(h *Hub) {
// Playback log
h.history.Do(func(v interface{}) {
if v != nil {
l.Receive(v.(event.MessageMetadata))
}
})
// Add to listeners
h.listeners[l] = struct{}{}
}
}
// RemoveListener deletes a listener registration, it will cease to receive messages.
func (hub *Hub) RemoveListener(l Listener) {
hub.opChan <- func(h *Hub) {
delete(h.listeners, l)
}
}
// Sync blocks until the msghub has processed its queue up to this point, useful
// for unit tests.
func (hub *Hub) Sync() {
done := make(chan struct{})
hub.opChan <- func(h *Hub) {
close(done)
}
<-done
}
func (hub *Hub) runOp(op func(*Hub)) {
defer func() {
if r := recover(); r != nil {
if err, ok := r.(error); ok {
log.Error().Str("module", "msghub").Err(err).Msg("Operation panicked")
} else {
log.Error().Str("module", "msghub").Err(err).Msgf("Operation panicked: %s", r)
}
}
}()
op(hub)
}