Skip to content

Commit 05f7004

Browse files
authored
fix: race during stop of active backends (#5106)
* chore: drop double call to stop all backends, refactors Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * fix: do lock when cycling to models to delete Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
1 parent 2f9203c commit 05f7004

File tree

4 files changed

+44
-36
lines changed

4 files changed

+44
-36
lines changed

.env

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
## Enable/Disable single backend (useful if only one GPU is available)
3030
# LOCALAI_SINGLE_ACTIVE_BACKEND=true
3131

32+
# Forces shutdown of the backends if busy (only if LOCALAI_SINGLE_ACTIVE_BACKEND is set)
33+
# LOCALAI_FORCE_BACKEND_SHUTDOWN=true
34+
3235
## Specify a build type. Available: cublas, openblas, clblas.
3336
## cuBLAS: This is a GPU-accelerated version of the complete standard BLAS (Basic Linear Algebra Subprograms) library. It's provided by Nvidia and is part of their CUDA toolkit.
3437
## OpenBLAS: This is an open-source implementation of the BLAS library that aims to provide highly optimized code for various platforms. It includes support for multi-threading and can be compiled to use hardware-specific features for additional performance. OpenBLAS can run on many kinds of hardware, including CPUs from Intel, AMD, and ARM.

pkg/model/initializers.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -473,8 +473,6 @@ func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err e
473473
backend = realBackend
474474
}
475475

476-
ml.stopActiveBackends(o.modelID, o.singleActiveBackend)
477-
478476
var backendToConsume string
479477

480478
switch backend {
@@ -497,13 +495,17 @@ func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err e
497495
}
498496

499497
func (ml *ModelLoader) stopActiveBackends(modelID string, singleActiveBackend bool) {
498+
if !singleActiveBackend {
499+
return
500+
}
501+
500502
// If we can have only one backend active, kill all the others (except external backends)
501-
if singleActiveBackend {
502-
log.Debug().Msgf("Stopping all backends except '%s'", modelID)
503-
err := ml.StopGRPC(allExcept(modelID))
504-
if err != nil {
505-
log.Error().Err(err).Str("keptModel", modelID).Msg("error while shutting down all backends except for the keptModel - greedyloader continuing")
506-
}
503+
504+
// Stop all backends except the one we are going to load
505+
log.Debug().Msgf("Stopping all backends except '%s'", modelID)
506+
err := ml.StopGRPC(allExcept(modelID))
507+
if err != nil {
508+
log.Error().Err(err).Str("keptModel", modelID).Msg("error while shutting down all backends except for the keptModel - greedyloader continuing")
507509
}
508510
}
509511

@@ -520,10 +522,12 @@ func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
520522

521523
ml.stopActiveBackends(o.modelID, o.singleActiveBackend)
522524

525+
// if a backend is defined, return the loader directly
523526
if o.backendString != "" {
524527
return ml.backendLoader(opts...)
525528
}
526529

530+
// Otherwise scan for backends in the asset directory
527531
var err error
528532

529533
// get backends embedded in the binary

pkg/model/loader.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -142,26 +142,6 @@ func (ml *ModelLoader) LoadModel(modelID, modelName string, loader func(string,
142142
func (ml *ModelLoader) ShutdownModel(modelName string) error {
143143
ml.mu.Lock()
144144
defer ml.mu.Unlock()
145-
model, ok := ml.models[modelName]
146-
if !ok {
147-
return fmt.Errorf("model %s not found", modelName)
148-
}
149-
150-
retries := 1
151-
for model.GRPC(false, ml.wd).IsBusy() {
152-
log.Debug().Msgf("%s busy. Waiting.", modelName)
153-
dur := time.Duration(retries*2) * time.Second
154-
if dur > retryTimeout {
155-
dur = retryTimeout
156-
}
157-
time.Sleep(dur)
158-
retries++
159-
160-
if retries > 10 && os.Getenv("LOCALAI_FORCE_BACKEND_SHUTDOWN") == "true" {
161-
log.Warn().Msgf("Model %s is still busy after %d retries. Forcing shutdown.", modelName, retries)
162-
break
163-
}
164-
}
165145

166146
return ml.deleteProcess(modelName)
167147
}

pkg/model/process.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,43 @@ import (
99
"strconv"
1010
"strings"
1111
"syscall"
12+
"time"
1213

1314
"github.com/hpcloud/tail"
1415
process "github.com/mudler/go-processmanager"
1516
"github.com/rs/zerolog/log"
1617
)
1718

19+
var forceBackendShutdown bool = os.Getenv("LOCALAI_FORCE_BACKEND_SHUTDOWN") == "true"
20+
1821
func (ml *ModelLoader) deleteProcess(s string) error {
22+
model, ok := ml.models[s]
23+
if !ok {
24+
log.Debug().Msgf("Model %s not found", s)
25+
return fmt.Errorf("model %s not found", s)
26+
}
27+
1928
defer delete(ml.models, s)
2029

21-
log.Debug().Msgf("Deleting process %s", s)
30+
retries := 1
31+
for model.GRPC(false, ml.wd).IsBusy() {
32+
log.Debug().Msgf("%s busy. Waiting.", s)
33+
dur := time.Duration(retries*2) * time.Second
34+
if dur > retryTimeout {
35+
dur = retryTimeout
36+
}
37+
time.Sleep(dur)
38+
retries++
2239

23-
m, exists := ml.models[s]
24-
if !exists {
25-
log.Error().Msgf("Model does not exist %s", s)
26-
// Nothing to do
27-
return nil
40+
if retries > 10 && forceBackendShutdown {
41+
log.Warn().Msgf("Model %s is still busy after %d retries. Forcing shutdown.", s, retries)
42+
break
43+
}
2844
}
2945

30-
process := m.Process()
46+
log.Debug().Msgf("Deleting process %s", s)
47+
48+
process := model.Process()
3149
if process == nil {
3250
log.Error().Msgf("No process for %s", s)
3351
// Nothing to do as there is no process
@@ -44,9 +62,12 @@ func (ml *ModelLoader) deleteProcess(s string) error {
4462

4563
func (ml *ModelLoader) StopGRPC(filter GRPCProcessFilter) error {
4664
var err error = nil
65+
ml.mu.Lock()
66+
defer ml.mu.Unlock()
67+
4768
for k, m := range ml.models {
4869
if filter(k, m.Process()) {
49-
e := ml.ShutdownModel(k)
70+
e := ml.deleteProcess(k)
5071
err = errors.Join(err, e)
5172
}
5273
}

0 commit comments

Comments
 (0)