mirror of
https://github.com/jhillyerd/inbucket.git
synced 2025-12-17 09:37:02 +00:00
msghub: Clear deleted messages instead of unlinking (#348)
Signed-off-by: James Hillyerd <james@hillyerd.com>
This commit is contained in:
@@ -9,6 +9,8 @@ import (
|
||||
|
||||
"github.com/inbucket/inbucket/pkg/extension"
|
||||
"github.com/inbucket/inbucket/pkg/extension/event"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// testListener implements the Listener interface, mock for unit tests
|
||||
@@ -302,6 +304,59 @@ func TestHubHistoryReplayWrap(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubHistoryReplayWrapAfterDelete(t *testing.T) {
|
||||
bufferSize := 5
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hub := New(bufferSize, extension.NewHost())
|
||||
go hub.Start(ctx)
|
||||
|
||||
waitForMessages := func(n int) {
|
||||
l := newTestListener(n)
|
||||
hub.AddListener(l)
|
||||
|
||||
select {
|
||||
case <-l.done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Timeout:", l)
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast more messages than the hub can hold.
|
||||
msgs := make([]event.MessageMetadata, 10)
|
||||
for i := 0; i < len(msgs); i++ {
|
||||
msgs[i] = event.MessageMetadata{
|
||||
Mailbox: "first",
|
||||
ID: strconv.Itoa(i),
|
||||
Subject: fmt.Sprintf("subj %v", i),
|
||||
}
|
||||
hub.Dispatch(msgs[i])
|
||||
}
|
||||
waitForMessages(bufferSize)
|
||||
|
||||
// Buffer must be configured size.
|
||||
require.Equal(t, bufferSize, hub.history.Len())
|
||||
|
||||
// Delete a message still present in buffer.
|
||||
hub.Delete("first", "7")
|
||||
|
||||
// Broadcast another set of messages.
|
||||
for i := 0; i < len(msgs); i++ {
|
||||
msgs[i] = event.MessageMetadata{
|
||||
Mailbox: "second",
|
||||
ID: strconv.Itoa(i),
|
||||
Subject: fmt.Sprintf("subj %v", i),
|
||||
}
|
||||
hub.Dispatch(msgs[i])
|
||||
}
|
||||
waitForMessages(bufferSize)
|
||||
|
||||
// Ensure the buffer did not shrink after delete.
|
||||
got := hub.history.Len()
|
||||
assert.Equal(t, bufferSize, got, "got buffer size %d, wanted %d", got, bufferSize)
|
||||
}
|
||||
|
||||
func TestHubContextCancel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
hub := New(5, extension.NewHost())
|
||||
|
||||
Reference in New Issue
Block a user