sched_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "log/slog"
  7. "os"
  8. "testing"
  9. "time"
  10. "github.com/ollama/ollama/api"
  11. "github.com/ollama/ollama/app/lifecycle"
  12. "github.com/ollama/ollama/format"
  13. "github.com/ollama/ollama/gpu"
  14. "github.com/ollama/ollama/llm"
  15. "github.com/stretchr/testify/require"
  16. )
  17. func init() {
  18. os.Setenv("OLLAMA_DEBUG", "1")
  19. lifecycle.InitLogging()
  20. }
  21. func TestInitScheduler(t *testing.T) {
  22. ctx, done := context.WithCancel(context.Background())
  23. defer done()
  24. s := InitScheduler(ctx)
  25. s.loadedMu.Lock()
  26. require.NotNil(t, s.loaded)
  27. s.loadedMu.Unlock()
  28. }
  29. func TestLoad(t *testing.T) {
  30. ctx, done := context.WithTimeout(context.Background(), 20*time.Millisecond)
  31. defer done()
  32. s := InitScheduler(ctx)
  33. var ggml *llm.GGML // value not used in tests
  34. req := &LlmRequest{
  35. ctx: ctx,
  36. model: &Model{ModelPath: "foo"},
  37. opts: api.DefaultOptions(),
  38. successCh: make(chan *runnerRef, 1),
  39. errCh: make(chan error, 1),
  40. sessionDuration: &api.Duration{Duration: 2 * time.Second},
  41. }
  42. // Fail to load model first
  43. s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
  44. return nil, fmt.Errorf("something failed to load model blah")
  45. }
  46. gpus := gpu.GpuInfoList{}
  47. s.load(req, ggml, gpus, 0)
  48. require.Empty(t, req.successCh)
  49. require.Len(t, req.errCh, 1)
  50. s.loadedMu.Lock()
  51. require.Empty(t, s.loaded)
  52. s.loadedMu.Unlock()
  53. err := <-req.errCh
  54. require.Contains(t, err.Error(), "this model may be incompatible")
  55. server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}}
  56. s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
  57. return server, nil
  58. }
  59. s.load(req, ggml, gpus, 0)
  60. select {
  61. case err := <-req.errCh:
  62. require.NoError(t, err)
  63. case resp := <-req.successCh:
  64. require.Equal(t, uint64(10), resp.estimatedVRAM)
  65. require.Equal(t, uint(1), resp.refCount)
  66. s.loadedMu.Lock()
  67. require.Len(t, s.loaded, 1)
  68. s.loadedMu.Unlock()
  69. }
  70. req.model.ModelPath = "dummy_model_path"
  71. server.waitResp = fmt.Errorf("wait failure")
  72. s.load(req, ggml, gpus, 0)
  73. select {
  74. case err := <-req.errCh:
  75. require.Contains(t, err.Error(), "wait failure")
  76. case resp := <-req.successCh:
  77. t.Fatalf("unexpected success %v", resp)
  78. }
  79. s.loadedMu.Lock()
  80. runner := s.loaded["dummy_model_path"]
  81. s.loadedMu.Unlock()
  82. require.NotNil(t, runner)
  83. require.Equal(t, uint(0), runner.refCount)
  84. time.Sleep(1 * time.Millisecond)
  85. require.Len(t, s.expiredCh, 1)
  86. }
  87. type reqBundle struct {
  88. ctx context.Context //nolint:containedctx
  89. ctxDone func()
  90. srv *mockLlm
  91. req *LlmRequest
  92. ggml *llm.GGML
  93. }
  94. func (scenario *reqBundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
  95. return scenario.srv, nil
  96. }
  97. func newScenarioRequest(t *testing.T, ctx context.Context, modelName string, estimatedVRAM uint64, duration *api.Duration) *reqBundle {
  98. b := &reqBundle{}
  99. b.ctx, b.ctxDone = context.WithCancel(ctx)
  100. t.Helper()
  101. f, err := os.CreateTemp(t.TempDir(), modelName)
  102. require.NoError(t, err)
  103. defer f.Close()
  104. require.NoError(t, llm.WriteGGUF(f, llm.KV{
  105. "general.architecture": "llama",
  106. "general.name": "name",
  107. "llama.context_length": uint32(32),
  108. "llama.embedding_length": uint32(4096),
  109. "llama.block_count": uint32(1),
  110. "llama.attention.head_count": uint32(32),
  111. "llama.attention.head_count_kv": uint32(32),
  112. "tokenizer.ggml.tokens": []string{" "},
  113. "tokenizer.ggml.scores": []float32{0},
  114. "tokenizer.ggml.token_type": []int32{0},
  115. }, []*llm.Tensor{
  116. {Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))},
  117. {Name: "output.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))},
  118. }))
  119. require.NoError(t, err)
  120. fname := f.Name()
  121. model := &Model{Name: modelName, ModelPath: fname}
  122. b.ggml, err = llm.LoadModel(model.ModelPath, 0)
  123. require.NoError(t, err)
  124. if duration == nil {
  125. duration = &api.Duration{Duration: 5 * time.Millisecond}
  126. }
  127. b.req = &LlmRequest{
  128. ctx: b.ctx,
  129. model: model,
  130. opts: api.DefaultOptions(),
  131. sessionDuration: duration,
  132. successCh: make(chan *runnerRef, 1),
  133. errCh: make(chan error, 1),
  134. }
  135. b.srv = &mockLlm{estimatedVRAM: estimatedVRAM, estimatedVRAMByGPU: map[string]uint64{"": estimatedVRAM}}
  136. return b
  137. }
  138. func getGpuFn() gpu.GpuInfoList {
  139. g := gpu.GpuInfo{Library: "metal"}
  140. g.TotalMemory = 24 * format.GigaByte
  141. g.FreeMemory = 12 * format.GigaByte
  142. return []gpu.GpuInfo{g}
  143. }
  144. func getCpuFn() gpu.GpuInfoList {
  145. g := gpu.GpuInfo{Library: "cpu"}
  146. g.TotalMemory = 32 * format.GigaByte
  147. g.FreeMemory = 26 * format.GigaByte
  148. return []gpu.GpuInfo{g}
  149. }
  150. func TestRequestsSameModelSameRequest(t *testing.T) {
  151. ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
  152. defer done()
  153. s := InitScheduler(ctx)
  154. s.getGpuFn = getGpuFn
  155. s.getCpuFn = getCpuFn
  156. a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond})
  157. b := newScenarioRequest(t, ctx, "ollama-model-1", 11, &api.Duration{Duration: 0})
  158. b.req.model = a.req.model
  159. b.ggml = a.ggml
  160. s.newServerFn = a.newServer
  161. slog.Info("a")
  162. s.pendingReqCh <- a.req
  163. require.Len(t, s.pendingReqCh, 1)
  164. s.Run(ctx)
  165. select {
  166. case resp := <-a.req.successCh:
  167. require.Equal(t, resp.llama, a.srv)
  168. require.Empty(t, s.pendingReqCh)
  169. require.Empty(t, a.req.errCh)
  170. case err := <-a.req.errCh:
  171. t.Fatal(err.Error())
  172. case <-ctx.Done():
  173. t.Fatal("timeout")
  174. }
  175. // Same runner as first request due to not needing a reload
  176. s.newServerFn = b.newServer
  177. slog.Info("b")
  178. s.pendingReqCh <- b.req
  179. select {
  180. case resp := <-b.req.successCh:
  181. require.Equal(t, resp.llama, a.srv)
  182. require.Empty(t, s.pendingReqCh)
  183. require.Empty(t, b.req.errCh)
  184. case err := <-b.req.errCh:
  185. t.Fatal(err.Error())
  186. case <-ctx.Done():
  187. t.Fatal("timeout")
  188. }
  189. }
  190. func TestRequestsSimpleReloadSameModel(t *testing.T) {
  191. ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
  192. defer done()
  193. s := InitScheduler(ctx)
  194. s.getGpuFn = getGpuFn
  195. s.getCpuFn = getCpuFn
  196. a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond})
  197. b := newScenarioRequest(t, ctx, "ollama-model-1", 20, &api.Duration{Duration: 5 * time.Millisecond})
  198. tmpModel := *a.req.model
  199. b.req.model = &tmpModel
  200. b.ggml = a.ggml
  201. s.newServerFn = a.newServer
  202. slog.Info("a")
  203. s.pendingReqCh <- a.req
  204. require.Len(t, s.pendingReqCh, 1)
  205. s.Run(ctx)
  206. select {
  207. case resp := <-a.req.successCh:
  208. require.Equal(t, resp.llama, a.srv)
  209. require.Empty(t, s.pendingReqCh)
  210. require.Empty(t, a.req.errCh)
  211. case err := <-a.req.errCh:
  212. t.Fatal(err.Error())
  213. case <-ctx.Done():
  214. t.Fatal("timeout")
  215. }
  216. // Trigger a reload
  217. s.newServerFn = b.newServer
  218. b.req.model.AdapterPaths = []string{"new"}
  219. slog.Info("b")
  220. s.pendingReqCh <- b.req
  221. // finish first two requests, so model can reload
  222. time.Sleep(1 * time.Millisecond)
  223. a.ctxDone()
  224. select {
  225. case resp := <-b.req.successCh:
  226. require.Equal(t, resp.llama, b.srv)
  227. require.Empty(t, s.pendingReqCh)
  228. require.Empty(t, b.req.errCh)
  229. case err := <-b.req.errCh:
  230. t.Fatal(err.Error())
  231. case <-ctx.Done():
  232. t.Fatal("timeout")
  233. }
  234. }
  235. func TestRequestsMultipleLoadedModels(t *testing.T) {
  236. ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
  237. defer done()
  238. s := InitScheduler(ctx)
  239. s.getGpuFn = getGpuFn
  240. s.getCpuFn = getCpuFn
  241. // Multiple loaded models
  242. a := newScenarioRequest(t, ctx, "ollama-model-3a", 1*format.GigaByte, nil)
  243. b := newScenarioRequest(t, ctx, "ollama-model-3b", 24*format.GigaByte, nil)
  244. c := newScenarioRequest(t, ctx, "ollama-model-4a", 30, nil)
  245. c.req.opts.NumGPU = 0 // CPU load, will be allowed
  246. d := newScenarioRequest(t, ctx, "ollama-model-3c", 30, nil) // Needs prior unloaded
  247. t.Setenv("OLLAMA_MAX_LOADED_MODELS", "1")
  248. s.newServerFn = a.newServer
  249. slog.Info("a")
  250. s.pendingReqCh <- a.req
  251. s.Run(ctx)
  252. select {
  253. case resp := <-a.req.successCh:
  254. require.Equal(t, resp.llama, a.srv)
  255. require.Empty(t, s.pendingReqCh)
  256. require.Empty(t, a.req.errCh)
  257. case err := <-a.req.errCh:
  258. t.Fatal(err.Error())
  259. case <-ctx.Done():
  260. t.Fatal("timeout")
  261. }
  262. s.loadedMu.Lock()
  263. require.Len(t, s.loaded, 1)
  264. s.loadedMu.Unlock()
  265. t.Setenv("OLLAMA_MAX_LOADED_MODELS", "0")
  266. s.newServerFn = b.newServer
  267. slog.Info("b")
  268. s.pendingReqCh <- b.req
  269. select {
  270. case resp := <-b.req.successCh:
  271. require.Equal(t, resp.llama, b.srv)
  272. require.Empty(t, s.pendingReqCh)
  273. require.Empty(t, b.req.errCh)
  274. case err := <-b.req.errCh:
  275. t.Fatal(err.Error())
  276. case <-ctx.Done():
  277. t.Fatal("timeout")
  278. }
  279. s.loadedMu.Lock()
  280. require.Len(t, s.loaded, 2)
  281. s.loadedMu.Unlock()
  282. // This is a CPU load with NumGPU = 0 so it should load
  283. s.newServerFn = c.newServer
  284. slog.Info("c")
  285. s.pendingReqCh <- c.req
  286. select {
  287. case resp := <-c.req.successCh:
  288. require.Equal(t, resp.llama, c.srv)
  289. require.Empty(t, s.pendingReqCh)
  290. require.Empty(t, c.req.errCh)
  291. case err := <-c.req.errCh:
  292. t.Fatal(err.Error())
  293. case <-ctx.Done():
  294. t.Fatal("timeout")
  295. }
  296. s.loadedMu.Lock()
  297. require.Len(t, s.loaded, 3)
  298. s.loadedMu.Unlock()
  299. // Try to load a model that wont fit
  300. s.newServerFn = d.newServer
  301. slog.Info("d")
  302. s.loadedMu.Lock()
  303. require.Len(t, s.loaded, 3)
  304. s.loadedMu.Unlock()
  305. a.ctxDone() // Won't help since this one isn't big enough to make room
  306. time.Sleep(2 * time.Millisecond)
  307. s.pendingReqCh <- d.req
  308. // finish prior request, so new model can load
  309. time.Sleep(6 * time.Millisecond)
  310. s.loadedMu.Lock()
  311. require.Len(t, s.loaded, 2)
  312. s.loadedMu.Unlock()
  313. b.ctxDone()
  314. select {
  315. case resp := <-d.req.successCh:
  316. require.Equal(t, resp.llama, d.srv)
  317. require.Empty(t, s.pendingReqCh)
  318. require.Empty(t, d.req.errCh)
  319. case <-ctx.Done():
  320. t.Fatal("timeout")
  321. }
  322. s.loadedMu.Lock()
  323. require.Len(t, s.loaded, 2)
  324. s.loadedMu.Unlock()
  325. }
  326. func TestGetRunner(t *testing.T) {
  327. ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
  328. defer done()
  329. a := newScenarioRequest(t, ctx, "ollama-model-1a", 10, &api.Duration{Duration: 2 * time.Millisecond})
  330. b := newScenarioRequest(t, ctx, "ollama-model-1b", 10, &api.Duration{Duration: 2 * time.Millisecond})
  331. c := newScenarioRequest(t, ctx, "ollama-model-1c", 10, &api.Duration{Duration: 2 * time.Millisecond})
  332. t.Setenv("OLLAMA_MAX_QUEUE", "1")
  333. s := InitScheduler(ctx)
  334. s.getGpuFn = getGpuFn
  335. s.getCpuFn = getCpuFn
  336. s.newServerFn = a.newServer
  337. slog.Info("a")
  338. successCh1a, errCh1a := s.GetRunner(a.ctx, a.req.model, a.req.opts, a.req.sessionDuration)
  339. require.Len(t, s.pendingReqCh, 1)
  340. slog.Info("b")
  341. successCh1b, errCh1b := s.GetRunner(b.ctx, b.req.model, b.req.opts, b.req.sessionDuration)
  342. require.Len(t, s.pendingReqCh, 1)
  343. require.Empty(t, successCh1b)
  344. require.Len(t, errCh1b, 1)
  345. err := <-errCh1b
  346. require.Contains(t, err.Error(), "server busy")
  347. s.Run(ctx)
  348. select {
  349. case resp := <-successCh1a:
  350. require.Equal(t, resp.llama, a.srv)
  351. require.Empty(t, s.pendingReqCh)
  352. require.Empty(t, errCh1a)
  353. case err := <-errCh1a:
  354. t.Fatal(err.Error())
  355. case <-ctx.Done():
  356. t.Fatal("timeout")
  357. }
  358. a.ctxDone() // Set "a" model to idle so it can unload
  359. s.loadedMu.Lock()
  360. require.Len(t, s.loaded, 1)
  361. s.loadedMu.Unlock()
  362. c.req.model.ModelPath = "bad path"
  363. slog.Info("c")
  364. successCh1c, errCh1c := s.GetRunner(c.ctx, c.req.model, c.req.opts, c.req.sessionDuration)
  365. // Starts in pending channel, then should be quickly processsed to return an error
  366. time.Sleep(20 * time.Millisecond) // Long enough for the "a" model to expire and unload
  367. require.Empty(t, successCh1c)
  368. s.loadedMu.Lock()
  369. require.Empty(t, s.loaded)
  370. s.loadedMu.Unlock()
  371. require.Len(t, errCh1c, 1)
  372. err = <-errCh1c
  373. require.Contains(t, err.Error(), "bad path")
  374. b.ctxDone()
  375. }
  376. // TODO - add one scenario that triggers the bogus finished event with positive ref count
  377. func TestPrematureExpired(t *testing.T) {
  378. ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
  379. defer done()
  380. // Same model, same request
  381. scenario1a := newScenarioRequest(t, ctx, "ollama-model-1a", 10, nil)
  382. s := InitScheduler(ctx)
  383. s.getGpuFn = func() gpu.GpuInfoList {
  384. g := gpu.GpuInfo{Library: "metal"}
  385. g.TotalMemory = 24 * format.GigaByte
  386. g.FreeMemory = 12 * format.GigaByte
  387. return []gpu.GpuInfo{g}
  388. }
  389. s.newServerFn = scenario1a.newServer
  390. successCh1a, errCh1a := s.GetRunner(scenario1a.ctx, scenario1a.req.model, scenario1a.req.opts, scenario1a.req.sessionDuration)
  391. require.Len(t, s.pendingReqCh, 1)
  392. s.Run(ctx)
  393. select {
  394. case resp := <-successCh1a:
  395. require.Equal(t, resp.llama, scenario1a.srv)
  396. require.Empty(t, s.pendingReqCh)
  397. require.Empty(t, errCh1a)
  398. s.loadedMu.Lock()
  399. require.Len(t, s.loaded, 1)
  400. s.loadedMu.Unlock()
  401. slog.Info("sending premature expired event now")
  402. s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe
  403. case err := <-errCh1a:
  404. t.Fatal(err.Error())
  405. case <-ctx.Done():
  406. t.Fatal("timeout")
  407. }
  408. time.Sleep(scenario1a.req.sessionDuration.Duration)
  409. scenario1a.ctxDone()
  410. time.Sleep(20 * time.Millisecond)
  411. require.LessOrEqual(t, len(s.finishedReqCh), 1)
  412. time.Sleep(10 * time.Millisecond)
  413. require.Empty(t, s.finishedReqCh)
  414. s.loadedMu.Lock()
  415. require.Empty(t, s.loaded)
  416. s.loadedMu.Unlock()
  417. // also shouldn't happen in real life
  418. s.finishedReqCh <- scenario1a.req
  419. time.Sleep(5 * time.Millisecond)
  420. }
  421. func TestUseLoadedRunner(t *testing.T) {
  422. ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
  423. req := &LlmRequest{
  424. ctx: ctx,
  425. opts: api.DefaultOptions(),
  426. successCh: make(chan *runnerRef, 1),
  427. sessionDuration: &api.Duration{Duration: 2},
  428. }
  429. finished := make(chan *LlmRequest)
  430. llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
  431. r1 := &runnerRef{llama: llm1, sessionDuration: 1, numParallel: 1}
  432. req.useLoadedRunner(r1, finished)
  433. require.Equal(t, uint(1), r1.refCount)
  434. require.Equal(t, time.Duration(2), r1.sessionDuration)
  435. select {
  436. case success := <-req.successCh:
  437. require.Equal(t, r1, success)
  438. case err := <-req.errCh:
  439. t.Fatal(err.Error())
  440. case <-ctx.Done():
  441. t.Fatal("timeout")
  442. }
  443. done()
  444. fin := <-finished
  445. require.Equal(t, req, fin)
  446. }
  447. func TestUpdateFreeSpace(t *testing.T) {
  448. ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
  449. defer done()
  450. gpus := gpu.GpuInfoList{
  451. {
  452. Library: "a",
  453. ID: "1",
  454. },
  455. {
  456. Library: "a",
  457. ID: "2",
  458. },
  459. }
  460. gpus[0].TotalMemory = 1000
  461. gpus[0].FreeMemory = 900
  462. gpus[1].TotalMemory = 2000
  463. gpus[1].FreeMemory = 1900
  464. llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}}
  465. llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}}
  466. r1 := &runnerRef{llama: llm1, gpus: gpus, numParallel: 1}
  467. r2 := &runnerRef{llama: llm2, gpus: gpus, numParallel: 1}
  468. s := InitScheduler(ctx)
  469. s.loadedMu.Lock()
  470. s.loaded["a"] = r1
  471. s.loaded["b"] = r2
  472. s.loadedMu.Unlock()
  473. s.updateFreeSpace(gpus)
  474. require.Equal(t, uint64(1000-50-125), gpus[0].FreeMemory)
  475. require.Equal(t, uint64(2000-50-75), gpus[1].FreeMemory)
  476. }
  477. func TestFilterGPUsWithoutLoadingModels(t *testing.T) {
  478. ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
  479. defer done()
  480. gpus := gpu.GpuInfoList{
  481. {
  482. Library: "cuda",
  483. ID: "0",
  484. },
  485. {
  486. Library: "cuda",
  487. ID: "1",
  488. },
  489. }
  490. r1 := &runnerRef{gpus: gpu.GpuInfoList{gpus[0]}, loading: true}
  491. s := InitScheduler(ctx)
  492. s.loadedMu.Lock()
  493. s.loaded["a"] = r1
  494. s.loadedMu.Unlock()
  495. tmp := s.filterGPUsWithoutLoadingModels(gpus)
  496. require.Len(t, tmp, 1)
  497. require.Equal(t, "1", tmp[0].ID)
  498. r1.gpus = gpu.GpuInfoList{gpus[1]}
  499. tmp = s.filterGPUsWithoutLoadingModels(gpus)
  500. require.Len(t, tmp, 1)
  501. require.Equal(t, "0", tmp[0].ID)
  502. r1.gpus = gpu.GpuInfoList{}
  503. tmp = s.filterGPUsWithoutLoadingModels(gpus)
  504. require.Len(t, tmp, 2)
  505. }
  506. func TestFindRunnerToUnload(t *testing.T) {
  507. ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
  508. defer done()
  509. r1 := &runnerRef{refCount: 1, sessionDuration: 1, numParallel: 1}
  510. r2 := &runnerRef{sessionDuration: 2, numParallel: 1}
  511. s := InitScheduler(ctx)
  512. s.loadedMu.Lock()
  513. s.loaded["a"] = r1
  514. s.loaded["b"] = r2
  515. s.loadedMu.Unlock()
  516. resp := s.findRunnerToUnload()
  517. require.Equal(t, r2, resp)
  518. r2.refCount = 1
  519. resp = s.findRunnerToUnload()
  520. require.Equal(t, r1, resp)
  521. }
  522. func TestNeedsReload(t *testing.T) {
  523. ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
  524. defer done()
  525. llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
  526. do := api.DefaultOptions()
  527. runner := &runnerRef{
  528. model: &Model{
  529. AdapterPaths: []string{"adapter1"},
  530. ProjectorPaths: []string{"projector1"},
  531. },
  532. Options: &do,
  533. llama: llm,
  534. numParallel: 1,
  535. }
  536. req := &LlmRequest{
  537. model: &Model{
  538. AdapterPaths: []string{"adapter2"},
  539. ProjectorPaths: []string{"projector2"},
  540. },
  541. opts: api.DefaultOptions(),
  542. }
  543. resp := runner.needsReload(ctx, req)
  544. require.True(t, resp)
  545. req.model.AdapterPaths = runner.model.AdapterPaths
  546. resp = runner.needsReload(ctx, req)
  547. require.True(t, resp)
  548. req.model.ProjectorPaths = runner.model.ProjectorPaths
  549. runner.loading = true
  550. req.opts.NumBatch = 1234
  551. resp = runner.needsReload(ctx, req)
  552. require.True(t, resp)
  553. req.opts.NumBatch = runner.Options.NumBatch
  554. llm.pingResp = fmt.Errorf("foo")
  555. resp = runner.needsReload(ctx, req)
  556. require.True(t, resp)
  557. llm.pingResp = nil
  558. resp = runner.needsReload(ctx, req)
  559. require.False(t, resp)
  560. req.opts.NumGPU = 99
  561. resp = runner.needsReload(ctx, req)
  562. require.True(t, resp)
  563. req.opts.NumGPU = -1
  564. resp = runner.needsReload(ctx, req)
  565. require.False(t, resp)
  566. }
  567. func TestUnloadAllRunners(t *testing.T) {
  568. ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
  569. defer done()
  570. llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
  571. llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
  572. s := InitScheduler(ctx)
  573. s.unloadAllRunners()
  574. r1 := &runnerRef{llama: llm1, numParallel: 1}
  575. r2 := &runnerRef{llama: llm2, numParallel: 1}
  576. s.loadedMu.Lock()
  577. s.loaded["a"] = r1
  578. s.loaded["b"] = r2
  579. s.loadedMu.Unlock()
  580. s.unloadAllRunners()
  581. require.True(t, llm1.closeCalled)
  582. require.True(t, llm2.closeCalled)
  583. }
  584. func TestUnload(t *testing.T) {
  585. llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
  586. r1 := &runnerRef{llama: llm1, numParallel: 1}
  587. r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}, numParallel: 1}
  588. r1.unload()
  589. require.True(t, llm1.closeCalled)
  590. r2.unload()
  591. require.Nil(t, r2.model)
  592. }
  593. func TestAlreadyCanceled(t *testing.T) {
  594. ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
  595. defer done()
  596. dctx, done2 := context.WithCancel(ctx)
  597. done2()
  598. scenario1a := newScenarioRequest(t, dctx, "ollama-model-1", 10, &api.Duration{Duration: 0})
  599. s := InitScheduler(ctx)
  600. slog.Info("scenario1a")
  601. s.pendingReqCh <- scenario1a.req
  602. require.Len(t, s.pendingReqCh, 1)
  603. s.Run(ctx)
  604. time.Sleep(5 * time.Millisecond)
  605. require.Empty(t, s.pendingReqCh)
  606. require.Empty(t, scenario1a.req.errCh)
  607. require.Empty(t, scenario1a.req.successCh)
  608. }
  609. func TestHomogeneousGPUs(t *testing.T) {
  610. ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
  611. defer done()
  612. s := InitScheduler(ctx)
  613. s.getGpuFn = func() gpu.GpuInfoList {
  614. // Set memory values to require the model to be spread
  615. gpus := []gpu.GpuInfo{
  616. {Library: "cuda"},
  617. {Library: "rocm"},
  618. }
  619. gpus[0].TotalMemory = 1 * format.GibiByte
  620. gpus[0].FreeMemory = 256 * format.MebiByte
  621. gpus[1].TotalMemory = 1 * format.GibiByte
  622. gpus[1].FreeMemory = 256 * format.MebiByte
  623. return gpus
  624. }
  625. s.getCpuFn = getCpuFn
  626. a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond})
  627. s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
  628. require.Len(t, gpus, 1)
  629. return a.newServer(gpus, model, ggml, adapters, projectors, opts, numParallel)
  630. }
  631. slog.Info("a")
  632. s.pendingReqCh <- a.req
  633. require.Len(t, s.pendingReqCh, 1)
  634. s.Run(ctx)
  635. select {
  636. case resp := <-a.req.successCh:
  637. require.Equal(t, resp.llama, a.srv)
  638. require.Empty(t, s.pendingReqCh)
  639. require.Empty(t, a.req.errCh)
  640. case err := <-a.req.errCh:
  641. t.Fatal(err.Error())
  642. case <-ctx.Done():
  643. t.Fatal("timeout")
  644. }
  645. }
  646. type mockLlm struct {
  647. pingResp error
  648. waitResp error
  649. completionResp error
  650. embedResp *llm.EmbedResponse
  651. embedRespErr error
  652. tokenizeResp []int
  653. tokenizeRespErr error
  654. detokenizeResp string
  655. detonekizeRespErr error
  656. closeResp error
  657. closeCalled bool
  658. estimatedVRAM uint64
  659. estimatedTotal uint64
  660. estimatedVRAMByGPU map[string]uint64
  661. }
  662. func (s *mockLlm) Ping(ctx context.Context) error { return s.pingResp }
  663. func (s *mockLlm) WaitUntilRunning(ctx context.Context) error { return s.waitResp }
  664. func (s *mockLlm) Completion(ctx context.Context, req llm.CompletionRequest, fn func(llm.CompletionResponse)) error {
  665. return s.completionResp
  666. }
  667. func (s *mockLlm) Embed(ctx context.Context, input []string) (*llm.EmbedResponse, error) {
  668. return s.embedResp, s.embedRespErr
  669. }
  670. func (s *mockLlm) Tokenize(ctx context.Context, content string) ([]int, error) {
  671. return s.tokenizeResp, s.tokenizeRespErr
  672. }
  673. func (s *mockLlm) Detokenize(ctx context.Context, tokens []int) (string, error) {
  674. return s.detokenizeResp, s.detonekizeRespErr
  675. }
  676. func (s *mockLlm) Close() error {
  677. s.closeCalled = true
  678. return s.closeResp
  679. }
  680. func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM }
  681. func (s *mockLlm) EstimatedTotal() uint64 { return s.estimatedTotal }
  682. func (s *mockLlm) EstimatedVRAMByGPU(gpuid string) uint64 { return s.estimatedVRAMByGPU[gpuid] }