sched.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. package server
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log/slog"
  7. "os"
  8. "reflect"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/ollama/ollama/api"
  15. "github.com/ollama/ollama/format"
  16. "github.com/ollama/ollama/gpu"
  17. "github.com/ollama/ollama/llm"
  18. "golang.org/x/exp/slices"
  19. )
  20. type LlmRequest struct {
  21. ctx context.Context //nolint:containedctx
  22. model *Model
  23. opts api.Options
  24. sessionDuration time.Duration
  25. successCh chan *runnerRef
  26. errCh chan error
  27. }
  28. type Scheduler struct {
  29. pendingReqCh chan *LlmRequest
  30. finishedReqCh chan *LlmRequest
  31. expiredCh chan *runnerRef
  32. unloadedCh chan interface{}
  33. loaded map[string]*runnerRef
  34. loadedMu sync.Mutex
  35. loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
  36. newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
  37. getGpuFn func() gpu.GpuInfoList
  38. }
  39. // TODO set this to zero after a release or two, to enable multiple models by default
  40. var loadedMax = 1 // Maximum runners; < 1 maps to as many as will fit in VRAM (unlimited for CPU runners)
  41. var maxQueuedRequests = 10 // TODO configurable
  42. var numParallel = 1
  43. func InitScheduler(ctx context.Context) *Scheduler {
  44. maxRunners := os.Getenv("OLLAMA_MAX_LOADED_MODELS")
  45. if maxRunners != "" {
  46. m, err := strconv.Atoi(maxRunners)
  47. if err != nil {
  48. slog.Error("invalid setting", "OLLAMA_MAX_LOADED_MODELS", maxRunners, "error", err)
  49. } else {
  50. loadedMax = m
  51. }
  52. }
  53. if onp := os.Getenv("OLLAMA_NUM_PARALLEL"); onp != "" {
  54. p, err := strconv.Atoi(onp)
  55. if err != nil || p <= 0 {
  56. slog.Error("invalid parallel setting, must be greater than zero", "OLLAMA_NUM_PARALLEL", onp, "error", err)
  57. } else {
  58. numParallel = p
  59. }
  60. }
  61. sched := &Scheduler{
  62. pendingReqCh: make(chan *LlmRequest, maxQueuedRequests),
  63. finishedReqCh: make(chan *LlmRequest, maxQueuedRequests),
  64. expiredCh: make(chan *runnerRef, maxQueuedRequests),
  65. unloadedCh: make(chan interface{}, maxQueuedRequests),
  66. loaded: make(map[string]*runnerRef),
  67. newServerFn: llm.NewLlamaServer,
  68. getGpuFn: gpu.GetGPUInfo,
  69. }
  70. sched.loadFn = sched.load
  71. return sched
  72. }
  73. // context must be canceled to decrement ref count and release the runner
  74. func (s *Scheduler) GetRunner(c context.Context, model *Model, opts api.Options, sessionDuration time.Duration) (chan *runnerRef, chan error) {
  75. req := &LlmRequest{
  76. ctx: c,
  77. model: model,
  78. opts: opts,
  79. sessionDuration: sessionDuration,
  80. successCh: make(chan *runnerRef),
  81. errCh: make(chan error, 1),
  82. }
  83. // context split across parallel threads
  84. opts.NumCtx = opts.NumCtx * numParallel
  85. select {
  86. case s.pendingReqCh <- req:
  87. default:
  88. req.errCh <- fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
  89. }
  90. return req.successCh, req.errCh
  91. }
  92. // Returns immediately, spawns go routines for the scheduler which will shutdown when ctx is done
  93. func (s *Scheduler) Run(ctx context.Context) {
  94. slog.Debug("starting llm scheduler")
  95. go func() {
  96. s.processPending(ctx)
  97. }()
  98. go func() {
  99. s.processCompleted(ctx)
  100. }()
  101. }
  102. func (s *Scheduler) processPending(ctx context.Context) {
  103. for {
  104. select {
  105. case <-ctx.Done():
  106. slog.Debug("shutting down scheduler pending loop")
  107. return
  108. case pending := <-s.pendingReqCh:
  109. // Block other requests until we get this pending request running
  110. for {
  111. var runnerToExpire *runnerRef
  112. s.loadedMu.Lock()
  113. runner := s.loaded[pending.model.ModelPath]
  114. loadedCount := len(s.loaded)
  115. s.loadedMu.Unlock()
  116. if runner != nil {
  117. if runner.needsReload(ctx, pending) {
  118. runnerToExpire = runner
  119. } else {
  120. // Runner is usable, return it
  121. pending.useLoadedRunner(runner, s.finishedReqCh)
  122. break
  123. }
  124. } else if loadedMax > 0 && loadedCount >= loadedMax {
  125. slog.Debug("max runners achieved, unloading one to make room", "runner_count", loadedCount)
  126. runnerToExpire = s.findRunnerToUnload(pending)
  127. } else {
  128. // Either no models are loaded or below loadedMax
  129. // Get a refreshed GPU list
  130. gpus := s.getGpuFn()
  131. // Load model for fitting
  132. ggml, err := llm.LoadModel(pending.model.ModelPath)
  133. if err != nil {
  134. pending.errCh <- err
  135. break
  136. }
  137. // No models loaded. Load the model but prefer the best fit.
  138. if loadedCount == 0 {
  139. slog.Debug("loading first model", "model", pending.model.ModelPath)
  140. g := pickBestFitGPUs(pending, ggml, gpus)
  141. if g != nil {
  142. gpus = g
  143. }
  144. s.loadFn(pending, ggml, gpus)
  145. break
  146. }
  147. // More than one loaded model, so we have to see if the new one fits
  148. // Update free memory from currently loaded models
  149. s.updateFreeSpace(gpus)
  150. gpus = pickBestFitGPUs(pending, ggml, gpus)
  151. if gpus != nil {
  152. slog.Debug("new model fits with existing models, loading")
  153. s.loadFn(pending, ggml, gpus)
  154. break
  155. }
  156. runnerToExpire = s.findRunnerToUnload(pending)
  157. }
  158. if runnerToExpire == nil {
  159. // Shouildn't happen
  160. slog.Error("runner to expire was nil!")
  161. continue
  162. }
  163. // Trigger an expiration to unload once it's done
  164. runnerToExpire.refMu.Lock()
  165. slog.Debug("resetting model to expire immediately to make room", "model", runnerToExpire.model, "refCount", runnerToExpire.refCount)
  166. if runnerToExpire.expireTimer != nil {
  167. runnerToExpire.expireTimer.Stop()
  168. runnerToExpire.expireTimer = nil
  169. }
  170. runnerToExpire.sessionDuration = 0
  171. if runnerToExpire.refCount <= 0 {
  172. s.expiredCh <- runnerToExpire
  173. }
  174. runnerToExpire.refMu.Unlock()
  175. // Wait for the unload to happen
  176. // Note: at this point we're queueing up all incoming requests, even if they were for
  177. // a different model that's loaded and not scheduled to be removed.
  178. slog.Debug("waiting for pending requests to complete and unload to occur", "model", runnerToExpire.model)
  179. select {
  180. case <-ctx.Done():
  181. slog.Debug("shutting down scheduler pending loop")
  182. return
  183. case <-s.unloadedCh:
  184. slog.Debug("unload completed", "model", runnerToExpire.model)
  185. continue
  186. }
  187. }
  188. case <-s.unloadedCh:
  189. // An unload request when there are no pending request can be ignored
  190. slog.Debug("ignoring unload event with no pending requests")
  191. }
  192. }
  193. }
  194. func (s *Scheduler) processCompleted(ctx context.Context) {
  195. // Process completed requests, expired timers, and unloading models
  196. for {
  197. select {
  198. case <-ctx.Done():
  199. slog.Debug("shutting down scheduler completed loop")
  200. return
  201. case finished := <-s.finishedReqCh:
  202. s.loadedMu.Lock()
  203. runner := s.loaded[finished.model.ModelPath]
  204. s.loadedMu.Unlock()
  205. if runner == nil {
  206. slog.Error("finished requeset signal received after model unloaded", "model", finished.model.ModelPath)
  207. continue
  208. }
  209. runner.refMu.Lock()
  210. runner.refCount--
  211. if runner.refCount <= 0 {
  212. if runner.sessionDuration <= 0 {
  213. slog.Debug("runner with zero duration has gone idle, expiring to unload", "model", runner.model)
  214. if runner.expireTimer != nil {
  215. runner.expireTimer.Stop()
  216. runner.expireTimer = nil
  217. }
  218. s.expiredCh <- runner
  219. } else if runner.expireTimer == nil {
  220. slog.Debug("runner with non-zero duration has gone idle, adding timer", "model", runner.model, "duration", runner.sessionDuration)
  221. runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() {
  222. slog.Debug("timer expired, expiring to unload", "model", runner.model)
  223. runner.refMu.Lock()
  224. defer runner.refMu.Unlock()
  225. if runner.expireTimer != nil {
  226. runner.expireTimer.Stop()
  227. }
  228. s.expiredCh <- runner
  229. })
  230. } else {
  231. slog.Debug("runner with non-zero duration has gone idle, resetting timer", "model", runner.model, "duration", runner.sessionDuration)
  232. runner.expireTimer.Reset(runner.sessionDuration)
  233. }
  234. }
  235. slog.Debug("after processing request finished event", "model", runner.model, "refCount", runner.refCount)
  236. runner.refMu.Unlock()
  237. case runner := <-s.expiredCh:
  238. slog.Debug("runner expired event received", "model", runner.model)
  239. runner.refMu.Lock()
  240. if runner.refCount > 0 {
  241. // Shouldn't happen, but safeguard to ensure no leaked runners
  242. slog.Debug("expired event with positive ref count, retrying", "model", runner.model, "refCount", runner.refCount)
  243. go func(runner *runnerRef) {
  244. // We can't unload yet, but want to as soon as the current request completes
  245. // So queue up another expired event
  246. time.Sleep(10 * time.Millisecond)
  247. s.expiredCh <- runner
  248. }(runner)
  249. runner.refMu.Unlock()
  250. continue
  251. }
  252. slog.Debug("got lock to unload", "model", runner.model)
  253. runner.unload()
  254. s.loadedMu.Lock()
  255. delete(s.loaded, runner.model)
  256. s.loadedMu.Unlock()
  257. slog.Debug("runner released", "model", runner.model)
  258. runner.refMu.Unlock()
  259. slog.Debug("sending an unloaded event", "model", runner.model)
  260. s.unloadedCh <- struct{}{}
  261. }
  262. }
  263. }
  264. // Complete the pending request and send the runner back to the requester
  265. // Wires up a finished event after the request context is completed
  266. // Updates session duration, and resets expiration timer
  267. func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *LlmRequest) {
  268. runner.refMu.Lock()
  269. defer runner.refMu.Unlock()
  270. runner.refCount++
  271. runner.sessionDuration = pending.sessionDuration
  272. pending.successCh <- runner
  273. go func() {
  274. <-pending.ctx.Done()
  275. slog.Debug("context for request finished")
  276. finished <- pending
  277. }()
  278. }
  279. func (s *Scheduler) load(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) {
  280. llama, err := s.newServerFn(gpus, req.model.ModelPath, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts)
  281. if err != nil {
  282. // some older models are not compatible with newer versions of llama.cpp
  283. // show a generalized compatibility error until there is a better way to
  284. // check for model compatibility
  285. if errors.Is(llm.ErrUnsupportedFormat, err) || strings.Contains(err.Error(), "failed to load model") {
  286. err = fmt.Errorf("%v: this model may be incompatible with your version of Ollama. If you previously pulled this model, try updating it by running `ollama pull %s`", err, req.model.ShortName)
  287. }
  288. slog.Info("NewLlamaServer failed", "model", req.model.ModelPath, "error", err)
  289. req.errCh <- err
  290. return
  291. }
  292. runner := &runnerRef{}
  293. runner.model = req.model.ModelPath
  294. runner.adapters = req.model.AdapterPaths
  295. runner.projectors = req.model.ProjectorPaths
  296. runner.llama = llama
  297. runner.Options = &req.opts
  298. runner.sessionDuration = req.sessionDuration
  299. runner.gpus = gpus
  300. runner.estimatedVRAM = llama.EstimatedVRAM()
  301. runner.loading = true
  302. runner.refCount = 1
  303. runner.refMu.Lock()
  304. s.loadedMu.Lock()
  305. s.loaded[req.model.ModelPath] = runner
  306. slog.Info("loaded runners", "count", len(s.loaded))
  307. s.loadedMu.Unlock()
  308. go func() {
  309. defer runner.refMu.Unlock()
  310. if err = llama.WaitUntilRunning(req.ctx); err != nil {
  311. slog.Error("error loading llama server", "error", err)
  312. runner.refCount--
  313. req.errCh <- err
  314. slog.Debug("triggering expiration for failed load", "model", runner.model)
  315. s.expiredCh <- runner
  316. return
  317. }
  318. slog.Debug("finished setting up runner", "model", req.model.ModelPath)
  319. runner.loading = false
  320. go func() {
  321. <-req.ctx.Done()
  322. slog.Debug("context for request finished")
  323. s.finishedReqCh <- req
  324. }()
  325. req.successCh <- runner
  326. }()
  327. }
  328. func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) {
  329. type predKey struct {
  330. Library string
  331. ID string
  332. }
  333. predMap := map[predKey]uint64{} // Sum up the total predicted usage per GPU for all runners
  334. s.loadedMu.Lock()
  335. for _, r := range s.loaded {
  336. r.refMu.Lock()
  337. gpuIDs := make([]string, 0, len(r.gpus))
  338. if r.llama != nil {
  339. // TODO this should be broken down by GPU instead of assuming uniform spread
  340. estimatedVRAMPerGPU := r.llama.EstimatedVRAM() / uint64(len(r.gpus))
  341. for _, gpu := range r.gpus {
  342. gpuIDs = append(gpuIDs, gpu.ID)
  343. }
  344. for _, gpu := range allGpus {
  345. if slices.Contains(gpuIDs, gpu.ID) {
  346. predMap[predKey{gpu.Library, gpu.ID}] += estimatedVRAMPerGPU
  347. }
  348. }
  349. } else {
  350. slog.Warn("unexpected nil runner reference, memory prediction may be incorrect")
  351. }
  352. r.refMu.Unlock()
  353. }
  354. s.loadedMu.Unlock()
  355. // Now that we've summed up all the GPU usage predictions across all the loaded runners, update the gpu list
  356. for i := range allGpus {
  357. if p, ok := predMap[predKey{allGpus[i].Library, allGpus[i].ID}]; ok {
  358. slog.Debug("gpu reported", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "available", format.HumanBytes2(allGpus[i].FreeMemory))
  359. if p > allGpus[i].TotalMemory {
  360. // Shouldn't happen
  361. slog.Warn("predicted usage exceeds VRAM", "gpu", allGpus[i].ID, "totalMemory", allGpus[i].TotalMemory, "predicted", p)
  362. allGpus[i].FreeMemory = 0
  363. } else if (allGpus[i].TotalMemory - p) < allGpus[i].FreeMemory { // predicted free is smaller than reported free, use it
  364. // TODO maybe we should just always trust our numbers, since cuda's free memory reporting is laggy
  365. // and we might unload models we didn't actually need to. The risk is if some other GPU intensive app is loaded
  366. // after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent.
  367. allGpus[i].FreeMemory = allGpus[i].TotalMemory - p
  368. }
  369. slog.Info("updated VRAM", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory))
  370. }
  371. }
  372. }
  373. type runnerRef struct {
  374. refMu sync.Mutex
  375. // refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
  376. refCount uint // prevent unloading if > 0
  377. // unloading bool // set to true when we are trying to unload the runner
  378. llama llm.LlamaServer
  379. loading bool // True only during initial load, then false forever
  380. gpus gpu.GpuInfoList // Recorded at time of provisioning
  381. estimatedVRAM uint64
  382. sessionDuration time.Duration
  383. expireTimer *time.Timer
  384. model string
  385. adapters []string
  386. projectors []string
  387. *api.Options
  388. }
  389. // The refMu must already be held when calling unload
  390. func (runner *runnerRef) unload() {
  391. if runner.llama != nil {
  392. runner.llama.Close()
  393. }
  394. runner.llama = nil
  395. runner.adapters = nil
  396. runner.projectors = nil
  397. runner.Options = nil
  398. runner.gpus = nil
  399. }
  400. func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool {
  401. slog.Debug("evaluating already loaded", "model", req.model.ModelPath)
  402. runner.refMu.Lock()
  403. defer runner.refMu.Unlock()
  404. timeout := 10 * time.Second
  405. if runner.loading {
  406. timeout = 2 * time.Minute // Initial load can take a long time for big models on slow systems...
  407. }
  408. // Don't reload runner if num_gpu=-1 was provided
  409. optsExisting := runner.Options.Runner
  410. optsNew := req.opts.Runner
  411. if optsNew.NumGPU < 0 {
  412. optsExisting.NumGPU = -1
  413. optsNew.NumGPU = -1
  414. }
  415. ctx, cancel := context.WithTimeout(ctx, timeout)
  416. defer cancel()
  417. if !reflect.DeepEqual(runner.adapters, req.model.AdapterPaths) || // have the adapters changed?
  418. !reflect.DeepEqual(runner.projectors, req.model.ProjectorPaths) || // have the projectors changed?
  419. !reflect.DeepEqual(optsExisting, optsNew) || // have the runner options changed?
  420. runner.llama.Ping(ctx) != nil {
  421. return true
  422. }
  423. return false
  424. }
  425. type ByDuration []*runnerRef
  426. func (a ByDuration) Len() int { return len(a) }
  427. func (a ByDuration) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  428. func (a ByDuration) Less(i, j int) bool {
  429. // uint64 to turn negative time (never unload) to largest
  430. return uint64(a[i].sessionDuration) < uint64(a[j].sessionDuration)
  431. }
  432. // TODO - future consideration to pick runners based on size
  433. // type BySize []*runnerRef
  434. // func (a BySize) Len() int { return len(a) }
  435. // func (a BySize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  436. // func (a BySize) Less(i, j int) bool { return a[i].estimatedVRAM < a[j].estimatedVRAM }
  437. // pickBestFitGPUs will try to find the optimal placement of the model in the available GPUs where the model fully fits
  438. // If the model can not be fit fully within the available GPU(s) nil is returned
  439. func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.GpuInfoList {
  440. var estimatedVRAM uint64
  441. for _, gl := range gpus.ByLibrary() {
  442. var ok bool
  443. sgl := append(make(gpu.GpuInfoList, 0, len(gl)), gl...)
  444. // TODO - potentially sort by performance capability, existing models loaded, etc.
  445. // Note: at present, this will favor more VRAM over faster GPU speed in mixed setups
  446. sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl)))
  447. // First attempt to fit the model into a single GPU
  448. for _, g := range sgl {
  449. if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  450. slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
  451. return []gpu.GpuInfo{g}
  452. }
  453. }
  454. // TODO future refinements
  455. // - if multiple Libraries, see if any single GPU in any Library will fit
  456. // - try subsets of GPUs instead of just falling back to 1 or all in a family
  457. // Now try all the GPUs
  458. if ok, estimatedVRAM = llm.PredictServerFit(gl, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
  459. slog.Debug("new model will fit in available VRAM, loading", "model", req.model.ModelPath, "library", gl[0].Library, "required", format.HumanBytes2(estimatedVRAM))
  460. return gl
  461. }
  462. }
  463. return nil
  464. }
  465. // findRunnerToUnload finds a runner to unload to make room for a new model
  466. func (s *Scheduler) findRunnerToUnload(req *LlmRequest) *runnerRef {
  467. s.loadedMu.Lock()
  468. runnerList := make([]*runnerRef, 0, len(s.loaded))
  469. for _, r := range s.loaded {
  470. runnerList = append(runnerList, r)
  471. }
  472. s.loadedMu.Unlock()
  473. // In the future we can enhance the algorithm to be smarter about picking the optimal runner to unload
  474. // e.g., if we have multiple options, will one make room for the request?
  475. sort.Sort(ByDuration(runnerList))
  476. // First try to find a runner that's already idle
  477. for _, runner := range runnerList {
  478. runner.refMu.Lock()
  479. rc := runner.refCount
  480. runner.refMu.Unlock()
  481. if rc == 0 {
  482. slog.Debug("found an idle runner to unload")
  483. return runner
  484. }
  485. }
  486. // None appear idle, just wait for the one with the shortest duration
  487. slog.Debug("no idle runners, picking the shortest duration", "count", len(runnerList))
  488. return runnerList[0]
  489. }
  490. func (s *Scheduler) unloadAllRunners() {
  491. s.loadedMu.Lock()
  492. defer s.loadedMu.Unlock()
  493. for model, runner := range s.loaded {
  494. if runner.llama != nil {
  495. slog.Debug("shutting down runner", "model", model)
  496. runner.llama.Close()
  497. }
  498. }
  499. }