|
@@ -36,7 +36,7 @@ type blobDownload struct {
|
|
|
|
|
|
done chan struct{}
|
|
done chan struct{}
|
|
context.CancelFunc
|
|
context.CancelFunc
|
|
- refCount atomic.Int32
|
|
|
|
|
|
+ references atomic.Int32
|
|
}
|
|
}
|
|
|
|
|
|
type blobDownloadPart struct {
|
|
type blobDownloadPart struct {
|
|
@@ -241,8 +241,19 @@ func (b *blobDownload) Write(p []byte) (n int, err error) {
|
|
return n, nil
|
|
return n, nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (b *blobDownload) acquire() {
|
|
|
|
+ b.references.Add(1)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (b *blobDownload) release() {
|
|
|
|
+ if b.references.Add(-1) == 0 {
|
|
|
|
+ b.CancelFunc()
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
|
|
func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
|
|
- b.refCount.Add(1)
|
|
|
|
|
|
+ b.acquire()
|
|
|
|
+ defer b.release()
|
|
|
|
|
|
ticker := time.NewTicker(60 * time.Millisecond)
|
|
ticker := time.NewTicker(60 * time.Millisecond)
|
|
for {
|
|
for {
|
|
@@ -253,10 +264,6 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse))
|
|
}
|
|
}
|
|
case <-ticker.C:
|
|
case <-ticker.C:
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
- if b.refCount.Add(-1) == 0 {
|
|
|
|
- b.CancelFunc()
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
return ctx.Err()
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
|