Skip to content

Commit 7a2ef18

Browse files
committed
feat(p2p): allow to run multiple clusters in the same network
Allow to specify a network ID via CLI which allows to run multiple clusters, logically separated within the same network (by using the same shared token). Note: This segregation is not "secure" by any means, anyone having the network token can see the services available in all the network, however, this provides a way to separate the inference endpoints. This allows for instance to have a node which is both federated and having attached a set of llama.cpp workers. Signed-off-by: Ettore Di Giacinto <[email protected]>
1 parent f15a93b commit 7a2ef18

File tree

8 files changed

+50
-26
lines changed

8 files changed

+50
-26
lines changed

core/cli/federated.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ import (
88
)
99

1010
type FederatedCLI struct {
11-
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
12-
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
13-
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
11+
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
12+
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
13+
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
14+
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode" group:"p2p"`
1415
}
1516

1617
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
1718

18-
fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken, f.LoadBalanced)
19+
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, f.LoadBalanced)
1920

2021
return fs.Start(context.Background())
2122
}

core/cli/run.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type RunCMD struct {
5454
OpaqueErrors bool `env:"LOCALAI_OPAQUE_ERRORS" default:"false" help:"If true, all error responses are replaced with blank 500 errors. This is intended only for hardening against information leaks and is normally not recommended." group:"hardening"`
5555
Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"`
5656
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
57+
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode" group:"p2p"`
5758
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
5859
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
5960
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
@@ -94,6 +95,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
9495
config.WithModelsURL(append(r.Models, r.ModelArgs...)...),
9596
config.WithOpaqueErrors(r.OpaqueErrors),
9697
config.WithEnforcedPredownloadScans(!r.DisablePredownloadScan),
98+
config.WithP2PNetworkID(r.Peer2PeerNetworkID),
9799
}
98100

99101
token := ""
@@ -119,9 +121,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
119121
}
120122

121123
log.Info().Msg("Starting P2P server discovery...")
122-
if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func(serviceID string, node p2p.NodeData) {
124+
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, ""), func(serviceID string, node p2p.NodeData) {
123125
var tunnelAddresses []string
124-
for _, v := range p2p.GetAvailableNodes("") {
126+
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, "")) {
125127
if v.IsOnline() {
126128
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
127129
} else {
@@ -142,14 +144,15 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
142144
if err != nil {
143145
return err
144146
}
145-
if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.FederatedID); err != nil {
147+
if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID)); err != nil {
146148
return err
147149
}
148150
node, err := p2p.NewNode(token)
149151
if err != nil {
150152
return err
151153
}
152-
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.FederatedID, nil); err != nil {
154+
155+
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil); err != nil {
153156
return err
154157
}
155158
}

core/cli/worker/worker_p2p.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ import (
1919
)
2020

2121
type P2P struct {
22-
WorkerFlags `embed:""`
23-
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
24-
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
25-
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
26-
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
27-
ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"`
22+
WorkerFlags `embed:""`
23+
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
24+
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
25+
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
26+
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
27+
ExtraLLamaCPPArgs []string `env:"LOCALAI_EXTRA_LLAMA_CPP_ARGS,EXTRA_LLAMA_CPP_ARGS" help:"Extra arguments to pass to llama-cpp-rpc-server"`
28+
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode" group:"p2p"`
2829
}
2930

3031
func (r *P2P) Run(ctx *cliContext.Context) error {
@@ -59,7 +60,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
5960
p = r.RunnerPort
6061
}
6162

62-
err = p2p.ExposeService(context.Background(), address, p, r.Token, "")
63+
err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
6364
if err != nil {
6465
return err
6566
}
@@ -99,7 +100,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
99100
}
100101
}()
101102

102-
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, "")
103+
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
103104
if err != nil {
104105
return err
105106
}

core/config/application_config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type ApplicationConfig struct {
3434
EnforcePredownloadScans bool
3535
OpaqueErrors bool
3636
P2PToken string
37+
P2PNetworkID string
3738

3839
ModelLibraryURL string
3940

@@ -91,6 +92,12 @@ func WithCors(b bool) AppOption {
9192
}
9293
}
9394

95+
func WithP2PNetworkID(s string) AppOption {
96+
return func(o *ApplicationConfig) {
97+
o.P2PNetworkID = s
98+
}
99+
}
100+
94101
func WithCsrf(b bool) AppOption {
95102
return func(o *ApplicationConfig) {
96103
o.CSRF = b

core/http/endpoints/localai/p2p.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ import (
1111
// @Summary Returns available P2P nodes
1212
// @Success 200 {object} []schema.P2PNodesResponse "Response"
1313
// @Router /api/p2p [get]
14-
func ShowP2PNodes(c *fiber.Ctx) error {
14+
func ShowP2PNodes(appConfig *config.ApplicationConfig) func(*fiber.Ctx) error {
1515
// Render index
16-
return c.JSON(schema.P2PNodesResponse{
17-
Nodes: p2p.GetAvailableNodes(""),
18-
FederatedNodes: p2p.GetAvailableNodes(p2p.FederatedID),
19-
})
16+
return func(c *fiber.Ctx) error {
17+
return c.JSON(schema.P2PNodesResponse{
18+
Nodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, "")),
19+
FederatedNodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID)),
20+
})
21+
}
2022
}
2123

2224
// ShowP2PToken returns the P2P token

core/http/routes/localai.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func RegisterLocalAIRoutes(app *fiber.App,
5959

6060
// p2p
6161
if p2p.IsP2PEnabled() {
62-
app.Get("/api/p2p", auth, localai.ShowP2PNodes)
62+
app.Get("/api/p2p", auth, localai.ShowP2PNodes(appConfig))
6363
app.Get("/api/p2p/token", auth, localai.ShowP2PToken(appConfig))
6464
}
6565

core/http/routes/ui.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func RegisterUIRoutes(app *fiber.App,
9696
//"FederatedNodes": p2p.GetAvailableNodes(p2p.FederatedID),
9797
"IsP2PEnabled": p2p.IsP2PEnabled(),
9898
"P2PToken": appConfig.P2PToken,
99+
"NetworkID": appConfig.P2PNetworkID,
99100
}
100101

101102
// Render index
@@ -104,17 +105,17 @@ func RegisterUIRoutes(app *fiber.App,
104105

105106
/* show nodes live! */
106107
app.Get("/p2p/ui/workers", auth, func(c *fiber.Ctx) error {
107-
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes("")))
108+
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, ""))))
108109
})
109110
app.Get("/p2p/ui/workers-federation", auth, func(c *fiber.Ctx) error {
110-
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.FederatedID)))
111+
return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID))))
111112
})
112113

113114
app.Get("/p2p/ui/workers-stats", auth, func(c *fiber.Ctx) error {
114-
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes("")))
115+
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, ""))))
115116
})
116117
app.Get("/p2p/ui/workers-federation-stats", auth, func(c *fiber.Ctx) error {
117-
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.FederatedID)))
118+
return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID))))
118119
})
119120
}
120121

core/p2p/federated.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
package p2p
22

3+
import "fmt"
4+
35
const FederatedID = "federated"
46

7+
func NetworkID(networkID, serviceID string) string {
8+
if networkID != "" {
9+
return fmt.Sprintf("%s_%s", networkID, serviceID)
10+
}
11+
return serviceID
12+
}
13+
514
type FederatedServer struct {
615
listenAddr, service, p2ptoken string
716
requestTable map[string]int

0 commit comments

Comments
 (0)