|
@@ -5,10 +5,8 @@ import (
|
|
"errors"
|
|
"errors"
|
|
"fmt"
|
|
"fmt"
|
|
"log/slog"
|
|
"log/slog"
|
|
- "os"
|
|
|
|
"reflect"
|
|
"reflect"
|
|
"sort"
|
|
"sort"
|
|
- "strconv"
|
|
|
|
"strings"
|
|
"strings"
|
|
"sync"
|
|
"sync"
|
|
"time"
|
|
"time"
|
|
@@ -17,6 +15,7 @@ import (
|
|
"github.com/ollama/ollama/format"
|
|
"github.com/ollama/ollama/format"
|
|
"github.com/ollama/ollama/gpu"
|
|
"github.com/ollama/ollama/gpu"
|
|
"github.com/ollama/ollama/llm"
|
|
"github.com/ollama/ollama/llm"
|
|
|
|
+ "github.com/ollama/ollama/server/envconfig"
|
|
"golang.org/x/exp/slices"
|
|
"golang.org/x/exp/slices"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -43,46 +42,14 @@ type Scheduler struct {
|
|
getGpuFn func() gpu.GpuInfoList
|
|
getGpuFn func() gpu.GpuInfoList
|
|
}
|
|
}
|
|
|
|
|
|
-var (
|
|
|
|
- // TODO set this to zero after a release or two, to enable multiple models by default
|
|
|
|
- loadedMax = 1 // Maximum runners; < 1 maps to as many as will fit in VRAM (unlimited for CPU runners)
|
|
|
|
- maxQueuedRequests = 512
|
|
|
|
- numParallel = 1
|
|
|
|
- ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
|
|
|
|
-)
|
|
|
|
|
|
+var ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
|
|
|
|
|
|
func InitScheduler(ctx context.Context) *Scheduler {
|
|
func InitScheduler(ctx context.Context) *Scheduler {
|
|
- maxRunners := os.Getenv("OLLAMA_MAX_LOADED_MODELS")
|
|
|
|
- if maxRunners != "" {
|
|
|
|
- m, err := strconv.Atoi(maxRunners)
|
|
|
|
- if err != nil {
|
|
|
|
- slog.Error("invalid setting", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err)
|
|
|
|
- } else {
|
|
|
|
- loadedMax = m
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if onp := os.Getenv("OLLAMA_NUM_PARALLEL"); onp != "" {
|
|
|
|
- p, err := strconv.Atoi(onp)
|
|
|
|
- if err != nil || p <= 0 {
|
|
|
|
- slog.Error("invalid parallel setting, must be greater than zero", "OLLAMA_NUM_PARALLEL", onp, "error", err)
|
|
|
|
- } else {
|
|
|
|
- numParallel = p
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if onp := os.Getenv("OLLAMA_MAX_QUEUE"); onp != "" {
|
|
|
|
- p, err := strconv.Atoi(onp)
|
|
|
|
- if err != nil || p <= 0 {
|
|
|
|
- slog.Error("invalid setting", "OLLAMA_MAX_QUEUE", onp, "error", err)
|
|
|
|
- } else {
|
|
|
|
- maxQueuedRequests = p
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
sched := &Scheduler{
|
|
sched := &Scheduler{
|
|
- pendingReqCh: make(chan *LlmRequest, maxQueuedRequests),
|
|
|
|
- finishedReqCh: make(chan *LlmRequest, maxQueuedRequests),
|
|
|
|
- expiredCh: make(chan *runnerRef, maxQueuedRequests),
|
|
|
|
- unloadedCh: make(chan interface{}, maxQueuedRequests),
|
|
|
|
|
|
+ pendingReqCh: make(chan *LlmRequest, envconfig.MaxQueuedRequests),
|
|
|
|
+ finishedReqCh: make(chan *LlmRequest, envconfig.MaxQueuedRequests),
|
|
|
|
+ expiredCh: make(chan *runnerRef, envconfig.MaxQueuedRequests),
|
|
|
|
+ unloadedCh: make(chan interface{}, envconfig.MaxQueuedRequests),
|
|
loaded: make(map[string]*runnerRef),
|
|
loaded: make(map[string]*runnerRef),
|
|
newServerFn: llm.NewLlamaServer,
|
|
newServerFn: llm.NewLlamaServer,
|
|
getGpuFn: gpu.GetGPUInfo,
|
|
getGpuFn: gpu.GetGPUInfo,
|
|
@@ -94,7 +61,7 @@ func InitScheduler(ctx context.Context) *Scheduler {
|
|
// context must be canceled to decrement ref count and release the runner
|
|
// context must be canceled to decrement ref count and release the runner
|
|
func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
|
|
func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
|
|
// allocate a large enough kv cache for all parallel requests
|
|
// allocate a large enough kv cache for all parallel requests
|
|
- opts.NumCtx = opts.NumCtx * numParallel
|
|
|
|
|
|
+ opts.NumCtx = opts.NumCtx * envconfig.NumParallel
|
|
|
|
|
|
req := &LlmRequest{
|
|
req := &LlmRequest{
|
|
ctx: c,
|
|
ctx: c,
|
|
@@ -147,11 +114,11 @@ func (s *Scheduler) processPending(ctx context.Context) {
|
|
pending.useLoadedRunner(runner, s.finishedReqCh)
|
|
pending.useLoadedRunner(runner, s.finishedReqCh)
|
|
break
|
|
break
|
|
}
|
|
}
|
|
- } else if loadedMax > 0 && loadedCount >= loadedMax {
|
|
|
|
|
|
+ } else if envconfig.MaxRunners > 0 && loadedCount >= envconfig.MaxRunners {
|
|
slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
|
|
slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
|
|
runnerToExpire = s.findRunnerToUnload(pending)
|
|
runnerToExpire = s.findRunnerToUnload(pending)
|
|
} else {
|
|
} else {
|
|
- // Either no models are loaded or below loadedMax
|
|
|
|
|
|
+ // Either no models are loaded or below envconfig.MaxRunners
|
|
// Get a refreshed GPU list
|
|
// Get a refreshed GPU list
|
|
gpus := s.getGpuFn()
|
|
gpus := s.getGpuFn()
|
|
|
|
|
|
@@ -162,7 +129,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
|
|
break
|
|
break
|
|
}
|
|
}
|
|
|
|
|
|
- // If we're CPU only mode, just limit by loadedMax above
|
|
|
|
|
|
+ // If we're CPU only mode, just limit by envconfig.MaxRunners above
|
|
// TODO handle system memory exhaustion
|
|
// TODO handle system memory exhaustion
|
|
if (len(gpus) == 1 && gpus[0].Library == "cpu") || pending.opts.NumGPU == 0 {
|
|
if (len(gpus) == 1 && gpus[0].Library == "cpu") || pending.opts.NumGPU == 0 {
|
|
slog.Debug("cpu mode with existing models, loading")
|
|
slog.Debug("cpu mode with existing models, loading")
|