diff --git a/README.md b/README.md index 1d6b79f8..c7e62c0e 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ integration with Apache Pulsar, allowing users to take full advantage of its rob executing code across different hardware architectures seamlessly. It provides compatibility and portability, allowing developers to run their code on various platforms without concerns about underlying hardware dependencies. -### Architecture and Components +## Architecture and Components Function Stream is composed of three main components: the WebAssembly runtime engine, the Pulsar client, and the Function Stream service. The following figure shows the overview of the Function Stream architecture. @@ -64,6 +64,29 @@ and the processing guarantees of the messages. **The Function Stream service** is responsible for managing the lifecycle and coordination of the WebAssembly instances. +## Directory Structure + +The Function Stream project is organized as follows: +. +├── LICENSE # The license for Function Stream +├── Makefile # Contains build automation and commands +├── README.md # README file for the project +├── benchmark # Contains benchmarking tools or results +├── bin # Contains compiled binary files +├── cmd # Contains the command line executable source files +├── common # Contains common utilities and libraries used across the project +├── docs # Documentation for the project +├── examples # Example configurations, scripts, and other reference materials +├── go.mod # Defines the module's module path and its dependency requirements +├── go.sum # Contains the expected cryptographic checksums of the content of specific module versions +├── fs # Core library files for Function Stream +├── license-checker # Tools related to checking license compliance +├── openapi.yaml # API definition file +├── perf # Performance testing scripts +├── restclient # REST client library +├── server # Server-side application source files +└── tests # Contains test scripts and test data + ## Building Instructions To compile Function Stream, use this command: diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index cde4ca38..ab9e1cab 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -21,8 +21,8 @@ import ( "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/functionstream/functionstream/common" - "github.com/functionstream/functionstream/lib" - "github.com/functionstream/functionstream/lib/contube" + "github.com/functionstream/functionstream/fs" + "github.com/functionstream/functionstream/fs/contube" "github.com/functionstream/functionstream/perf" "github.com/functionstream/functionstream/restclient" "github.com/functionstream/functionstream/server" @@ -103,9 +103,9 @@ func BenchmarkStressForBasicFunc(b *testing.B) { func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { memoryQueueFactory := contube.NewMemoryQueueFactory(context.Background()) - svrConf := &lib.Config{ + svrConf := &fs.Config{ ListenAddr: common.DefaultAddr, - QueueBuilder: func(ctx context.Context, config *lib.Config) (contube.TubeFactory, error) { + QueueBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) { return memoryQueueFactory, nil }, } @@ -130,7 +130,7 @@ func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) { Output: outputTopic, Replicas: &replicas, }, - QueueBuilder: func(ctx context.Context, c *lib.Config) (contube.TubeFactory, error) { + QueueBuilder: func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { return memoryQueueFactory, nil }, } diff --git a/lib/config.go b/fs/config.go similarity index 92% rename from lib/config.go rename to fs/config.go index 854b36a5..18b0a114 100644 --- a/lib/config.go +++ b/fs/config.go @@ -14,11 +14,11 @@ * limitations under the License. */ -package lib +package fs import ( "context" - "github.com/functionstream/functionstream/lib/contube" + "github.com/functionstream/functionstream/fs/contube" ) type QueueBuilder func(ctx context.Context, config *Config) (contube.TubeFactory, error) diff --git a/lib/contube/event_tube.go b/fs/contube/event_tube.go similarity index 100% rename from lib/contube/event_tube.go rename to fs/contube/event_tube.go diff --git a/lib/contube/memory_tube.go b/fs/contube/memory_tube.go similarity index 100% rename from lib/contube/memory_tube.go rename to fs/contube/memory_tube.go diff --git a/lib/contube/pulsar_tube.go b/fs/contube/pulsar_tube.go similarity index 100% rename from lib/contube/pulsar_tube.go rename to fs/contube/pulsar_tube.go diff --git a/lib/instance.go b/fs/instance.go similarity index 98% rename from lib/instance.go rename to fs/instance.go index beeb278f..a5e1d1fb 100644 --- a/lib/instance.go +++ b/fs/instance.go @@ -14,14 +14,14 @@ * limitations under the License. */ -package lib +package fs import ( "context" "fmt" "github.com/functionstream/functionstream/common" "github.com/functionstream/functionstream/common/model" - "github.com/functionstream/functionstream/lib/contube" + "github.com/functionstream/functionstream/fs/contube" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/tetratelabs/wazero" diff --git a/lib/manager.go b/fs/manager.go similarity index 97% rename from lib/manager.go rename to fs/manager.go index 3a764437..5efe79aa 100644 --- a/lib/manager.go +++ b/fs/manager.go @@ -14,13 +14,13 @@ * limitations under the License. */ -package lib +package fs import ( "context" "github.com/functionstream/functionstream/common" "github.com/functionstream/functionstream/common/model" - "github.com/functionstream/functionstream/lib/contube" + "github.com/functionstream/functionstream/fs/contube" "log/slog" "math/rand" "strconv" diff --git a/perf/perf.go b/perf/perf.go index f45d0b52..9f40205a 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -22,8 +22,8 @@ import ( "fmt" "github.com/bmizerany/perks/quantile" "github.com/functionstream/functionstream/common" - "github.com/functionstream/functionstream/lib" - "github.com/functionstream/functionstream/lib/contube" + "github.com/functionstream/functionstream/fs" + "github.com/functionstream/functionstream/fs/contube" "github.com/functionstream/functionstream/restclient" "golang.org/x/time/rate" "log/slog" @@ -38,7 +38,7 @@ type Config struct { PulsarURL string RequestRate float64 Func *restclient.Function - QueueBuilder lib.QueueBuilder + QueueBuilder fs.QueueBuilder } type Perf interface { @@ -49,7 +49,7 @@ type perf struct { config *Config input chan<- contube.Record output <-chan contube.Record - queueBuilder lib.QueueBuilder + queueBuilder fs.QueueBuilder } func New(config *Config) Perf { @@ -57,7 +57,7 @@ func New(config *Config) Perf { config: config, } if config.QueueBuilder == nil { - p.queueBuilder = func(ctx context.Context, c *lib.Config) (contube.TubeFactory, error) { + p.queueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{ PulsarURL: config.PulsarURL, }).ToConfigMap()) @@ -92,7 +92,7 @@ func (p *perf) Run(ctx context.Context) { } } - config := &lib.Config{ + config := &fs.Config{ PulsarURL: p.config.PulsarURL, } diff --git a/server/config_loader.go b/server/config_loader.go index 1cd81615..b1ff4d75 100644 --- a/server/config_loader.go +++ b/server/config_loader.go @@ -19,27 +19,27 @@ package server import ( "context" "github.com/functionstream/functionstream/common" - "github.com/functionstream/functionstream/lib/contube" + "github.com/functionstream/functionstream/fs/contube" "log/slog" "os" "sync" - "github.com/functionstream/functionstream/lib" + "github.com/functionstream/functionstream/fs" ) -var loadedConfig *lib.Config +var loadedConfig *fs.Config var initConfig = sync.Once{} -func LoadConfigFromEnv() *lib.Config { +func LoadConfigFromEnv() *fs.Config { initConfig.Do(func() { - loadedConfig = &lib.Config{ + loadedConfig = &fs.Config{ ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr), PulsarURL: getEnvWithDefault("PULSAR_URL", common.DefaultPulsarURL), } queueType := getEnvWithDefault("QUEUE_TYPE", common.DefaultQueueType) switch queueType { case common.PulsarQueueType: - loadedConfig.QueueBuilder = func(ctx context.Context, c *lib.Config) (contube.TubeFactory, error) { + loadedConfig.QueueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { return contube.NewPulsarEventQueueFactory(ctx, (&contube.PulsarTubeFactoryConfig{ PulsarURL: c.PulsarURL, }).ToConfigMap()) @@ -49,12 +49,12 @@ func LoadConfigFromEnv() *lib.Config { return loadedConfig } -func LoadStandaloneConfigFromEnv() *lib.Config { +func LoadStandaloneConfigFromEnv() *fs.Config { initConfig.Do(func() { - loadedConfig = &lib.Config{ + loadedConfig = &fs.Config{ ListenAddr: getEnvWithDefault("LISTEN_ADDR", common.DefaultAddr), } - loadedConfig.QueueBuilder = func(ctx context.Context, c *lib.Config) (contube.TubeFactory, error) { + loadedConfig.QueueBuilder = func(ctx context.Context, c *fs.Config) (contube.TubeFactory, error) { return contube.NewMemoryQueueFactory(ctx), nil } }) diff --git a/server/server.go b/server/server.go index 6244b7f1..647448bf 100644 --- a/server/server.go +++ b/server/server.go @@ -22,8 +22,8 @@ import ( "fmt" "github.com/functionstream/functionstream/common" "github.com/functionstream/functionstream/common/model" - "github.com/functionstream/functionstream/lib" - "github.com/functionstream/functionstream/lib/contube" + "github.com/functionstream/functionstream/fs" + "github.com/functionstream/functionstream/fs/contube" "github.com/functionstream/functionstream/restclient" "github.com/gorilla/mux" "github.com/pkg/errors" @@ -36,13 +36,13 @@ import ( ) type Server struct { - manager *lib.FunctionManager - config *lib.Config + manager *fs.FunctionManager + config *fs.Config httpSvr atomic.Pointer[http.Server] } -func New(config *lib.Config) *Server { - manager, err := lib.NewFunctionManager(config) +func New(config *fs.Config) *Server { + manager, err := fs.NewFunctionManager(config) if err != nil { slog.Error("Error creating function manager", err) } diff --git a/server/server_test.go b/server/server_test.go index 0262d86d..7528aa08 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -20,8 +20,8 @@ import ( "context" "encoding/json" "github.com/functionstream/functionstream/common/model" - "github.com/functionstream/functionstream/lib" - "github.com/functionstream/functionstream/lib/contube" + "github.com/functionstream/functionstream/fs" + "github.com/functionstream/functionstream/fs/contube" "github.com/functionstream/functionstream/tests" "math/rand" "strconv" @@ -30,9 +30,9 @@ import ( func TestStandaloneBasicFunction(t *testing.T) { - conf := &lib.Config{ + conf := &fs.Config{ ListenAddr: "localhost:7301", - QueueBuilder: func(ctx context.Context, config *lib.Config) (contube.TubeFactory, error) { + QueueBuilder: func(ctx context.Context, config *fs.Config) (contube.TubeFactory, error) { return contube.NewMemoryQueueFactory(ctx), nil }, }