|
@@ -36,6 +36,8 @@ type blobUpload struct {
|
|
|
|
|
|
context.CancelFunc
|
|
context.CancelFunc
|
|
|
|
|
|
|
|
+ file *os.File
|
|
|
|
+
|
|
done bool
|
|
done bool
|
|
err error
|
|
err error
|
|
references atomic.Int32
|
|
references atomic.Int32
|
|
@@ -101,7 +103,7 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg
|
|
}
|
|
}
|
|
|
|
|
|
// set part.N to the current number of parts
|
|
// set part.N to the current number of parts
|
|
- b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size})
|
|
|
|
|
|
+ b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Hash: md5.New(), Offset: offset, Size: size})
|
|
offset += size
|
|
offset += size
|
|
}
|
|
}
|
|
|
|
|
|
@@ -123,6 +125,19 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
|
|
defer blobUploadManager.Delete(b.Digest)
|
|
defer blobUploadManager.Delete(b.Digest)
|
|
ctx, b.CancelFunc = context.WithCancel(ctx)
|
|
ctx, b.CancelFunc = context.WithCancel(ctx)
|
|
|
|
|
|
|
|
+ p, err := GetBlobsPath(b.Digest)
|
|
|
|
+ if err != nil {
|
|
|
|
+ b.err = err
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ b.file, err = os.Open(p)
|
|
|
|
+ if err != nil {
|
|
|
|
+ b.err = err
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ defer b.file.Close()
|
|
|
|
+
|
|
g, inner := errgroup.WithContext(ctx)
|
|
g, inner := errgroup.WithContext(ctx)
|
|
g.SetLimit(numUploadParts)
|
|
g.SetLimit(numUploadParts)
|
|
for i := range b.Parts {
|
|
for i := range b.Parts {
|
|
@@ -204,18 +219,8 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
|
|
headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
|
|
headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
|
|
}
|
|
}
|
|
|
|
|
|
- p, err := GetBlobsPath(b.Digest)
|
|
|
|
- if err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- f, err := os.Open(p)
|
|
|
|
- if err != nil {
|
|
|
|
- return err
|
|
|
|
- }
|
|
|
|
- defer f.Close()
|
|
|
|
-
|
|
|
|
- resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(io.NewSectionReader(f, part.Offset, part.Size), part), opts)
|
|
|
|
|
|
+ sr := io.NewSectionReader(b.file, part.Offset, part.Size)
|
|
|
|
+ resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, part), opts)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
@@ -337,13 +342,7 @@ func (p *blobUploadPart) Write(b []byte) (n int, err error) {
|
|
n = len(b)
|
|
n = len(b)
|
|
p.written += int64(n)
|
|
p.written += int64(n)
|
|
p.Completed.Add(int64(n))
|
|
p.Completed.Add(int64(n))
|
|
-
|
|
|
|
- if p.Hash == nil {
|
|
|
|
- p.Hash = md5.New()
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
p.Hash.Write(b)
|
|
p.Hash.Write(b)
|
|
-
|
|
|
|
return n, nil
|
|
return n, nil
|
|
}
|
|
}
|
|
|
|
|