Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/cli/federated.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
type FederatedCLI struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
}

func (f *FederatedCLI) Run(ctx *cliContext.Context) error {

fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, f.LoadBalanced)
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker)

return fs.Start(context.Background())
}
68 changes: 65 additions & 3 deletions core/p2p/federated.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package p2p

import "fmt"
import (
"fmt"
"math/rand/v2"
"sync"

"github.com/rs/zerolog/log"
)

const FederatedID = "federated"

Expand All @@ -12,22 +18,70 @@ func NetworkID(networkID, serviceID string) string {
}

type FederatedServer struct {
sync.Mutex
listenAddr, service, p2ptoken string
requestTable map[string]int
loadBalanced bool
workerTarget string
}

func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool) *FederatedServer {
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer {
return &FederatedServer{
listenAddr: listenAddr,
service: service,
p2ptoken: p2pToken,
requestTable: map[string]int{},
loadBalanced: loadBalanced,
workerTarget: workerTarget,
}
}

func (fs *FederatedServer) RandomServer() string {
var tunnelAddresses []string
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
delete(fs.requestTable, v.TunnelAddress) // make sure it's not tracked
log.Info().Msgf("Node %s is offline", v.ID)
}
}

if len(tunnelAddresses) == 0 {
return ""
}

return tunnelAddresses[rand.IntN(len(tunnelAddresses))]

Check failure

Code scanning / gosec

Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand)

Use of weak random number generator (math/rand or math/rand/v2 instead of crypto/rand)
}

func (fs *FederatedServer) syncTableStatus() {
fs.Lock()
defer fs.Unlock()
currentTunnels := make(map[string]struct{})

for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
fs.ensureRecordExist(v.TunnelAddress)
currentTunnels[v.TunnelAddress] = struct{}{}
}
}

// delete tunnels that don't exist anymore
for t := range fs.requestTable {
if _, ok := currentTunnels[t]; !ok {
delete(fs.requestTable, t)
}
}
}

func (fs *FederatedServer) SelectLeastUsedServer() string {
fs.syncTableStatus()

fs.Lock()
defer fs.Unlock()

log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")

// cycle over requestTable and find the entry with the lower number
// if there are multiple entries with the same number, select one randomly
// if there are no entries, return an empty string
Expand All @@ -39,18 +93,26 @@ func (fs *FederatedServer) SelectLeastUsedServer() string {
minKey = k
}
}
log.Debug().Any("requests_served", min).Msgf("Selected tunnel %s", minKey)

return minKey
}

func (fs *FederatedServer) RecordRequest(nodeID string) {
fs.Lock()
defer fs.Unlock()
// increment the counter for the nodeID in the requestTable
fs.requestTable[nodeID]++

log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
}

func (fs *FederatedServer) EnsureRecordExist(nodeID string) {
func (fs *FederatedServer) ensureRecordExist(nodeID string) {
// if the nodeID is not in the requestTable, add it with a counter of 0
_, ok := fs.requestTable[nodeID]
if !ok {
fs.requestTable[nodeID] = 0
}

log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
}
47 changes: 22 additions & 25 deletions core/p2p/federated_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"net"
"time"

"math/rand/v2"

"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/types"
Expand Down Expand Up @@ -76,7 +74,7 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
case <-ctx.Done():
return errors.New("context canceled")
default:
log.Debug().Msg("New for connection")
log.Debug().Msgf("New connection from %s", l.Addr().String())
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
Expand All @@ -86,38 +84,34 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {

// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
var tunnelAddresses []string
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}

if len(tunnelAddresses) == 0 {
log.Error().Msg("No available nodes yet")
return
}

tunnelAddr := ""

if fs.loadBalanced {
for _, t := range tunnelAddresses {
fs.EnsureRecordExist(t)
if fs.workerTarget != "" {
for _, v := range GetAvailableNodes(fs.service) {
if v.ID == fs.workerTarget {
tunnelAddr = v.TunnelAddress
break
}
}
} else if fs.loadBalanced {
log.Debug().Msgf("Load balancing request")

tunnelAddr = fs.SelectLeastUsedServer()
log.Debug().Msgf("Selected tunnel %s", tunnelAddr)
if tunnelAddr == "" {
tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
tunnelAddr = fs.RandomServer()
}

fs.RecordRequest(tunnelAddr)
} else {
tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
tunnelAddr = fs.RandomServer()
}

if tunnelAddr == "" {
log.Error().Msg("No available nodes yet")
return
}

log.Debug().Msgf("Selected tunnel %s", tunnelAddr)

tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
Expand All @@ -132,7 +126,10 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {

tunnelConn.Close()
conn.Close()
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())

if fs.loadBalanced {
fs.RecordRequest(tunnelAddr)
}
}()
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
if err != nil {
return nil, fmt.Errorf("creating a new node: %w", err)
}

// get new services, allocate and return to the channel

// TODO:
Expand All @@ -201,6 +200,9 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
zlog.Debug().Msg("Searching for workers")

data := ledger.LastBlock().Storage[servicesID]

zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data")

for k, v := range data {
zlog.Info().Msgf("Found worker %s", k)
nd := &NodeData{}
Expand Down