瀏覽代碼

use LimitGroup for uploads

Michael Yang 1 年之前
父節點
當前提交
bea007deb7
共有 1 個文件被更改,包括 31 次插入3 次删除
  1. 31 3
      server/upload.go

+ 31 - 3
server/upload.go

@@ -18,7 +18,6 @@ import (
 
 	"github.com/jmorganca/ollama/api"
 	"github.com/jmorganca/ollama/format"
-	"golang.org/x/sync/errgroup"
 )
 
 var blobUploadManager sync.Map
@@ -137,8 +136,37 @@ func (b *blobUpload) Run(ctx context.Context, opts *registryOptions) {
 	}
 	defer b.file.Close()
 
-	g, inner := errgroup.WithContext(ctx)
-	g.SetLimit(numUploadParts)
+	g, inner := NewLimitGroup(ctx, numUploadParts)
+
+	go func() {
+		ticker := time.NewTicker(time.Second)
+		var n int64 = 1
+		var maxDelta float64
+		var buckets []int64
+		for {
+			select {
+			case <-ticker.C:
+				buckets = append(buckets, b.Completed.Load())
+				if len(buckets) < 2 {
+					continue
+				} else if len(buckets) > 10 {
+					buckets = buckets[1:]
+				}
+
+				delta := float64((buckets[len(buckets)-1] - buckets[0])) / float64(len(buckets))
+				slog.Debug(fmt.Sprintf("delta: %s/s max_delta: %s/s", format.HumanBytes(int64(delta)), format.HumanBytes(int64(maxDelta))))
+				if delta > maxDelta*1.5 {
+					maxDelta = delta
+					g.SetLimit(n)
+					n++
+				}
+
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
+
 	for i := range b.Parts {
 		part := &b.Parts[i]
 		select {