Skip to content

Commit fdd95d1

Browse files
authored
feat: allow to run parallel requests (#1290)
* feat: allow to run parallel requests Signed-off-by: Ettore Di Giacinto <[email protected]> * fixup Signed-off-by: Ettore Di Giacinto <[email protected]> --------- Signed-off-by: Ettore Di Giacinto <[email protected]>
1 parent 66a558f commit fdd95d1

File tree

9 files changed

+91
-44
lines changed

9 files changed

+91
-44
lines changed

.env

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,7 @@ MODELS_PATH=/models
6969
# PYTHON_GRPC_MAX_WORKERS=1
7070

7171
### Define the number of parallel LLAMA.cpp workers (Defaults to 1)
72-
# LLAMACPP_PARALLEL=1
72+
# LLAMACPP_PARALLEL=1
73+
74+
### Enable to run parallel requests
75+
# PARALLEL_REQUESTS=true

api/backend/options.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ func modelOpts(c config.Config, o *options.Option, opts []model.Option) []model.
1616
opts = append(opts, model.WithSingleActiveBackend())
1717
}
1818

19+
if o.ParallelBackendRequests {
20+
opts = append(opts, model.EnableParallelRequests)
21+
}
22+
1923
if c.GRPC.Attempts != 0 {
2024
opts = append(opts, model.WithGRPCAttempts(c.GRPC.Attempts))
2125
}

api/localai/backend_monitor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,11 @@ func BackendMonitorEndpoint(bm BackendMonitor) func(c *fiber.Ctx) error {
125125

126126
client := bm.options.Loader.CheckIsLoaded(backendId)
127127

128-
if client == nil {
128+
if client == "" {
129129
return fmt.Errorf("backend %s is not currently loaded", backendId)
130130
}
131131

132-
status, rpcErr := client.Status(context.TODO())
132+
status, rpcErr := client.GRPC().Status(context.TODO())
133133
if rpcErr != nil {
134134
log.Warn().Msgf("backend %s experienced an error retrieving status info: %s", backendId, rpcErr.Error())
135135
val, slbErr := bm.SampleLocalBackendProcess(backendId)

api/options/options.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import (
55
"embed"
66
"encoding/json"
77

8+
"github.com/go-skynet/LocalAI/metrics"
89
"github.com/go-skynet/LocalAI/pkg/gallery"
910
model "github.com/go-skynet/LocalAI/pkg/model"
10-
"github.com/go-skynet/LocalAI/metrics"
1111
"github.com/rs/zerolog/log"
1212
)
1313

@@ -36,7 +36,8 @@ type Option struct {
3636

3737
AutoloadGalleries bool
3838

39-
SingleBackend bool
39+
SingleBackend bool
40+
ParallelBackendRequests bool
4041
}
4142

4243
type AppOption func(*Option)
@@ -66,6 +67,10 @@ var EnableSingleBackend = func(o *Option) {
6667
o.SingleBackend = true
6768
}
6869

70+
var EnableParallelBackendRequests = func(o *Option) {
71+
o.ParallelBackendRequests = true
72+
}
73+
6974
var EnableGalleriesAutoload = func(o *Option) {
7075
o.AutoloadGalleries = true
7176
}

main.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ import (
1616
config "github.com/go-skynet/LocalAI/api/config"
1717
"github.com/go-skynet/LocalAI/api/options"
1818
"github.com/go-skynet/LocalAI/internal"
19+
"github.com/go-skynet/LocalAI/metrics"
1920
"github.com/go-skynet/LocalAI/pkg/gallery"
2021
model "github.com/go-skynet/LocalAI/pkg/model"
21-
"github.com/go-skynet/LocalAI/metrics"
2222
"github.com/rs/zerolog"
2323
"github.com/rs/zerolog/log"
2424
progressbar "github.com/schollz/progressbar/v3"
@@ -63,6 +63,11 @@ func main() {
6363
EnvVars: []string{"SINGLE_ACTIVE_BACKEND"},
6464
Usage: "Allow only one backend to be running.",
6565
},
66+
&cli.BoolFlag{
67+
Name: "parallel-requests",
68+
EnvVars: []string{"PARALLEL_REQUESTS"},
69+
Usage: "Enable backends to handle multiple requests in parallel. This is for backends that supports multiple requests in parallel, like llama.cpp or vllm",
70+
},
6671
&cli.BoolFlag{
6772
Name: "cors",
6873
EnvVars: []string{"CORS"},
@@ -193,7 +198,9 @@ For a list of compatible model, check out: https://localai.io/model-compatibilit
193198
options.WithUploadLimitMB(ctx.Int("upload-limit")),
194199
options.WithApiKeys(ctx.StringSlice("api-keys")),
195200
}
196-
201+
if ctx.Bool("parallel-requests") {
202+
opts = append(opts, options.EnableParallelBackendRequests)
203+
}
197204
if ctx.Bool("single-active-backend") {
198205
opts = append(opts, options.EnableSingleBackend)
199206
}

pkg/model/initializers.go

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ var AutoLoadBackends []string = []string{
6161

6262
// starts the grpcModelProcess for the backend, and returns a grpc client
6363
// It also loads the model
64-
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string) (*grpc.Client, error) {
65-
return func(modelName, modelFile string) (*grpc.Client, error) {
64+
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string) (ModelAddress, error) {
65+
return func(modelName, modelFile string) (ModelAddress, error) {
6666
log.Debug().Msgf("Loading Model %s with gRPC (file: %s) (backend: %s): %+v", modelName, modelFile, backend, *o)
6767

68-
var client *grpc.Client
68+
var client ModelAddress
6969

7070
getFreeAddress := func() (string, error) {
7171
port, err := freeport.GetFreePort()
@@ -82,46 +82,46 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
8282
if _, err := os.Stat(uri); err == nil {
8383
serverAddress, err := getFreeAddress()
8484
if err != nil {
85-
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
85+
return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
8686
}
8787
// Make sure the process is executable
8888
if err := ml.startProcess(uri, o.model, serverAddress); err != nil {
89-
return nil, err
89+
return "", err
9090
}
9191

9292
log.Debug().Msgf("GRPC Service Started")
9393

94-
client = grpc.NewClient(serverAddress)
94+
client = ModelAddress(serverAddress)
9595
} else {
9696
// address
97-
client = grpc.NewClient(uri)
97+
client = ModelAddress(uri)
9898
}
9999
} else {
100100
grpcProcess := filepath.Join(o.assetDir, "backend-assets", "grpc", backend)
101101
// Check if the file exists
102102
if _, err := os.Stat(grpcProcess); os.IsNotExist(err) {
103-
return nil, fmt.Errorf("grpc process not found: %s. some backends(stablediffusion, tts) require LocalAI compiled with GO_TAGS", grpcProcess)
103+
return "", fmt.Errorf("grpc process not found: %s. some backends(stablediffusion, tts) require LocalAI compiled with GO_TAGS", grpcProcess)
104104
}
105105

106106
serverAddress, err := getFreeAddress()
107107
if err != nil {
108-
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
108+
return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
109109
}
110110

111111
// Make sure the process is executable
112112
if err := ml.startProcess(grpcProcess, o.model, serverAddress); err != nil {
113-
return nil, err
113+
return "", err
114114
}
115115

116116
log.Debug().Msgf("GRPC Service Started")
117117

118-
client = grpc.NewClient(serverAddress)
118+
client = ModelAddress(serverAddress)
119119
}
120120

121121
// Wait for the service to start up
122122
ready := false
123123
for i := 0; i < o.grpcAttempts; i++ {
124-
if client.HealthCheck(context.Background()) {
124+
if client.GRPC().HealthCheck(context.Background()) {
125125
log.Debug().Msgf("GRPC Service Ready")
126126
ready = true
127127
break
@@ -131,7 +131,7 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
131131

132132
if !ready {
133133
log.Debug().Msgf("GRPC Service NOT ready")
134-
return nil, fmt.Errorf("grpc service not ready")
134+
return "", fmt.Errorf("grpc service not ready")
135135
}
136136

137137
options := *o.gRPCOptions
@@ -140,19 +140,30 @@ func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string
140140

141141
log.Debug().Msgf("GRPC: Loading model with options: %+v", options)
142142

143-
res, err := client.LoadModel(o.context, &options)
143+
res, err := client.GRPC().LoadModel(o.context, &options)
144144
if err != nil {
145-
return nil, fmt.Errorf("could not load model: %w", err)
145+
return "", fmt.Errorf("could not load model: %w", err)
146146
}
147147
if !res.Success {
148-
return nil, fmt.Errorf("could not load model (no success): %s", res.Message)
148+
return "", fmt.Errorf("could not load model (no success): %s", res.Message)
149149
}
150150

151151
return client, nil
152152
}
153153
}
154154

155-
func (ml *ModelLoader) BackendLoader(opts ...Option) (model *grpc.Client, err error) {
155+
func (ml *ModelLoader) resolveAddress(addr ModelAddress, parallel bool) (*grpc.Client, error) {
156+
if parallel {
157+
return addr.GRPC(), nil
158+
}
159+
160+
if _, ok := ml.grpcClients[string(addr)]; !ok {
161+
ml.grpcClients[string(addr)] = addr.GRPC()
162+
}
163+
return ml.grpcClients[string(addr)], nil
164+
}
165+
166+
func (ml *ModelLoader) BackendLoader(opts ...Option) (client *grpc.Client, err error) {
156167
o := NewOptions(opts...)
157168

158169
log.Debug().Msgf("Loading model %s from %s", o.backendString, o.model)
@@ -166,22 +177,25 @@ func (ml *ModelLoader) BackendLoader(opts ...Option) (model *grpc.Client, err er
166177
ml.mu.Unlock()
167178
}
168179

169-
// if an external backend is provided, use it
170-
_, externalBackendExists := o.externalBackends[backend]
171-
if externalBackendExists {
172-
return ml.LoadModel(o.model, ml.grpcModel(backend, o))
173-
}
180+
var backendToConsume string
174181

175182
switch backend {
176183
case Gpt4AllLlamaBackend, Gpt4AllMptBackend, Gpt4AllJBackend, Gpt4All:
177184
o.gRPCOptions.LibrarySearchPath = filepath.Join(o.assetDir, "backend-assets", "gpt4all")
178-
return ml.LoadModel(o.model, ml.grpcModel(Gpt4All, o))
185+
backendToConsume = Gpt4All
179186
case PiperBackend:
180187
o.gRPCOptions.LibrarySearchPath = filepath.Join(o.assetDir, "backend-assets", "espeak-ng-data")
181-
return ml.LoadModel(o.model, ml.grpcModel(PiperBackend, o))
188+
backendToConsume = PiperBackend
182189
default:
183-
return ml.LoadModel(o.model, ml.grpcModel(backend, o))
190+
backendToConsume = backend
191+
}
192+
193+
addr, err := ml.LoadModel(o.model, ml.grpcModel(backendToConsume, o))
194+
if err != nil {
195+
return nil, err
184196
}
197+
198+
return ml.resolveAddress(addr, o.parallelRequests)
185199
}
186200

187201
func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) {
@@ -190,10 +204,11 @@ func (ml *ModelLoader) GreedyLoader(opts ...Option) (*grpc.Client, error) {
190204
ml.mu.Lock()
191205
// Return earlier if we have a model already loaded
192206
// (avoid looping through all the backends)
193-
if m := ml.CheckIsLoaded(o.model); m != nil {
207+
if m := ml.CheckIsLoaded(o.model); m != "" {
194208
log.Debug().Msgf("Model '%s' already loaded", o.model)
195209
ml.mu.Unlock()
196-
return m, nil
210+
211+
return ml.resolveAddress(m, o.parallelRequests)
197212
}
198213
// If we can have only one backend active, kill all the others (except external backends)
199214
if o.singleActiveBackend {

pkg/model/loader.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,23 @@ type ModelLoader struct {
5959
ModelPath string
6060
mu sync.Mutex
6161
// TODO: this needs generics
62-
models map[string]*grpc.Client
62+
grpcClients map[string]*grpc.Client
63+
models map[string]ModelAddress
6364
grpcProcesses map[string]*process.Process
6465
templates map[TemplateType]map[string]*template.Template
6566
}
6667

68+
type ModelAddress string
69+
70+
func (m ModelAddress) GRPC() *grpc.Client {
71+
return grpc.NewClient(string(m))
72+
}
73+
6774
func NewModelLoader(modelPath string) *ModelLoader {
6875
nml := &ModelLoader{
6976
ModelPath: modelPath,
70-
models: make(map[string]*grpc.Client),
77+
grpcClients: make(map[string]*grpc.Client),
78+
models: make(map[string]ModelAddress),
7179
templates: make(map[TemplateType]map[string]*template.Template),
7280
grpcProcesses: make(map[string]*process.Process),
7381
}
@@ -98,12 +106,12 @@ func (ml *ModelLoader) ListModels() ([]string, error) {
98106
return models, nil
99107
}
100108

101-
func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (*grpc.Client, error)) (*grpc.Client, error) {
109+
func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (ModelAddress, error)) (ModelAddress, error) {
102110
ml.mu.Lock()
103111
defer ml.mu.Unlock()
104112

105113
// Check if we already have a loaded model
106-
if model := ml.CheckIsLoaded(modelName); model != nil {
114+
if model := ml.CheckIsLoaded(modelName); model != "" {
107115
return model, nil
108116
}
109117

@@ -113,7 +121,7 @@ func (ml *ModelLoader) LoadModel(modelName string, loader func(string, string) (
113121

114122
model, err := loader(modelName, modelFile)
115123
if err != nil {
116-
return nil, err
124+
return "", err
117125
}
118126

119127
// TODO: Add a helper method to iterate all prompt templates associated with a config if and only if it's YAML?
@@ -138,24 +146,24 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error {
138146
return ml.deleteProcess(modelName)
139147
}
140148

141-
func (ml *ModelLoader) CheckIsLoaded(s string) *grpc.Client {
149+
func (ml *ModelLoader) CheckIsLoaded(s string) ModelAddress {
142150
if m, ok := ml.models[s]; ok {
143151
log.Debug().Msgf("Model already loaded in memory: %s", s)
144152

145-
if !m.HealthCheck(context.Background()) {
153+
if !m.GRPC().HealthCheck(context.Background()) {
146154
log.Debug().Msgf("GRPC Model not responding: %s", s)
147155
if !ml.grpcProcesses[s].IsAlive() {
148156
log.Debug().Msgf("GRPC Process is not responding: %s", s)
149157
// stop and delete the process, this forces to re-load the model and re-create again the service
150158
ml.deleteProcess(s)
151-
return nil
159+
return ""
152160
}
153161
}
154162

155163
return m
156164
}
157165

158-
return nil
166+
return ""
159167
}
160168

161169
func (ml *ModelLoader) EvaluateTemplateForPrompt(templateType TemplateType, templateName string, in PromptTemplateData) (string, error) {

pkg/model/options.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@ type Options struct {
2020
grpcAttempts int
2121
grpcAttemptsDelay int
2222
singleActiveBackend bool
23+
parallelRequests bool
2324
}
2425

2526
type Option func(*Options)
2627

28+
var EnableParallelRequests = func(o *Options) {
29+
o.parallelRequests = true
30+
}
31+
2732
func WithExternalBackend(name string, uri string) Option {
2833
return func(o *Options) {
2934
if o.externalBackends == nil {

pkg/model/process.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
func (ml *ModelLoader) StopAllExcept(s string) {
1818
ml.StopGRPC(func(id string, p *process.Process) bool {
1919
if id != s {
20-
for ml.models[id].IsBusy() {
20+
for ml.models[id].GRPC().IsBusy() {
2121
log.Debug().Msgf("%s busy. Waiting.", id)
2222
time.Sleep(2 * time.Second)
2323
}

0 commit comments

Comments
 (0)