浏览代码

Merge pull request #4218 from dhiltgen/auto_parallel

Enable concurrency by default
Daniel Hiltgen 10 月之前
父节点
当前提交
3518aaef33
共有 7 个文件被更改,包括 175 次插入73 次删除
  1. 8 8
      envconfig/config.go
  2. 3 2
      gpu/amd_windows.go
  3. 5 0
      gpu/types.go
  4. 3 10
      llm/server.go
  5. 5 0
      server/routes.go
  6. 100 24
      server/sched.go
  7. 51 29
      server/sched_test.go

+ 8 - 8
envconfig/config.go

@@ -85,13 +85,13 @@ func AsMap() map[string]EnvVar {
 		"OLLAMA_HOST":              {"OLLAMA_HOST", Host, "IP Address for the ollama server (default 127.0.0.1:11434)"},
 		"OLLAMA_HOST":              {"OLLAMA_HOST", Host, "IP Address for the ollama server (default 127.0.0.1:11434)"},
 		"OLLAMA_KEEP_ALIVE":        {"OLLAMA_KEEP_ALIVE", KeepAlive, "The duration that models stay loaded in memory (default \"5m\")"},
 		"OLLAMA_KEEP_ALIVE":        {"OLLAMA_KEEP_ALIVE", KeepAlive, "The duration that models stay loaded in memory (default \"5m\")"},
 		"OLLAMA_LLM_LIBRARY":       {"OLLAMA_LLM_LIBRARY", LLMLibrary, "Set LLM library to bypass autodetection"},
 		"OLLAMA_LLM_LIBRARY":       {"OLLAMA_LLM_LIBRARY", LLMLibrary, "Set LLM library to bypass autodetection"},
-		"OLLAMA_MAX_LOADED_MODELS": {"OLLAMA_MAX_LOADED_MODELS", MaxRunners, "Maximum number of loaded models (default 1)"},
+		"OLLAMA_MAX_LOADED_MODELS": {"OLLAMA_MAX_LOADED_MODELS", MaxRunners, "Maximum number of loaded models per GPU (default auto)"},
 		"OLLAMA_MAX_QUEUE":         {"OLLAMA_MAX_QUEUE", MaxQueuedRequests, "Maximum number of queued requests"},
 		"OLLAMA_MAX_QUEUE":         {"OLLAMA_MAX_QUEUE", MaxQueuedRequests, "Maximum number of queued requests"},
 		"OLLAMA_MAX_VRAM":          {"OLLAMA_MAX_VRAM", MaxVRAM, "Maximum VRAM"},
 		"OLLAMA_MAX_VRAM":          {"OLLAMA_MAX_VRAM", MaxVRAM, "Maximum VRAM"},
 		"OLLAMA_MODELS":            {"OLLAMA_MODELS", ModelsDir, "The path to the models directory"},
 		"OLLAMA_MODELS":            {"OLLAMA_MODELS", ModelsDir, "The path to the models directory"},
 		"OLLAMA_NOHISTORY":         {"OLLAMA_NOHISTORY", NoHistory, "Do not preserve readline history"},
 		"OLLAMA_NOHISTORY":         {"OLLAMA_NOHISTORY", NoHistory, "Do not preserve readline history"},
 		"OLLAMA_NOPRUNE":           {"OLLAMA_NOPRUNE", NoPrune, "Do not prune model blobs on startup"},
 		"OLLAMA_NOPRUNE":           {"OLLAMA_NOPRUNE", NoPrune, "Do not prune model blobs on startup"},
-		"OLLAMA_NUM_PARALLEL":      {"OLLAMA_NUM_PARALLEL", NumParallel, "Maximum number of parallel requests (default 1)"},
+		"OLLAMA_NUM_PARALLEL":      {"OLLAMA_NUM_PARALLEL", NumParallel, "Maximum number of parallel requests (default auto)"},
 		"OLLAMA_ORIGINS":           {"OLLAMA_ORIGINS", AllowOrigins, "A comma separated list of allowed origins"},
 		"OLLAMA_ORIGINS":           {"OLLAMA_ORIGINS", AllowOrigins, "A comma separated list of allowed origins"},
 		"OLLAMA_RUNNERS_DIR":       {"OLLAMA_RUNNERS_DIR", RunnersDir, "Location for runners"},
 		"OLLAMA_RUNNERS_DIR":       {"OLLAMA_RUNNERS_DIR", RunnersDir, "Location for runners"},
 		"OLLAMA_SCHED_SPREAD":      {"OLLAMA_SCHED_SPREAD", SchedSpread, "Always schedule model across all GPUs"},
 		"OLLAMA_SCHED_SPREAD":      {"OLLAMA_SCHED_SPREAD", SchedSpread, "Always schedule model across all GPUs"},
@@ -129,8 +129,8 @@ func clean(key string) string {
 
 
 func init() {
 func init() {
 	// default values
 	// default values
-	NumParallel = 1
-	MaxRunners = 1
+	NumParallel = 0 // Autoselect
+	MaxRunners = 0  // Autoselect
 	MaxQueuedRequests = 512
 	MaxQueuedRequests = 512
 
 
 	LoadConfig()
 	LoadConfig()
@@ -205,8 +205,8 @@ func LoadConfig() {
 
 
 	if onp := clean("OLLAMA_NUM_PARALLEL"); onp != "" {
 	if onp := clean("OLLAMA_NUM_PARALLEL"); onp != "" {
 		val, err := strconv.Atoi(onp)
 		val, err := strconv.Atoi(onp)
-		if err != nil || val <= 0 {
-			slog.Error("invalid setting must be greater than zero", "OLLAMA_NUM_PARALLEL", onp, "error", err)
+		if err != nil {
+			slog.Error("invalid setting, ignoring", "OLLAMA_NUM_PARALLEL", onp, "error", err)
 		} else {
 		} else {
 			NumParallel = val
 			NumParallel = val
 		}
 		}
@@ -251,7 +251,7 @@ func LoadConfig() {
 	if maxRunners != "" {
 	if maxRunners != "" {
 		m, err := strconv.Atoi(maxRunners)
 		m, err := strconv.Atoi(maxRunners)
 		if err != nil {
 		if err != nil {
-			slog.Error("invalid setting", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err)
+			slog.Error("invalid setting, ignoring", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err)
 		} else {
 		} else {
 			MaxRunners = m
 			MaxRunners = m
 		}
 		}
@@ -260,7 +260,7 @@ func LoadConfig() {
 	if onp := os.Getenv("OLLAMA_MAX_QUEUE"); onp != "" {
 	if onp := os.Getenv("OLLAMA_MAX_QUEUE"); onp != "" {
 		p, err := strconv.Atoi(onp)
 		p, err := strconv.Atoi(onp)
 		if err != nil || p <= 0 {
 		if err != nil || p <= 0 {
-			slog.Error("invalid setting", "OLLAMA_MAX_QUEUE", onp, "error", err)
+			slog.Error("invalid setting, ignoring", "OLLAMA_MAX_QUEUE", onp, "error", err)
 		} else {
 		} else {
 			MaxQueuedRequests = p
 			MaxQueuedRequests = p
 		}
 		}

+ 3 - 2
gpu/amd_windows.go

@@ -115,8 +115,6 @@ func AMDGetGPUInfo() []RocmGPUInfo {
 			continue
 			continue
 		}
 		}
 
 
-		// TODO revisit this once ROCm v6 is available on windows.
-		// v5.7 only reports VRAM used by this process, so it's completely wrong and unusable
 		slog.Debug("amdgpu memory", "gpu", i, "total", format.HumanBytes2(totalMemory))
 		slog.Debug("amdgpu memory", "gpu", i, "total", format.HumanBytes2(totalMemory))
 		slog.Debug("amdgpu memory", "gpu", i, "available", format.HumanBytes2(freeMemory))
 		slog.Debug("amdgpu memory", "gpu", i, "available", format.HumanBytes2(freeMemory))
 		gpuInfo := RocmGPUInfo{
 		gpuInfo := RocmGPUInfo{
@@ -126,6 +124,9 @@ func AMDGetGPUInfo() []RocmGPUInfo {
 					TotalMemory: totalMemory,
 					TotalMemory: totalMemory,
 					FreeMemory:  freeMemory,
 					FreeMemory:  freeMemory,
 				},
 				},
+				// Free memory reporting on Windows is not reliable until we bump to ROCm v6.2
+				UnreliableFreeMemory: true,
+
 				ID:             strconv.Itoa(i), // TODO this is probably wrong if we specify visible devices
 				ID:             strconv.Itoa(i), // TODO this is probably wrong if we specify visible devices
 				DependencyPath: libDir,
 				DependencyPath: libDir,
 				MinimumMemory:  rocmMinimumMemory,
 				MinimumMemory:  rocmMinimumMemory,

+ 5 - 0
gpu/types.go

@@ -29,6 +29,11 @@ type GpuInfo struct {
 	// Extra environment variables specific to the GPU as list of [key,value]
 	// Extra environment variables specific to the GPU as list of [key,value]
 	EnvWorkarounds [][2]string `json:"envs,omitempty"`
 	EnvWorkarounds [][2]string `json:"envs,omitempty"`
 
 
+	// Set to true if we can NOT reliably discover FreeMemory.  A value of true indicates
+	// the FreeMemory is best effort, and may over or under report actual memory usage
+	// False indicates FreeMemory can generally be trusted on this GPU
+	UnreliableFreeMemory bool
+
 	// GPU information
 	// GPU information
 	ID      string `json:"gpu_id"`  // string to use for selection of this specific GPU
 	ID      string `json:"gpu_id"`  // string to use for selection of this specific GPU
 	Name    string `json:"name"`    // user friendly name if available
 	Name    string `json:"name"`    // user friendly name if available

+ 3 - 10
llm/server.go

@@ -82,7 +82,7 @@ func LoadModel(model string, maxArraySize int) (*GGML, error) {
 
 
 // NewLlamaServer will run a server for the given GPUs
 // NewLlamaServer will run a server for the given GPUs
 // The gpu list must be a single family.
 // The gpu list must be a single family.
-func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, projectors []string, opts api.Options) (LlamaServer, error) {
+func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, projectors []string, opts api.Options, numParallel int) (LlamaServer, error) {
 	var err error
 	var err error
 	var cpuRunner string
 	var cpuRunner string
 	var estimate MemoryEstimate
 	var estimate MemoryEstimate
@@ -218,8 +218,10 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
 
 
 	// Windows CUDA should not use mmap for best performance
 	// Windows CUDA should not use mmap for best performance
 	// Linux  with a model larger than free space, mmap leads to thrashing
 	// Linux  with a model larger than free space, mmap leads to thrashing
+	// For CPU loads we want the memory to be allocated, not FS cache
 	if (runtime.GOOS == "windows" && gpus[0].Library == "cuda" && opts.UseMMap == api.TriStateUndefined) ||
 	if (runtime.GOOS == "windows" && gpus[0].Library == "cuda" && opts.UseMMap == api.TriStateUndefined) ||
 		(runtime.GOOS == "linux" && systemFreeMemory < estimate.TotalSize && opts.UseMMap == api.TriStateUndefined) ||
 		(runtime.GOOS == "linux" && systemFreeMemory < estimate.TotalSize && opts.UseMMap == api.TriStateUndefined) ||
+		(gpus[0].Library == "cpu" && opts.UseMMap == api.TriStateUndefined) ||
 		opts.UseMMap == api.TriStateFalse {
 		opts.UseMMap == api.TriStateFalse {
 		params = append(params, "--no-mmap")
 		params = append(params, "--no-mmap")
 	}
 	}
@@ -232,15 +234,6 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
 		params = append(params, "--numa")
 		params = append(params, "--numa")
 	}
 	}
 
 
-	numParallel := envconfig.NumParallel
-
-	// TODO (jmorganca): multimodal models don't support parallel yet
-	// see https://github.com/ollama/ollama/issues/4165
-	if len(projectors) > 0 {
-		numParallel = 1
-		slog.Warn("multimodal models don't support parallel requests yet")
-	}
-
 	params = append(params, "--parallel", fmt.Sprintf("%d", numParallel))
 	params = append(params, "--parallel", fmt.Sprintf("%d", numParallel))
 
 
 	if estimate.TensorSplit != "" {
 	if estimate.TensorSplit != "" {

+ 5 - 0
server/routes.go

@@ -1237,6 +1237,11 @@ func (s *Server) ProcessHandler(c *gin.Context) {
 		models = append(models, mr)
 		models = append(models, mr)
 	}
 	}
 
 
+	slices.SortStableFunc(models, func(i, j api.ProcessModelResponse) int {
+		// longest duration remaining listed first
+		return cmp.Compare(j.ExpiresAt.Unix(), i.ExpiresAt.Unix())
+	})
+
 	c.JSON(http.StatusOK, api.ProcessResponse{Models: models})
 	c.JSON(http.StatusOK, api.ProcessResponse{Models: models})
 }
 }
 
 

+ 100 - 24
server/sched.go

@@ -23,6 +23,7 @@ type LlmRequest struct {
 	ctx             context.Context //nolint:containedctx
 	ctx             context.Context //nolint:containedctx
 	model           *Model
 	model           *Model
 	opts            api.Options
 	opts            api.Options
+	origNumCTX      int // Track the initial ctx request
 	sessionDuration time.Duration
 	sessionDuration time.Duration
 	successCh       chan *runnerRef
 	successCh       chan *runnerRef
 	errCh           chan error
 	errCh           chan error
@@ -38,13 +39,23 @@ type Scheduler struct {
 	loaded   map[string]*runnerRef
 	loaded   map[string]*runnerRef
 	loadedMu sync.Mutex
 	loadedMu sync.Mutex
 
 
-	loadFn       func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
-	newServerFn  func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
+	loadFn       func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList, numParallel int)
+	newServerFn  func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error)
 	getGpuFn     func() gpu.GpuInfoList
 	getGpuFn     func() gpu.GpuInfoList
 	getCpuFn     func() gpu.GpuInfoList
 	getCpuFn     func() gpu.GpuInfoList
 	reschedDelay time.Duration
 	reschedDelay time.Duration
 }
 }
 
 
+// Default automatic value for number of models we allow per GPU
+// Model will still need to fit in VRAM, but loading many small models
+// on a large GPU can cause stalling
+var defaultModelsPerGPU = 3
+
+// Default automatic value for parallel setting
+// Model will still need to fit in VRAM.  If this setting wont fit
+// we'll back off down to 1 to try to get it to fit
+var defaultParallel = 4
+
 var ErrMaxQueue = fmt.Errorf("server busy, please try again.  maximum pending requests exceeded")
 var ErrMaxQueue = fmt.Errorf("server busy, please try again.  maximum pending requests exceeded")
 
 
 func InitScheduler(ctx context.Context) *Scheduler {
 func InitScheduler(ctx context.Context) *Scheduler {
@@ -65,13 +76,10 @@ func InitScheduler(ctx context.Context) *Scheduler {
 
 
 // context must be canceled to decrement ref count and release the runner
 // context must be canceled to decrement ref count and release the runner
 func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
 func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
-	// allocate a large enough kv cache for all parallel requests
 	if opts.NumCtx < 4 {
 	if opts.NumCtx < 4 {
 		opts.NumCtx = 4
 		opts.NumCtx = 4
 	}
 	}
 
 
-	opts.NumCtx *= envconfig.NumParallel
-
 	req := &LlmRequest{
 	req := &LlmRequest{
 		ctx:             c,
 		ctx:             c,
 		model:           model,
 		model:           model,
@@ -110,11 +118,25 @@ func (s *Scheduler) processPending(ctx context.Context) {
 		case pending := <-s.pendingReqCh:
 		case pending := <-s.pendingReqCh:
 			// Block other requests until we get this pending request running
 			// Block other requests until we get this pending request running
 			pending.schedAttempts++
 			pending.schedAttempts++
+			if pending.origNumCTX == 0 {
+				pending.origNumCTX = pending.opts.NumCtx
+			}
 
 
 			if pending.ctx.Err() != nil {
 			if pending.ctx.Err() != nil {
 				slog.Debug("pending request cancelled or timed out, skipping scheduling")
 				slog.Debug("pending request cancelled or timed out, skipping scheduling")
 				continue
 				continue
 			}
 			}
+			numParallel := envconfig.NumParallel
+			// TODO (jmorganca): multimodal models don't support parallel yet
+			// see https://github.com/ollama/ollama/issues/4165
+			if len(pending.model.ProjectorPaths) > 0 && numParallel != 1 {
+				numParallel = 1
+				slog.Warn("multimodal models don't support parallel requests yet")
+			}
+			// Keep NumCtx and numParallel in sync
+			if numParallel > 1 {
+				pending.opts.NumCtx = pending.origNumCTX * numParallel
+			}
 
 
 			for {
 			for {
 				var runnerToExpire *runnerRef
 				var runnerToExpire *runnerRef
@@ -143,6 +165,26 @@ func (s *Scheduler) processPending(ctx context.Context) {
 						gpus = s.getGpuFn()
 						gpus = s.getGpuFn()
 					}
 					}
 
 
+					if envconfig.MaxRunners <= 0 {
+						// No user specified MaxRunners, so figure out what automatic setting to use
+						// If all GPUs have reliable free memory reporting, defaultModelsPerGPU * the number of GPUs
+						// if any GPU has unreliable free memory reporting, 1x the number of GPUs
+						allReliable := true
+						for _, gpu := range gpus {
+							if gpu.UnreliableFreeMemory {
+								allReliable = false
+								break
+							}
+						}
+						if allReliable {
+							envconfig.MaxRunners = defaultModelsPerGPU * len(gpus)
+							slog.Debug("updating default concurrency", "OLLAMA_MAX_LOADED_MODELS", envconfig.MaxRunners, "gpu_count", len(gpus))
+						} else {
+							slog.Info("one or more GPUs detected that are unable to accurately report free memory - disabling default concurrency")
+							envconfig.MaxRunners = len(gpus)
+						}
+					}
+
 					// Load model for fitting
 					// Load model for fitting
 					ggml, err := llm.LoadModel(pending.model.ModelPath, 0)
 					ggml, err := llm.LoadModel(pending.model.ModelPath, 0)
 					if err != nil {
 					if err != nil {
@@ -152,26 +194,32 @@ func (s *Scheduler) processPending(ctx context.Context) {
 
 
 					// Evaluate if the model will fit in the available system memory, or if we should unload a model first
 					// Evaluate if the model will fit in the available system memory, or if we should unload a model first
 					if len(gpus) == 1 && gpus[0].Library == "cpu" {
 					if len(gpus) == 1 && gpus[0].Library == "cpu" {
+						// simplifying assumption of defaultParallel when in CPU mode
+						if numParallel <= 0 {
+							numParallel = defaultParallel
+							pending.opts.NumCtx = pending.origNumCTX * numParallel
+						}
+
 						if loadedCount == 0 {
 						if loadedCount == 0 {
 							slog.Debug("cpu mode with first model, loading")
 							slog.Debug("cpu mode with first model, loading")
-							s.loadFn(pending, ggml, gpus)
+							s.loadFn(pending, ggml, gpus, numParallel)
 							break
 							break
 						}
 						}
 						runnerToExpire = s.maybeFindCPURunnerToUnload(pending, ggml, gpus)
 						runnerToExpire = s.maybeFindCPURunnerToUnload(pending, ggml, gpus)
 						if runnerToExpire == nil {
 						if runnerToExpire == nil {
 							slog.Debug("cpu mode with available system memory or first model, loading")
 							slog.Debug("cpu mode with available system memory or first model, loading")
-							s.loadFn(pending, ggml, gpus)
+							s.loadFn(pending, ggml, gpus, numParallel)
 							break
 							break
 						}
 						}
 						// else we need to expire a runner
 						// else we need to expire a runner
 					} else if loadedCount == 0 {
 					} else if loadedCount == 0 {
 						// No models loaded. Load the model but prefer the best fit.
 						// No models loaded. Load the model but prefer the best fit.
 						slog.Debug("loading first model", "model", pending.model.ModelPath)
 						slog.Debug("loading first model", "model", pending.model.ModelPath)
-						g := pickBestFitGPUs(pending, ggml, gpus)
+						g := pickBestFitGPUs(pending, ggml, gpus, &numParallel)
 						if g != nil {
 						if g != nil {
 							gpus = g
 							gpus = g
 						}
 						}
-						s.loadFn(pending, ggml, gpus)
+						s.loadFn(pending, ggml, gpus, numParallel)
 						break
 						break
 					}
 					}
 
 
@@ -186,10 +234,10 @@ func (s *Scheduler) processPending(ctx context.Context) {
 
 
 						// Update free memory from currently loaded models
 						// Update free memory from currently loaded models
 						s.updateFreeSpace(availGpus)
 						s.updateFreeSpace(availGpus)
-						fitGpus := pickBestFitGPUs(pending, ggml, availGpus)
+						fitGpus := pickBestFitGPUs(pending, ggml, availGpus, &numParallel)
 						if fitGpus != nil {
 						if fitGpus != nil {
 							slog.Debug("new model fits with existing models, loading")
 							slog.Debug("new model fits with existing models, loading")
-							s.loadFn(pending, ggml, fitGpus)
+							s.loadFn(pending, ggml, fitGpus, numParallel)
 							break
 							break
 						}
 						}
 
 
@@ -350,8 +398,11 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm
 	}()
 	}()
 }
 }
 
 
-func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) {
-	llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts)
+func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList, numParallel int) {
+	if numParallel < 1 {
+		numParallel = 1
+	}
+	llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts, numParallel)
 	if err != nil {
 	if err != nil {
 		// some older models are not compatible with newer versions of llama.cpp
 		// some older models are not compatible with newer versions of llama.cpp
 		// show a generalized compatibility error until there is a better way to
 		// show a generalized compatibility error until there is a better way to
@@ -375,6 +426,7 @@ func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
 		loading:         true,
 		loading:         true,
 		refCount:        1,
 		refCount:        1,
 	}
 	}
+	runner.numParallel = numParallel
 	runner.refMu.Lock()
 	runner.refMu.Lock()
 
 
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
@@ -483,8 +535,9 @@ type runnerRef struct {
 	expireTimer     *time.Timer
 	expireTimer     *time.Timer
 	expiresAt       time.Time
 	expiresAt       time.Time
 
 
-	model     *Model
-	modelPath string
+	model       *Model
+	modelPath   string
+	numParallel int
 	*api.Options
 	*api.Options
 }
 }
 
 
@@ -525,6 +578,9 @@ func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool
 		optsNew.NumGPU = -1
 		optsNew.NumGPU = -1
 	}
 	}
 
 
+	// Normalize the NumCtx for parallelism
+	optsExisting.NumCtx = optsExisting.NumCtx / runner.numParallel
+
 	ctx, cancel := context.WithTimeout(ctx, timeout)
 	ctx, cancel := context.WithTimeout(ctx, timeout)
 	defer cancel()
 	defer cancel()
 	if !reflect.DeepEqual(runner.model.AdapterPaths, req.model.AdapterPaths) || // have the adapters changed?
 	if !reflect.DeepEqual(runner.model.AdapterPaths, req.model.AdapterPaths) || // have the adapters changed?
@@ -611,22 +667,38 @@ func (a ByDuration) Less(i, j int) bool {
 
 
 // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
 // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
 // If the model can not be fit fully within the available GPU(s) nil is returned
 // If the model can not be fit fully within the available GPU(s) nil is returned
-func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList {
+// If numParallel is <= 0, this will attempt try to optimize parallism based on available VRAM, and adjust
+// opts.NumCtx accordingly
+func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList, numParallel *int) gpu.GpuInfoList {
 	var estimatedVRAM uint64
 	var estimatedVRAM uint64
+
+	var numParallelToTry []int
+	if *numParallel <= 0 {
+		// If no specific parallel setting was provided, try larger then smaller, always end with 1
+		numParallelToTry = append(numParallelToTry, defaultParallel, 1)
+	} else {
+		numParallelToTry = []int{*numParallel}
+	}
+
 	for _, gl := range gpus.ByLibrary() {
 	for _, gl := range gpus.ByLibrary() {
 		var ok bool
 		var ok bool
 		sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...)
 		sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...)
 
 
 		// TODO - potentially sort by performance capability, existing models loaded, etc.
 		// TODO - potentially sort by performance capability, existing models loaded, etc.
+		// TODO - Eliminate any GPUs that already have envconfig.MaxRunners loaded on them
 		// Note: at present, this will favor more VRAM over faster GPU speed in mixed setups
 		// Note: at present, this will favor more VRAM over faster GPU speed in mixed setups
 		sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl)))
 		sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl)))
 
 
 		// First attempt to fit the model into a single GPU
 		// First attempt to fit the model into a single GPU
-		if !envconfig.SchedSpread {
-			for _, g := range sgl {
-				if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
-					slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
-					return []gpu.GpuInfo{g}
+		for _, p := range numParallelToTry {
+			req.opts.NumCtx = req.origNumCTX * p
+			if !envconfig.SchedSpread {
+				for _, g := range sgl {
+					if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
+						slog.Info("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "parallel", p, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
+						*numParallel = p
+						return []gpu.GpuInfo{g}
+					}
 				}
 				}
 			}
 			}
 		}
 		}
@@ -636,9 +708,13 @@ func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.
 		// - try subsets of GPUs instead of just falling back to 1 or all in a family
 		// - try subsets of GPUs instead of just falling back to 1 or all in a family
 
 
 		// Now try all the GPUs
 		// Now try all the GPUs
-		if ok, estimatedVRAM = llm.PredictServerFit(sgl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
-			slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", sgl[0].Library, "required", format.HumanBytes2(estimatedVRAM))
-			return sgl
+		for _, p := range numParallelToTry {
+			req.opts.NumCtx = req.origNumCTX * p
+			if ok, estimatedVRAM = llm.PredictServerFit(sgl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
+				slog.Info("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", sgl[0].Library, "parallel", p, "required", format.HumanBytes2(estimatedVRAM))
+				*numParallel = p
+				return sgl
+			}
 		}
 		}
 	}
 	}
 	return nil
 	return nil

+ 51 - 29
server/sched_test.go

@@ -47,11 +47,11 @@ func TestLoad(t *testing.T) {
 		sessionDuration: 2,
 		sessionDuration: 2,
 	}
 	}
 	// Fail to load model first
 	// Fail to load model first
-	s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
+	s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
 		return nil, fmt.Errorf("something failed to load model blah")
 		return nil, fmt.Errorf("something failed to load model blah")
 	}
 	}
 	gpus := gpu.GpuInfoList{}
 	gpus := gpu.GpuInfoList{}
-	s.load(req, ggml, gpus)
+	s.load(req, ggml, gpus, 0)
 	require.Empty(t, req.successCh)
 	require.Empty(t, req.successCh)
 	require.Len(t, req.errCh, 1)
 	require.Len(t, req.errCh, 1)
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
@@ -61,10 +61,10 @@ func TestLoad(t *testing.T) {
 	require.Contains(t, err.Error(), "this model may be incompatible")
 	require.Contains(t, err.Error(), "this model may be incompatible")
 
 
 	server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}}
 	server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}}
-	s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
+	s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
 		return server, nil
 		return server, nil
 	}
 	}
-	s.load(req, ggml, gpus)
+	s.load(req, ggml, gpus, 0)
 	select {
 	select {
 	case err := <-req.errCh:
 	case err := <-req.errCh:
 		require.NoError(t, err)
 		require.NoError(t, err)
@@ -78,12 +78,12 @@ func TestLoad(t *testing.T) {
 
 
 	req.model.ModelPath = "dummy_model_path"
 	req.model.ModelPath = "dummy_model_path"
 	server.waitResp = fmt.Errorf("wait failure")
 	server.waitResp = fmt.Errorf("wait failure")
-	s.load(req, ggml, gpus)
+	s.load(req, ggml, gpus, 0)
 	select {
 	select {
 	case err := <-req.errCh:
 	case err := <-req.errCh:
 		require.Contains(t, err.Error(), "wait failure")
 		require.Contains(t, err.Error(), "wait failure")
 	case resp := <-req.successCh:
 	case resp := <-req.successCh:
-		t.Errorf("unexpected success %v", resp)
+		t.Fatalf("unexpected success %v", resp)
 	}
 	}
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
 	runner := s.loaded["dummy_model_path"]
 	runner := s.loaded["dummy_model_path"]
@@ -102,7 +102,7 @@ type bundle struct {
 	ggml    *llm.GGML
 	ggml    *llm.GGML
 }
 }
 
 
-func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
+func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
 	return scenario.srv, nil
 	return scenario.srv, nil
 }
 }
 
 
@@ -200,7 +200,7 @@ func TestRequests(t *testing.T) {
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, scenario1a.req.errCh)
 		require.Empty(t, scenario1a.req.errCh)
 	case <-ctx.Done():
 	case <-ctx.Done():
-		t.Errorf("timeout")
+		t.Fatal("timeout")
 	}
 	}
 
 
 	// Same runner as first request due to not needing a reload
 	// Same runner as first request due to not needing a reload
@@ -213,7 +213,7 @@ func TestRequests(t *testing.T) {
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, scenario1b.req.errCh)
 		require.Empty(t, scenario1b.req.errCh)
 	case <-ctx.Done():
 	case <-ctx.Done():
-		t.Errorf("timeout")
+		t.Fatal("timeout")
 	}
 	}
 
 
 	// Trigger a reload
 	// Trigger a reload
@@ -231,7 +231,7 @@ func TestRequests(t *testing.T) {
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, scenario2a.req.errCh)
 		require.Empty(t, scenario2a.req.errCh)
 	case <-ctx.Done():
 	case <-ctx.Done():
-		t.Errorf("timeout")
+		t.Fatal("timeout")
 	}
 	}
 
 
 	envconfig.MaxRunners = 1
 	envconfig.MaxRunners = 1
@@ -247,7 +247,7 @@ func TestRequests(t *testing.T) {
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, scenario3a.req.errCh)
 		require.Empty(t, scenario3a.req.errCh)
 	case <-ctx.Done():
 	case <-ctx.Done():
-		t.Errorf("timeout")
+		t.Fatal("timeout")
 	}
 	}
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
 	require.Len(t, s.loaded, 1)
 	require.Len(t, s.loaded, 1)
@@ -263,7 +263,7 @@ func TestRequests(t *testing.T) {
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, scenario3b.req.errCh)
 		require.Empty(t, scenario3b.req.errCh)
 	case <-ctx.Done():
 	case <-ctx.Done():
-		t.Errorf("timeout")
+		t.Fatal("timeout")
 	}
 	}
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
 	require.Len(t, s.loaded, 2)
 	require.Len(t, s.loaded, 2)
@@ -279,7 +279,7 @@ func TestRequests(t *testing.T) {
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, scenario3c.req.errCh)
 		require.Empty(t, scenario3c.req.errCh)
 	case <-ctx.Done():
 	case <-ctx.Done():
-		t.Errorf("timeout")
+		t.Fatal("timeout")
 	}
 	}
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
 	require.Len(t, s.loaded, 3)
 	require.Len(t, s.loaded, 3)
@@ -306,7 +306,7 @@ func TestRequests(t *testing.T) {
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, scenario3d.req.errCh)
 		require.Empty(t, scenario3d.req.errCh)
 	case <-ctx.Done():
 	case <-ctx.Done():
-		t.Errorf("timeout")
+		t.Fatal("timeout")
 	}
 	}
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
 	require.Len(t, s.loaded, 2)
 	require.Len(t, s.loaded, 2)
@@ -349,7 +349,7 @@ func TestGetRunner(t *testing.T) {
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, errCh1a)
 		require.Empty(t, errCh1a)
 	case <-ctx.Done():
 	case <-ctx.Done():
-		t.Errorf("timeout")
+		t.Fatal("timeout")
 	}
 	}
 	scenario1a.ctxDone()
 	scenario1a.ctxDone()
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
@@ -400,7 +400,7 @@ func TestPrematureExpired(t *testing.T) {
 		slog.Info("sending premature expired event now")
 		slog.Info("sending premature expired event now")
 		s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe
 		s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe
 	case <-ctx.Done():
 	case <-ctx.Done():
-		t.Errorf("timeout")
+		t.Fatal("timeout")
 	}
 	}
 	time.Sleep(scenario1a.req.sessionDuration)
 	time.Sleep(scenario1a.req.sessionDuration)
 	scenario1a.ctxDone()
 	scenario1a.ctxDone()
@@ -427,7 +427,7 @@ func TestUseLoadedRunner(t *testing.T) {
 	}
 	}
 	finished := make(chan *LlmRequest)
 	finished := make(chan *LlmRequest)
 	llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
 	llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
-	r1 := &runnerRef{llama: llm1, sessionDuration: 1}
+	r1 := &runnerRef{llama: llm1, sessionDuration: 1, numParallel: 1}
 	req.useLoadedRunner(r1, finished)
 	req.useLoadedRunner(r1, finished)
 	require.Equal(t, uint(1), r1.refCount)
 	require.Equal(t, uint(1), r1.refCount)
 	require.Equal(t, time.Duration(2), r1.sessionDuration)
 	require.Equal(t, time.Duration(2), r1.sessionDuration)
@@ -435,7 +435,7 @@ func TestUseLoadedRunner(t *testing.T) {
 	case success := <-req.successCh:
 	case success := <-req.successCh:
 		require.Equal(t, r1, success)
 		require.Equal(t, r1, success)
 	case <-ctx.Done():
 	case <-ctx.Done():
-		t.Errorf("timeout")
+		t.Fatal("timeout")
 	}
 	}
 	done()
 	done()
 	fin := <-finished
 	fin := <-finished
@@ -461,8 +461,8 @@ func TestUpdateFreeSpace(t *testing.T) {
 	gpus[1].FreeMemory = 1900
 	gpus[1].FreeMemory = 1900
 	llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}}
 	llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}}
 	llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}}
 	llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}}
-	r1 := &runnerRef{llama: llm1, gpus: gpus}
-	r2 := &runnerRef{llama: llm2, gpus: gpus}
+	r1 := &runnerRef{llama: llm1, gpus: gpus, numParallel: 1}
+	r2 := &runnerRef{llama: llm2, gpus: gpus, numParallel: 1}
 
 
 	s := InitScheduler(ctx)
 	s := InitScheduler(ctx)
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
@@ -513,8 +513,8 @@ func TestFindRunnerToUnload(t *testing.T) {
 	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
 	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
 	defer done()
 	defer done()
 
 
-	r1 := &runnerRef{refCount: 1, sessionDuration: 1}
-	r2 := &runnerRef{sessionDuration: 2}
+	r1 := &runnerRef{refCount: 1, sessionDuration: 1, numParallel: 1}
+	r2 := &runnerRef{sessionDuration: 2, numParallel: 1}
 
 
 	s := InitScheduler(ctx)
 	s := InitScheduler(ctx)
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
@@ -536,9 +536,13 @@ func TestNeedsReload(t *testing.T) {
 	llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
 	llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
 	do := api.DefaultOptions()
 	do := api.DefaultOptions()
 	runner := &runnerRef{
 	runner := &runnerRef{
-		model:   &Model{AdapterPaths: []string{"adapter1"}, ProjectorPaths: []string{"projector1"}},
-		Options: &do,
-		llama:   llm,
+		model: &Model{
+			AdapterPaths:   []string{"adapter1"},
+			ProjectorPaths: []string{"projector1"},
+		},
+		Options:     &do,
+		llama:       llm,
+		numParallel: 1,
 	}
 	}
 	req := &LlmRequest{
 	req := &LlmRequest{
 		model: &Model{
 		model: &Model{
@@ -581,8 +585,8 @@ func TestUnloadAllRunners(t *testing.T) {
 	s := InitScheduler(ctx)
 	s := InitScheduler(ctx)
 	s.unloadAllRunners()
 	s.unloadAllRunners()
 
 
-	r1 := &runnerRef{llama: llm1}
-	r2 := &runnerRef{llama: llm2}
+	r1 := &runnerRef{llama: llm1, numParallel: 1}
+	r2 := &runnerRef{llama: llm2, numParallel: 1}
 
 
 	s.loadedMu.Lock()
 	s.loadedMu.Lock()
 	s.loaded["a"] = r1
 	s.loaded["a"] = r1
@@ -596,14 +600,32 @@ func TestUnloadAllRunners(t *testing.T) {
 
 
 func TestUnload(t *testing.T) {
 func TestUnload(t *testing.T) {
 	llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
 	llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
-	r1 := &runnerRef{llama: llm1}
-	r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}}
+	r1 := &runnerRef{llama: llm1, numParallel: 1}
+	r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}, numParallel: 1}
 	r1.unload()
 	r1.unload()
 	require.True(t, llm1.closeCalled)
 	require.True(t, llm1.closeCalled)
 	r2.unload()
 	r2.unload()
 	require.Nil(t, r2.model)
 	require.Nil(t, r2.model)
 }
 }
 
 
+func TestAlreadyCanceled(t *testing.T) {
+	ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer done()
+	dctx, done2 := context.WithCancel(ctx)
+	done2()
+	scenario1a := newScenario(t, dctx, "ollama-model-1", 10)
+	scenario1a.req.sessionDuration = 0
+	s := InitScheduler(ctx)
+	slog.Info("scenario1a")
+	s.pendingReqCh <- scenario1a.req
+	require.Len(t, s.pendingReqCh, 1)
+	s.Run(ctx)
+	time.Sleep(5 * time.Millisecond)
+	require.Empty(t, s.pendingReqCh)
+	require.Empty(t, scenario1a.req.errCh)
+	require.Empty(t, scenario1a.req.successCh)
+}
+
 type mockLlm struct {
 type mockLlm struct {
 	pingResp           error
 	pingResp           error
 	waitResp           error
 	waitResp           error