Browse Source

chunked pipe

Michael Yang 1 year ago
parent
commit
865fceb73c
1 changed files with 75 additions and 35 deletions
  1. 75 35
      server/upload.go

+ 75 - 35
server/upload.go

@@ -55,49 +55,89 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, requestURL *url.URL, l
 	}
 	}
 	defer f.Close()
 	defer f.Close()
 
 
-	var completed int64
 	// 95MB chunk size
 	// 95MB chunk size
 	chunkSize := 95 * 1024 * 1024
 	chunkSize := 95 * 1024 * 1024
 
 
-	for {
-		chunk := int64(layer.Size) - completed
+	for offset := int64(0); offset < int64(layer.Size); {
+		chunk := int64(layer.Size) - offset
 		if chunk > int64(chunkSize) {
 		if chunk > int64(chunkSize) {
 			chunk = int64(chunkSize)
 			chunk = int64(chunkSize)
 		}
 		}
 
 
-		sectionReader := io.NewSectionReader(f, int64(completed), chunk)
-
-		headers := make(http.Header)
-		headers.Set("Content-Type", "application/octet-stream")
-		headers.Set("Content-Length", strconv.Itoa(int(chunk)))
-		headers.Set("Content-Range", fmt.Sprintf("%d-%d", completed, completed+sectionReader.Size()-1))
-		resp, err := makeRequestWithRetry(ctx, "PATCH", requestURL, headers, sectionReader, regOpts)
-		if err != nil && !errors.Is(err, io.EOF) {
-			fn(api.ProgressResponse{
-				Status:    fmt.Sprintf("error uploading chunk: %v", err),
-				Digest:    layer.Digest,
-				Total:     layer.Size,
-				Completed: int(completed),
-			})
-
-			return err
-		}
-		defer resp.Body.Close()
-
-		completed += sectionReader.Size()
-		fn(api.ProgressResponse{
-			Status:    fmt.Sprintf("uploading %s", layer.Digest),
-			Digest:    layer.Digest,
-			Total:     layer.Size,
-			Completed: int(completed),
-		})
-
-		requestURL, err = url.Parse(resp.Header.Get("Location"))
-		if err != nil {
-			return err
-		}
+		sectionReader := io.NewSectionReader(f, int64(offset), chunk)
+		for try := 0; try < MaxRetries; try++ {
+			r, w := io.Pipe()
+			defer r.Close()
+			go func() {
+				defer w.Close()
+
+				for chunked := int64(0); chunked < chunk; {
+					n, err := io.CopyN(w, sectionReader, 1024*1024)
+					if err != nil && !errors.Is(err, io.EOF) {
+						fn(api.ProgressResponse{
+							Status:    fmt.Sprintf("error reading chunk: %v", err),
+							Digest:    layer.Digest,
+							Total:     layer.Size,
+							Completed: int(offset),
+						})
+
+						return
+					}
+
+					chunked += n
+					fn(api.ProgressResponse{
+						Status:    fmt.Sprintf("uploading %s", layer.Digest),
+						Digest:    layer.Digest,
+						Total:     layer.Size,
+						Completed: int(offset) + int(chunked),
+					})
+				}
+			}()
+
+			headers := make(http.Header)
+			headers.Set("Content-Type", "application/octet-stream")
+			headers.Set("Content-Length", strconv.Itoa(int(chunk)))
+			headers.Set("Content-Range", fmt.Sprintf("%d-%d", offset, offset+sectionReader.Size()-1))
+			resp, err := makeRequest(ctx, "PATCH", requestURL, headers, r, regOpts)
+			if err != nil && !errors.Is(err, io.EOF) {
+				fn(api.ProgressResponse{
+					Status:    fmt.Sprintf("error uploading chunk: %v", err),
+					Digest:    layer.Digest,
+					Total:     layer.Size,
+					Completed: int(offset),
+				})
+
+				return err
+			}
+			defer resp.Body.Close()
+
+			switch resp.StatusCode {
+			case http.StatusAccepted, http.StatusCreated:
+			case http.StatusUnauthorized:
+				auth := resp.Header.Get("www-authenticate")
+				authRedir := ParseAuthRedirectString(auth)
+				token, err := getAuthToken(ctx, authRedir, regOpts)
+				if err != nil {
+					return err
+				}
+
+				regOpts.Token = token
+				if _, err := sectionReader.Seek(0, io.SeekStart); err != nil {
+					return err
+				}
+
+				continue
+			default:
+				body, _ := io.ReadAll(resp.Body)
+				return fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body)
+			}
+
+			offset += sectionReader.Size()
+			requestURL, err = url.Parse(resp.Header.Get("Location"))
+			if err != nil {
+				return err
+			}
 
 
-		if completed >= int64(layer.Size) {
 			break
 			break
 		}
 		}
 	}
 	}