Remove data races
This commit is contained in:
50
ring.go
50
ring.go
@@ -3,6 +3,8 @@ Package ring provides a simple implementation of a ring buffer.
|
|||||||
*/
|
*/
|
||||||
package ring
|
package ring
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
The DefaultCapacity of an uninitialized Ring buffer.
|
The DefaultCapacity of an uninitialized Ring buffer.
|
||||||
|
|
||||||
@@ -15,6 +17,7 @@ Type Ring implements a Circular Buffer.
|
|||||||
The default value of the Ring struct is a valid (empty) Ring buffer with capacity DefaultCapacify.
|
The default value of the Ring struct is a valid (empty) Ring buffer with capacity DefaultCapacify.
|
||||||
*/
|
*/
|
||||||
type Ring struct {
|
type Ring struct {
|
||||||
|
sync.Mutex
|
||||||
head int // the most recent value written
|
head int // the most recent value written
|
||||||
tail int // the least recent value written
|
tail int // the least recent value written
|
||||||
buff []interface{}
|
buff []interface{}
|
||||||
@@ -24,6 +27,9 @@ type Ring struct {
|
|||||||
Set the maximum size of the ring buffer.
|
Set the maximum size of the ring buffer.
|
||||||
*/
|
*/
|
||||||
func (r *Ring) SetCapacity(size int) {
|
func (r *Ring) SetCapacity(size int) {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
r.checkInit()
|
r.checkInit()
|
||||||
r.extend(size)
|
r.extend(size)
|
||||||
}
|
}
|
||||||
@@ -31,14 +37,20 @@ func (r *Ring) SetCapacity(size int) {
|
|||||||
/*
|
/*
|
||||||
Capacity returns the current capacity of the ring buffer.
|
Capacity returns the current capacity of the ring buffer.
|
||||||
*/
|
*/
|
||||||
func (r Ring) Capacity() int {
|
func (r *Ring) Capacity() int {
|
||||||
return len(r.buff)
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
|
return r.capacity()
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
ContentSize returns the current number of elements inside the ring buffer.
|
ContentSize returns the current number of elements inside the ring buffer.
|
||||||
*/
|
*/
|
||||||
func (r *Ring) ContentSize() int {
|
func (r *Ring) ContentSize() int {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
if r.head == -1 {
|
if r.head == -1 {
|
||||||
return 0
|
return 0
|
||||||
} else {
|
} else {
|
||||||
@@ -54,6 +66,9 @@ func (r *Ring) ContentSize() int {
|
|||||||
Enqueue a value into the Ring buffer.
|
Enqueue a value into the Ring buffer.
|
||||||
*/
|
*/
|
||||||
func (r *Ring) Enqueue(i interface{}) {
|
func (r *Ring) Enqueue(i interface{}) {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
r.checkInit()
|
r.checkInit()
|
||||||
r.set(r.head+1, i)
|
r.set(r.head+1, i)
|
||||||
old := r.head
|
old := r.head
|
||||||
@@ -69,6 +84,9 @@ Dequeue a value from the Ring buffer.
|
|||||||
Returns nil if the ring buffer is empty.
|
Returns nil if the ring buffer is empty.
|
||||||
*/
|
*/
|
||||||
func (r *Ring) Dequeue() interface{} {
|
func (r *Ring) Dequeue() interface{} {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
r.checkInit()
|
r.checkInit()
|
||||||
if r.head == -1 {
|
if r.head == -1 {
|
||||||
return nil
|
return nil
|
||||||
@@ -89,6 +107,9 @@ Read the value that Dequeue would have dequeued without actually dequeuing it.
|
|||||||
Returns nil if the ring buffer is empty.
|
Returns nil if the ring buffer is empty.
|
||||||
*/
|
*/
|
||||||
func (r *Ring) Peek() interface{} {
|
func (r *Ring) Peek() interface{} {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
r.checkInit()
|
r.checkInit()
|
||||||
if r.head == -1 {
|
if r.head == -1 {
|
||||||
return nil
|
return nil
|
||||||
@@ -102,11 +123,14 @@ The returned slice can be modified independently of the circular buffer. However
|
|||||||
are shared between the slice and circular buffer.
|
are shared between the slice and circular buffer.
|
||||||
*/
|
*/
|
||||||
func (r *Ring) Values() []interface{} {
|
func (r *Ring) Values() []interface{} {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
|
||||||
if r.head == -1 {
|
if r.head == -1 {
|
||||||
return []interface{}{}
|
return []interface{}{}
|
||||||
}
|
}
|
||||||
arr := make([]interface{}, 0, r.Capacity())
|
arr := make([]interface{}, 0, r.capacity())
|
||||||
for i := 0; i < r.Capacity(); i++ {
|
for i := 0; i < r.capacity(); i++ {
|
||||||
idx := r.mod(i + r.tail)
|
idx := r.mod(i + r.tail)
|
||||||
arr = append(arr, r.get(idx))
|
arr = append(arr, r.get(idx))
|
||||||
if idx == r.head {
|
if idx == r.head {
|
||||||
@@ -120,6 +144,10 @@ func (r *Ring) Values() []interface{} {
|
|||||||
*** Unexported methods beyond this point.
|
*** Unexported methods beyond this point.
|
||||||
**/
|
**/
|
||||||
|
|
||||||
|
func (r *Ring) capacity() int {
|
||||||
|
return len(r.buff)
|
||||||
|
}
|
||||||
|
|
||||||
// sets a value at the given unmodified index and returns the modified index of the value
|
// sets a value at the given unmodified index and returns the modified index of the value
|
||||||
func (r *Ring) set(p int, v interface{}) {
|
func (r *Ring) set(p int, v interface{}) {
|
||||||
r.buff[r.mod(p)] = v
|
r.buff[r.mod(p)] = v
|
||||||
@@ -136,13 +164,15 @@ func (r *Ring) mod(p int) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Ring) checkInit() {
|
func (r *Ring) checkInit() {
|
||||||
if r.buff == nil {
|
if r.buff != nil {
|
||||||
r.buff = make([]interface{}, DefaultCapacity)
|
return
|
||||||
for i := range r.buff {
|
|
||||||
r.buff[i] = nil
|
|
||||||
}
|
|
||||||
r.head, r.tail = -1, 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.buff = make([]interface{}, DefaultCapacity)
|
||||||
|
for i := range r.buff {
|
||||||
|
r.buff[i] = nil
|
||||||
|
}
|
||||||
|
r.head, r.tail = -1, 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Ring) extend(size int) {
|
func (r *Ring) extend(size int) {
|
||||||
|
|||||||
47
ring_test.go
47
ring_test.go
@@ -1,7 +1,7 @@
|
|||||||
package ring
|
package ring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
// "fmt"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -114,3 +114,48 @@ func TestConstructsArr(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcurrency(t *testing.T) {
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
|
r := Ring{}
|
||||||
|
r.SetCapacity(128)
|
||||||
|
|
||||||
|
for i := 0; i < 2048; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
for x := 0; x < 100; x++ {
|
||||||
|
r.Enqueue(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
if i%10 == 0 {
|
||||||
|
r.ContentSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i+1)%10 == 0 {
|
||||||
|
r.Capacity()
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i+2)%10 == 0 {
|
||||||
|
r.Peek()
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i+3)%10 == 0 {
|
||||||
|
r.Values()
|
||||||
|
}
|
||||||
|
|
||||||
|
if i%10 == 0 {
|
||||||
|
r.SetCapacity(r.Capacity() + 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
for x := 0; x < 125; x++ {
|
||||||
|
r.Dequeue()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Done()
|
||||||
|
}(i)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user