sched.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. package server
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log/slog"
  7. "os"
  8. "reflect"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/ollama/ollama/api"
  15. "github.com/ollama/ollama/format"
  16. "github.com/ollama/ollama/gpu"
  17. "github.com/ollama/ollama/llm"
  18. "golang.org/x/exp/slices"
  19. )
  20. type LlmRequest struct {
  21. ctx context.Context //nolint:containedctx
  22. model *Model
  23. ggml *llm.GGML // TODO - how large is this, and do we need to free it after we've finished loading?
  24. opts api.Options
  25. sessionDuration time.Duration
  26. successCh chan *runnerRef
  27. errCh chan error
  28. }
  29. type Scheduler struct {
  30. pendingReqCh chan *LlmRequest
  31. finishedReqCh chan *LlmRequest
  32. expiredCh chan *runnerRef
  33. unloadedCh chan interface{}
  34. loaded map[string]*runnerRef
  35. loadedMu sync.Mutex
  36. loadFn func(req *LlmRequest, gpus gpu.GpuInfoList)
  37. newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
  38. getGpuFn func() gpu.GpuInfoList
  39. }
  40. // TODO set this to zero after a release or two, to enable multiple models by default
  41. var loadedMax = 1 // Maximum runners; < 1 maps to as many as will fit in VRAM (unlimited for CPU runners)
  42. var maxQueuedRequests = 10 // TODO configurable
  43. func InitScheduler(ctx context.Context) *Scheduler {
  44. maxRunners := os.Getenv("OLLAMA_MAX_LOADED_MODELS")
  45. if maxRunners != "" {
  46. m, err := strconv.Atoi(maxRunners)
  47. if err != nil {
  48. slog.Error("invalid setting", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err)
  49. } else {
  50. loadedMax = m
  51. }
  52. }
  53. sched := &Scheduler{
  54. pendingReqCh: make(chan *LlmRequest, maxQueuedRequests),
  55. finishedReqCh: make(chan *LlmRequest, maxQueuedRequests),
  56. expiredCh: make(chan *runnerRef, maxQueuedRequests),
  57. unloadedCh: make(chan interface{}, maxQueuedRequests),
  58. loaded: make(map[string]*runnerRef),
  59. newServerFn: llm.NewLlamaServer,
  60. getGpuFn: gpu.GetGPUInfo,
  61. }
  62. sched.loadFn = sched.load
  63. return sched
  64. }
  65. // context must be canceled to decrement ref count and release the runner
  66. func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
  67. ggml, err := llm.LoadModel(model.ModelPath)
  68. req := &LlmRequest{
  69. ctx: c,
  70. model: model,
  71. ggml: ggml,
  72. opts: opts,
  73. sessionDuration: sessionDuration,
  74. successCh: make(chan *runnerRef),
  75. errCh: make(chan error, 1),
  76. }
  77. if err != nil {
  78. req.errCh <- err
  79. return req.successCh, req.errCh
  80. }
  81. select {
  82. case s.pendingReqCh <- req:
  83. default:
  84. req.errCh <- fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
  85. }
  86. return req.successCh, req.errCh
  87. }
  88. // Returns immediately, spawns go routines for the scheduler which will shutdown when ctx is done
  89. func (s *Scheduler) Run(ctx context.Context) {
  90. slog.Debug("starting llm scheduler")
  91. go func() {
  92. s.processPending(ctx)
  93. }()
  94. go func() {
  95. s.processCompleted(ctx)
  96. }()
  97. }
  98. func (s *Scheduler) processPending(ctx context.Context) {
  99. for {
  100. select {
  101. case <-ctx.Done():
  102. slog.Debug("shutting down scheduler pending loop")
  103. return
  104. case pending := <-s.pendingReqCh:
  105. // Block other requests until we get this pending request running
  106. for {
  107. var runnerToExpire *runnerRef
  108. s.loadedMu.Lock()
  109. runner := s.loaded[pending.model.ModelPath]
  110. loadedCount := len(s.loaded)
  111. s.loadedMu.Unlock()
  112. if runner != nil {
  113. if runner.needsReload(ctx, pending) {
  114. runnerToExpire = runner
  115. } else {
  116. // Runner is usable, return it
  117. pending.useLoadedRunner(runner, s.finishedReqCh)
  118. break
  119. }
  120. } else if loadedCount == 0 {
  121. slog.Debug("loading first model", "model", pending.model.ModelPath)
  122. gpus := s.getGpuFn()
  123. g := pickBestFitGPUs(pending, gpus)
  124. if g != nil {
  125. gpus = g
  126. }
  127. s.loadFn(pending, gpus)
  128. break
  129. } else if loadedMax > 0 && loadedCount >= loadedMax {
  130. slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
  131. runnerToExpire = s.findRunnerToUnload(pending)
  132. } else {
  133. // More than one loaded model, so we have to see if the new one fits
  134. // Get a refreshed GPU list
  135. gpus := s.getGpuFn()
  136. // Update free memory from currently loaded models
  137. s.updateFreeSpace(gpus)
  138. gpus = pickBestFitGPUs(pending, gpus)
  139. if gpus != nil {
  140. slog.Debug("new model fits with existing models, loading")
  141. s.loadFn(pending, gpus)
  142. break
  143. }
  144. runnerToExpire = s.findRunnerToUnload(pending)
  145. }
  146. if runnerToExpire == nil {
  147. // Shouildn't happen
  148. slog.Error("runner to expire was nil!")
  149. continue
  150. }
  151. // Trigger an expiration to unload once it's done
  152. runnerToExpire.refMu.Lock()
  153. slog.Debug("resetting model to expire immediately to make room", "model", runnerToExpire.model, "refCount", runnerToExpire.refCount)
  154. if runnerToExpire.expireTimer != nil {
  155. runnerToExpire.expireTimer.Stop()
  156. runnerToExpire.expireTimer = nil
  157. }
  158. runnerToExpire.sessionDuration = 0
  159. if runnerToExpire.refCount <= 0 {
  160. s.expiredCh <- runnerToExpire
  161. }
  162. runnerToExpire.refMu.Unlock()
  163. // Wait for the unload to happen
  164. // Note: at this point we're queueing up all incoming requests, even if they were for
  165. // a different model that's loaded and not scheduled to be removed.
  166. slog.Debug("waiting for pending requests to complete and unload to occur", "model", runnerToExpire.model)
  167. select {
  168. case <-ctx.Done():
  169. slog.Debug("shutting down scheduler pending loop")
  170. return
  171. case <-s.unloadedCh:
  172. slog.Debug("unload completed", "model", runnerToExpire.model)
  173. continue
  174. }
  175. }
  176. case <-s.unloadedCh:
  177. // An unload request when there are no pending request can be ignored
  178. slog.Debug("ignoring unload event with no pending requests")
  179. }
  180. }
  181. }
  182. func (s *Scheduler) processCompleted(ctx context.Context) {
  183. // Process completed requests, expired timers, and unloading models
  184. for {
  185. select {
  186. case <-ctx.Done():
  187. slog.Debug("shutting down scheduler completed loop")
  188. return
  189. case finished := <-s.finishedReqCh:
  190. s.loadedMu.Lock()
  191. runner := s.loaded[finished.model.ModelPath]
  192. s.loadedMu.Unlock()
  193. if runner == nil {
  194. slog.Error("finished requeset signal received after model unloaded", "model", finished.model.ModelPath)
  195. continue
  196. }
  197. runner.refMu.Lock()
  198. runner.refCount--
  199. if runner.refCount <= 0 {
  200. if runner.sessionDuration <= 0 {
  201. slog.Debug("runner with zero duration has gone idle, expiring to unload", "model", runner.model)
  202. if runner.expireTimer != nil {
  203. runner.expireTimer.Stop()
  204. runner.expireTimer = nil
  205. }
  206. s.expiredCh <- runner
  207. } else if runner.expireTimer == nil {
  208. slog.Debug("runner with non-zero duration has gone idle, adding timer", "model", runner.model, "duration", runner.sessionDuration)
  209. runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() {
  210. slog.Debug("timer expired, expiring to unload", "model", runner.model)
  211. runner.refMu.Lock()
  212. defer runner.refMu.Unlock()
  213. if runner.expireTimer != nil {
  214. runner.expireTimer.Stop()
  215. }
  216. s.expiredCh <- runner
  217. })
  218. } else {
  219. slog.Debug("runner with non-zero duration has gone idle, resetting timer", "model", runner.model, "duration", runner.sessionDuration)
  220. runner.expireTimer.Reset(runner.sessionDuration)
  221. }
  222. }
  223. slog.Debug("after processing request finished event", "model", runner.model, "refCount", runner.refCount)
  224. runner.refMu.Unlock()
  225. case runner := <-s.expiredCh:
  226. slog.Debug("runner expired event received", "model", runner.model)
  227. runner.refMu.Lock()
  228. if runner.refCount > 0 {
  229. // Shouldn't happen, but safeguard to ensure no leaked runners
  230. slog.Debug("expired event with positive ref count, retrying", "model", runner.model, "refCount", runner.refCount)
  231. go func(runner *runnerRef) {
  232. // We can't unload yet, but want to as soon as the current request completes
  233. // So queue up another expired event
  234. time.Sleep(10 * time.Millisecond)
  235. s.expiredCh <- runner
  236. }(runner)
  237. runner.refMu.Unlock()
  238. continue
  239. }
  240. slog.Debug("got lock to unload", "model", runner.model)
  241. runner.unload()
  242. s.loadedMu.Lock()
  243. delete(s.loaded, runner.model)
  244. s.loadedMu.Unlock()
  245. slog.Debug("runner released", "model", runner.model)
  246. runner.refMu.Unlock()
  247. slog.Debug("sending an unloaded event", "model", runner.model)
  248. s.unloadedCh <- struct{}{}
  249. }
  250. }
  251. }
  252. // Complete the pending request and send the runner back to the requester
  253. // Wires up a finished event after the request context is completed
  254. // Updates session duration, and resets expiration timer
  255. func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *LlmRequest) {
  256. runner.refMu.Lock()
  257. defer runner.refMu.Unlock()
  258. runner.refCount++
  259. runner.sessionDuration = pending.sessionDuration
  260. pending.successCh <- runner
  261. go func() {
  262. <-pending.ctx.Done()
  263. slog.Debug("context for request finished")
  264. finished <- pending
  265. }()
  266. }
  267. func (s *Scheduler) load(req *LlmRequest, gpus gpu.GpuInfoList) {
  268. llama, err := s.newServerFn(gpus, req.model.ModelPath, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts)
  269. if err != nil {
  270. // some older models are not compatible with newer versions of llama.cpp
  271. // show a generalized compatibility error until there is a better way to
  272. // check for model compatibility
  273. if errors.Is(llm.ErrUnsupportedFormat, err) || strings.Contains(err.Error(), "failed to load model") {
  274. err = fmt.Errorf("%v: this model may be incompatible with your version of Ollama. If you previously pulled this model, try updating it by running `ollama pull %s`", err, req.model.ShortName)
  275. }
  276. slog.Info("NewLlamaServer failed", "model", req.model.ModelPath, "error", err)
  277. req.errCh <- err
  278. return
  279. }
  280. runner := &runnerRef{}
  281. runner.model = req.model.ModelPath
  282. runner.adapters = req.model.AdapterPaths
  283. runner.projectors = req.model.ProjectorPaths
  284. runner.llama = llama
  285. runner.Options = &req.opts
  286. runner.sessionDuration = req.sessionDuration
  287. runner.gpus = gpus
  288. runner.estimatedVRAM = llama.EstimatedVRAM()
  289. runner.loading = true
  290. runner.refCount = 1
  291. runner.refMu.Lock()
  292. s.loadedMu.Lock()
  293. s.loaded[req.model.ModelPath] = runner
  294. slog.Info("loaded runners", "count", len(s.loaded))
  295. s.loadedMu.Unlock()
  296. go func() {
  297. defer runner.refMu.Unlock()
  298. if err = llama.WaitUntilRunning(req.ctx); err != nil {
  299. slog.Error("error loading llama server", "error", err)
  300. runner.refCount--
  301. req.errCh <- err
  302. slog.Debug("triggering expiration for failed load", "model", runner.model)
  303. s.expiredCh <- runner
  304. return
  305. }
  306. slog.Debug("finished setting up runner", "model", req.model.ModelPath)
  307. runner.loading = false
  308. go func() {
  309. <-req.ctx.Done()
  310. slog.Debug("context for request finished")
  311. s.finishedReqCh <- req
  312. }()
  313. req.successCh <- runner
  314. }()
  315. }
  316. func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) {
  317. type predKey struct {
  318. Library string
  319. ID string
  320. }
  321. predMap := map[predKey]uint64{} // Sum up the total predicted usage per GPU for all runners
  322. s.loadedMu.Lock()
  323. for _, r := range s.loaded {
  324. r.refMu.Lock()
  325. gpuIDs := make([]string, 0, len(r.gpus))
  326. if r.llama != nil {
  327. // TODO this should be broken down by GPU instead of assuming uniform spread
  328. estimatedVRAMPerGPU := r.llama.EstimatedVRAM() / uint64(len(r.gpus))
  329. for _, gpu := range r.gpus {
  330. gpuIDs = append(gpuIDs, gpu.ID)
  331. }
  332. for _, gpu := range allGpus {
  333. if slices.Contains(gpuIDs, gpu.ID) {
  334. predMap[predKey{gpu.Library, gpu.ID}] += estimatedVRAMPerGPU
  335. }
  336. }
  337. } else {
  338. slog.Warn("unexpected nil runner reference, memory prediction may be incorrect")
  339. }
  340. r.refMu.Unlock()
  341. }
  342. s.loadedMu.Unlock()
  343. // Now that we've summed up all the GPU usage predictions across all the loaded runners, update the gpu list
  344. for i := range allGpus {
  345. if p, ok := predMap[predKey{allGpus[i].Library, allGpus[i].ID}]; ok {
  346. slog.Debug("gpu reported", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "available", format.HumanBytes2(allGpus[i].FreeMemory))
  347. if p > allGpus[i].TotalMemory {
  348. // Shouldn't happen
  349. slog.Warn("predicted usage exceeds VRAM", "gpu", allGpus[i].ID, "totalMemory", allGpus[i].TotalMemory, "predicted", p)
  350. allGpus[i].FreeMemory = 0
  351. } else if (allGpus[i].TotalMemory - p) < allGpus[i].FreeMemory { // predicted free is smaller than reported free, use it
  352. // TODO maybe we should just always trust our numbers, since cuda's free memory reporting is laggy
  353. // and we might unload models we didn't actually need to. The risk is if some other GPU intensive app is loaded
  354. // after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent.
  355. allGpus[i].FreeMemory = allGpus[i].TotalMemory - p
  356. }
  357. slog.Info("updated VRAM", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory))
  358. }
  359. }
  360. }
  361. type runnerRef struct {
  362. refMu sync.Mutex
  363. // refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
  364. refCount uint // prevent unloading if > 0
  365. // unloading bool // set to true when we are trying to unload the runner
  366. llama llm.LlamaServer
  367. loading bool // True only during initial load, then false forever
  368. gpus gpu.GpuInfoList // Recorded at time of provisioning
  369. estimatedVRAM uint64
  370. sessionDuration time.Duration
  371. expireTimer *time.Timer
  372. model string
  373. adapters []string
  374. projectors []string
  375. *api.Options
  376. }
  377. // The refMu must already be held when calling unload
  378. func (runner *runnerRef) unload() {
  379. if runner.llama != nil {
  380. runner.llama.Close()
  381. }
  382. runner.llama = nil
  383. runner.adapters = nil
  384. runner.projectors = nil
  385. runner.Options = nil
  386. runner.gpus = nil
  387. }
  388. func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool {
  389. slog.Debug("evaluating already loaded", "model", req.model.ModelPath)
  390. runner.refMu.Lock()
  391. defer runner.refMu.Unlock()
  392. // Ignore the NumGPU settings for comparison
  393. optsExisting := runner.Options.Runner
  394. optsExisting.NumGPU = -1
  395. optsNew := req.opts.Runner
  396. optsNew.NumGPU = -1
  397. timeout := 10 * time.Second
  398. if runner.loading {
  399. timeout = 2 * time.Minute // Initial load can take a long time for big models on slow systems...
  400. }
  401. ctx, cancel := context.WithTimeout(ctx, timeout) // BUG -
  402. defer cancel()
  403. if !reflect.DeepEqual(runner.adapters, req.model.AdapterPaths) || // have the adapters changed?
  404. !reflect.DeepEqual(runner.projectors, req.model.ProjectorPaths) || // have the projectors changed?
  405. !reflect.DeepEqual(optsExisting, optsNew) || // have the runner options changed?
  406. runner.llama.Ping(ctx) != nil {
  407. return true
  408. }
  409. return false
  410. }
  411. type ByDuration []*runnerRef
  412. func (a ByDuration) Len() int { return len(a) }
  413. func (a ByDuration) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  414. func (a ByDuration) Less(i, j int) bool {
  415. // uint64 to turn negative time (never unload) to largest
  416. return uint64(a[i].sessionDuration) < uint64(a[j].sessionDuration)
  417. }
  418. // TODO - future consideration to pick runners based on size
  419. // type BySize []*runnerRef
  420. // func (a BySize) Len() int { return len(a) }
  421. // func (a BySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  422. // func (a BySize) Less(i, j int) bool { return a[i].estimatedVRAM < a[j].estimatedVRAM }
  423. // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
  424. // If the model can not be fit fully within the available GPU(s) nil is returned
  425. func pickBestFitGPUs(req *LlmRequest, gpus gpu.GpuInfoList) gpu.GpuInfoList {
  426. var estimatedVRAM uint64
  427. for _, gl := range gpus.ByLibrary() {
  428. var ok bool
  429. sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...)
  430. // TODO - potentially sort by performance capability, existing models loaded, etc.
  431. // Note: at present, this will favor more VRAM over faster GPU speed in mixed setups
  432. sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl)))
  433. // First attempt to fit the model into a single GPU
  434. for _, g := range sgl {
  435. if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  436. slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
  437. return []gpu.GpuInfo{g}
  438. }
  439. }
  440. // TODO future refinements
  441. // - if multiple Libraries, see if any single GPU in any Library will fit
  442. // - try subsets of GPUs instead of just falling back to 1 or all in a family
  443. // Now try all the GPUs
  444. if ok, estimatedVRAM = llm.PredictServerFit(gl, req.ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  445. slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", gl[0].Library, "required", format.HumanBytes2(estimatedVRAM))
  446. return gl
  447. }
  448. }
  449. return nil
  450. }
  451. // findRunnerToUnload finds a runner to unload to make room for a new model
  452. func (s *Scheduler) findRunnerToUnload(req *LlmRequest) *runnerRef {
  453. s.loadedMu.Lock()
  454. runnerList := make([]*runnerRef, 0, len(s.loaded))
  455. for _, r := range s.loaded {
  456. runnerList = append(runnerList, r)
  457. }
  458. s.loadedMu.Unlock()
  459. // In the future we can enhance the algorithm to be smarter about picking the optimal runner to unload
  460. // e.g., if we have multiple options, will one make room for the request?
  461. sort.Sort(ByDuration(runnerList))
  462. // First try to find a runner that's already idle
  463. for _, runner := range runnerList {
  464. runner.refMu.Lock()
  465. rc := runner.refCount
  466. runner.refMu.Unlock()
  467. if rc == 0 {
  468. slog.Debug("found an idle runner to unload")
  469. return runner
  470. }
  471. }
  472. // None appear idle, just wait for the one with the shortest duration
  473. slog.Debug("no idle runners, picking the shortest duration", "count", len(runnerList))
  474. return runnerList[0]
  475. }
  476. func (s *Scheduler) unloadAllRunners() {
  477. s.loadedMu.Lock()
  478. defer s.loadedMu.Unlock()
  479. for model, runner := range s.loaded {
  480. if runner.llama != nil {
  481. slog.Debug("shutting down runner", "model", model)
  482. runner.llama.Close()
  483. }
  484. }
  485. }