Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c91c01e
Method of modifying ctx to add value
wyyolo Feb 17, 2024
70a8d4f
Method of modifying ctx to add value
wyyolo Feb 17, 2024
70e45b8
Method of modifying ctx to add value
wyyolo Feb 17, 2024
a4a36ec
Remove type conversions and modify function names
wyyolo Feb 18, 2024
7a4273e
Merge branch 'FunctionStream:main' into main
wyyolo Feb 18, 2024
3cc2a76
Merge branch 'FunctionStream:main' into mymain
wyyolo Feb 26, 2024
e6572f7
Merge branch 'FunctionStream:main' into mymain
wyyolo Feb 27, 2024
1e35053
Merge branch 'FunctionStream:main' into mymain
wyyolo Mar 3, 2024
5b4b055
fix NewSinkTube in memory
wyyolo Mar 3, 2024
1a8cf24
fix NewSinkTube in memory
wyyolo Mar 3, 2024
766ffaf
fix NewSinkTube in memory
wyyolo Mar 3, 2024
1301137
fix NewSinkTube in memory
wyyolo Mar 3, 2024
16878cf
fix NewSinkTube in memory, add unit test for NewSourceTube and NewSin…
wyyolo Mar 4, 2024
80593e6
fix NewSinkTube in memory, add unit test for NewSourceTube and NewSin…
wyyolo Mar 4, 2024
c709c8b
fix NewSinkTube in memory, add unit test for NewSourceTube and NewSin…
wyyolo Mar 4, 2024
9960d20
fix NewSinkTube in memory, add unit test for NewSourceTube and NewSin…
wyyolo Mar 4, 2024
c28d228
fix NewSinkTube in memory, add unit test for NewSourceTube and NewSin…
wyyolo Mar 4, 2024
0a7988e
fix NewSinkTube in memory, add unit test for NewSourceTube and NewSin…
wyyolo Mar 4, 2024
bee573c
fix NewSinkTube in memory, add unit test for NewSourceTube and NewSin…
wyyolo Mar 6, 2024
c61f8e5
fix NewSinkTube in memory, add unit test for NewSourceTube and NewSin…
wyyolo Mar 6, 2024
ccb6f81
fix NewSinkTube in memory, add unit test for NewSourceTube and NewSin…
wyyolo Mar 6, 2024
3a3c08f
fix NewSinkTube in memory, add unit test
wyyolo Mar 9, 2024
5a15018
fix NewSinkTube in memory, add unit test
wyyolo Mar 10, 2024
2f5ef13
fix NewSinkTube in memory, add unit test
wyyolo Mar 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion fs/contube/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,19 @@ func (f *MemoryQueueFactory) release(name string) {
func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) {
config := NewSourceQueueConfig(configMap)
result := make(chan Record)

var wg sync.WaitGroup
for _, topic := range config.Topics {
t := topic
wg.Add(1)
go func() {
<-ctx.Done()
f.release(t)
}()

go func() {
defer wg.Done()
c := f.getOrCreateChan(t)
defer close(result)
for {
select {
case <-ctx.Done():
Expand All @@ -99,6 +103,12 @@ func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap Config
}
}()
}

go func() {
wg.Wait()
close(result)
}()

return result, nil
}

Expand Down
85 changes: 85 additions & 0 deletions fs/contube/memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2024 Function Stream Org.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package contube

import (
"context"
"math/rand"
"strconv"
"sync"
"testing"
"time"
)

func TestMemoryTube(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
tubeFactory := NewMemoryQueueFactory(ctx)
memoryQueueFactory := tubeFactory.(*MemoryQueueFactory)

var wg sync.WaitGroup
var events []Record

topics := []string{"topic1", "topic2", "topic3"}
source, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: topics, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap())
if err != nil {
t.Fatal(err)
}

for i, v := range topics {
wg.Add(1)
sink, err := memoryQueueFactory.NewSinkTube(ctx, (&SinkQueueConfig{Topic: v}).ToConfigMap())
if err != nil {
t.Fatal(err)
}
go func(i int) {
defer wg.Done()
defer close(sink)
sink <- NewRecordImpl([]byte{byte(i + 1)}, func() {})
}(i)
}

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case event := <-source:
events = append(events, event)
if len(events) == len(topics) {
return
}
default:
continue
}
}
}()

wg.Wait()
cancel()

// Give enough time to ensure that the goroutine execution within NewSource Tube and NewSinkTube is complete and the released queue is successful.
time.Sleep(100 * time.Millisecond)

// assert the memoryQueueFactory.queues is empty.
memoryQueueFactory.mu.Lock()
if len(memoryQueueFactory.queues) != 0 {
t.Fatal("MemoryQueueFactory.queues is not empty")
}
memoryQueueFactory.mu.Unlock()

}