diff --git a/.gitignore b/.gitignore index a878ec0..6a35ff2 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,7 @@ *.env cmd/example -cmd/dipdup-gen/*.json \ No newline at end of file +cmd/dipdup-gen/*.json + +# IDE and other 3th party tools +.idea diff --git a/pkg/modules/input.go b/pkg/modules/input.go index fd443f7..7b32625 100644 --- a/pkg/modules/input.go +++ b/pkg/modules/input.go @@ -1,15 +1,5 @@ package modules -import ( - "errors" -) - -// errors -var ( - ErrUnknownInput = errors.New("unknown input") - ErrUnknownOutput = errors.New("unknown output") -) - // Input - type Input struct { data chan any diff --git a/pkg/modules/module.go b/pkg/modules/module.go index c5ce722..319b44f 100644 --- a/pkg/modules/module.go +++ b/pkg/modules/module.go @@ -2,10 +2,17 @@ package modules import ( "context" + "github.com/pkg/errors" "io" ) -// Module is the interface which modules has to implement. +// errors +var ( + ErrUnknownInput = errors.New("unknown input") + ErrUnknownOutput = errors.New("unknown output") +) + +// Module is the interface which modules have to implement. type Module interface { io.Closer diff --git a/pkg/modules/output.go b/pkg/modules/output.go index abe450a..8ae04b8 100644 --- a/pkg/modules/output.go +++ b/pkg/modules/output.go @@ -53,9 +53,9 @@ func (output *Output) Name() string { // Connect - func Connect(outputModule, inputModule Module, outputName, inputName string) error { - everySecond, err := inputModule.Input(inputName) + input, err := inputModule.Input(inputName) if err != nil { log.Panic(err) } - return outputModule.AttachTo(outputName, everySecond) + return outputModule.AttachTo(outputName, input) } diff --git a/pkg/sync/map.go b/pkg/sync/map.go new file mode 100644 index 0000000..71275fe --- /dev/null +++ b/pkg/sync/map.go @@ -0,0 +1,69 @@ +package sync + +import "sync" + +type Map[K comparable, V any] struct { + m map[K]V + mx *sync.RWMutex +} + +func NewMap[K comparable, V any]() Map[K, V] { + return Map[K, V]{ + m: make(map[K]V), + mx: new(sync.RWMutex), + } +} + +func (m Map[K, V]) Get(key K) (V, bool) { + m.mx.RLock() + val, ok := m.m[key] + m.mx.RUnlock() + return val, ok +} + +func (m Map[K, V]) Delete(key K) { + m.mx.Lock() + delete(m.m, key) + m.mx.Unlock() +} + +func (m Map[K, V]) Set(key K, value V) { + m.mx.Lock() + m.m[key] = value + m.mx.Unlock() +} + +// Range (WARN) does not support nested ranges with Delete in them. +func (m Map[K, V]) Range(handler func(key K, value V) (error, bool)) error { + if handler == nil { + return nil + } + m.mx.RLock() + defer m.mx.RUnlock() + + for k, v := range m.m { + err, br := handler(k, v) + if err != nil { + return err + } + if br { + return nil + } + } + return nil +} + +func (m Map[K, V]) Clear() { + m.mx.Lock() + // clear(m.m) TODO: rewrite on go 1.21 + for k := range m.m { + delete(m.m, k) + } + m.mx.Unlock() +} + +func (m Map[K, V]) Len() int { + m.mx.RLock() + defer m.mx.RUnlock() + return len(m.m) +} diff --git a/pkg/sync/map_test.go b/pkg/sync/map_test.go new file mode 100644 index 0000000..165ebf7 --- /dev/null +++ b/pkg/sync/map_test.go @@ -0,0 +1,172 @@ +package sync + +import ( + "math/rand" + "runtime" + "sync" + "testing" +) + +func TestMap_Get(t *testing.T) { + m := NewMap[int, string]() + m.Set(10, "hello sdk sync map based on RWMutex") + + value, ok := m.Get(10) + if !ok { + t.Fatal("existing key was not found") + } + + if value != "hello sdk sync map based on RWMutex" { + t.Fatal("found value is incorrect") + } +} + +func TestMap_Delete(t *testing.T) { + m := NewMap[int, string]() + m.Set(10, "hello sdk sync map based on RWMutex") + m.Set(11, "this value and key will be deleted") + + m.Delete(11) + + _, ok := m.Get(11) + if ok { + t.Fatal("non-existing key was found") + } +} + +func TestMap_Range(t *testing.T) { + m := NewMap[int, string]() + m.Set(10, "hello sdk sync map based on RWMutex") + m.Set(11, "second value") + + checkData := map[int]*struct { + checked bool + value string + }{ + 10: {value: "hello sdk sync map based on RWMutex"}, + 11: {value: "second value"}, + } + + handler := func(k int, v string) (error, bool) { + toCheck, ok := checkData[k] + if !ok { + t.Fatal("found non-existing key") + return nil, true + } + + if v != toCheck.value { + t.Fatalf("found value is incorrect for key=%d with value=%s, looking for value=%s", k, v, toCheck.value) + return nil, true + } + + toCheck.checked = true + return nil, false + } + + if err := m.Range(handler); err != nil { + t.Fatalf("error occured in Range %+v", err) + } + + for k, v := range checkData { + if !v.checked { + t.Fatalf("key %d was not applied in Range", k) + } + } +} + +func TestMap_ConcurrentRange(t *testing.T) { + const mapSize = 1 << 10 + + m := NewMap[int64, int64]() + for n := int64(1); n <= mapSize; n++ { + m.Set(n, n) + } + + done := make(chan struct{}) + var wg sync.WaitGroup + defer func() { + close(done) + wg.Wait() + }() + + for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- { + r := rand.New(rand.NewSource(g)) + wg.Add(1) + go func(g int64) { + defer wg.Done() + for i := int64(0); ; i++ { + select { + case <-done: + return + default: + } + for n := int64(1); n < mapSize; n++ { + if r.Int63n(mapSize) == 0 { + m.Set(n, n*i*g) + } else { + m.Get(n) + } + } + } + }(g) + } + + for n := 16; n > 0; n-- { + seen := make(map[int64]bool, mapSize) + + err := m.Range(func(k, v int64) (error, bool) { + if v%k != 0 { + t.Fatalf("while Setting multiples of %v, Range saw value %v", k, v) + } + if seen[k] { + t.Fatalf("Range visited key %v twice", k) + } + seen[k] = true + return nil, false + }) + + if len(seen) != mapSize { + t.Fatalf("Range visited %v elements of %v-element Map", len(seen), mapSize) + } + + if err != nil { + t.Fatalf("error occured in Range %+v", err) + } + } +} + +func TestMap_Clear(t *testing.T) { + m := NewMap[int, string]() + for i, v := range [3]string{"clear", "sync", "map"} { + m.Set(i, v) + } + + m.Clear() + + length := 0 + err := m.Range(func(key int, value string) (error, bool) { + length++ + return nil, false + }) + + if err != nil { + t.Fatalf("error occured in checking length of Range %+v", err) + } + + if length != 0 { + t.Fatalf("unexpected map size, got %v want %v", length, 0) + } +} + +func TestMap_Len(t *testing.T) { + m := NewMap[int, string]() + for i, v := range [3]string{"len", "sync", "map"} { + m.Set(i, v) + } + + length := m.Len() + + if length != 3 { + t.Fatalf("unexpected map size, got %v want %v", length, 3) + } +}