浏览代码

Merge pull request #993 from jmorganca/mxyng/cleanup

cleanup upload and download errors
Michael Yang 1 年之前
父节点
当前提交
146072113d
共有 2 个文件被更改,包括 34 次插入31 次删除
  1. 3 2
      server/download.go
  2. 31 29
      server/upload.go

+ 3 - 2
server/download.go

@@ -149,9 +149,10 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis
 
 		i := i
 		g.Go(func() error {
+			var err error
 			for try := 0; try < maxRetries; try++ {
 				w := io.NewOffsetWriter(file, part.StartsAt())
-				err := b.downloadChunk(inner, requestURL, w, part, opts)
+				err = b.downloadChunk(inner, requestURL, w, part, opts)
 				switch {
 				case errors.Is(err, context.Canceled), errors.Is(err, syscall.ENOSPC):
 					// return immediately if the context is canceled or the device is out of space
@@ -164,7 +165,7 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis
 				}
 			}
 
-			return errMaxRetriesExceeded
+			return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
 		})
 	}
 

+ 31 - 29
server/upload.go

@@ -40,14 +40,6 @@ type blobUpload struct {
 	references atomic.Int32
 }
 
-type blobUploadPart struct {
-	// N is the part number
-	N      int
-	Offset int64
-	Size   int64
-	hash.Hash
-}
-
 const (
 	numUploadParts          = 64
 	minUploadPartSize int64 = 95 * 1000 * 1000
@@ -100,7 +92,7 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg
 		}
 
 		// set part.N to the current number of parts
-		b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size, Hash: md5.New()})
+		b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size})
 		offset += size
 	}
 
@@ -143,9 +135,10 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
 		case <-inner.Done():
 		case requestURL := <-b.nextURL:
 			g.Go(func() error {
+				var err error
 				for try := 0; try < maxRetries; try++ {
-					r := io.NewSectionReader(f, part.Offset, part.Size)
-					err := b.uploadChunk(inner, http.MethodPatch, requestURL, r, part, opts)
+					part.ReadSeeker = io.NewSectionReader(f, part.Offset, part.Size)
+					err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts)
 					switch {
 					case errors.Is(err, context.Canceled):
 						return err
@@ -159,7 +152,7 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
 					return nil
 				}
 
-				return errMaxRetriesExceeded
+				return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
 			})
 		}
 	}
@@ -197,7 +190,9 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
 	b.done = true
 }
 
-func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, rs io.ReadSeeker, part *blobUploadPart, opts *RegistryOptions) error {
+func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error {
+	part.Reset()
+
 	headers := make(http.Header)
 	headers.Set("Content-Type", "application/octet-stream")
 	headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
@@ -207,8 +202,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
 		headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
 	}
 
-	buw := blobUploadWriter{blobUpload: b}
-	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(rs, io.MultiWriter(&buw, part.Hash)), opts)
+	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(part.ReadSeeker, io.MultiWriter(part, part.Hash)), opts)
 	if err != nil {
 		return err
 	}
@@ -234,11 +228,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
 		}
 
 		for try := 0; try < maxRetries; try++ {
-			rs.Seek(0, io.SeekStart)
-			b.Completed.Add(-buw.written)
-			buw.written = 0
-			part.Hash = md5.New()
-			err := b.uploadChunk(ctx, http.MethodPut, redirectURL, rs, part, nil)
+			err = b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil)
 			switch {
 			case errors.Is(err, context.Canceled):
 				return err
@@ -252,7 +242,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
 			return nil
 		}
 
-		return errMaxRetriesExceeded
+		return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
 
 	case resp.StatusCode == http.StatusUnauthorized:
 		auth := resp.Header.Get("www-authenticate")
@@ -270,9 +260,6 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
 			return err
 		}
 
-		rs.Seek(0, io.SeekStart)
-		b.Completed.Add(-buw.written)
-		buw.written = 0
 		return fmt.Errorf("http status %d %s: %s", resp.StatusCode, resp.Status, body)
 	}
 
@@ -318,18 +305,33 @@ func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) er
 	}
 }
 
-type blobUploadWriter struct {
+type blobUploadPart struct {
+	// N is the part number
+	N      int
+	Offset int64
+	Size   int64
+	hash.Hash
+
 	written int64
+
+	io.ReadSeeker
 	*blobUpload
 }
 
-func (b *blobUploadWriter) Write(p []byte) (n int, err error) {
-	n = len(p)
-	b.written += int64(n)
-	b.Completed.Add(int64(n))
+func (p *blobUploadPart) Write(b []byte) (n int, err error) {
+	n = len(b)
+	p.written += int64(n)
+	p.Completed.Add(int64(n))
 	return n, nil
 }
 
+func (p *blobUploadPart) Reset() {
+	p.Seek(0, io.SeekStart)
+	p.Completed.Add(-int64(p.written))
+	p.written = 0
+	p.Hash = md5.New()
+}
+
 func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {
 	requestURL := mp.BaseURL()
 	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)