Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions pkg/modules/input.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
package modules

import (
"errors"
)

// errors
var (
ErrUnknownInput = errors.New("unknown input")
ErrUnknownOutput = errors.New("unknown output")
)

// Input -
type Input struct {
data chan any
Expand Down
9 changes: 8 additions & 1 deletion pkg/modules/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package modules

import (
"context"
"github.com/pkg/errors"
"io"
)

// Module is the interface which modules has to implement.
// errors
var (
ErrUnknownInput = errors.New("unknown input")
ErrUnknownOutput = errors.New("unknown output")
)

// Module is the interface which modules have to implement.
type Module interface {
io.Closer

Expand Down
4 changes: 2 additions & 2 deletions pkg/modules/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func (output *Output) Name() string {

// Connect -
func Connect(outputModule, inputModule Module, outputName, inputName string) error {
everySecond, err := inputModule.Input(inputName)
input, err := inputModule.Input(inputName)
if err != nil {
log.Panic(err)
}
return outputModule.AttachTo(outputName, everySecond)
return outputModule.AttachTo(outputName, input)
}
53 changes: 53 additions & 0 deletions pkg/sync/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package sync

import "sync"

type Map[K comparable, V any] struct {
m map[K]V
mx *sync.RWMutex
}

func NewMap[K comparable, V any]() Map[K, V] {
return Map[K, V]{
m: make(map[K]V),
mx: new(sync.RWMutex),
}
}

func (m Map[K, V]) Get(key K) (V, bool) {
m.mx.RLock()
val, ok := m.m[key]
m.mx.RUnlock()
return val, ok
}

func (m Map[K, V]) Delete(key K) {
m.mx.Lock()
delete(m.m, key)
m.mx.Unlock()
}

func (m Map[K, V]) Set(key K, value V) {
m.mx.Lock()
m.m[key] = value
m.mx.Unlock()
}

func (m Map[K, V]) Range(handler func(key K, value V) (error, bool)) error {
if handler == nil {
return nil
}
m.mx.RLock()
defer m.mx.RUnlock()

for k, v := range m.m {
err, br := handler(k, v)
if err != nil {
return err
}
if br {
return nil
}
}
return nil
}
70 changes: 70 additions & 0 deletions pkg/sync/map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package sync

import "testing"

func TestMap_Get(t *testing.T) {
m := NewMap[int, string]()
m.Set(10, "hello sdk sync map based on RWMutex")

value, ok := m.Get(10)
if !ok {
t.Fatal("existing key was not found")
}

if value != "hello sdk sync map based on RWMutex" {
t.Fatal("found value is incorrect")
}
}

func TestMap_Delete(t *testing.T) {
m := NewMap[int, string]()
m.Set(10, "hello sdk sync map based on RWMutex")
m.Set(11, "this value and key will be deleted")

m.Delete(11)

_, ok := m.Get(11)
if ok {
t.Fatal("non-existing key was found")
}
}

func TestMap_Range(t *testing.T) {
m := NewMap[int, string]()
m.Set(10, "hello sdk sync map based on RWMutex")
m.Set(11, "second value")

checkData := map[int]*struct {
checked bool
value string
}{
10: {value: "hello sdk sync map based on RWMutex"},
11: {value: "second value"},
}

handler := func(k int, v string) (error, bool) {
toCheck, ok := checkData[k]
if !ok {
t.Fatal("found non-existing key")
return nil, true
}

if v != toCheck.value {
t.Fatalf("found value is incorrect for key=%d with value=%s, looking for value=%s", k, v, toCheck.value)
return nil, true
}

toCheck.checked = true
return nil, false
}

if err := m.Range(handler); err != nil {
t.Fatalf("error occured in range %+v", err)
}

for k, v := range checkData {
if !v.checked {
t.Fatalf("key %d was not applied in range", k)
}
}
}