diff --git a/pkg/msghub/hub.go b/pkg/msghub/hub.go index ba87df2..07bbefe 100644 --- a/pkg/msghub/hub.go +++ b/pkg/msghub/hub.go @@ -96,7 +96,7 @@ func (hub *Hub) Delete(mailbox string, id string) { for { if next, ok := p.Next().Value.(event.MessageMetadata); ok { if mailbox == next.Mailbox && id == next.ID { - p.Unlink(1) // Remove next node. + p.Next().Value = nil break } } diff --git a/pkg/msghub/hub_test.go b/pkg/msghub/hub_test.go index d357cf3..8304a6d 100644 --- a/pkg/msghub/hub_test.go +++ b/pkg/msghub/hub_test.go @@ -9,6 +9,8 @@ import ( "github.com/inbucket/inbucket/pkg/extension" "github.com/inbucket/inbucket/pkg/extension/event" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // testListener implements the Listener interface, mock for unit tests @@ -302,6 +304,59 @@ func TestHubHistoryReplayWrap(t *testing.T) { } } +func TestHubHistoryReplayWrapAfterDelete(t *testing.T) { + bufferSize := 5 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hub := New(bufferSize, extension.NewHost()) + go hub.Start(ctx) + + waitForMessages := func(n int) { + l := newTestListener(n) + hub.AddListener(l) + + select { + case <-l.done: + case <-time.After(time.Second): + t.Fatal("Timeout:", l) + } + } + + // Broadcast more messages than the hub can hold. + msgs := make([]event.MessageMetadata, 10) + for i := 0; i < len(msgs); i++ { + msgs[i] = event.MessageMetadata{ + Mailbox: "first", + ID: strconv.Itoa(i), + Subject: fmt.Sprintf("subj %v", i), + } + hub.Dispatch(msgs[i]) + } + waitForMessages(bufferSize) + + // Buffer must be configured size. + require.Equal(t, bufferSize, hub.history.Len()) + + // Delete a message still present in buffer. + hub.Delete("first", "7") + + // Broadcast another set of messages. + for i := 0; i < len(msgs); i++ { + msgs[i] = event.MessageMetadata{ + Mailbox: "second", + ID: strconv.Itoa(i), + Subject: fmt.Sprintf("subj %v", i), + } + hub.Dispatch(msgs[i]) + } + waitForMessages(bufferSize) + + // Ensure the buffer did not shrink after delete. + got := hub.history.Len() + assert.Equal(t, bufferSize, got, "got buffer size %d, wanted %d", got, bufferSize) +} + func TestHubContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) hub := New(5, extension.NewHost())