|
@@ -47,11 +47,11 @@ func TestLoad(t *testing.T) {
|
|
sessionDuration: 2,
|
|
sessionDuration: 2,
|
|
}
|
|
}
|
|
// Fail to load model first
|
|
// Fail to load model first
|
|
- s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
|
|
|
|
|
|
+ s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
|
|
return nil, fmt.Errorf("something failed to load model blah")
|
|
return nil, fmt.Errorf("something failed to load model blah")
|
|
}
|
|
}
|
|
gpus := gpu.GpuInfoList{}
|
|
gpus := gpu.GpuInfoList{}
|
|
- s.load(req, ggml, gpus)
|
|
|
|
|
|
+ s.load(req, ggml, gpus, 0)
|
|
require.Empty(t, req.successCh)
|
|
require.Empty(t, req.successCh)
|
|
require.Len(t, req.errCh, 1)
|
|
require.Len(t, req.errCh, 1)
|
|
s.loadedMu.Lock()
|
|
s.loadedMu.Lock()
|
|
@@ -61,10 +61,10 @@ func TestLoad(t *testing.T) {
|
|
require.Contains(t, err.Error(), "this model may be incompatible")
|
|
require.Contains(t, err.Error(), "this model may be incompatible")
|
|
|
|
|
|
server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}}
|
|
server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}}
|
|
- s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
|
|
|
|
|
|
+ s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
|
|
return server, nil
|
|
return server, nil
|
|
}
|
|
}
|
|
- s.load(req, ggml, gpus)
|
|
|
|
|
|
+ s.load(req, ggml, gpus, 0)
|
|
select {
|
|
select {
|
|
case err := <-req.errCh:
|
|
case err := <-req.errCh:
|
|
require.NoError(t, err)
|
|
require.NoError(t, err)
|
|
@@ -78,12 +78,12 @@ func TestLoad(t *testing.T) {
|
|
|
|
|
|
req.model.ModelPath = "dummy_model_path"
|
|
req.model.ModelPath = "dummy_model_path"
|
|
server.waitResp = fmt.Errorf("wait failure")
|
|
server.waitResp = fmt.Errorf("wait failure")
|
|
- s.load(req, ggml, gpus)
|
|
|
|
|
|
+ s.load(req, ggml, gpus, 0)
|
|
select {
|
|
select {
|
|
case err := <-req.errCh:
|
|
case err := <-req.errCh:
|
|
require.Contains(t, err.Error(), "wait failure")
|
|
require.Contains(t, err.Error(), "wait failure")
|
|
case resp := <-req.successCh:
|
|
case resp := <-req.successCh:
|
|
- t.Errorf("unexpected success %v", resp)
|
|
|
|
|
|
+ t.Fatalf("unexpected success %v", resp)
|
|
}
|
|
}
|
|
s.loadedMu.Lock()
|
|
s.loadedMu.Lock()
|
|
runner := s.loaded["dummy_model_path"]
|
|
runner := s.loaded["dummy_model_path"]
|
|
@@ -102,7 +102,7 @@ type bundle struct {
|
|
ggml *llm.GGML
|
|
ggml *llm.GGML
|
|
}
|
|
}
|
|
|
|
|
|
-func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
|
|
|
|
|
|
+func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
|
|
return scenario.srv, nil
|
|
return scenario.srv, nil
|
|
}
|
|
}
|
|
|
|
|
|
@@ -128,14 +128,14 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV
|
|
"tokenizer.ggml.scores": []float32{0},
|
|
"tokenizer.ggml.scores": []float32{0},
|
|
"tokenizer.ggml.token_type": []int32{0},
|
|
"tokenizer.ggml.token_type": []int32{0},
|
|
}, []llm.Tensor{
|
|
}, []llm.Tensor{
|
|
- {Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
|
|
|
|
- {Name: "output.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
|
|
|
|
|
|
+ {Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))},
|
|
|
|
+ {Name: "output.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))},
|
|
})
|
|
})
|
|
require.NoError(t, err)
|
|
require.NoError(t, err)
|
|
|
|
|
|
fname := f.Name()
|
|
fname := f.Name()
|
|
model := &Model{Name: modelName, ModelPath: fname}
|
|
model := &Model{Name: modelName, ModelPath: fname}
|
|
- scenario.ggml, err = llm.LoadModel(model.ModelPath)
|
|
|
|
|
|
+ scenario.ggml, err = llm.LoadModel(model.ModelPath, 0)
|
|
require.NoError(t, err)
|
|
require.NoError(t, err)
|
|
|
|
|
|
scenario.req = &LlmRequest{
|
|
scenario.req = &LlmRequest{
|
|
@@ -200,7 +200,7 @@ func TestRequests(t *testing.T) {
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, scenario1a.req.errCh)
|
|
require.Empty(t, scenario1a.req.errCh)
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- t.Errorf("timeout")
|
|
|
|
|
|
+ t.Fatal("timeout")
|
|
}
|
|
}
|
|
|
|
|
|
// Same runner as first request due to not needing a reload
|
|
// Same runner as first request due to not needing a reload
|
|
@@ -213,7 +213,7 @@ func TestRequests(t *testing.T) {
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, scenario1b.req.errCh)
|
|
require.Empty(t, scenario1b.req.errCh)
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- t.Errorf("timeout")
|
|
|
|
|
|
+ t.Fatal("timeout")
|
|
}
|
|
}
|
|
|
|
|
|
// Trigger a reload
|
|
// Trigger a reload
|
|
@@ -231,7 +231,7 @@ func TestRequests(t *testing.T) {
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, scenario2a.req.errCh)
|
|
require.Empty(t, scenario2a.req.errCh)
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- t.Errorf("timeout")
|
|
|
|
|
|
+ t.Fatal("timeout")
|
|
}
|
|
}
|
|
|
|
|
|
envconfig.MaxRunners = 1
|
|
envconfig.MaxRunners = 1
|
|
@@ -247,7 +247,7 @@ func TestRequests(t *testing.T) {
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, scenario3a.req.errCh)
|
|
require.Empty(t, scenario3a.req.errCh)
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- t.Errorf("timeout")
|
|
|
|
|
|
+ t.Fatal("timeout")
|
|
}
|
|
}
|
|
s.loadedMu.Lock()
|
|
s.loadedMu.Lock()
|
|
require.Len(t, s.loaded, 1)
|
|
require.Len(t, s.loaded, 1)
|
|
@@ -263,7 +263,7 @@ func TestRequests(t *testing.T) {
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, scenario3b.req.errCh)
|
|
require.Empty(t, scenario3b.req.errCh)
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- t.Errorf("timeout")
|
|
|
|
|
|
+ t.Fatal("timeout")
|
|
}
|
|
}
|
|
s.loadedMu.Lock()
|
|
s.loadedMu.Lock()
|
|
require.Len(t, s.loaded, 2)
|
|
require.Len(t, s.loaded, 2)
|
|
@@ -279,7 +279,7 @@ func TestRequests(t *testing.T) {
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, scenario3c.req.errCh)
|
|
require.Empty(t, scenario3c.req.errCh)
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- t.Errorf("timeout")
|
|
|
|
|
|
+ t.Fatal("timeout")
|
|
}
|
|
}
|
|
s.loadedMu.Lock()
|
|
s.loadedMu.Lock()
|
|
require.Len(t, s.loaded, 3)
|
|
require.Len(t, s.loaded, 3)
|
|
@@ -306,7 +306,7 @@ func TestRequests(t *testing.T) {
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, scenario3d.req.errCh)
|
|
require.Empty(t, scenario3d.req.errCh)
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- t.Errorf("timeout")
|
|
|
|
|
|
+ t.Fatal("timeout")
|
|
}
|
|
}
|
|
s.loadedMu.Lock()
|
|
s.loadedMu.Lock()
|
|
require.Len(t, s.loaded, 2)
|
|
require.Len(t, s.loaded, 2)
|
|
@@ -349,7 +349,7 @@ func TestGetRunner(t *testing.T) {
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, s.pendingReqCh)
|
|
require.Empty(t, errCh1a)
|
|
require.Empty(t, errCh1a)
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- t.Errorf("timeout")
|
|
|
|
|
|
+ t.Fatal("timeout")
|
|
}
|
|
}
|
|
scenario1a.ctxDone()
|
|
scenario1a.ctxDone()
|
|
s.loadedMu.Lock()
|
|
s.loadedMu.Lock()
|
|
@@ -400,7 +400,7 @@ func TestPrematureExpired(t *testing.T) {
|
|
slog.Info("sending premature expired event now")
|
|
slog.Info("sending premature expired event now")
|
|
s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe
|
|
s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- t.Errorf("timeout")
|
|
|
|
|
|
+ t.Fatal("timeout")
|
|
}
|
|
}
|
|
time.Sleep(scenario1a.req.sessionDuration)
|
|
time.Sleep(scenario1a.req.sessionDuration)
|
|
scenario1a.ctxDone()
|
|
scenario1a.ctxDone()
|
|
@@ -427,7 +427,7 @@ func TestUseLoadedRunner(t *testing.T) {
|
|
}
|
|
}
|
|
finished := make(chan *LlmRequest)
|
|
finished := make(chan *LlmRequest)
|
|
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
|
|
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
|
|
- r1 := &runnerRef{llama: llm1, sessionDuration: 1}
|
|
|
|
|
|
+ r1 := &runnerRef{llama: llm1, sessionDuration: 1, numParallel: 1}
|
|
req.useLoadedRunner(r1, finished)
|
|
req.useLoadedRunner(r1, finished)
|
|
require.Equal(t, uint(1), r1.refCount)
|
|
require.Equal(t, uint(1), r1.refCount)
|
|
require.Equal(t, time.Duration(2), r1.sessionDuration)
|
|
require.Equal(t, time.Duration(2), r1.sessionDuration)
|
|
@@ -435,7 +435,7 @@ func TestUseLoadedRunner(t *testing.T) {
|
|
case success := <-req.successCh:
|
|
case success := <-req.successCh:
|
|
require.Equal(t, r1, success)
|
|
require.Equal(t, r1, success)
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- t.Errorf("timeout")
|
|
|
|
|
|
+ t.Fatal("timeout")
|
|
}
|
|
}
|
|
done()
|
|
done()
|
|
fin := <-finished
|
|
fin := <-finished
|
|
@@ -461,8 +461,8 @@ func TestUpdateFreeSpace(t *testing.T) {
|
|
gpus[1].FreeMemory = 1900
|
|
gpus[1].FreeMemory = 1900
|
|
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}}
|
|
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}}
|
|
llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}}
|
|
llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}}
|
|
- r1 := &runnerRef{llama: llm1, gpus: gpus}
|
|
|
|
- r2 := &runnerRef{llama: llm2, gpus: gpus}
|
|
|
|
|
|
+ r1 := &runnerRef{llama: llm1, gpus: gpus, numParallel: 1}
|
|
|
|
+ r2 := &runnerRef{llama: llm2, gpus: gpus, numParallel: 1}
|
|
|
|
|
|
s := InitScheduler(ctx)
|
|
s := InitScheduler(ctx)
|
|
s.loadedMu.Lock()
|
|
s.loadedMu.Lock()
|
|
@@ -513,8 +513,8 @@ func TestFindRunnerToUnload(t *testing.T) {
|
|
ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
|
defer done()
|
|
defer done()
|
|
|
|
|
|
- r1 := &runnerRef{refCount: 1, sessionDuration: 1}
|
|
|
|
- r2 := &runnerRef{sessionDuration: 2}
|
|
|
|
|
|
+ r1 := &runnerRef{refCount: 1, sessionDuration: 1, numParallel: 1}
|
|
|
|
+ r2 := &runnerRef{sessionDuration: 2, numParallel: 1}
|
|
|
|
|
|
s := InitScheduler(ctx)
|
|
s := InitScheduler(ctx)
|
|
s.loadedMu.Lock()
|
|
s.loadedMu.Lock()
|
|
@@ -536,9 +536,13 @@ func TestNeedsReload(t *testing.T) {
|
|
llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
|
|
llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
|
|
do := api.DefaultOptions()
|
|
do := api.DefaultOptions()
|
|
runner := &runnerRef{
|
|
runner := &runnerRef{
|
|
- model: &Model{AdapterPaths: []string{"adapter1"}, ProjectorPaths: []string{"projector1"}},
|
|
|
|
- Options: &do,
|
|
|
|
- llama: llm,
|
|
|
|
|
|
+ model: &Model{
|
|
|
|
+ AdapterPaths: []string{"adapter1"},
|
|
|
|
+ ProjectorPaths: []string{"projector1"},
|
|
|
|
+ },
|
|
|
|
+ Options: &do,
|
|
|
|
+ llama: llm,
|
|
|
|
+ numParallel: 1,
|
|
}
|
|
}
|
|
req := &LlmRequest{
|
|
req := &LlmRequest{
|
|
model: &Model{
|
|
model: &Model{
|
|
@@ -581,8 +585,8 @@ func TestUnloadAllRunners(t *testing.T) {
|
|
s := InitScheduler(ctx)
|
|
s := InitScheduler(ctx)
|
|
s.unloadAllRunners()
|
|
s.unloadAllRunners()
|
|
|
|
|
|
- r1 := &runnerRef{llama: llm1}
|
|
|
|
- r2 := &runnerRef{llama: llm2}
|
|
|
|
|
|
+ r1 := &runnerRef{llama: llm1, numParallel: 1}
|
|
|
|
+ r2 := &runnerRef{llama: llm2, numParallel: 1}
|
|
|
|
|
|
s.loadedMu.Lock()
|
|
s.loadedMu.Lock()
|
|
s.loaded["a"] = r1
|
|
s.loaded["a"] = r1
|
|
@@ -596,14 +600,32 @@ func TestUnloadAllRunners(t *testing.T) {
|
|
|
|
|
|
func TestUnload(t *testing.T) {
|
|
func TestUnload(t *testing.T) {
|
|
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
|
|
llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
|
|
- r1 := &runnerRef{llama: llm1}
|
|
|
|
- r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}}
|
|
|
|
|
|
+ r1 := &runnerRef{llama: llm1, numParallel: 1}
|
|
|
|
+ r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}, numParallel: 1}
|
|
r1.unload()
|
|
r1.unload()
|
|
require.True(t, llm1.closeCalled)
|
|
require.True(t, llm1.closeCalled)
|
|
r2.unload()
|
|
r2.unload()
|
|
require.Nil(t, r2.model)
|
|
require.Nil(t, r2.model)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func TestAlreadyCanceled(t *testing.T) {
|
|
|
|
+ ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|
|
|
+ defer done()
|
|
|
|
+ dctx, done2 := context.WithCancel(ctx)
|
|
|
|
+ done2()
|
|
|
|
+ scenario1a := newScenario(t, dctx, "ollama-model-1", 10)
|
|
|
|
+ scenario1a.req.sessionDuration = 0
|
|
|
|
+ s := InitScheduler(ctx)
|
|
|
|
+ slog.Info("scenario1a")
|
|
|
|
+ s.pendingReqCh <- scenario1a.req
|
|
|
|
+ require.Len(t, s.pendingReqCh, 1)
|
|
|
|
+ s.Run(ctx)
|
|
|
|
+ time.Sleep(5 * time.Millisecond)
|
|
|
|
+ require.Empty(t, s.pendingReqCh)
|
|
|
|
+ require.Empty(t, scenario1a.req.errCh)
|
|
|
|
+ require.Empty(t, scenario1a.req.successCh)
|
|
|
|
+}
|
|
|
|
+
|
|
type mockLlm struct {
|
|
type mockLlm struct {
|
|
pingResp error
|
|
pingResp error
|
|
waitResp error
|
|
waitResp error
|