소스 검색

runner.go: Fix deadlock with many concurrent requests

If there are no avilable slots for new sequences then a request
will not be added to the processing queue but will continue on
to wait for a response that never comes. Besides never giving a
response to the request, this prevents the model from being
unloaded due to the outstanding request.

To prevent this, there are semaphores that prevent more requests
from being processed than there are slots - one in the Ollama
server and one in the runner.
 - The Ollama server one works but it is not designed to protect
the runner's data internal structures and the runner can return a
final response before clearing its data structures.
 - The internal runner semaphore has similar behavior where it
 can release the semaphore when it issues a response. This is
 wrong - it should only release the semaphore after it has
 cleared the data structure.

In addition, we should return an error if a slot is not found
rather than deadlocking in the event we ever get to this spot.

Fixes #7779
Jesse Gross 5 달 전
부모
커밋
3478b2cf14
1개의 변경된 파일17개의 추가작업 그리고 4개의 파일을 삭제
  1. 17 4
      llama/runner/runner.go

+ 17 - 4
llama/runner/runner.go

@@ -300,6 +300,7 @@ func (s *Server) removeSequence(seqIndex int, reason string) {
 	close(seq.embedding)
 	close(seq.embedding)
 	seq.cache.InUse = false
 	seq.cache.InUse = false
 	s.seqs[seqIndex] = nil
 	s.seqs[seqIndex] = nil
+	s.seqsSem.Release(1)
 }
 }
 
 
 func (s *Server) run(ctx context.Context) {
 func (s *Server) run(ctx context.Context) {
@@ -649,7 +650,7 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 
 
-	// Ensure that a place to put the sequence is available
+	// Ensure there is a place to put the sequence, released when removed from s.seqs
 	if err := s.seqsSem.Acquire(r.Context(), 1); err != nil {
 	if err := s.seqsSem.Acquire(r.Context(), 1); err != nil {
 		if errors.Is(err, context.Canceled) {
 		if errors.Is(err, context.Canceled) {
 			slog.Info("aborting completion request due to client closing the connection")
 			slog.Info("aborting completion request due to client closing the connection")
@@ -658,9 +659,9 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
 		}
 		}
 		return
 		return
 	}
 	}
-	defer s.seqsSem.Release(1)
 
 
 	s.mu.Lock()
 	s.mu.Lock()
+	found := false
 	for i, sq := range s.seqs {
 	for i, sq := range s.seqs {
 		if sq == nil {
 		if sq == nil {
 			seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, req.CachePrompt)
 			seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, req.CachePrompt)
@@ -674,11 +675,17 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
 
 
 			s.seqs[i] = seq
 			s.seqs[i] = seq
 			s.cond.Signal()
 			s.cond.Signal()
+			found = true
 			break
 			break
 		}
 		}
 	}
 	}
 	s.mu.Unlock()
 	s.mu.Unlock()
 
 
+	if !found {
+		http.Error(w, "could not find an available sequence", http.StatusInternalServerError)
+		return
+	}
+
 	for {
 	for {
 		select {
 		select {
 		case <-r.Context().Done():
 		case <-r.Context().Done():
@@ -742,7 +749,7 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 
 
-	// Ensure that a place to put the sequence is available
+	// Ensure there is a place to put the sequence, released when removed from s.seqs
 	if err := s.seqsSem.Acquire(r.Context(), 1); err != nil {
 	if err := s.seqsSem.Acquire(r.Context(), 1); err != nil {
 		if errors.Is(err, context.Canceled) {
 		if errors.Is(err, context.Canceled) {
 			slog.Info("aborting embeddings request due to client closing the connection")
 			slog.Info("aborting embeddings request due to client closing the connection")
@@ -751,9 +758,9 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) {
 		}
 		}
 		return
 		return
 	}
 	}
-	defer s.seqsSem.Release(1)
 
 
 	s.mu.Lock()
 	s.mu.Lock()
+	found := false
 	for i, sq := range s.seqs {
 	for i, sq := range s.seqs {
 		if sq == nil {
 		if sq == nil {
 			seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, req.CachePrompt)
 			seq.cache, seq.inputs, err = s.cache.LoadCacheSlot(seq.inputs, req.CachePrompt)
@@ -764,11 +771,17 @@ func (s *Server) embeddings(w http.ResponseWriter, r *http.Request) {
 			}
 			}
 			s.seqs[i] = seq
 			s.seqs[i] = seq
 			s.cond.Signal()
 			s.cond.Signal()
+			found = true
 			break
 			break
 		}
 		}
 	}
 	}
 	s.mu.Unlock()
 	s.mu.Unlock()
 
 
+	if !found {
+		http.Error(w, "could not find an available sequence", http.StatusInternalServerError)
+		return
+	}
+
 	embedding := <-seq.embedding
 	embedding := <-seq.embedding
 
 
 	if err := json.NewEncoder(w).Encode(&EmbeddingResponse{
 	if err := json.NewEncoder(w).Encode(&EmbeddingResponse{