sched.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. package server
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log/slog"
  7. "reflect"
  8. "runtime"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/ollama/ollama/api"
  14. "github.com/ollama/ollama/format"
  15. "github.com/ollama/ollama/gpu"
  16. "github.com/ollama/ollama/llm"
  17. "github.com/ollama/ollama/envconfig"
  18. "golang.org/x/exp/slices"
  19. )
  20. type LlmRequest struct {
  21. ctx context.Context //nolint:containedctx
  22. model *Model
  23. opts api.Options
  24. sessionDuration time.Duration
  25. successCh chan *runnerRef
  26. errCh chan error
  27. }
  28. type Scheduler struct {
  29. pendingReqCh chan *LlmRequest
  30. finishedReqCh chan *LlmRequest
  31. expiredCh chan *runnerRef
  32. unloadedCh chan interface{}
  33. loaded map[string]*runnerRef
  34. loadedMu sync.Mutex
  35. loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
  36. newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
  37. getGpuFn func() gpu.GpuInfoList
  38. }
  39. var ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
  40. func InitScheduler(ctx context.Context) *Scheduler {
  41. sched := &Scheduler{
  42. pendingReqCh: make(chan *LlmRequest, envconfig.MaxQueuedRequests),
  43. finishedReqCh: make(chan *LlmRequest, envconfig.MaxQueuedRequests),
  44. expiredCh: make(chan *runnerRef, envconfig.MaxQueuedRequests),
  45. unloadedCh: make(chan interface{}, envconfig.MaxQueuedRequests),
  46. loaded: make(map[string]*runnerRef),
  47. newServerFn: llm.NewLlamaServer,
  48. getGpuFn: gpu.GetGPUInfo,
  49. }
  50. sched.loadFn = sched.load
  51. return sched
  52. }
  53. // context must be canceled to decrement ref count and release the runner
  54. func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
  55. // allocate a large enough kv cache for all parallel requests
  56. if opts.NumCtx < 4 {
  57. opts.NumCtx = 4
  58. }
  59. opts.NumCtx = opts.NumCtx * envconfig.NumParallel
  60. req := &LlmRequest{
  61. ctx: c,
  62. model: model,
  63. opts: opts,
  64. sessionDuration: sessionDuration,
  65. successCh: make(chan *runnerRef),
  66. errCh: make(chan error, 1),
  67. }
  68. select {
  69. case s.pendingReqCh <- req:
  70. default:
  71. req.errCh <- ErrMaxQueue
  72. }
  73. return req.successCh, req.errCh
  74. }
  75. // Returns immediately, spawns go routines for the scheduler which will shutdown when ctx is done
  76. func (s *Scheduler) Run(ctx context.Context) {
  77. slog.Debug("starting llm scheduler")
  78. go func() {
  79. s.processPending(ctx)
  80. }()
  81. go func() {
  82. s.processCompleted(ctx)
  83. }()
  84. }
  85. func (s *Scheduler) processPending(ctx context.Context) {
  86. for {
  87. select {
  88. case <-ctx.Done():
  89. slog.Debug("shutting down scheduler pending loop")
  90. return
  91. case pending := <-s.pendingReqCh:
  92. // Block other requests until we get this pending request running
  93. if pending.ctx.Err() != nil {
  94. slog.Debug("pending request cancelled or timed out, skipping scheduling")
  95. continue
  96. }
  97. for {
  98. var runnerToExpire *runnerRef
  99. s.loadedMu.Lock()
  100. runner := s.loaded[pending.model.ModelPath]
  101. loadedCount := len(s.loaded)
  102. s.loadedMu.Unlock()
  103. if runner != nil {
  104. if runner.needsReload(ctx, pending) {
  105. runnerToExpire = runner
  106. } else {
  107. // Runner is usable, return it
  108. pending.useLoadedRunner(runner, s.finishedReqCh)
  109. break
  110. }
  111. } else if envconfig.MaxRunners > 0 && loadedCount >= envconfig.MaxRunners {
  112. slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
  113. runnerToExpire = s.findRunnerToUnload()
  114. } else {
  115. // Either no models are loaded or below envconfig.MaxRunners
  116. // Get a refreshed GPU list
  117. gpus := s.getGpuFn()
  118. // Load model for fitting
  119. ggml, err := llm.LoadModel(pending.model.ModelPath)
  120. if err != nil {
  121. pending.errCh <- err
  122. break
  123. }
  124. // If we're CPU only mode, just limit by envconfig.MaxRunners above
  125. // TODO handle system memory exhaustion
  126. if (len(gpus) == 1 && gpus[0].Library == "cpu") || pending.opts.NumGPU == 0 {
  127. slog.Debug("cpu mode with existing models, loading")
  128. s.loadFn(pending, ggml, gpus)
  129. break
  130. }
  131. // No models loaded. Load the model but prefer the best fit.
  132. if loadedCount == 0 {
  133. slog.Debug("loading first model", "model", pending.model.ModelPath)
  134. g := pickBestFitGPUs(pending, ggml, gpus)
  135. if g != nil {
  136. gpus = g
  137. }
  138. s.loadFn(pending, ggml, gpus)
  139. break
  140. }
  141. // More than one loaded model, so we have to see if the new one fits
  142. // Update free memory from currently loaded models
  143. s.updateFreeSpace(gpus)
  144. gpus = pickBestFitGPUs(pending, ggml, gpus)
  145. if gpus != nil {
  146. slog.Debug("new model fits with existing models, loading")
  147. s.loadFn(pending, ggml, gpus)
  148. break
  149. }
  150. runnerToExpire = s.findRunnerToUnload()
  151. }
  152. if runnerToExpire == nil {
  153. // Shouildn't happen
  154. slog.Error("runner to expire was nil!")
  155. continue
  156. }
  157. // Trigger an expiration to unload once it's done
  158. runnerToExpire.refMu.Lock()
  159. slog.Debug("resetting model to expire immediately to make room", "modelPath", runnerToExpire.modelPath, "refCount", runnerToExpire.refCount)
  160. if runnerToExpire.expireTimer != nil {
  161. runnerToExpire.expireTimer.Stop()
  162. runnerToExpire.expireTimer = nil
  163. }
  164. runnerToExpire.sessionDuration = 0
  165. if runnerToExpire.refCount <= 0 {
  166. s.expiredCh <- runnerToExpire
  167. }
  168. runnerToExpire.refMu.Unlock()
  169. // Wait for the unload to happen
  170. // Note: at this point we're queueing up all incoming requests, even if they were for
  171. // a different model that's loaded and not scheduled to be removed.
  172. slog.Debug("waiting for pending requests to complete and unload to occur", "modelPath", runnerToExpire.modelPath)
  173. select {
  174. case <-ctx.Done():
  175. slog.Debug("shutting down scheduler pending loop")
  176. return
  177. case <-s.unloadedCh:
  178. slog.Debug("unload completed", "modelPath", runnerToExpire.modelPath)
  179. continue
  180. }
  181. }
  182. case <-s.unloadedCh:
  183. // An unload request when there are no pending request can be ignored
  184. slog.Debug("ignoring unload event with no pending requests")
  185. }
  186. }
  187. }
  188. func (s *Scheduler) processCompleted(ctx context.Context) {
  189. // Process completed requests, expired timers, and unloading models
  190. for {
  191. select {
  192. case <-ctx.Done():
  193. slog.Debug("shutting down scheduler completed loop")
  194. return
  195. case finished := <-s.finishedReqCh:
  196. s.loadedMu.Lock()
  197. runner := s.loaded[finished.model.ModelPath]
  198. s.loadedMu.Unlock()
  199. if runner == nil {
  200. slog.Error("finished request signal received after model unloaded", "modelPath", finished.model.ModelPath)
  201. continue
  202. }
  203. runner.refMu.Lock()
  204. runner.refCount--
  205. if runner.refCount <= 0 {
  206. if runner.sessionDuration <= 0 {
  207. slog.Debug("runner with zero duration has gone idle, expiring to unload", "modelPath", runner.modelPath)
  208. if runner.expireTimer != nil {
  209. runner.expireTimer.Stop()
  210. runner.expireTimer = nil
  211. }
  212. s.expiredCh <- runner
  213. } else if runner.expireTimer == nil {
  214. slog.Debug("runner with non-zero duration has gone idle, adding timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration)
  215. runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() {
  216. slog.Debug("timer expired, expiring to unload", "modelPath", runner.modelPath)
  217. runner.refMu.Lock()
  218. defer runner.refMu.Unlock()
  219. if runner.expireTimer != nil {
  220. runner.expireTimer.Stop()
  221. runner.expireTimer = nil
  222. }
  223. s.expiredCh <- runner
  224. })
  225. runner.expiresAt = time.Now().Add(runner.sessionDuration)
  226. } else {
  227. slog.Debug("runner with non-zero duration has gone idle, resetting timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration)
  228. runner.expireTimer.Reset(runner.sessionDuration)
  229. runner.expiresAt = time.Now().Add(runner.sessionDuration)
  230. }
  231. }
  232. slog.Debug("after processing request finished event", "modelPath", runner.modelPath, "refCount", runner.refCount)
  233. runner.refMu.Unlock()
  234. case runner := <-s.expiredCh:
  235. slog.Debug("runner expired event received", "modelPath", runner.modelPath)
  236. runner.refMu.Lock()
  237. if runner.refCount > 0 {
  238. // Shouldn't happen, but safeguard to ensure no leaked runners
  239. slog.Debug("expired event with positive ref count, retrying", "modelPath", runner.modelPath, "refCount", runner.refCount)
  240. go func(runner *runnerRef) {
  241. // We can't unload yet, but want to as soon as the current request completes
  242. // So queue up another expired event
  243. time.Sleep(10 * time.Millisecond)
  244. s.expiredCh <- runner
  245. }(runner)
  246. runner.refMu.Unlock()
  247. continue
  248. }
  249. s.loadedMu.Lock()
  250. slog.Debug("got lock to unload", "modelPath", runner.modelPath)
  251. finished := runner.waitForVRAMRecovery()
  252. runner.unload()
  253. delete(s.loaded, runner.modelPath)
  254. s.loadedMu.Unlock()
  255. slog.Debug("runner released", "modelPath", runner.modelPath)
  256. runner.refMu.Unlock()
  257. <-finished
  258. slog.Debug("sending an unloaded event", "modelPath", runner.modelPath)
  259. s.unloadedCh <- struct{}{}
  260. }
  261. }
  262. }
  263. // Complete the pending request and send the runner back to the requester
  264. // Wires up a finished event after the request context is completed
  265. // Updates session duration, and resets expiration timer
  266. func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *LlmRequest) {
  267. runner.refMu.Lock()
  268. defer runner.refMu.Unlock()
  269. runner.refCount++
  270. if runner.expireTimer != nil {
  271. runner.expireTimer.Stop()
  272. runner.expireTimer = nil
  273. }
  274. runner.sessionDuration = pending.sessionDuration
  275. pending.successCh <- runner
  276. go func() {
  277. <-pending.ctx.Done()
  278. slog.Debug("context for request finished")
  279. finished <- pending
  280. }()
  281. }
  282. func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) {
  283. llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts)
  284. if err != nil {
  285. // some older models are not compatible with newer versions of llama.cpp
  286. // show a generalized compatibility error until there is a better way to
  287. // check for model compatibility
  288. if errors.Is(llm.ErrUnsupportedFormat, err) || strings.Contains(err.Error(), "failed to load model") {
  289. err = fmt.Errorf("%v: this model may be incompatible with your version of Ollama. If you previously pulled this model, try updating it by running `ollama pull %s`", err, req.model.ShortName)
  290. }
  291. slog.Info("NewLlamaServer failed", "model", req.model.ModelPath, "error", err)
  292. req.errCh <- err
  293. return
  294. }
  295. runner := &runnerRef{
  296. model: req.model,
  297. modelPath: req.model.ModelPath,
  298. llama: llama,
  299. Options: &req.opts,
  300. sessionDuration: req.sessionDuration,
  301. gpus: gpus,
  302. estimatedVRAM: llama.EstimatedVRAM(),
  303. estimatedTotal: llama.EstimatedTotal(),
  304. loading: true,
  305. refCount: 1,
  306. }
  307. runner.refMu.Lock()
  308. s.loadedMu.Lock()
  309. s.loaded[req.model.ModelPath] = runner
  310. slog.Info("loaded runners", "count", len(s.loaded))
  311. s.loadedMu.Unlock()
  312. go func() {
  313. defer runner.refMu.Unlock()
  314. if err = llama.WaitUntilRunning(req.ctx); err != nil {
  315. slog.Error("error loading llama server", "error", err)
  316. runner.refCount--
  317. req.errCh <- err
  318. slog.Debug("triggering expiration for failed load", "model", runner.modelPath)
  319. s.expiredCh <- runner
  320. return
  321. }
  322. slog.Debug("finished setting up runner", "model", req.model.ModelPath)
  323. runner.loading = false
  324. go func() {
  325. <-req.ctx.Done()
  326. slog.Debug("context for request finished")
  327. s.finishedReqCh <- req
  328. }()
  329. req.successCh <- runner
  330. }()
  331. }
  332. func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) {
  333. type predKey struct {
  334. Library string
  335. ID string
  336. }
  337. predMap := map[predKey]uint64{} // Sum up the total predicted usage per GPU for all runners
  338. s.loadedMu.Lock()
  339. for _, r := range s.loaded {
  340. r.refMu.Lock()
  341. gpuIDs := make([]string, 0, len(r.gpus))
  342. if r.llama != nil {
  343. // TODO this should be broken down by GPU instead of assuming uniform spread
  344. estimatedVRAMPerGPU := r.llama.EstimatedVRAM() / uint64(len(r.gpus))
  345. for _, gpu := range r.gpus {
  346. gpuIDs = append(gpuIDs, gpu.ID)
  347. }
  348. for _, gpu := range allGpus {
  349. if slices.Contains(gpuIDs, gpu.ID) {
  350. predMap[predKey{gpu.Library, gpu.ID}] += estimatedVRAMPerGPU
  351. }
  352. }
  353. } else {
  354. slog.Warn("unexpected nil runner reference, memory prediction may be incorrect")
  355. }
  356. r.refMu.Unlock()
  357. }
  358. s.loadedMu.Unlock()
  359. // Now that we've summed up all the GPU usage predictions across all the loaded runners, update the gpu list
  360. for i := range allGpus {
  361. if p, ok := predMap[predKey{allGpus[i].Library, allGpus[i].ID}]; ok {
  362. slog.Debug("gpu reported", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "available", format.HumanBytes2(allGpus[i].FreeMemory))
  363. if p > allGpus[i].TotalMemory {
  364. // Shouldn't happen
  365. slog.Warn("predicted usage exceeds VRAM", "gpu", allGpus[i].ID, "totalMemory", allGpus[i].TotalMemory, "predicted", p)
  366. allGpus[i].FreeMemory = 0
  367. } else if (allGpus[i].TotalMemory - p) < allGpus[i].FreeMemory { // predicted free is smaller than reported free, use it
  368. // TODO maybe we should just always trust our numbers, since cuda's free memory reporting is laggy
  369. // and we might unload models we didn't actually need to. The risk is if some other GPU intensive app is loaded
  370. // after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent.
  371. allGpus[i].FreeMemory = allGpus[i].TotalMemory - p
  372. }
  373. slog.Info("updated VRAM", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory))
  374. }
  375. }
  376. }
  377. type runnerRef struct {
  378. refMu sync.Mutex
  379. // refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
  380. refCount uint // prevent unloading if > 0
  381. // unloading bool // set to true when we are trying to unload the runner
  382. llama llm.LlamaServer
  383. loading bool // True only during initial load, then false forever
  384. gpus gpu.GpuInfoList // Recorded at time of provisioning
  385. estimatedVRAM uint64
  386. estimatedTotal uint64
  387. sessionDuration time.Duration
  388. expireTimer *time.Timer
  389. expiresAt time.Time
  390. model *Model
  391. modelPath string
  392. *api.Options
  393. }
  394. // The refMu must already be held when calling unload
  395. func (runner *runnerRef) unload() {
  396. if runner.expireTimer != nil {
  397. runner.expireTimer.Stop()
  398. runner.expireTimer = nil
  399. }
  400. if runner.llama != nil {
  401. runner.llama.Close()
  402. }
  403. runner.model = nil
  404. runner.llama = nil
  405. runner.Options = nil
  406. runner.gpus = nil
  407. }
  408. func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool {
  409. slog.Debug("evaluating already loaded", "model", req.model.ModelPath)
  410. runner.refMu.Lock()
  411. defer runner.refMu.Unlock()
  412. timeout := 10 * time.Second
  413. if runner.loading {
  414. timeout = 2 * time.Minute // Initial load can take a long time for big models on slow systems...
  415. }
  416. if runner.Options == nil {
  417. return true
  418. }
  419. // Don't reload runner if num_gpu=-1 was provided
  420. optsExisting := runner.Options.Runner
  421. optsNew := req.opts.Runner
  422. if optsNew.NumGPU < 0 {
  423. optsExisting.NumGPU = -1
  424. optsNew.NumGPU = -1
  425. }
  426. ctx, cancel := context.WithTimeout(ctx, timeout)
  427. defer cancel()
  428. if !reflect.DeepEqual(runner.model.AdapterPaths, req.model.AdapterPaths) || // have the adapters changed?
  429. !reflect.DeepEqual(runner.model.ProjectorPaths, req.model.ProjectorPaths) || // have the projectors changed?
  430. !reflect.DeepEqual(optsExisting, optsNew) || // have the runner options changed?
  431. runner.llama.Ping(ctx) != nil {
  432. return true
  433. }
  434. return false
  435. }
  436. // Free memory reporting on GPUs can lag for a while even after the runner
  437. // exits, so we have to keep checking until we see the available memory recover,
  438. // otherwise subsequent model loads will get far less layers loaded or worse
  439. // case, may completely fall back to CPU mode.
  440. // This routine must be called before the runner unloads so it can establish
  441. // a before and after GPU memory allocation. The returned channel
  442. // will be notified when we're done waiting, or have timed out and should
  443. // proceed anyway
  444. func (runner *runnerRef) waitForVRAMRecovery() chan interface{} {
  445. finished := make(chan interface{}, 1)
  446. // CPU or Metal don't need checking, so no waiting required, windows can page VRAM, and the APIs we query tend to be optimistic on free space
  447. if (len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal")) || runtime.GOOS == "windows" {
  448. finished <- struct{}{}
  449. return finished
  450. }
  451. start := time.Now()
  452. // Establish a baseline before we unload
  453. gpusBefore := gpu.GetGPUInfo()
  454. var totalMemoryBefore, freeMemoryBefore uint64
  455. for _, gpu := range gpusBefore {
  456. totalMemoryBefore += gpu.TotalMemory
  457. freeMemoryBefore += gpu.FreeMemory
  458. }
  459. go func() {
  460. expiresAt := start.Add(5 * time.Second) // typical convergence is 0.5-1.5s
  461. ticker := time.NewTicker(250 * time.Millisecond)
  462. defer ticker.Stop()
  463. for {
  464. <-ticker.C
  465. if time.Now().After(expiresAt) {
  466. slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds())
  467. finished <- struct{}{}
  468. }
  469. // Query GPUs, look for free to go back up
  470. gpusNow := gpu.GetGPUInfo()
  471. var totalMemoryNow, freeMemoryNow uint64
  472. for _, gpu := range gpusNow {
  473. totalMemoryNow += gpu.TotalMemory
  474. freeMemoryNow += gpu.FreeMemory
  475. }
  476. // If we're within ~80% of the estimated memory usage recovered, bail out
  477. if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.estimatedVRAM)*0.8 {
  478. slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()))
  479. finished <- struct{}{}
  480. return
  481. }
  482. }
  483. }()
  484. return finished
  485. }
  486. type ByDuration []*runnerRef
  487. func (a ByDuration) Len() int { return len(a) }
  488. func (a ByDuration) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  489. func (a ByDuration) Less(i, j int) bool {
  490. // uint64 to turn negative time (never unload) to largest
  491. return uint64(a[i].sessionDuration) < uint64(a[j].sessionDuration)
  492. }
  493. // TODO - future consideration to pick runners based on size
  494. // type BySize []*runnerRef
  495. // func (a BySize) Len() int { return len(a) }
  496. // func (a BySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  497. // func (a BySize) Less(i, j int) bool { return a[i].estimatedVRAM < a[j].estimatedVRAM }
  498. // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
  499. // If the model can not be fit fully within the available GPU(s) nil is returned
  500. func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList {
  501. var estimatedVRAM uint64
  502. for _, gl := range gpus.ByLibrary() {
  503. var ok bool
  504. sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...)
  505. // TODO - potentially sort by performance capability, existing models loaded, etc.
  506. // Note: at present, this will favor more VRAM over faster GPU speed in mixed setups
  507. sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl)))
  508. // First attempt to fit the model into a single GPU
  509. for _, g := range sgl {
  510. if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  511. slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
  512. return []gpu.GpuInfo{g}
  513. }
  514. }
  515. // TODO future refinements
  516. // - if multiple Libraries, see if any single GPU in any Library will fit
  517. // - try subsets of GPUs instead of just falling back to 1 or all in a family
  518. // Now try all the GPUs
  519. if ok, estimatedVRAM = llm.PredictServerFit(sgl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  520. slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", sgl[0].Library, "required", format.HumanBytes2(estimatedVRAM))
  521. return sgl
  522. }
  523. }
  524. return nil
  525. }
  526. // findRunnerToUnload finds a runner to unload to make room for a new model
  527. func (s *Scheduler) findRunnerToUnload() *runnerRef {
  528. s.loadedMu.Lock()
  529. runnerList := make([]*runnerRef, 0, len(s.loaded))
  530. for _, r := range s.loaded {
  531. runnerList = append(runnerList, r)
  532. }
  533. s.loadedMu.Unlock()
  534. // In the future we can enhance the algorithm to be smarter about picking the optimal runner to unload
  535. // e.g., if we have multiple options, will one make room for the request?
  536. sort.Sort(ByDuration(runnerList))
  537. // First try to find a runner that's already idle
  538. for _, runner := range runnerList {
  539. runner.refMu.Lock()
  540. rc := runner.refCount
  541. runner.refMu.Unlock()
  542. if rc == 0 {
  543. slog.Debug("found an idle runner to unload")
  544. return runner
  545. }
  546. }
  547. // None appear idle, just wait for the one with the shortest duration
  548. slog.Debug("no idle runners, picking the shortest duration", "count", len(runnerList))
  549. return runnerList[0]
  550. }
  551. func (s *Scheduler) unloadAllRunners() {
  552. s.loadedMu.Lock()
  553. defer s.loadedMu.Unlock()
  554. for model, runner := range s.loaded {
  555. if runner.llama != nil {
  556. slog.Debug("shutting down runner", "model", model)
  557. runner.llama.Close()
  558. }
  559. }
  560. }