浏览代码

Merge pull request #5506 from dhiltgen/sched_tests

Refine scheduler unit tests for reliability
Daniel Hiltgen 9 月之前
父节点
当前提交
06e5d74e34
共有 1 个文件被更改,包括 195 次插入130 次删除
  1. 195 130
      server/sched_test.go

+ 195 - 130
server/sched_test.go

@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"log/slog"
 	"os"
+	"runtime"
 	"testing"
 	"time"
 
@@ -94,7 +95,7 @@ func TestLoad(t *testing.T) {
 	require.Len(t, s.expiredCh, 1)
 }
 
-type bundle struct {
+type reqBundle struct {
 	ctx     context.Context //nolint:containedctx
 	ctxDone func()
 	srv     *mockLlm
@@ -102,13 +103,13 @@ type bundle struct {
 	ggml    *llm.GGML
 }
 
-func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
+func (scenario *reqBundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
 	return scenario.srv, nil
 }
 
-func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedVRAM uint64) *bundle {
-	scenario := &bundle{}
-	scenario.ctx, scenario.ctxDone = context.WithCancel(ctx)
+func newScenarioRequest(t *testing.T, ctx context.Context, modelName string, estimatedVRAM uint64, duration *api.Duration) *reqBundle {
+	b := &reqBundle{}
+	b.ctx, b.ctxDone = context.WithCancel(ctx)
 	t.Helper()
 
 	f, err := os.CreateTemp(t.TempDir(), modelName)
@@ -135,124 +136,154 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV
 
 	fname := f.Name()
 	model := &Model{Name: modelName, ModelPath: fname}
-	scenario.ggml, err = llm.LoadModel(model.ModelPath, 0)
+	b.ggml, err = llm.LoadModel(model.ModelPath, 0)
 	require.NoError(t, err)
 
-	scenario.req = &LlmRequest{
-		ctx:             scenario.ctx,
+	if duration == nil {
+		duration = &api.Duration{Duration: 5 * time.Millisecond}
+	}
+	b.req = &LlmRequest{
+		ctx:             b.ctx,
 		model:           model,
 		opts:            api.DefaultOptions(),
-		sessionDuration: &api.Duration{Duration: 5 * time.Millisecond},
+		sessionDuration: duration,
 		successCh:       make(chan *runnerRef, 1),
 		errCh:           make(chan error, 1),
 	}
-	scenario.srv = &mockLlm{estimatedVRAM: estimatedVRAM, estimatedVRAMByGPU: map[string]uint64{"": estimatedVRAM}}
-	return scenario
+	b.srv = &mockLlm{estimatedVRAM: estimatedVRAM, estimatedVRAMByGPU: map[string]uint64{"": estimatedVRAM}}
+	return b
 }
 
-func TestRequests(t *testing.T) {
-	ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
-	defer done()
-
-	// Same model, same request
-	scenario1a := newScenario(t, ctx, "ollama-model-1", 10)
-	scenario1a.req.sessionDuration = &api.Duration{Duration: 5 * time.Millisecond}
-	scenario1b := newScenario(t, ctx, "ollama-model-1", 11)
-	scenario1b.req.model = scenario1a.req.model
-	scenario1b.ggml = scenario1a.ggml
-	scenario1b.req.sessionDuration = &api.Duration{Duration: 0}
-
-	// simple reload of same model
-	scenario2a := newScenario(t, ctx, "ollama-model-1", 20)
-	tmpModel := *scenario1a.req.model
-	scenario2a.req.model = &tmpModel
-	scenario2a.ggml = scenario1a.ggml
-	scenario2a.req.sessionDuration = &api.Duration{Duration: 5 * time.Millisecond}
+func getGpuFn() gpu.GpuInfoList {
+	g := gpu.GpuInfo{Library: "metal"}
+	g.TotalMemory = 24 * format.GigaByte
+	g.FreeMemory = 12 * format.GigaByte
+	return []gpu.GpuInfo{g}
+}
 
-	// Multiple loaded models
-	scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte)
-	scenario3b := newScenario(t, ctx, "ollama-model-3b", 24*format.GigaByte)
-	scenario3c := newScenario(t, ctx, "ollama-model-4a", 30)
-	scenario3c.req.opts.NumGPU = 0                           // CPU load, will be allowed
-	scenario3d := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded
+func getCpuFn() gpu.GpuInfoList {
+	g := gpu.GpuInfo{Library: "cpu"}
+	g.TotalMemory = 32 * format.GigaByte
+	g.FreeMemory = 26 * format.GigaByte
+	return []gpu.GpuInfo{g}
+}
 
+func TestRequestsSameModelSameRequest(t *testing.T) {
+	ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer done()
 	s := InitScheduler(ctx)
-	s.getGpuFn = func() gpu.GpuInfoList {
-		g := gpu.GpuInfo{Library: "metal"}
-		g.TotalMemory = 24 * format.GigaByte
-		g.FreeMemory = 12 * format.GigaByte
-		return []gpu.GpuInfo{g}
-	}
-	s.getCpuFn = func() gpu.GpuInfoList {
-		g := gpu.GpuInfo{Library: "cpu"}
-		g.TotalMemory = 32 * format.GigaByte
-		g.FreeMemory = 26 * format.GigaByte
-		return []gpu.GpuInfo{g}
-	}
-	s.newServerFn = scenario1a.newServer
-	slog.Info("scenario1a")
-	s.pendingReqCh <- scenario1a.req
+	s.getGpuFn = getGpuFn
+	s.getCpuFn = getCpuFn
+	a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond})
+	b := newScenarioRequest(t, ctx, "ollama-model-1", 11, &api.Duration{Duration: 0})
+	b.req.model = a.req.model
+	b.ggml = a.ggml
+
+	s.newServerFn = a.newServer
+	slog.Info("a")
+	s.pendingReqCh <- a.req
 	require.Len(t, s.pendingReqCh, 1)
 	s.Run(ctx)
 	select {
-	case resp := <-scenario1a.req.successCh:
-		require.Equal(t, resp.llama, scenario1a.srv)
+	case resp := <-a.req.successCh:
+		require.Equal(t, resp.llama, a.srv)
 		require.Empty(t, s.pendingReqCh)
-		require.Empty(t, scenario1a.req.errCh)
-	case err := <-scenario1a.req.errCh:
+		require.Empty(t, a.req.errCh)
+	case err := <-a.req.errCh:
 		t.Fatal(err.Error())
 	case <-ctx.Done():
 		t.Fatal("timeout")
 	}
 
 	// Same runner as first request due to not needing a reload
-	s.newServerFn = scenario1b.newServer
-	slog.Info("scenario1b")
-	s.pendingReqCh <- scenario1b.req
+	s.newServerFn = b.newServer
+	slog.Info("b")
+	s.pendingReqCh <- b.req
 	select {
-	case resp := <-scenario1b.req.successCh:
-		require.Equal(t, resp.llama, scenario1a.srv)
+	case resp := <-b.req.successCh:
+		require.Equal(t, resp.llama, a.srv)
 		require.Empty(t, s.pendingReqCh)
-		require.Empty(t, scenario1b.req.errCh)
-	case err := <-scenario1b.req.errCh:
+		require.Empty(t, b.req.errCh)
+	case err := <-b.req.errCh:
+		t.Fatal(err.Error())
+	case <-ctx.Done():
+		t.Fatal("timeout")
+	}
+}
+
+func TestRequestsSimpleReloadSameModel(t *testing.T) {
+	ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer done()
+	s := InitScheduler(ctx)
+	s.getGpuFn = getGpuFn
+	s.getCpuFn = getCpuFn
+	a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond})
+	b := newScenarioRequest(t, ctx, "ollama-model-1", 20, &api.Duration{Duration: 5 * time.Millisecond})
+	tmpModel := *a.req.model
+	b.req.model = &tmpModel
+	b.ggml = a.ggml
+
+	s.newServerFn = a.newServer
+	slog.Info("a")
+	s.pendingReqCh <- a.req
+	require.Len(t, s.pendingReqCh, 1)
+	s.Run(ctx)
+	select {
+	case resp := <-a.req.successCh:
+		require.Equal(t, resp.llama, a.srv)
+		require.Empty(t, s.pendingReqCh)
+		require.Empty(t, a.req.errCh)
+	case err := <-a.req.errCh:
 		t.Fatal(err.Error())
 	case <-ctx.Done():
 		t.Fatal("timeout")
 	}
 
 	// Trigger a reload
-	s.newServerFn = scenario2a.newServer
-	scenario2a.req.model.AdapterPaths = []string{"new"}
-	slog.Info("scenario2a")
-	s.pendingReqCh <- scenario2a.req
+	s.newServerFn = b.newServer
+	b.req.model.AdapterPaths = []string{"new"}
+	slog.Info("b")
+	s.pendingReqCh <- b.req
 	// finish first two requests, so model can reload
 	time.Sleep(1 * time.Millisecond)
-	scenario1a.ctxDone()
-	scenario1b.ctxDone()
+	a.ctxDone()
 	select {
-	case resp := <-scenario2a.req.successCh:
-		require.Equal(t, resp.llama, scenario2a.srv)
+	case resp := <-b.req.successCh:
+		require.Equal(t, resp.llama, b.srv)
 		require.Empty(t, s.pendingReqCh)
-		require.Empty(t, scenario2a.req.errCh)
-	case err := <-scenario2a.req.errCh:
+		require.Empty(t, b.req.errCh)
+	case err := <-b.req.errCh:
 		t.Fatal(err.Error())
 	case <-ctx.Done():
 		t.Fatal("timeout")
 	}
+}
+
+func TestRequestsMultipleLoadedModels(t *testing.T) {
+	ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer done()
+	s := InitScheduler(ctx)
+	s.getGpuFn = getGpuFn
+	s.getCpuFn = getCpuFn
+
+	// Multiple loaded models
+	a := newScenarioRequest(t, ctx, "ollama-model-3a", 1*format.GigaByte, nil)
+	b := newScenarioRequest(t, ctx, "ollama-model-3b", 24*format.GigaByte, nil)
+	c := newScenarioRequest(t, ctx, "ollama-model-4a", 30, nil)
+	c.req.opts.NumGPU = 0                                       // CPU load, will be allowed
+	d := newScenarioRequest(t, ctx, "ollama-model-3c", 30, nil) // Needs prior unloaded
 
 	envconfig.MaxRunners = 1
-	s.newServerFn = scenario3a.newServer
-	slog.Info("scenario3a")
-	s.pendingReqCh <- scenario3a.req
-	// finish prior request, so new model can load
-	time.Sleep(1 * time.Millisecond)
-	scenario2a.ctxDone()
+	s.newServerFn = a.newServer
+	slog.Info("a")
+	s.pendingReqCh <- a.req
+	s.Run(ctx)
 	select {
-	case resp := <-scenario3a.req.successCh:
-		require.Equal(t, resp.llama, scenario3a.srv)
+	case resp := <-a.req.successCh:
+		require.Equal(t, resp.llama, a.srv)
 		require.Empty(t, s.pendingReqCh)
-		require.Empty(t, scenario3a.req.errCh)
-	case err := <-scenario3a.req.errCh:
+		require.Empty(t, a.req.errCh)
+	case err := <-a.req.errCh:
 		t.Fatal(err.Error())
 	case <-ctx.Done():
 		t.Fatal("timeout")
@@ -262,15 +293,15 @@ func TestRequests(t *testing.T) {
 	s.loadedMu.Unlock()
 
 	envconfig.MaxRunners = 0
-	s.newServerFn = scenario3b.newServer
-	slog.Info("scenario3b")
-	s.pendingReqCh <- scenario3b.req
+	s.newServerFn = b.newServer
+	slog.Info("b")
+	s.pendingReqCh <- b.req
 	select {
-	case resp := <-scenario3b.req.successCh:
-		require.Equal(t, resp.llama, scenario3b.srv)
+	case resp := <-b.req.successCh:
+		require.Equal(t, resp.llama, b.srv)
 		require.Empty(t, s.pendingReqCh)
-		require.Empty(t, scenario3b.req.errCh)
-	case err := <-scenario3b.req.errCh:
+		require.Empty(t, b.req.errCh)
+	case err := <-b.req.errCh:
 		t.Fatal(err.Error())
 	case <-ctx.Done():
 		t.Fatal("timeout")
@@ -280,15 +311,15 @@ func TestRequests(t *testing.T) {
 	s.loadedMu.Unlock()
 
 	// This is a CPU load with NumGPU = 0 so it should load
-	s.newServerFn = scenario3c.newServer
-	slog.Info("scenario3c")
-	s.pendingReqCh <- scenario3c.req
+	s.newServerFn = c.newServer
+	slog.Info("c")
+	s.pendingReqCh <- c.req
 	select {
-	case resp := <-scenario3c.req.successCh:
-		require.Equal(t, resp.llama, scenario3c.srv)
+	case resp := <-c.req.successCh:
+		require.Equal(t, resp.llama, c.srv)
 		require.Empty(t, s.pendingReqCh)
-		require.Empty(t, scenario3c.req.errCh)
-	case err := <-scenario3c.req.errCh:
+		require.Empty(t, c.req.errCh)
+	case err := <-c.req.errCh:
 		t.Fatal(err.Error())
 	case <-ctx.Done():
 		t.Fatal("timeout")
@@ -298,25 +329,25 @@ func TestRequests(t *testing.T) {
 	s.loadedMu.Unlock()
 
 	// Try to load a model that wont fit
-	s.newServerFn = scenario3d.newServer
-	slog.Info("scenario3d")
+	s.newServerFn = d.newServer
+	slog.Info("d")
 	s.loadedMu.Lock()
 	require.Len(t, s.loaded, 3)
 	s.loadedMu.Unlock()
-	scenario3a.ctxDone() // Won't help since this one isn't big enough to make room
+	a.ctxDone() // Won't help since this one isn't big enough to make room
 	time.Sleep(2 * time.Millisecond)
-	s.pendingReqCh <- scenario3d.req
+	s.pendingReqCh <- d.req
 	// finish prior request, so new model can load
 	time.Sleep(6 * time.Millisecond)
 	s.loadedMu.Lock()
 	require.Len(t, s.loaded, 2)
 	s.loadedMu.Unlock()
-	scenario3b.ctxDone()
+	b.ctxDone()
 	select {
-	case resp := <-scenario3d.req.successCh:
-		require.Equal(t, resp.llama, scenario3d.srv)
+	case resp := <-d.req.successCh:
+		require.Equal(t, resp.llama, d.srv)
 		require.Empty(t, s.pendingReqCh)
-		require.Empty(t, scenario3d.req.errCh)
+		require.Empty(t, d.req.errCh)
 	case <-ctx.Done():
 		t.Fatal("timeout")
 	}
@@ -325,30 +356,59 @@ func TestRequests(t *testing.T) {
 	s.loadedMu.Unlock()
 }
 
-func TestGetRunner(t *testing.T) {
-	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
+func TestRequestsModelTooBigForSystem(t *testing.T) {
+	ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
 	defer done()
-
-	scenario1a := newScenario(t, ctx, "ollama-model-1a", 10)
-	scenario1a.req.sessionDuration = &api.Duration{Duration: 0}
-	scenario1b := newScenario(t, ctx, "ollama-model-1b", 10)
-	scenario1b.req.sessionDuration = &api.Duration{Duration: 0}
-	scenario1c := newScenario(t, ctx, "ollama-model-1c", 10)
-	scenario1c.req.sessionDuration = &api.Duration{Duration: 0}
-	envconfig.MaxQueuedRequests = 1
 	s := InitScheduler(ctx)
 	s.getGpuFn = func() gpu.GpuInfoList {
 		g := gpu.GpuInfo{Library: "metal"}
-		g.TotalMemory = 24 * format.GigaByte
-		g.FreeMemory = 12 * format.GigaByte
+		g.TotalMemory = 4 * format.MebiByte
+		g.FreeMemory = 3 * format.MebiByte
 		return []gpu.GpuInfo{g}
 	}
-	s.newServerFn = scenario1a.newServer
-	slog.Info("scenario1a")
-	successCh1a, errCh1a := s.GetRunner(scenario1a.ctx, scenario1a.req.model, scenario1a.req.opts, scenario1a.req.sessionDuration)
+
+	s.getCpuFn = func() gpu.GpuInfoList {
+		g := gpu.GpuInfo{Library: "cpu"}
+		g.TotalMemory = 4 * format.MebiByte
+		g.FreeMemory = 2 * format.MebiByte
+		return []gpu.GpuInfo{g}
+	}
+	a := newScenarioRequest(t, ctx, "ollama-model-1", 10, &api.Duration{Duration: 5 * time.Millisecond})
+
+	s.newServerFn = a.newServer
+	slog.Info("a")
+	s.pendingReqCh <- a.req
 	require.Len(t, s.pendingReqCh, 1)
-	slog.Info("scenario1b")
-	successCh1b, errCh1b := s.GetRunner(scenario1b.ctx, scenario1b.req.model, scenario1b.req.opts, scenario1b.req.sessionDuration)
+	s.Run(ctx)
+	select {
+	case <-a.req.successCh:
+		if runtime.GOOS == "linux" {
+			t.Fatal("request should have been rejected with out of space")
+		}
+		// else - Darwin and Windows don't reject right now
+	case err := <-a.req.errCh:
+		require.Contains(t, err.Error(), "too large")
+	case <-ctx.Done():
+		t.Fatal("timeout")
+	}
+}
+func TestGetRunner(t *testing.T) {
+	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
+	defer done()
+
+	a := newScenarioRequest(t, ctx, "ollama-model-1a", 10, &api.Duration{Duration: 2 * time.Millisecond})
+	b := newScenarioRequest(t, ctx, "ollama-model-1b", 10, &api.Duration{Duration: 2 * time.Millisecond})
+	c := newScenarioRequest(t, ctx, "ollama-model-1c", 10, &api.Duration{Duration: 2 * time.Millisecond})
+	envconfig.MaxQueuedRequests = 1
+	s := InitScheduler(ctx)
+	s.getGpuFn = getGpuFn
+	s.getCpuFn = getCpuFn
+	s.newServerFn = a.newServer
+	slog.Info("a")
+	successCh1a, errCh1a := s.GetRunner(a.ctx, a.req.model, a.req.opts, a.req.sessionDuration)
+	require.Len(t, s.pendingReqCh, 1)
+	slog.Info("b")
+	successCh1b, errCh1b := s.GetRunner(b.ctx, b.req.model, b.req.opts, b.req.sessionDuration)
 	require.Len(t, s.pendingReqCh, 1)
 	require.Empty(t, successCh1b)
 	require.Len(t, errCh1b, 1)
@@ -357,22 +417,24 @@ func TestGetRunner(t *testing.T) {
 	s.Run(ctx)
 	select {
 	case resp := <-successCh1a:
-		require.Equal(t, resp.llama, scenario1a.srv)
+		require.Equal(t, resp.llama, a.srv)
 		require.Empty(t, s.pendingReqCh)
 		require.Empty(t, errCh1a)
+	case err := <-errCh1a:
+		t.Fatal(err.Error())
 	case <-ctx.Done():
 		t.Fatal("timeout")
 	}
-	scenario1a.ctxDone()
+	a.ctxDone() // Set "a" model to idle so it can unload
 	s.loadedMu.Lock()
 	require.Len(t, s.loaded, 1)
 	s.loadedMu.Unlock()
 
-	scenario1c.req.model.ModelPath = "bad path"
-	slog.Info("scenario1c")
-	successCh1c, errCh1c := s.GetRunner(scenario1c.ctx, scenario1c.req.model, scenario1c.req.opts, scenario1c.req.sessionDuration)
+	c.req.model.ModelPath = "bad path"
+	slog.Info("c")
+	successCh1c, errCh1c := s.GetRunner(c.ctx, c.req.model, c.req.opts, c.req.sessionDuration)
 	// Starts in pending channel, then should be quickly processsed to return an error
-	time.Sleep(5 * time.Millisecond)
+	time.Sleep(20 * time.Millisecond) // Long enough for the "a" model to expire and unload
 	require.Empty(t, successCh1c)
 	s.loadedMu.Lock()
 	require.Empty(t, s.loaded)
@@ -380,7 +442,7 @@ func TestGetRunner(t *testing.T) {
 	require.Len(t, errCh1c, 1)
 	err = <-errCh1c
 	require.Contains(t, err.Error(), "bad path")
-	scenario1b.ctxDone()
+	b.ctxDone()
 }
 
 // TODO - add one scenario that triggers the bogus finished event with positive ref count
@@ -389,7 +451,7 @@ func TestPrematureExpired(t *testing.T) {
 	defer done()
 
 	// Same model, same request
-	scenario1a := newScenario(t, ctx, "ollama-model-1a", 10)
+	scenario1a := newScenarioRequest(t, ctx, "ollama-model-1a", 10, nil)
 	s := InitScheduler(ctx)
 	s.getGpuFn = func() gpu.GpuInfoList {
 		g := gpu.GpuInfo{Library: "metal"}
@@ -411,6 +473,8 @@ func TestPrematureExpired(t *testing.T) {
 		s.loadedMu.Unlock()
 		slog.Info("sending premature expired event now")
 		s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe
+	case err := <-errCh1a:
+		t.Fatal(err.Error())
 	case <-ctx.Done():
 		t.Fatal("timeout")
 	}
@@ -446,6 +510,8 @@ func TestUseLoadedRunner(t *testing.T) {
 	select {
 	case success := <-req.successCh:
 		require.Equal(t, r1, success)
+	case err := <-req.errCh:
+		t.Fatal(err.Error())
 	case <-ctx.Done():
 		t.Fatal("timeout")
 	}
@@ -625,8 +691,7 @@ func TestAlreadyCanceled(t *testing.T) {
 	defer done()
 	dctx, done2 := context.WithCancel(ctx)
 	done2()
-	scenario1a := newScenario(t, dctx, "ollama-model-1", 10)
-	scenario1a.req.sessionDuration = &api.Duration{Duration: 0}
+	scenario1a := newScenarioRequest(t, dctx, "ollama-model-1", 10, &api.Duration{Duration: 0})
 	s := InitScheduler(ctx)
 	slog.Info("scenario1a")
 	s.pendingReqCh <- scenario1a.req