Jelajahi Sumber

split uploadBlobChunked

Michael Yang 1 tahun lalu
induk
melakukan
e53bc57d4d
1 mengubah file dengan 54 tambahan dan 53 penghapusan
  1. 54 53
      server/upload.go

+ 54 - 53
server/upload.go

@@ -70,62 +70,23 @@ func uploadBlobChunked(ctx context.Context, requestURL *url.URL, layer *Layer, r
 			chunk = int64(chunkSize)
 		}
 
-		sectionReader := io.NewSectionReader(f, int64(offset), chunk)
-
-		var errStatus error
-		for try := 0; try < MaxRetries; try++ {
-			errStatus = nil
-
-			headers := make(http.Header)
-			headers.Set("Content-Type", "application/octet-stream")
-			headers.Set("Content-Length", strconv.Itoa(int(chunk)))
-			headers.Set("Content-Range", fmt.Sprintf("%d-%d", offset, offset+sectionReader.Size()-1))
-			resp, err := makeRequest(ctx, "PATCH", requestURL, headers, io.TeeReader(sectionReader, &pw), regOpts)
-			if err != nil && !errors.Is(err, io.EOF) {
-				fn(api.ProgressResponse{
-					Status:    fmt.Sprintf("error uploading chunk: %v", err),
-					Digest:    layer.Digest,
-					Total:     layer.Size,
-					Completed: int(offset),
-				})
-
-				return err
-			}
-			defer resp.Body.Close()
-
-			switch {
-			case resp.StatusCode == http.StatusUnauthorized:
-				errStatus = errors.New("unauthorized")
-
-				auth := resp.Header.Get("www-authenticate")
-				authRedir := ParseAuthRedirectString(auth)
-				token, err := getAuthToken(ctx, authRedir)
-				if err != nil {
-					return err
-				}
-
-				regOpts.Token = token
-
-				pw.completed = int(offset)
-				sectionReader = io.NewSectionReader(f, offset, chunk)
-				continue
-			case resp.StatusCode >= http.StatusBadRequest:
-				body, _ := io.ReadAll(resp.Body)
-				return fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body)
-			}
-
-			offset += sectionReader.Size()
-			requestURL, err = url.Parse(resp.Header.Get("Location"))
-			if err != nil {
-				return err
-			}
-
-			break
+		resp, err := uploadBlobChunk(ctx, requestURL, f, offset, chunk, regOpts, &pw)
+		if err != nil {
+			fn(api.ProgressResponse{
+				Status:    fmt.Sprintf("error uploading limit: %v", err),
+				Digest:    layer.Digest,
+				Total:     layer.Size,
+				Completed: int(offset),
+			})
 		}
 
-		if errStatus != nil {
-			return fmt.Errorf("max retries exceeded: %w", errStatus)
+		offset += chunk
+		location, err := resp.Location()
+		if err != nil {
+			return err
 		}
+
+		requestURL = location
 	}
 
 	values := requestURL.Query()
@@ -151,6 +112,46 @@ func uploadBlobChunked(ctx context.Context, requestURL *url.URL, layer *Layer, r
 	return nil
 }
 
+func uploadBlobChunk(ctx context.Context, requestURL *url.URL, r io.ReaderAt, offset, limit int64, opts *RegistryOptions, pw *ProgressWriter) (*http.Response, error) {
+	sectionReader := io.NewSectionReader(r, int64(offset), limit)
+
+	headers := make(http.Header)
+	headers.Set("Content-Type", "application/octet-stream")
+	headers.Set("Content-Length", strconv.Itoa(int(limit)))
+	headers.Set("Content-Range", fmt.Sprintf("%d-%d", offset, offset+sectionReader.Size()-1))
+
+	for try := 0; try < MaxRetries; try++ {
+		resp, err := makeRequest(ctx, "PATCH", requestURL, headers, io.TeeReader(sectionReader, pw), opts)
+		if err != nil && !errors.Is(err, io.EOF) {
+			return nil, err
+		}
+		defer resp.Body.Close()
+
+		switch {
+		case resp.StatusCode == http.StatusUnauthorized:
+			auth := resp.Header.Get("www-authenticate")
+			authRedir := ParseAuthRedirectString(auth)
+			token, err := getAuthToken(ctx, authRedir)
+			if err != nil {
+				return nil, err
+			}
+
+			opts.Token = token
+
+			pw.completed = int(offset)
+			sectionReader = io.NewSectionReader(r, offset, limit)
+			continue
+		case resp.StatusCode >= http.StatusBadRequest:
+			body, _ := io.ReadAll(resp.Body)
+			return nil, fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body)
+		}
+
+		return resp, nil
+	}
+
+	return nil, fmt.Errorf("max retries exceeded")
+}
+
 type ProgressWriter struct {
 	status    string
 	digest    string