sched.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530
  1. package server
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log/slog"
  7. "os"
  8. "reflect"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/ollama/ollama/api"
  15. "github.com/ollama/ollama/format"
  16. "github.com/ollama/ollama/gpu"
  17. "github.com/ollama/ollama/llm"
  18. "golang.org/x/exp/slices"
  19. )
  20. type LlmRequest struct {
  21. ctx context.Context //nolint:containedctx
  22. model *Model
  23. opts api.Options
  24. sessionDuration time.Duration
  25. successCh chan *runnerRef
  26. errCh chan error
  27. }
  28. type Scheduler struct {
  29. pendingReqCh chan *LlmRequest
  30. finishedReqCh chan *LlmRequest
  31. expiredCh chan *runnerRef
  32. unloadedCh chan interface{}
  33. loaded map[string]*runnerRef
  34. loadedMu sync.Mutex
  35. loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
  36. newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
  37. getGpuFn func() gpu.GpuInfoList
  38. }
  39. // TODO set this to zero after a release or two, to enable multiple models by default
  40. var loadedMax = 1 // Maximum runners; < 1 maps to as many as will fit in VRAM (unlimited for CPU runners)
  41. var maxQueuedRequests = 10 // TODO configurable
  42. func InitScheduler(ctx context.Context) *Scheduler {
  43. maxRunners := os.Getenv("OLLAMA_MAX_LOADED_MODELS")
  44. if maxRunners != "" {
  45. m, err := strconv.Atoi(maxRunners)
  46. if err != nil {
  47. slog.Error("invalid setting", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err)
  48. } else {
  49. loadedMax = m
  50. }
  51. }
  52. sched := &Scheduler{
  53. pendingReqCh: make(chan *LlmRequest, maxQueuedRequests),
  54. finishedReqCh: make(chan *LlmRequest, maxQueuedRequests),
  55. expiredCh: make(chan *runnerRef, maxQueuedRequests),
  56. unloadedCh: make(chan interface{}, maxQueuedRequests),
  57. loaded: make(map[string]*runnerRef),
  58. newServerFn: llm.NewLlamaServer,
  59. getGpuFn: gpu.GetGPUInfo,
  60. }
  61. sched.loadFn = sched.load
  62. return sched
  63. }
  64. // context must be canceled to decrement ref count and release the runner
  65. func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
  66. req := &LlmRequest{
  67. ctx: c,
  68. model: model,
  69. opts: opts,
  70. sessionDuration: sessionDuration,
  71. successCh: make(chan *runnerRef),
  72. errCh: make(chan error, 1),
  73. }
  74. select {
  75. case s.pendingReqCh <- req:
  76. default:
  77. req.errCh <- fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
  78. }
  79. return req.successCh, req.errCh
  80. }
  81. // Returns immediately, spawns go routines for the scheduler which will shutdown when ctx is done
  82. func (s *Scheduler) Run(ctx context.Context) {
  83. slog.Debug("starting llm scheduler")
  84. go func() {
  85. s.processPending(ctx)
  86. }()
  87. go func() {
  88. s.processCompleted(ctx)
  89. }()
  90. }
  91. func (s *Scheduler) processPending(ctx context.Context) {
  92. for {
  93. select {
  94. case <-ctx.Done():
  95. slog.Debug("shutting down scheduler pending loop")
  96. return
  97. case pending := <-s.pendingReqCh:
  98. // Block other requests until we get this pending request running
  99. for {
  100. var runnerToExpire *runnerRef
  101. s.loadedMu.Lock()
  102. runner := s.loaded[pending.model.ModelPath]
  103. loadedCount := len(s.loaded)
  104. s.loadedMu.Unlock()
  105. if runner != nil {
  106. if runner.needsReload(ctx, pending) {
  107. runnerToExpire = runner
  108. } else {
  109. // Runner is usable, return it
  110. pending.useLoadedRunner(runner, s.finishedReqCh)
  111. break
  112. }
  113. } else if loadedCount == 0 {
  114. slog.Debug("loading first model", "model", pending.model.ModelPath)
  115. gpus := s.getGpuFn()
  116. ggml, err := llm.LoadModel(pending.model.ModelPath)
  117. if err != nil {
  118. pending.errCh <- err
  119. break
  120. }
  121. g := pickBestFitGPUs(pending, ggml, gpus)
  122. if g != nil {
  123. gpus = g
  124. }
  125. s.loadFn(pending, ggml, gpus)
  126. break
  127. } else if loadedMax > 0 && loadedCount >= loadedMax {
  128. slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
  129. runnerToExpire = s.findRunnerToUnload(pending)
  130. } else {
  131. // More than one loaded model, so we have to see if the new one fits
  132. // Get a refreshed GPU list
  133. gpus := s.getGpuFn()
  134. // Update free memory from currently loaded models
  135. s.updateFreeSpace(gpus)
  136. ggml, err := llm.LoadModel(pending.model.ModelPath)
  137. if err != nil {
  138. pending.errCh <- err
  139. break
  140. }
  141. gpus = pickBestFitGPUs(pending, ggml, gpus)
  142. if gpus != nil {
  143. slog.Debug("new model fits with existing models, loading")
  144. s.loadFn(pending, ggml, gpus)
  145. break
  146. }
  147. runnerToExpire = s.findRunnerToUnload(pending)
  148. }
  149. if runnerToExpire == nil {
  150. // Shouildn't happen
  151. slog.Error("runner to expire was nil!")
  152. continue
  153. }
  154. // Trigger an expiration to unload once it's done
  155. runnerToExpire.refMu.Lock()
  156. slog.Debug("resetting model to expire immediately to make room", "model", runnerToExpire.model, "refCount", runnerToExpire.refCount)
  157. if runnerToExpire.expireTimer != nil {
  158. runnerToExpire.expireTimer.Stop()
  159. runnerToExpire.expireTimer = nil
  160. }
  161. runnerToExpire.sessionDuration = 0
  162. if runnerToExpire.refCount <= 0 {
  163. s.expiredCh <- runnerToExpire
  164. }
  165. runnerToExpire.refMu.Unlock()
  166. // Wait for the unload to happen
  167. // Note: at this point we're queueing up all incoming requests, even if they were for
  168. // a different model that's loaded and not scheduled to be removed.
  169. slog.Debug("waiting for pending requests to complete and unload to occur", "model", runnerToExpire.model)
  170. select {
  171. case <-ctx.Done():
  172. slog.Debug("shutting down scheduler pending loop")
  173. return
  174. case <-s.unloadedCh:
  175. slog.Debug("unload completed", "model", runnerToExpire.model)
  176. continue
  177. }
  178. }
  179. case <-s.unloadedCh:
  180. // An unload request when there are no pending request can be ignored
  181. slog.Debug("ignoring unload event with no pending requests")
  182. }
  183. }
  184. }
  185. func (s *Scheduler) processCompleted(ctx context.Context) {
  186. // Process completed requests, expired timers, and unloading models
  187. for {
  188. select {
  189. case <-ctx.Done():
  190. slog.Debug("shutting down scheduler completed loop")
  191. return
  192. case finished := <-s.finishedReqCh:
  193. s.loadedMu.Lock()
  194. runner := s.loaded[finished.model.ModelPath]
  195. s.loadedMu.Unlock()
  196. if runner == nil {
  197. slog.Error("finished requeset signal received after model unloaded", "model", finished.model.ModelPath)
  198. continue
  199. }
  200. runner.refMu.Lock()
  201. runner.refCount--
  202. if runner.refCount <= 0 {
  203. if runner.sessionDuration <= 0 {
  204. slog.Debug("runner with zero duration has gone idle, expiring to unload", "model", runner.model)
  205. if runner.expireTimer != nil {
  206. runner.expireTimer.Stop()
  207. runner.expireTimer = nil
  208. }
  209. s.expiredCh <- runner
  210. } else if runner.expireTimer == nil {
  211. slog.Debug("runner with non-zero duration has gone idle, adding timer", "model", runner.model, "duration", runner.sessionDuration)
  212. runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() {
  213. slog.Debug("timer expired, expiring to unload", "model", runner.model)
  214. runner.refMu.Lock()
  215. defer runner.refMu.Unlock()
  216. if runner.expireTimer != nil {
  217. runner.expireTimer.Stop()
  218. }
  219. s.expiredCh <- runner
  220. })
  221. } else {
  222. slog.Debug("runner with non-zero duration has gone idle, resetting timer", "model", runner.model, "duration", runner.sessionDuration)
  223. runner.expireTimer.Reset(runner.sessionDuration)
  224. }
  225. }
  226. slog.Debug("after processing request finished event", "model", runner.model, "refCount", runner.refCount)
  227. runner.refMu.Unlock()
  228. case runner := <-s.expiredCh:
  229. slog.Debug("runner expired event received", "model", runner.model)
  230. runner.refMu.Lock()
  231. if runner.refCount > 0 {
  232. // Shouldn't happen, but safeguard to ensure no leaked runners
  233. slog.Debug("expired event with positive ref count, retrying", "model", runner.model, "refCount", runner.refCount)
  234. go func(runner *runnerRef) {
  235. // We can't unload yet, but want to as soon as the current request completes
  236. // So queue up another expired event
  237. time.Sleep(10 * time.Millisecond)
  238. s.expiredCh <- runner
  239. }(runner)
  240. runner.refMu.Unlock()
  241. continue
  242. }
  243. slog.Debug("got lock to unload", "model", runner.model)
  244. runner.unload()
  245. s.loadedMu.Lock()
  246. delete(s.loaded, runner.model)
  247. s.loadedMu.Unlock()
  248. slog.Debug("runner released", "model", runner.model)
  249. runner.refMu.Unlock()
  250. slog.Debug("sending an unloaded event", "model", runner.model)
  251. s.unloadedCh <- struct{}{}
  252. }
  253. }
  254. }
  255. // Complete the pending request and send the runner back to the requester
  256. // Wires up a finished event after the request context is completed
  257. // Updates session duration, and resets expiration timer
  258. func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *LlmRequest) {
  259. runner.refMu.Lock()
  260. defer runner.refMu.Unlock()
  261. runner.refCount++
  262. runner.sessionDuration = pending.sessionDuration
  263. pending.successCh <- runner
  264. go func() {
  265. <-pending.ctx.Done()
  266. slog.Debug("context for request finished")
  267. finished <- pending
  268. }()
  269. }
  270. func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) {
  271. llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts)
  272. if err != nil {
  273. // some older models are not compatible with newer versions of llama.cpp
  274. // show a generalized compatibility error until there is a better way to
  275. // check for model compatibility
  276. if errors.Is(llm.ErrUnsupportedFormat, err) || strings.Contains(err.Error(), "failed to load model") {
  277. err = fmt.Errorf("%v: this model may be incompatible with your version of Ollama. If you previously pulled this model, try updating it by running `ollama pull %s`", err, req.model.ShortName)
  278. }
  279. slog.Info("NewLlamaServer failed", "model", req.model.ModelPath, "error", err)
  280. req.errCh <- err
  281. return
  282. }
  283. runner := &runnerRef{}
  284. runner.model = req.model.ModelPath
  285. runner.adapters = req.model.AdapterPaths
  286. runner.projectors = req.model.ProjectorPaths
  287. runner.llama = llama
  288. runner.Options = &req.opts
  289. runner.sessionDuration = req.sessionDuration
  290. runner.gpus = gpus
  291. runner.estimatedVRAM = llama.EstimatedVRAM()
  292. runner.loading = true
  293. runner.refCount = 1
  294. runner.refMu.Lock()
  295. s.loadedMu.Lock()
  296. s.loaded[req.model.ModelPath] = runner
  297. slog.Info("loaded runners", "count", len(s.loaded))
  298. s.loadedMu.Unlock()
  299. go func() {
  300. defer runner.refMu.Unlock()
  301. if err = llama.WaitUntilRunning(req.ctx); err != nil {
  302. slog.Error("error loading llama server", "error", err)
  303. runner.refCount--
  304. req.errCh <- err
  305. slog.Debug("triggering expiration for failed load", "model", runner.model)
  306. s.expiredCh <- runner
  307. return
  308. }
  309. slog.Debug("finished setting up runner", "model", req.model.ModelPath)
  310. runner.loading = false
  311. go func() {
  312. <-req.ctx.Done()
  313. slog.Debug("context for request finished")
  314. s.finishedReqCh <- req
  315. }()
  316. req.successCh <- runner
  317. }()
  318. }
  319. func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) {
  320. type predKey struct {
  321. Library string
  322. ID string
  323. }
  324. predMap := map[predKey]uint64{} // Sum up the total predicted usage per GPU for all runners
  325. s.loadedMu.Lock()
  326. for _, r := range s.loaded {
  327. r.refMu.Lock()
  328. gpuIDs := make([]string, 0, len(r.gpus))
  329. if r.llama != nil {
  330. // TODO this should be broken down by GPU instead of assuming uniform spread
  331. estimatedVRAMPerGPU := r.llama.EstimatedVRAM() / uint64(len(r.gpus))
  332. for _, gpu := range r.gpus {
  333. gpuIDs = append(gpuIDs, gpu.ID)
  334. }
  335. for _, gpu := range allGpus {
  336. if slices.Contains(gpuIDs, gpu.ID) {
  337. predMap[predKey{gpu.Library, gpu.ID}] += estimatedVRAMPerGPU
  338. }
  339. }
  340. } else {
  341. slog.Warn("unexpected nil runner reference, memory prediction may be incorrect")
  342. }
  343. r.refMu.Unlock()
  344. }
  345. s.loadedMu.Unlock()
  346. // Now that we've summed up all the GPU usage predictions across all the loaded runners, update the gpu list
  347. for i := range allGpus {
  348. if p, ok := predMap[predKey{allGpus[i].Library, allGpus[i].ID}]; ok {
  349. slog.Debug("gpu reported", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "available", format.HumanBytes2(allGpus[i].FreeMemory))
  350. if p > allGpus[i].TotalMemory {
  351. // Shouldn't happen
  352. slog.Warn("predicted usage exceeds VRAM", "gpu", allGpus[i].ID, "totalMemory", allGpus[i].TotalMemory, "predicted", p)
  353. allGpus[i].FreeMemory = 0
  354. } else if (allGpus[i].TotalMemory - p) < allGpus[i].FreeMemory { // predicted free is smaller than reported free, use it
  355. // TODO maybe we should just always trust our numbers, since cuda's free memory reporting is laggy
  356. // and we might unload models we didn't actually need to. The risk is if some other GPU intensive app is loaded
  357. // after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent.
  358. allGpus[i].FreeMemory = allGpus[i].TotalMemory - p
  359. }
  360. slog.Info("updated VRAM", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory))
  361. }
  362. }
  363. }
  364. type runnerRef struct {
  365. refMu sync.Mutex
  366. // refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
  367. refCount uint // prevent unloading if > 0
  368. // unloading bool // set to true when we are trying to unload the runner
  369. llama llm.LlamaServer
  370. loading bool // True only during initial load, then false forever
  371. gpus gpu.GpuInfoList // Recorded at time of provisioning
  372. estimatedVRAM uint64
  373. sessionDuration time.Duration
  374. expireTimer *time.Timer
  375. model string
  376. adapters []string
  377. projectors []string
  378. *api.Options
  379. }
  380. // The refMu must already be held when calling unload
  381. func (runner *runnerRef) unload() {
  382. if runner.llama != nil {
  383. runner.llama.Close()
  384. }
  385. runner.llama = nil
  386. runner.adapters = nil
  387. runner.projectors = nil
  388. runner.Options = nil
  389. runner.gpus = nil
  390. }
  391. func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool {
  392. slog.Debug("evaluating already loaded", "model", req.model.ModelPath)
  393. runner.refMu.Lock()
  394. defer runner.refMu.Unlock()
  395. // Ignore the NumGPU settings for comparison
  396. optsExisting := runner.Options.Runner
  397. optsExisting.NumGPU = -1
  398. optsNew := req.opts.Runner
  399. optsNew.NumGPU = -1
  400. timeout := 10 * time.Second
  401. if runner.loading {
  402. timeout = 2 * time.Minute // Initial load can take a long time for big models on slow systems...
  403. }
  404. ctx, cancel := context.WithTimeout(ctx, timeout) // BUG -
  405. defer cancel()
  406. if !reflect.DeepEqual(runner.adapters, req.model.AdapterPaths) || // have the adapters changed?
  407. !reflect.DeepEqual(runner.projectors, req.model.ProjectorPaths) || // have the projectors changed?
  408. !reflect.DeepEqual(optsExisting, optsNew) || // have the runner options changed?
  409. runner.llama.Ping(ctx) != nil {
  410. return true
  411. }
  412. return false
  413. }
  414. type ByDuration []*runnerRef
  415. func (a ByDuration) Len() int { return len(a) }
  416. func (a ByDuration) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  417. func (a ByDuration) Less(i, j int) bool {
  418. // uint64 to turn negative time (never unload) to largest
  419. return uint64(a[i].sessionDuration) < uint64(a[j].sessionDuration)
  420. }
  421. // TODO - future consideration to pick runners based on size
  422. // type BySize []*runnerRef
  423. // func (a BySize) Len() int { return len(a) }
  424. // func (a BySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  425. // func (a BySize) Less(i, j int) bool { return a[i].estimatedVRAM < a[j].estimatedVRAM }
  426. // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
  427. // If the model can not be fit fully within the available GPU(s) nil is returned
  428. func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList {
  429. var estimatedVRAM uint64
  430. for _, gl := range gpus.ByLibrary() {
  431. var ok bool
  432. sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...)
  433. // TODO - potentially sort by performance capability, existing models loaded, etc.
  434. // Note: at present, this will favor more VRAM over faster GPU speed in mixed setups
  435. sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl)))
  436. // First attempt to fit the model into a single GPU
  437. for _, g := range sgl {
  438. if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  439. slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
  440. return []gpu.GpuInfo{g}
  441. }
  442. }
  443. // TODO future refinements
  444. // - if multiple Libraries, see if any single GPU in any Library will fit
  445. // - try subsets of GPUs instead of just falling back to 1 or all in a family
  446. // Now try all the GPUs
  447. if ok, estimatedVRAM = llm.PredictServerFit(gl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  448. slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", gl[0].Library, "required", format.HumanBytes2(estimatedVRAM))
  449. return gl
  450. }
  451. }
  452. return nil
  453. }
  454. // findRunnerToUnload finds a runner to unload to make room for a new model
  455. func (s *Scheduler) findRunnerToUnload(req *LlmRequest) *runnerRef {
  456. s.loadedMu.Lock()
  457. runnerList := make([]*runnerRef, 0, len(s.loaded))
  458. for _, r := range s.loaded {
  459. runnerList = append(runnerList, r)
  460. }
  461. s.loadedMu.Unlock()
  462. // In the future we can enhance the algorithm to be smarter about picking the optimal runner to unload
  463. // e.g., if we have multiple options, will one make room for the request?
  464. sort.Sort(ByDuration(runnerList))
  465. // First try to find a runner that's already idle
  466. for _, runner := range runnerList {
  467. runner.refMu.Lock()
  468. rc := runner.refCount
  469. runner.refMu.Unlock()
  470. if rc == 0 {
  471. slog.Debug("found an idle runner to unload")
  472. return runner
  473. }
  474. }
  475. // None appear idle, just wait for the one with the shortest duration
  476. slog.Debug("no idle runners, picking the shortest duration", "count", len(runnerList))
  477. return runnerList[0]
  478. }
  479. func (s *Scheduler) unloadAllRunners() {
  480. s.loadedMu.Lock()
  481. defer s.loadedMu.Unlock()
  482. for model, runner := range s.loaded {
  483. if runner.llama != nil {
  484. slog.Debug("shutting down runner", "model", model)
  485. runner.llama.Close()
  486. }
  487. }
  488. }