|
@@ -2,13 +2,16 @@ package server
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
+ "crypto/md5"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ "hash"
|
|
|
"io"
|
|
|
"log"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
"os"
|
|
|
+ "strings"
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
@@ -42,6 +45,7 @@ type blobUploadPart struct {
|
|
|
N int
|
|
|
Offset int64
|
|
|
Size int64
|
|
|
+ hash.Hash
|
|
|
}
|
|
|
|
|
|
const (
|
|
@@ -96,7 +100,7 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg
|
|
|
}
|
|
|
|
|
|
// set part.N to the current number of parts
|
|
|
- b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size})
|
|
|
+ b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size, Hash: md5.New()})
|
|
|
offset += size
|
|
|
}
|
|
|
|
|
@@ -167,8 +171,16 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
|
|
|
|
|
|
requestURL := <-b.nextURL
|
|
|
|
|
|
+ var sb strings.Builder
|
|
|
+ for _, part := range b.Parts {
|
|
|
+ sb.Write(part.Sum(nil))
|
|
|
+ }
|
|
|
+
|
|
|
+ md5sum := md5.Sum([]byte(sb.String()))
|
|
|
+
|
|
|
values := requestURL.Query()
|
|
|
values.Add("digest", b.Digest)
|
|
|
+ values.Add("etag", fmt.Sprintf("%x-%d", md5sum, len(b.Parts)))
|
|
|
requestURL.RawQuery = values.Encode()
|
|
|
|
|
|
headers := make(http.Header)
|
|
@@ -196,7 +208,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
|
|
|
}
|
|
|
|
|
|
buw := blobUploadWriter{blobUpload: b}
|
|
|
- resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(rs, &buw), opts)
|
|
|
+ resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(rs, io.MultiWriter(&buw, part.Hash)), opts)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -225,6 +237,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
|
|
|
rs.Seek(0, io.SeekStart)
|
|
|
b.Completed.Add(-buw.written)
|
|
|
buw.written = 0
|
|
|
+ part.Hash = md5.New()
|
|
|
err := b.uploadChunk(ctx, http.MethodPut, redirectURL, rs, part, nil)
|
|
|
switch {
|
|
|
case errors.Is(err, context.Canceled):
|