Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/cli/federated.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
)

type FederatedCLI struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
}

func (f *FederatedCLI) Run(ctx *cliContext.Context) error {

fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken, f.LoadBalanced)
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, f.LoadBalanced)

return fs.Start(context.Background())
}
11 changes: 7 additions & 4 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type RunCMD struct {
OpaqueErrors bool `env:"LOCALAI_OPAQUE_ERRORS" default:"false" help:"If true, all error responses are replaced with blank 500 errors. This is intended only for hardening against information leaks and is normally not recommended." group:"hardening"`
Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
Expand Down Expand Up @@ -94,6 +95,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
config.WithModelsURL(append(r.Models, r.ModelArgs...)...),
config.WithOpaqueErrors(r.OpaqueErrors),
config.WithEnforcedPredownloadScans(!r.DisablePredownloadScan),
config.WithP2PNetworkID(r.Peer2PeerNetworkID),
}

token := ""
Expand All @@ -119,9 +121,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
}

log.Info().Msg("Starting P2P server discovery...")
if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func(serviceID string, node p2p.NodeData) {
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, ""), func(serviceID string, node p2p.NodeData) {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes("") {
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, "")) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
Expand All @@ -142,14 +144,15 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if err != nil {
return err
}
if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.FederatedID); err != nil {
if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID)); err != nil {
return err
}
node, err := p2p.NewNode(token)
if err != nil {
return err
}
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.FederatedID, nil); err != nil {

if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil); err != nil {
return err
}
}
Expand Down
17 changes: 9 additions & 8 deletions core/cli/worker/worker_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
)

type P2P struct {
WorkerFlags `embed:""`
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"`
WorkerFlags `embed:""`
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
}

func (r *P2P) Run(ctx *cliContext.Context) error {
Expand Down Expand Up @@ -59,7 +60,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
p = r.RunnerPort
}

err = p2p.ExposeService(context.Background(), address, p, r.Token, "")
err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
if err != nil {
return err
}
Expand Down Expand Up @@ -99,7 +100,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
}
}()

err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, "")
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions core/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ApplicationConfig struct {
EnforcePredownloadScans bool
OpaqueErrors bool
P2PToken string
P2PNetworkID string

ModelLibraryURL string

Expand Down Expand Up @@ -91,6 +92,12 @@ func WithCors(b bool) AppOption {
}
}

func WithP2PNetworkID(s string) AppOption {
return func(o *ApplicationConfig) {
o.P2PNetworkID = s
}
}

func WithCsrf(b bool) AppOption {
return func(o *ApplicationConfig) {
o.CSRF = b
Expand Down
12 changes: 7 additions & 5 deletions core/http/endpoints/localai/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
// @Summary Returns available P2P nodes
// @Success 200 {object} []schema.P2PNodesResponse "Response"
// @Router /api/p2p [get]
func ShowP2PNodes(c *fiber.Ctx) error {
func ShowP2PNodes(appConfig *config.ApplicationConfig) func(*fiber.Ctx) error {
// Render index
return c.JSON(schema.P2PNodesResponse{
Nodes: p2p.GetAvailableNodes(""),
FederatedNodes: p2p.GetAvailableNodes(p2p.FederatedID),
})
return func(c *fiber.Ctx) error {
return c.JSON(schema.P2PNodesResponse{
Nodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, "")),
FederatedNodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID)),
})
}
}

// ShowP2PToken returns the P2P token
Expand Down
2 changes: 1 addition & 1 deletion core/http/routes/localai.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func RegisterLocalAIRoutes(app *fiber.App,

// p2p
if p2p.IsP2PEnabled() {
app.Get("/api/p2p", auth, localai.ShowP2PNodes)
app.Get("/api/p2p", auth, localai.ShowP2PNodes(appConfig))
app.Get("/api/p2p/token", auth, localai.ShowP2PToken(appConfig))
}

Expand Down
9 changes: 5 additions & 4 deletions core/http/routes/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func RegisterUIRoutes(app *fiber.App,
//"FederatedNodes": p2p.GetAvailableNodes(p2p.FederatedID),
"IsP2PEnabled": p2p.IsP2PEnabled(),
"P2PToken": appConfig.P2PToken,
"NetworkID": appConfig.P2PNetworkID,
}

// Render index
Expand All @@ -104,17 +105,17 @@ func RegisterUIRoutes(app *fiber.App,

/* show nodes live! */
app.Get("/p2p/ui/workers", auth, func(c *fiber.Ctx) error {
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes("")))
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, ""))))
})
app.Get("/p2p/ui/workers-federation", auth, func(c *fiber.Ctx) error {
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.FederatedID)))
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID))))
})

app.Get("/p2p/ui/workers-stats", auth, func(c *fiber.Ctx) error {
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes("")))
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, ""))))
})
app.Get("/p2p/ui/workers-federation-stats", auth, func(c *fiber.Ctx) error {
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.FederatedID)))
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID))))
})
}

Expand Down
9 changes: 9 additions & 0 deletions core/p2p/federated.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package p2p

import "fmt"

const FederatedID = "federated"

func NetworkID(networkID, serviceID string) string {
if networkID != "" {
return fmt.Sprintf("%s_%s", networkID, serviceID)
}
return serviceID
}

type FederatedServer struct {
listenAddr, service, p2ptoken string
requestTable map[string]int
Expand Down