max_queue_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. //go:build integration
  2. package integration
  3. import (
  4. "context"
  5. "errors"
  6. "fmt"
  7. "log/slog"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "testing"
  13. "time"
  14. "github.com/ollama/ollama/api"
  15. "github.com/stretchr/testify/require"
  16. )
  17. func TestMaxQueue(t *testing.T) {
  18. // Note: This test can be quite slow when running in CPU mode, so keep the threadCount low unless your on GPU
  19. // Also note that by default Darwin can't sustain > ~128 connections without adjusting limits
  20. threadCount := 32
  21. mq := os.Getenv("OLLAMA_MAX_QUEUE")
  22. if mq != "" {
  23. var err error
  24. threadCount, err = strconv.Atoi(mq)
  25. require.NoError(t, err)
  26. } else {
  27. os.Setenv("OLLAMA_MAX_QUEUE", fmt.Sprintf("%d", threadCount))
  28. }
  29. req := api.GenerateRequest{
  30. Model: "orca-mini",
  31. Prompt: "write a long historical fiction story about christopher columbus. use at least 10 facts from his actual journey",
  32. Options: map[string]interface{}{
  33. "seed": 42,
  34. "temperature": 0.0,
  35. },
  36. }
  37. resp := []string{"explore", "discover", "ocean"}
  38. // CPU mode takes much longer at the limit with a large queue setting
  39. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  40. defer cancel()
  41. client, _, cleanup := InitServerConnection(ctx, t)
  42. defer cleanup()
  43. require.NoError(t, PullIfMissing(ctx, client, req.Model))
  44. // Context for the worker threads so we can shut them down
  45. // embedCtx, embedCancel := context.WithCancel(ctx)
  46. embedCtx := ctx
  47. var genwg sync.WaitGroup
  48. go func() {
  49. genwg.Add(1)
  50. defer genwg.Done()
  51. slog.Info("Starting generate request")
  52. DoGenerate(ctx, t, client, req, resp, 45*time.Second, 5*time.Second)
  53. slog.Info("generate completed")
  54. }()
  55. // Give the generate a chance to get started before we start hammering on embed requests
  56. time.Sleep(5 * time.Millisecond)
  57. threadCount += 10 // Add a few extra to ensure we push the queue past its limit
  58. busyCount := 0
  59. resetByPeerCount := 0
  60. canceledCount := 0
  61. succesCount := 0
  62. counterMu := sync.Mutex{}
  63. var embedwg sync.WaitGroup
  64. for i := 0; i < threadCount; i++ {
  65. go func(i int) {
  66. embedwg.Add(1)
  67. defer embedwg.Done()
  68. slog.Info("embed started", "id", i)
  69. embedReq := api.EmbeddingRequest{
  70. Model: req.Model,
  71. Prompt: req.Prompt,
  72. Options: req.Options,
  73. }
  74. // Fresh client for every request
  75. client, _ = GetTestEndpoint()
  76. resp, genErr := client.Embeddings(embedCtx, &embedReq)
  77. counterMu.Lock()
  78. defer counterMu.Unlock()
  79. switch {
  80. case genErr == nil:
  81. succesCount++
  82. require.Greater(t, len(resp.Embedding), 5) // somewhat arbitrary, but sufficient to be reasonable
  83. case errors.Is(genErr, context.Canceled):
  84. canceledCount++
  85. case strings.Contains(genErr.Error(), "busy"):
  86. busyCount++
  87. case strings.Contains(genErr.Error(), "connection reset by peer"):
  88. resetByPeerCount++
  89. default:
  90. require.NoError(t, genErr, "%d request failed", i)
  91. }
  92. slog.Info("embed finished", "id", i)
  93. }(i)
  94. }
  95. genwg.Wait()
  96. slog.Info("generate done, waiting for embeds")
  97. embedwg.Wait()
  98. require.Equal(t, resetByPeerCount, 0, "Connections reset by peer, have you updated your fd and socket limits?")
  99. require.True(t, busyCount > 0, "no requests hit busy error but some should have")
  100. require.True(t, canceledCount == 0, "no requests should have been canceled due to timeout")
  101. slog.Info("embeds completed", "success", succesCount, "busy", busyCount, "reset", resetByPeerCount, "canceled", canceledCount)
  102. }