Browse Source

Merge pull request #253 from jmorganca/upload

use a pipe to push to registry with progress
Michael Yang 1 year ago
parent
commit
29b897f525
1 changed files with 41 additions and 54 deletions
  1. 41 54
      server/images.go

+ 41 - 54
server/images.go

@@ -906,71 +906,58 @@ func uploadBlobChunked(mp ModelPath, url string, layer *Layer, regOpts *Registry
 		return err
 	}
 
-	headers := make(map[string]string)
-	headers["Content-Type"] = "application/octet-stream"
+	totalUploaded := 0
 
-	chunkSize := 1 << 20
-	buf := make([]byte, chunkSize)
-	var totalUploaded int
+	r, w := io.Pipe()
+	defer r.Close()
 
-	for {
-		n, err := f.Read(buf)
-		if err != nil {
-			return err
-		}
-
-		headers["Content-Length"] = fmt.Sprintf("%d", n)
-		headers["Content-Range"] = fmt.Sprintf("%d-%d", totalUploaded, totalUploaded+n-1)
-
-		fn(api.ProgressResponse{
-			Status:    fmt.Sprintf("uploading %s", layer.Digest),
-			Digest:    layer.Digest,
-			Total:     int(layer.Size),
-			Completed: int(totalUploaded),
-		})
+	go func() {
+		defer w.Close()
+		for {
+			n, err := io.CopyN(w, f, 1024*1024)
+			if err != nil && !errors.Is(err, io.EOF) {
+				fn(api.ProgressResponse{
+					Status:    fmt.Sprintf("error copying pipe: %v", err),
+					Digest:    layer.Digest,
+					Total:     layer.Size,
+					Completed: totalUploaded,
+				})
+				return
+			}
 
-		// change the buffersize for the last chunk
-		if n < chunkSize {
-			buf = buf[:n]
-		}
-		resp, err := makeRequest("PATCH", url, headers, bytes.NewReader(buf), regOpts)
-		if err != nil {
-			log.Printf("couldn't upload blob: %v", err)
-			return err
-		}
-		defer resp.Body.Close()
-		url = resp.Header.Get("Location")
+			totalUploaded += int(n)
 
-		// Check for success: For a successful upload, the Docker registry will respond with a 201 Created
-		if resp.StatusCode != http.StatusAccepted {
 			fn(api.ProgressResponse{
-				Status:    "error uploading layer",
+				Status:    fmt.Sprintf("uploading %s", layer.Digest),
 				Digest:    layer.Digest,
-				Total:     int(layer.Size),
-				Completed: int(totalUploaded),
+				Total:     layer.Size,
+				Completed: totalUploaded,
 			})
-			body, _ := io.ReadAll(resp.Body)
-			return fmt.Errorf("on layer upload registry responded with code %d: %v", resp.StatusCode, string(body))
+
+			if totalUploaded >= layer.Size {
+				return
+			}
 		}
+	}()
 
-		totalUploaded += n
-		if totalUploaded >= layer.Size {
-			url = fmt.Sprintf("%s&digest=%s", url, layer.Digest)
+	url = fmt.Sprintf("%s&digest=%s", url, layer.Digest)
 
-			// finish the upload
-			resp, err := makeRequest("PUT", url, nil, nil, regOpts)
-			if err != nil {
-				log.Printf("couldn't finish upload: %v", err)
-				return err
-			}
-			defer resp.Body.Close()
+	headers := make(map[string]string)
+	headers["Content-Type"] = "application/octet-stream"
+	headers["Content-Range"] = fmt.Sprintf("0-%d", layer.Size-1)
+	headers["Content-Length"] = strconv.Itoa(int(layer.Size))
 
-			if resp.StatusCode != http.StatusCreated {
-				body, _ := io.ReadAll(resp.Body)
-				return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body))
-			}
-			break
-		}
+	// finish the upload
+	resp, err := makeRequest("PUT", url, headers, r, regOpts)
+	if err != nil {
+		log.Printf("couldn't finish upload: %v", err)
+		return err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusCreated {
+		body, _ := io.ReadAll(resp.Body)
+		return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body))
 	}
 	return nil
 }