浏览代码

change push to chunked uploads from monolithic

Patrick Devine 1 年之前
父节点
当前提交
402babdad0
共有 2 个文件被更改,包括 94 次插入52 次删除
  1. 17 1
      cmd/cmd.go
  2. 77 51
      server/images.go

+ 17 - 1
cmd/cmd.go

@@ -94,9 +94,25 @@ func PushHandler(cmd *cobra.Command, args []string) error {
 		return err
 	}
 
+	var currentDigest string
+	var bar *progressbar.ProgressBar
+
 	request := api.PushRequest{Name: args[0], Insecure: insecure}
 	fn := func(resp api.ProgressResponse) error {
-		fmt.Println(resp.Status)
+		if resp.Digest != currentDigest && resp.Digest != "" {
+			currentDigest = resp.Digest
+			bar = progressbar.DefaultBytes(
+				int64(resp.Total),
+				fmt.Sprintf("pushing %s...", resp.Digest[7:19]),
+			)
+
+			bar.Set(resp.Completed)
+		} else if resp.Digest == currentDigest && resp.Digest != "" {
+			bar.Set(resp.Completed)
+		} else {
+			currentDigest = ""
+			fmt.Println(resp.Status)
+		}
 		return nil
 	}
 

+ 77 - 51
server/images.go

@@ -582,14 +582,10 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon
 	}
 
 	var layers []*Layer
-	var total int
-	var completed int
 	for _, layer := range manifest.Layers {
 		layers = append(layers, layer)
-		total += layer.Size
 	}
 	layers = append(layers, &manifest.Config)
-	total += manifest.Config.Size
 
 	for _, layer := range layers {
 		exists, err := checkBlobExistence(mp, layer.Digest, regOpts)
@@ -598,21 +594,20 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon
 		}
 
 		if exists {
-			completed += layer.Size
 			fn(api.ProgressResponse{
 				Status:    "using existing layer",
 				Digest:    layer.Digest,
-				Total:     total,
-				Completed: completed,
+				Total:     layer.Size,
+				Completed: layer.Size,
 			})
+			log.Printf("Layer %s already exists", layer.Digest)
 			continue
 		}
 
 		fn(api.ProgressResponse{
-			Status:    "starting upload",
-			Digest:    layer.Digest,
-			Total:     total,
-			Completed: completed,
+			Status: "starting upload",
+			Digest: layer.Digest,
+			Total:  layer.Size,
 		})
 
 		location, err := startUpload(mp, regOpts)
@@ -621,25 +616,14 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon
 			return err
 		}
 
-		err = uploadBlob(location, layer, regOpts)
+		err = uploadBlobChunked(mp, location, layer, regOpts, fn)
 		if err != nil {
 			log.Printf("error uploading blob: %v", err)
 			return err
 		}
-		completed += layer.Size
-		fn(api.ProgressResponse{
-			Status:    "upload complete",
-			Digest:    layer.Digest,
-			Total:     total,
-			Completed: completed,
-		})
 	}
 
-	fn(api.ProgressResponse{
-		Status:    "pushing manifest",
-		Total:     total,
-		Completed: completed,
-	})
+	fn(api.ProgressResponse{Status: "pushing manifest"})
 	url := fmt.Sprintf("%s/v2/%s/manifests/%s", mp.Registry, mp.GetNamespaceRepository(), mp.Tag)
 	headers := map[string]string{
 		"Content-Type": "application/vnd.docker.distribution.manifest.v2+json",
@@ -662,11 +646,7 @@ func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressRespon
 		return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
 	}
 
-	fn(api.ProgressResponse{
-		Status:    "success",
-		Total:     total,
-		Completed: completed,
-	})
+	fn(api.ProgressResponse{Status: "success"})
 
 	return nil
 }
@@ -828,19 +808,14 @@ func checkBlobExistence(mp ModelPath, digest string, regOpts *RegistryOptions) (
 	return resp.StatusCode == http.StatusOK, nil
 }
 
-func uploadBlob(location string, layer *Layer, regOpts *RegistryOptions) error {
-	// Create URL
-	url := fmt.Sprintf("%s&digest=%s", location, layer.Digest)
-
-	headers := make(map[string]string)
-	headers["Content-Length"] = fmt.Sprintf("%d", layer.Size)
-	headers["Content-Type"] = "application/octet-stream"
-
-	// TODO change from monolithic uploads to chunked uploads
+func uploadBlobChunked(mp ModelPath, location string, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error {
 	// TODO allow resumability
 	// TODO allow canceling uploads via DELETE
 	// TODO allow cross repo blob mount
 
+	// Create URL
+	url := fmt.Sprintf("%s", location)
+
 	fp, err := GetBlobsPath(layer.Digest)
 	if err != nil {
 		return err
@@ -851,19 +826,72 @@ func uploadBlob(location string, layer *Layer, regOpts *RegistryOptions) error {
 		return err
 	}
 
-	resp, err := makeRequest("PUT", url, headers, f, regOpts)
-	if err != nil {
-		log.Printf("couldn't upload blob: %v", err)
-		return err
-	}
-	defer resp.Body.Close()
+	headers := make(map[string]string)
+	headers["Content-Type"] = "application/octet-stream"
 
-	// Check for success: For a successful upload, the Docker registry will respond with a 201 Created
-	if resp.StatusCode != http.StatusCreated {
-		body, _ := io.ReadAll(resp.Body)
-		return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
-	}
+	chunkSize := 1 << 20
+	buf := make([]byte, chunkSize)
+	var totalUploaded int
 
+	for {
+		n, err := f.Read(buf)
+		if err != nil {
+			return err
+		}
+
+		headers["Content-Length"] = fmt.Sprintf("%d", n)
+		headers["Content-Range"] = fmt.Sprintf("%d-%d", totalUploaded, totalUploaded+n-1)
+
+		fn(api.ProgressResponse{
+			Status:    fmt.Sprintf("uploading %s", layer.Digest),
+			Digest:    layer.Digest,
+			Total:     int(layer.Size),
+			Completed: int(totalUploaded),
+		})
+
+		// change the buffersize for the last chunk
+		if n < chunkSize {
+			buf = buf[:n]
+		}
+		resp, err := makeRequest("PATCH", url, headers, bytes.NewReader(buf), regOpts)
+		if err != nil {
+			log.Printf("couldn't upload blob: %v", err)
+			return err
+		}
+		defer resp.Body.Close()
+		url = resp.Header.Get("Location")
+
+		// Check for success: For a successful upload, the Docker registry will respond with a 201 Created
+		if resp.StatusCode != http.StatusAccepted {
+			fn(api.ProgressResponse{
+				Status:    fmt.Sprintf("error uploading layer"),
+				Digest:    layer.Digest,
+				Total:     int(layer.Size),
+				Completed: int(totalUploaded),
+			})
+			body, _ := io.ReadAll(resp.Body)
+			return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
+		}
+
+		totalUploaded += n
+		if totalUploaded >= layer.Size {
+			url = fmt.Sprintf("%s&digest=%s", url, layer.Digest)
+
+			// finish the upload
+			resp, err := makeRequest("PUT", url, nil, nil, regOpts)
+			if err != nil {
+				log.Printf("couldn't finish upload: %v", err)
+				return err
+			}
+			defer resp.Body.Close()
+
+			if resp.StatusCode != http.StatusCreated {
+				body, _ := io.ReadAll(resp.Body)
+				return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
+			}
+			break
+		}
+	}
 	return nil
 }
 
@@ -974,8 +1002,6 @@ func makeRequest(method, url string, headers map[string]string, body io.Reader,
 		}
 	}
 
-	log.Printf("url = %s", url)
-
 	req, err := http.NewRequest(method, url, body)
 	if err != nil {
 		return nil, err