sched.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. package server
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log/slog"
  7. "os"
  8. "reflect"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/ollama/ollama/api"
  15. "github.com/ollama/ollama/format"
  16. "github.com/ollama/ollama/gpu"
  17. "github.com/ollama/ollama/llm"
  18. "golang.org/x/exp/slices"
  19. )
  20. type LlmRequest struct {
  21. ctx context.Context //nolint:containedctx
  22. model *Model
  23. opts api.Options
  24. sessionDuration time.Duration
  25. successCh chan *runnerRef
  26. errCh chan error
  27. }
  28. type Scheduler struct {
  29. pendingReqCh chan *LlmRequest
  30. finishedReqCh chan *LlmRequest
  31. expiredCh chan *runnerRef
  32. unloadedCh chan interface{}
  33. loaded map[string]*runnerRef
  34. loadedMu sync.Mutex
  35. loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
  36. newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
  37. getGpuFn func() gpu.GpuInfoList
  38. }
  39. // TODO set this to zero after a release or two, to enable multiple models by default
  40. var loadedMax = 1 // Maximum runners; < 1 maps to as many as will fit in VRAM (unlimited for CPU runners)
  41. var maxQueuedRequests = 10 // TODO configurable
  42. func InitScheduler(ctx context.Context) *Scheduler {
  43. maxRunners := os.Getenv("OLLAMA_MAX_LOADED_MODELS")
  44. if maxRunners != "" {
  45. m, err := strconv.Atoi(maxRunners)
  46. if err != nil {
  47. slog.Error("invalid setting", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err)
  48. } else {
  49. loadedMax = m
  50. }
  51. }
  52. sched := &Scheduler{
  53. pendingReqCh: make(chan *LlmRequest, maxQueuedRequests),
  54. finishedReqCh: make(chan *LlmRequest, maxQueuedRequests),
  55. expiredCh: make(chan *runnerRef, maxQueuedRequests),
  56. unloadedCh: make(chan interface{}, maxQueuedRequests),
  57. loaded: make(map[string]*runnerRef),
  58. newServerFn: llm.NewLlamaServer,
  59. getGpuFn: gpu.GetGPUInfo,
  60. }
  61. sched.loadFn = sched.load
  62. return sched
  63. }
  64. // context must be canceled to decrement ref count and release the runner
  65. func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
  66. req := &LlmRequest{
  67. ctx: c,
  68. model: model,
  69. opts: opts,
  70. sessionDuration: sessionDuration,
  71. successCh: make(chan *runnerRef),
  72. errCh: make(chan error, 1),
  73. }
  74. select {
  75. case s.pendingReqCh <- req:
  76. default:
  77. req.errCh <- fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
  78. }
  79. return req.successCh, req.errCh
  80. }
  81. // Returns immediately, spawns go routines for the scheduler which will shutdown when ctx is done
  82. func (s *Scheduler) Run(ctx context.Context) {
  83. slog.Debug("starting llm scheduler")
  84. go func() {
  85. s.processPending(ctx)
  86. }()
  87. go func() {
  88. s.processCompleted(ctx)
  89. }()
  90. }
  91. func (s *Scheduler) processPending(ctx context.Context) {
  92. for {
  93. select {
  94. case <-ctx.Done():
  95. slog.Debug("shutting down scheduler pending loop")
  96. return
  97. case pending := <-s.pendingReqCh:
  98. // Block other requests until we get this pending request running
  99. for {
  100. var runnerToExpire *runnerRef
  101. s.loadedMu.Lock()
  102. runner := s.loaded[pending.model.ModelPath]
  103. loadedCount := len(s.loaded)
  104. s.loadedMu.Unlock()
  105. if runner != nil {
  106. if runner.needsReload(ctx, pending) {
  107. runnerToExpire = runner
  108. } else {
  109. // Runner is usable, return it
  110. pending.useLoadedRunner(runner, s.finishedReqCh)
  111. break
  112. }
  113. } else if loadedMax > 0 && loadedCount >= loadedMax {
  114. slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
  115. runnerToExpire = s.findRunnerToUnload(pending)
  116. } else {
  117. // Either no models are loaded or below loadedMax
  118. // Get a refreshed GPU list
  119. gpus := s.getGpuFn()
  120. // Load model for fitting
  121. ggml, err := llm.LoadModel(pending.model.ModelPath)
  122. if err != nil {
  123. pending.errCh <- err
  124. break
  125. }
  126. // No models loaded. Load the model but prefer the best fit.
  127. if loadedCount == 0 {
  128. slog.Debug("loading first model", "model", pending.model.ModelPath)
  129. g := pickBestFitGPUs(pending, ggml, gpus)
  130. if g != nil {
  131. gpus = g
  132. }
  133. s.loadFn(pending, ggml, gpus)
  134. break
  135. }
  136. // More than one loaded model, so we have to see if the new one fits
  137. // Update free memory from currently loaded models
  138. s.updateFreeSpace(gpus)
  139. gpus = pickBestFitGPUs(pending, ggml, gpus)
  140. if gpus != nil {
  141. slog.Debug("new model fits with existing models, loading")
  142. s.loadFn(pending, ggml, gpus)
  143. break
  144. }
  145. runnerToExpire = s.findRunnerToUnload(pending)
  146. }
  147. if runnerToExpire == nil {
  148. // Shouildn't happen
  149. slog.Error("runner to expire was nil!")
  150. continue
  151. }
  152. // Trigger an expiration to unload once it's done
  153. runnerToExpire.refMu.Lock()
  154. slog.Debug("resetting model to expire immediately to make room", "model", runnerToExpire.model, "refCount", runnerToExpire.refCount)
  155. if runnerToExpire.expireTimer != nil {
  156. runnerToExpire.expireTimer.Stop()
  157. runnerToExpire.expireTimer = nil
  158. }
  159. runnerToExpire.sessionDuration = 0
  160. if runnerToExpire.refCount <= 0 {
  161. s.expiredCh <- runnerToExpire
  162. }
  163. runnerToExpire.refMu.Unlock()
  164. // Wait for the unload to happen
  165. // Note: at this point we're queueing up all incoming requests, even if they were for
  166. // a different model that's loaded and not scheduled to be removed.
  167. slog.Debug("waiting for pending requests to complete and unload to occur", "model", runnerToExpire.model)
  168. select {
  169. case <-ctx.Done():
  170. slog.Debug("shutting down scheduler pending loop")
  171. return
  172. case <-s.unloadedCh:
  173. slog.Debug("unload completed", "model", runnerToExpire.model)
  174. continue
  175. }
  176. }
  177. case <-s.unloadedCh:
  178. // An unload request when there are no pending request can be ignored
  179. slog.Debug("ignoring unload event with no pending requests")
  180. }
  181. }
  182. }
  183. func (s *Scheduler) processCompleted(ctx context.Context) {
  184. // Process completed requests, expired timers, and unloading models
  185. for {
  186. select {
  187. case <-ctx.Done():
  188. slog.Debug("shutting down scheduler completed loop")
  189. return
  190. case finished := <-s.finishedReqCh:
  191. s.loadedMu.Lock()
  192. runner := s.loaded[finished.model.ModelPath]
  193. s.loadedMu.Unlock()
  194. if runner == nil {
  195. slog.Error("finished requeset signal received after model unloaded", "model", finished.model.ModelPath)
  196. continue
  197. }
  198. runner.refMu.Lock()
  199. runner.refCount--
  200. if runner.refCount <= 0 {
  201. if runner.sessionDuration <= 0 {
  202. slog.Debug("runner with zero duration has gone idle, expiring to unload", "model", runner.model)
  203. if runner.expireTimer != nil {
  204. runner.expireTimer.Stop()
  205. runner.expireTimer = nil
  206. }
  207. s.expiredCh <- runner
  208. } else if runner.expireTimer == nil {
  209. slog.Debug("runner with non-zero duration has gone idle, adding timer", "model", runner.model, "duration", runner.sessionDuration)
  210. runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() {
  211. slog.Debug("timer expired, expiring to unload", "model", runner.model)
  212. runner.refMu.Lock()
  213. defer runner.refMu.Unlock()
  214. if runner.expireTimer != nil {
  215. runner.expireTimer.Stop()
  216. }
  217. s.expiredCh <- runner
  218. })
  219. } else {
  220. slog.Debug("runner with non-zero duration has gone idle, resetting timer", "model", runner.model, "duration", runner.sessionDuration)
  221. runner.expireTimer.Reset(runner.sessionDuration)
  222. }
  223. }
  224. slog.Debug("after processing request finished event", "model", runner.model, "refCount", runner.refCount)
  225. runner.refMu.Unlock()
  226. case runner := <-s.expiredCh:
  227. slog.Debug("runner expired event received", "model", runner.model)
  228. runner.refMu.Lock()
  229. if runner.refCount > 0 {
  230. // Shouldn't happen, but safeguard to ensure no leaked runners
  231. slog.Debug("expired event with positive ref count, retrying", "model", runner.model, "refCount", runner.refCount)
  232. go func(runner *runnerRef) {
  233. // We can't unload yet, but want to as soon as the current request completes
  234. // So queue up another expired event
  235. time.Sleep(10 * time.Millisecond)
  236. s.expiredCh <- runner
  237. }(runner)
  238. runner.refMu.Unlock()
  239. continue
  240. }
  241. slog.Debug("got lock to unload", "model", runner.model)
  242. runner.unload()
  243. s.loadedMu.Lock()
  244. delete(s.loaded, runner.model)
  245. s.loadedMu.Unlock()
  246. slog.Debug("runner released", "model", runner.model)
  247. runner.refMu.Unlock()
  248. slog.Debug("sending an unloaded event", "model", runner.model)
  249. s.unloadedCh <- struct{}{}
  250. }
  251. }
  252. }
  253. // Complete the pending request and send the runner back to the requester
  254. // Wires up a finished event after the request context is completed
  255. // Updates session duration, and resets expiration timer
  256. func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *LlmRequest) {
  257. runner.refMu.Lock()
  258. defer runner.refMu.Unlock()
  259. runner.refCount++
  260. runner.sessionDuration = pending.sessionDuration
  261. pending.successCh <- runner
  262. go func() {
  263. <-pending.ctx.Done()
  264. slog.Debug("context for request finished")
  265. finished <- pending
  266. }()
  267. }
  268. func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) {
  269. llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts)
  270. if err != nil {
  271. // some older models are not compatible with newer versions of llama.cpp
  272. // show a generalized compatibility error until there is a better way to
  273. // check for model compatibility
  274. if errors.Is(llm.ErrUnsupportedFormat, err) || strings.Contains(err.Error(), "failed to load model") {
  275. err = fmt.Errorf("%v: this model may be incompatible with your version of Ollama. If you previously pulled this model, try updating it by running `ollama pull %s`", err, req.model.ShortName)
  276. }
  277. slog.Info("NewLlamaServer failed", "model", req.model.ModelPath, "error", err)
  278. req.errCh <- err
  279. return
  280. }
  281. runner := &runnerRef{}
  282. runner.model = req.model.ModelPath
  283. runner.adapters = req.model.AdapterPaths
  284. runner.projectors = req.model.ProjectorPaths
  285. runner.llama = llama
  286. runner.Options = &req.opts
  287. runner.sessionDuration = req.sessionDuration
  288. runner.gpus = gpus
  289. runner.estimatedVRAM = llama.EstimatedVRAM()
  290. runner.loading = true
  291. runner.refCount = 1
  292. runner.refMu.Lock()
  293. s.loadedMu.Lock()
  294. s.loaded[req.model.ModelPath] = runner
  295. slog.Info("loaded runners", "count", len(s.loaded))
  296. s.loadedMu.Unlock()
  297. go func() {
  298. defer runner.refMu.Unlock()
  299. if err = llama.WaitUntilRunning(req.ctx); err != nil {
  300. slog.Error("error loading llama server", "error", err)
  301. runner.refCount--
  302. req.errCh <- err
  303. slog.Debug("triggering expiration for failed load", "model", runner.model)
  304. s.expiredCh <- runner
  305. return
  306. }
  307. slog.Debug("finished setting up runner", "model", req.model.ModelPath)
  308. runner.loading = false
  309. go func() {
  310. <-req.ctx.Done()
  311. slog.Debug("context for request finished")
  312. s.finishedReqCh <- req
  313. }()
  314. req.successCh <- runner
  315. }()
  316. }
  317. func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) {
  318. type predKey struct {
  319. Library string
  320. ID string
  321. }
  322. predMap := map[predKey]uint64{} // Sum up the total predicted usage per GPU for all runners
  323. s.loadedMu.Lock()
  324. for _, r := range s.loaded {
  325. r.refMu.Lock()
  326. gpuIDs := make([]string, 0, len(r.gpus))
  327. if r.llama != nil {
  328. // TODO this should be broken down by GPU instead of assuming uniform spread
  329. estimatedVRAMPerGPU := r.llama.EstimatedVRAM() / uint64(len(r.gpus))
  330. for _, gpu := range r.gpus {
  331. gpuIDs = append(gpuIDs, gpu.ID)
  332. }
  333. for _, gpu := range allGpus {
  334. if slices.Contains(gpuIDs, gpu.ID) {
  335. predMap[predKey{gpu.Library, gpu.ID}] += estimatedVRAMPerGPU
  336. }
  337. }
  338. } else {
  339. slog.Warn("unexpected nil runner reference, memory prediction may be incorrect")
  340. }
  341. r.refMu.Unlock()
  342. }
  343. s.loadedMu.Unlock()
  344. // Now that we've summed up all the GPU usage predictions across all the loaded runners, update the gpu list
  345. for i := range allGpus {
  346. if p, ok := predMap[predKey{allGpus[i].Library, allGpus[i].ID}]; ok {
  347. slog.Debug("gpu reported", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "available", format.HumanBytes2(allGpus[i].FreeMemory))
  348. if p > allGpus[i].TotalMemory {
  349. // Shouldn't happen
  350. slog.Warn("predicted usage exceeds VRAM", "gpu", allGpus[i].ID, "totalMemory", allGpus[i].TotalMemory, "predicted", p)
  351. allGpus[i].FreeMemory = 0
  352. } else if (allGpus[i].TotalMemory - p) < allGpus[i].FreeMemory { // predicted free is smaller than reported free, use it
  353. // TODO maybe we should just always trust our numbers, since cuda's free memory reporting is laggy
  354. // and we might unload models we didn't actually need to. The risk is if some other GPU intensive app is loaded
  355. // after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent.
  356. allGpus[i].FreeMemory = allGpus[i].TotalMemory - p
  357. }
  358. slog.Info("updated VRAM", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory))
  359. }
  360. }
  361. }
  362. type runnerRef struct {
  363. refMu sync.Mutex
  364. // refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
  365. refCount uint // prevent unloading if > 0
  366. // unloading bool // set to true when we are trying to unload the runner
  367. llama llm.LlamaServer
  368. loading bool // True only during initial load, then false forever
  369. gpus gpu.GpuInfoList // Recorded at time of provisioning
  370. estimatedVRAM uint64
  371. sessionDuration time.Duration
  372. expireTimer *time.Timer
  373. model string
  374. adapters []string
  375. projectors []string
  376. *api.Options
  377. }
  378. // The refMu must already be held when calling unload
  379. func (runner *runnerRef) unload() {
  380. if runner.llama != nil {
  381. runner.llama.Close()
  382. }
  383. runner.llama = nil
  384. runner.adapters = nil
  385. runner.projectors = nil
  386. runner.Options = nil
  387. runner.gpus = nil
  388. }
  389. func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool {
  390. slog.Debug("evaluating already loaded", "model", req.model.ModelPath)
  391. runner.refMu.Lock()
  392. defer runner.refMu.Unlock()
  393. timeout := 10 * time.Second
  394. if runner.loading {
  395. timeout = 2 * time.Minute // Initial load can take a long time for big models on slow systems...
  396. }
  397. // Don't reload runner if num_gpu=-1 was provided
  398. optsExisting := runner.Options.Runner
  399. optsNew := req.opts.Runner
  400. if optsNew.NumGPU < 0 {
  401. optsExisting.NumGPU = -1
  402. optsNew.NumGPU = -1
  403. }
  404. ctx, cancel := context.WithTimeout(ctx, timeout)
  405. defer cancel()
  406. if !reflect.DeepEqual(runner.adapters, req.model.AdapterPaths) || // have the adapters changed?
  407. !reflect.DeepEqual(runner.projectors, req.model.ProjectorPaths) || // have the projectors changed?
  408. !reflect.DeepEqual(optsExisting, optsNew) || // have the runner options changed?
  409. runner.llama.Ping(ctx) != nil {
  410. return true
  411. }
  412. return false
  413. }
  414. type ByDuration []*runnerRef
  415. func (a ByDuration) Len() int { return len(a) }
  416. func (a ByDuration) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  417. func (a ByDuration) Less(i, j int) bool {
  418. // uint64 to turn negative time (never unload) to largest
  419. return uint64(a[i].sessionDuration) < uint64(a[j].sessionDuration)
  420. }
  421. // TODO - future consideration to pick runners based on size
  422. // type BySize []*runnerRef
  423. // func (a BySize) Len() int { return len(a) }
  424. // func (a BySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  425. // func (a BySize) Less(i, j int) bool { return a[i].estimatedVRAM < a[j].estimatedVRAM }
  426. // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
  427. // If the model can not be fit fully within the available GPU(s) nil is returned
  428. func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList {
  429. var estimatedVRAM uint64
  430. for _, gl := range gpus.ByLibrary() {
  431. var ok bool
  432. sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...)
  433. // TODO - potentially sort by performance capability, existing models loaded, etc.
  434. // Note: at present, this will favor more VRAM over faster GPU speed in mixed setups
  435. sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl)))
  436. // First attempt to fit the model into a single GPU
  437. for _, g := range sgl {
  438. if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  439. slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
  440. return []gpu.GpuInfo{g}
  441. }
  442. }
  443. // TODO future refinements
  444. // - if multiple Libraries, see if any single GPU in any Library will fit
  445. // - try subsets of GPUs instead of just falling back to 1 or all in a family
  446. // Now try all the GPUs
  447. if ok, estimatedVRAM = llm.PredictServerFit(gl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  448. slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", gl[0].Library, "required", format.HumanBytes2(estimatedVRAM))
  449. return gl
  450. }
  451. }
  452. return nil
  453. }
  454. // findRunnerToUnload finds a runner to unload to make room for a new model
  455. func (s *Scheduler) findRunnerToUnload(req *LlmRequest) *runnerRef {
  456. s.loadedMu.Lock()
  457. runnerList := make([]*runnerRef, 0, len(s.loaded))
  458. for _, r := range s.loaded {
  459. runnerList = append(runnerList, r)
  460. }
  461. s.loadedMu.Unlock()
  462. // In the future we can enhance the algorithm to be smarter about picking the optimal runner to unload
  463. // e.g., if we have multiple options, will one make room for the request?
  464. sort.Sort(ByDuration(runnerList))
  465. // First try to find a runner that's already idle
  466. for _, runner := range runnerList {
  467. runner.refMu.Lock()
  468. rc := runner.refCount
  469. runner.refMu.Unlock()
  470. if rc == 0 {
  471. slog.Debug("found an idle runner to unload")
  472. return runner
  473. }
  474. }
  475. // None appear idle, just wait for the one with the shortest duration
  476. slog.Debug("no idle runners, picking the shortest duration", "count", len(runnerList))
  477. return runnerList[0]
  478. }
  479. func (s *Scheduler) unloadAllRunners() {
  480. s.loadedMu.Lock()
  481. defer s.loadedMu.Unlock()
  482. for model, runner := range s.loaded {
  483. if runner.llama != nil {
  484. slog.Debug("shutting down runner", "model", model)
  485. runner.llama.Close()
  486. }
  487. }
  488. }