sched.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. package server
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log/slog"
  7. "reflect"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/ollama/ollama/api"
  13. "github.com/ollama/ollama/format"
  14. "github.com/ollama/ollama/gpu"
  15. "github.com/ollama/ollama/llm"
  16. "github.com/ollama/ollama/server/envconfig"
  17. "golang.org/x/exp/slices"
  18. )
  19. type LlmRequest struct {
  20. ctx context.Context //nolint:containedctx
  21. model *Model
  22. opts api.Options
  23. sessionDuration time.Duration
  24. successCh chan *runnerRef
  25. errCh chan error
  26. }
  27. type Scheduler struct {
  28. pendingReqCh chan *LlmRequest
  29. finishedReqCh chan *LlmRequest
  30. expiredCh chan *runnerRef
  31. unloadedCh chan interface{}
  32. loaded map[string]*runnerRef
  33. loadedMu sync.Mutex
  34. loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
  35. newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
  36. getGpuFn func() gpu.GpuInfoList
  37. }
  38. var ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
  39. func InitScheduler(ctx context.Context) *Scheduler {
  40. sched := &Scheduler{
  41. pendingReqCh: make(chan *LlmRequest, envconfig.MaxQueuedRequests),
  42. finishedReqCh: make(chan *LlmRequest, envconfig.MaxQueuedRequests),
  43. expiredCh: make(chan *runnerRef, envconfig.MaxQueuedRequests),
  44. unloadedCh: make(chan interface{}, envconfig.MaxQueuedRequests),
  45. loaded: make(map[string]*runnerRef),
  46. newServerFn: llm.NewLlamaServer,
  47. getGpuFn: gpu.GetGPUInfo,
  48. }
  49. sched.loadFn = sched.load
  50. return sched
  51. }
  52. // context must be canceled to decrement ref count and release the runner
  53. func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
  54. // allocate a large enough kv cache for all parallel requests
  55. opts.NumCtx = opts.NumCtx * envconfig.NumParallel
  56. req := &LlmRequest{
  57. ctx: c,
  58. model: model,
  59. opts: opts,
  60. sessionDuration: sessionDuration,
  61. successCh: make(chan *runnerRef),
  62. errCh: make(chan error, 1),
  63. }
  64. select {
  65. case s.pendingReqCh <- req:
  66. default:
  67. req.errCh <- ErrMaxQueue
  68. }
  69. return req.successCh, req.errCh
  70. }
  71. // Returns immediately, spawns go routines for the scheduler which will shutdown when ctx is done
  72. func (s *Scheduler) Run(ctx context.Context) {
  73. slog.Debug("starting llm scheduler")
  74. go func() {
  75. s.processPending(ctx)
  76. }()
  77. go func() {
  78. s.processCompleted(ctx)
  79. }()
  80. }
  81. func (s *Scheduler) processPending(ctx context.Context) {
  82. for {
  83. select {
  84. case <-ctx.Done():
  85. slog.Debug("shutting down scheduler pending loop")
  86. return
  87. case pending := <-s.pendingReqCh:
  88. // Block other requests until we get this pending request running
  89. for {
  90. var runnerToExpire *runnerRef
  91. s.loadedMu.Lock()
  92. runner := s.loaded[pending.model.ModelPath]
  93. loadedCount := len(s.loaded)
  94. s.loadedMu.Unlock()
  95. if runner != nil {
  96. if runner.needsReload(ctx, pending) {
  97. runnerToExpire = runner
  98. } else {
  99. // Runner is usable, return it
  100. pending.useLoadedRunner(runner, s.finishedReqCh)
  101. break
  102. }
  103. } else if envconfig.MaxRunners > 0 && loadedCount >= envconfig.MaxRunners {
  104. slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
  105. runnerToExpire = s.findRunnerToUnload(pending)
  106. } else {
  107. // Either no models are loaded or below envconfig.MaxRunners
  108. // Get a refreshed GPU list
  109. gpus := s.getGpuFn()
  110. // Load model for fitting
  111. ggml, err := llm.LoadModel(pending.model.ModelPath)
  112. if err != nil {
  113. pending.errCh <- err
  114. break
  115. }
  116. // If we're CPU only mode, just limit by envconfig.MaxRunners above
  117. // TODO handle system memory exhaustion
  118. if (len(gpus) == 1 && gpus[0].Library == "cpu") || pending.opts.NumGPU == 0 {
  119. slog.Debug("cpu mode with existing models, loading")
  120. s.loadFn(pending, ggml, gpus)
  121. break
  122. }
  123. // No models loaded. Load the model but prefer the best fit.
  124. if loadedCount == 0 {
  125. slog.Debug("loading first model", "model", pending.model.ModelPath)
  126. g := pickBestFitGPUs(pending, ggml, gpus)
  127. if g != nil {
  128. gpus = g
  129. }
  130. s.loadFn(pending, ggml, gpus)
  131. break
  132. }
  133. // More than one loaded model, so we have to see if the new one fits
  134. // Update free memory from currently loaded models
  135. s.updateFreeSpace(gpus)
  136. gpus = pickBestFitGPUs(pending, ggml, gpus)
  137. if gpus != nil {
  138. slog.Debug("new model fits with existing models, loading")
  139. s.loadFn(pending, ggml, gpus)
  140. break
  141. }
  142. runnerToExpire = s.findRunnerToUnload(pending)
  143. }
  144. if runnerToExpire == nil {
  145. // Shouildn't happen
  146. slog.Error("runner to expire was nil!")
  147. continue
  148. }
  149. // Trigger an expiration to unload once it's done
  150. runnerToExpire.refMu.Lock()
  151. slog.Debug("resetting model to expire immediately to make room", "model", runnerToExpire.model, "refCount", runnerToExpire.refCount)
  152. if runnerToExpire.expireTimer != nil {
  153. runnerToExpire.expireTimer.Stop()
  154. runnerToExpire.expireTimer = nil
  155. }
  156. runnerToExpire.sessionDuration = 0
  157. if runnerToExpire.refCount <= 0 {
  158. s.expiredCh <- runnerToExpire
  159. }
  160. runnerToExpire.refMu.Unlock()
  161. // Wait for the unload to happen
  162. // Note: at this point we're queueing up all incoming requests, even if they were for
  163. // a different model that's loaded and not scheduled to be removed.
  164. slog.Debug("waiting for pending requests to complete and unload to occur", "model", runnerToExpire.model)
  165. select {
  166. case <-ctx.Done():
  167. slog.Debug("shutting down scheduler pending loop")
  168. return
  169. case <-s.unloadedCh:
  170. slog.Debug("unload completed", "model", runnerToExpire.model)
  171. continue
  172. }
  173. }
  174. case <-s.unloadedCh:
  175. // An unload request when there are no pending request can be ignored
  176. slog.Debug("ignoring unload event with no pending requests")
  177. }
  178. }
  179. }
  180. func (s *Scheduler) processCompleted(ctx context.Context) {
  181. // Process completed requests, expired timers, and unloading models
  182. for {
  183. select {
  184. case <-ctx.Done():
  185. slog.Debug("shutting down scheduler completed loop")
  186. return
  187. case finished := <-s.finishedReqCh:
  188. s.loadedMu.Lock()
  189. runner := s.loaded[finished.model.ModelPath]
  190. s.loadedMu.Unlock()
  191. if runner == nil {
  192. slog.Error("finished requeset signal received after model unloaded", "model", finished.model.ModelPath)
  193. continue
  194. }
  195. runner.refMu.Lock()
  196. runner.refCount--
  197. if runner.refCount <= 0 {
  198. if runner.sessionDuration <= 0 {
  199. slog.Debug("runner with zero duration has gone idle, expiring to unload", "model", runner.model)
  200. if runner.expireTimer != nil {
  201. runner.expireTimer.Stop()
  202. runner.expireTimer = nil
  203. }
  204. s.expiredCh <- runner
  205. } else if runner.expireTimer == nil {
  206. slog.Debug("runner with non-zero duration has gone idle, adding timer", "model", runner.model, "duration", runner.sessionDuration)
  207. runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() {
  208. slog.Debug("timer expired, expiring to unload", "model", runner.model)
  209. runner.refMu.Lock()
  210. defer runner.refMu.Unlock()
  211. if runner.expireTimer != nil {
  212. runner.expireTimer.Stop()
  213. runner.expireTimer = nil
  214. }
  215. s.expiredCh <- runner
  216. })
  217. } else {
  218. slog.Debug("runner with non-zero duration has gone idle, resetting timer", "model", runner.model, "duration", runner.sessionDuration)
  219. runner.expireTimer.Reset(runner.sessionDuration)
  220. }
  221. }
  222. slog.Debug("after processing request finished event", "model", runner.model, "refCount", runner.refCount)
  223. runner.refMu.Unlock()
  224. case runner := <-s.expiredCh:
  225. slog.Debug("runner expired event received", "model", runner.model)
  226. runner.refMu.Lock()
  227. if runner.refCount > 0 {
  228. // Shouldn't happen, but safeguard to ensure no leaked runners
  229. slog.Debug("expired event with positive ref count, retrying", "model", runner.model, "refCount", runner.refCount)
  230. go func(runner *runnerRef) {
  231. // We can't unload yet, but want to as soon as the current request completes
  232. // So queue up another expired event
  233. time.Sleep(10 * time.Millisecond)
  234. s.expiredCh <- runner
  235. }(runner)
  236. runner.refMu.Unlock()
  237. continue
  238. }
  239. slog.Debug("got lock to unload", "model", runner.model)
  240. runner.unload()
  241. s.loadedMu.Lock()
  242. delete(s.loaded, runner.model)
  243. s.loadedMu.Unlock()
  244. slog.Debug("runner released", "model", runner.model)
  245. runner.refMu.Unlock()
  246. slog.Debug("sending an unloaded event", "model", runner.model)
  247. s.unloadedCh <- struct{}{}
  248. }
  249. }
  250. }
  251. // Complete the pending request and send the runner back to the requester
  252. // Wires up a finished event after the request context is completed
  253. // Updates session duration, and resets expiration timer
  254. func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *LlmRequest) {
  255. runner.refMu.Lock()
  256. defer runner.refMu.Unlock()
  257. runner.refCount++
  258. if runner.expireTimer != nil {
  259. runner.expireTimer.Stop()
  260. runner.expireTimer = nil
  261. }
  262. runner.sessionDuration = pending.sessionDuration
  263. pending.successCh <- runner
  264. go func() {
  265. <-pending.ctx.Done()
  266. slog.Debug("context for request finished")
  267. finished <- pending
  268. }()
  269. }
  270. func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) {
  271. llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts)
  272. if err != nil {
  273. // some older models are not compatible with newer versions of llama.cpp
  274. // show a generalized compatibility error until there is a better way to
  275. // check for model compatibility
  276. if errors.Is(llm.ErrUnsupportedFormat, err) || strings.Contains(err.Error(), "failed to load model") {
  277. err = fmt.Errorf("%v: this model may be incompatible with your version of Ollama. If you previously pulled this model, try updating it by running `ollama pull %s`", err, req.model.ShortName)
  278. }
  279. slog.Info("NewLlamaServer failed", "model", req.model.ModelPath, "error", err)
  280. req.errCh <- err
  281. return
  282. }
  283. runner := &runnerRef{}
  284. runner.model = req.model.ModelPath
  285. runner.adapters = req.model.AdapterPaths
  286. runner.projectors = req.model.ProjectorPaths
  287. runner.llama = llama
  288. runner.Options = &req.opts
  289. runner.sessionDuration = req.sessionDuration
  290. runner.gpus = gpus
  291. runner.estimatedVRAM = llama.EstimatedVRAM()
  292. runner.loading = true
  293. runner.refCount = 1
  294. runner.refMu.Lock()
  295. s.loadedMu.Lock()
  296. s.loaded[req.model.ModelPath] = runner
  297. slog.Info("loaded runners", "count", len(s.loaded))
  298. s.loadedMu.Unlock()
  299. go func() {
  300. defer runner.refMu.Unlock()
  301. if err = llama.WaitUntilRunning(req.ctx); err != nil {
  302. slog.Error("error loading llama server", "error", err)
  303. runner.refCount--
  304. req.errCh <- err
  305. slog.Debug("triggering expiration for failed load", "model", runner.model)
  306. s.expiredCh <- runner
  307. return
  308. }
  309. slog.Debug("finished setting up runner", "model", req.model.ModelPath)
  310. runner.loading = false
  311. go func() {
  312. <-req.ctx.Done()
  313. slog.Debug("context for request finished")
  314. s.finishedReqCh <- req
  315. }()
  316. req.successCh <- runner
  317. }()
  318. }
  319. func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) {
  320. type predKey struct {
  321. Library string
  322. ID string
  323. }
  324. predMap := map[predKey]uint64{} // Sum up the total predicted usage per GPU for all runners
  325. s.loadedMu.Lock()
  326. for _, r := range s.loaded {
  327. r.refMu.Lock()
  328. gpuIDs := make([]string, 0, len(r.gpus))
  329. if r.llama != nil {
  330. // TODO this should be broken down by GPU instead of assuming uniform spread
  331. estimatedVRAMPerGPU := r.llama.EstimatedVRAM() / uint64(len(r.gpus))
  332. for _, gpu := range r.gpus {
  333. gpuIDs = append(gpuIDs, gpu.ID)
  334. }
  335. for _, gpu := range allGpus {
  336. if slices.Contains(gpuIDs, gpu.ID) {
  337. predMap[predKey{gpu.Library, gpu.ID}] += estimatedVRAMPerGPU
  338. }
  339. }
  340. } else {
  341. slog.Warn("unexpected nil runner reference, memory prediction may be incorrect")
  342. }
  343. r.refMu.Unlock()
  344. }
  345. s.loadedMu.Unlock()
  346. // Now that we've summed up all the GPU usage predictions across all the loaded runners, update the gpu list
  347. for i := range allGpus {
  348. if p, ok := predMap[predKey{allGpus[i].Library, allGpus[i].ID}]; ok {
  349. slog.Debug("gpu reported", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "available", format.HumanBytes2(allGpus[i].FreeMemory))
  350. if p > allGpus[i].TotalMemory {
  351. // Shouldn't happen
  352. slog.Warn("predicted usage exceeds VRAM", "gpu", allGpus[i].ID, "totalMemory", allGpus[i].TotalMemory, "predicted", p)
  353. allGpus[i].FreeMemory = 0
  354. } else if (allGpus[i].TotalMemory - p) < allGpus[i].FreeMemory { // predicted free is smaller than reported free, use it
  355. // TODO maybe we should just always trust our numbers, since cuda's free memory reporting is laggy
  356. // and we might unload models we didn't actually need to. The risk is if some other GPU intensive app is loaded
  357. // after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent.
  358. allGpus[i].FreeMemory = allGpus[i].TotalMemory - p
  359. }
  360. slog.Info("updated VRAM", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory))
  361. }
  362. }
  363. }
  364. type runnerRef struct {
  365. refMu sync.Mutex
  366. // refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
  367. refCount uint // prevent unloading if > 0
  368. // unloading bool // set to true when we are trying to unload the runner
  369. llama llm.LlamaServer
  370. loading bool // True only during initial load, then false forever
  371. gpus gpu.GpuInfoList // Recorded at time of provisioning
  372. estimatedVRAM uint64
  373. sessionDuration time.Duration
  374. expireTimer *time.Timer
  375. model string
  376. adapters []string
  377. projectors []string
  378. *api.Options
  379. }
  380. // The refMu must already be held when calling unload
  381. func (runner *runnerRef) unload() {
  382. if runner.expireTimer != nil {
  383. runner.expireTimer.Stop()
  384. runner.expireTimer = nil
  385. }
  386. if runner.llama != nil {
  387. runner.llama.Close()
  388. }
  389. runner.llama = nil
  390. runner.adapters = nil
  391. runner.projectors = nil
  392. runner.Options = nil
  393. runner.gpus = nil
  394. }
  395. func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool {
  396. slog.Debug("evaluating already loaded", "model", req.model.ModelPath)
  397. runner.refMu.Lock()
  398. defer runner.refMu.Unlock()
  399. timeout := 10 * time.Second
  400. if runner.loading {
  401. timeout = 2 * time.Minute // Initial load can take a long time for big models on slow systems...
  402. }
  403. // Don't reload runner if num_gpu=-1 was provided
  404. optsExisting := runner.Options.Runner
  405. optsNew := req.opts.Runner
  406. if optsNew.NumGPU < 0 {
  407. optsExisting.NumGPU = -1
  408. optsNew.NumGPU = -1
  409. }
  410. ctx, cancel := context.WithTimeout(ctx, timeout)
  411. defer cancel()
  412. if !reflect.DeepEqual(runner.adapters, req.model.AdapterPaths) || // have the adapters changed?
  413. !reflect.DeepEqual(runner.projectors, req.model.ProjectorPaths) || // have the projectors changed?
  414. !reflect.DeepEqual(optsExisting, optsNew) || // have the runner options changed?
  415. runner.llama.Ping(ctx) != nil {
  416. return true
  417. }
  418. return false
  419. }
  420. type ByDuration []*runnerRef
  421. func (a ByDuration) Len() int { return len(a) }
  422. func (a ByDuration) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  423. func (a ByDuration) Less(i, j int) bool {
  424. // uint64 to turn negative time (never unload) to largest
  425. return uint64(a[i].sessionDuration) < uint64(a[j].sessionDuration)
  426. }
  427. // TODO - future consideration to pick runners based on size
  428. // type BySize []*runnerRef
  429. // func (a BySize) Len() int { return len(a) }
  430. // func (a BySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  431. // func (a BySize) Less(i, j int) bool { return a[i].estimatedVRAM < a[j].estimatedVRAM }
  432. // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
  433. // If the model can not be fit fully within the available GPU(s) nil is returned
  434. func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList {
  435. var estimatedVRAM uint64
  436. for _, gl := range gpus.ByLibrary() {
  437. var ok bool
  438. sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...)
  439. // TODO - potentially sort by performance capability, existing models loaded, etc.
  440. // Note: at present, this will favor more VRAM over faster GPU speed in mixed setups
  441. sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl)))
  442. // First attempt to fit the model into a single GPU
  443. for _, g := range sgl {
  444. if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  445. slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
  446. return []gpu.GpuInfo{g}
  447. }
  448. }
  449. // TODO future refinements
  450. // - if multiple Libraries, see if any single GPU in any Library will fit
  451. // - try subsets of GPUs instead of just falling back to 1 or all in a family
  452. // Now try all the GPUs
  453. if ok, estimatedVRAM = llm.PredictServerFit(gl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  454. slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", gl[0].Library, "required", format.HumanBytes2(estimatedVRAM))
  455. return gl
  456. }
  457. }
  458. return nil
  459. }
  460. // findRunnerToUnload finds a runner to unload to make room for a new model
  461. func (s *Scheduler) findRunnerToUnload(req *LlmRequest) *runnerRef {
  462. s.loadedMu.Lock()
  463. runnerList := make([]*runnerRef, 0, len(s.loaded))
  464. for _, r := range s.loaded {
  465. runnerList = append(runnerList, r)
  466. }
  467. s.loadedMu.Unlock()
  468. // In the future we can enhance the algorithm to be smarter about picking the optimal runner to unload
  469. // e.g., if we have multiple options, will one make room for the request?
  470. sort.Sort(ByDuration(runnerList))
  471. // First try to find a runner that's already idle
  472. for _, runner := range runnerList {
  473. runner.refMu.Lock()
  474. rc := runner.refCount
  475. runner.refMu.Unlock()
  476. if rc == 0 {
  477. slog.Debug("found an idle runner to unload")
  478. return runner
  479. }
  480. }
  481. // None appear idle, just wait for the one with the shortest duration
  482. slog.Debug("no idle runners, picking the shortest duration", "count", len(runnerList))
  483. return runnerList[0]
  484. }
  485. func (s *Scheduler) unloadAllRunners() {
  486. s.loadedMu.Lock()
  487. defer s.loadedMu.Unlock()
  488. for model, runner := range s.loaded {
  489. if runner.llama != nil {
  490. slog.Debug("shutting down runner", "model", model)
  491. runner.llama.Close()
  492. }
  493. }
  494. }