|
@@ -963,18 +963,12 @@ func PushModel(ctx context.Context, name string, regOpts *RegistryOptions, fn fu
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
|
|
- resp, err := makeRequest(ctx, "PUT", url, headers, bytes.NewReader(manifestJSON), regOpts)
|
|
|
|
|
|
+ resp, err := makeRequestWithRetry(ctx, "PUT", url, headers, bytes.NewReader(manifestJSON), regOpts)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
defer resp.Body.Close()
|
|
defer resp.Body.Close()
|
|
|
|
|
|
- // Check for success: For a successful upload, the Docker registry will respond with a 201 Created
|
|
|
|
- if resp.StatusCode != http.StatusCreated {
|
|
|
|
- body, _ := io.ReadAll(resp.Body)
|
|
|
|
- return fmt.Errorf("on push registry responded with code %d: %v", resp.StatusCode, string(body))
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
fn(api.ProgressResponse{Status: "success"})
|
|
fn(api.ProgressResponse{Status: "success"})
|
|
|
|
|
|
return nil
|
|
return nil
|
|
@@ -1116,43 +1110,18 @@ func GetSHA256Digest(r io.Reader) (string, int) {
|
|
type requestContextKey string
|
|
type requestContextKey string
|
|
|
|
|
|
func startUpload(ctx context.Context, mp ModelPath, layer *Layer, regOpts *RegistryOptions) (string, error) {
|
|
func startUpload(ctx context.Context, mp ModelPath, layer *Layer, regOpts *RegistryOptions) (string, error) {
|
|
- retry, _ := ctx.Value(requestContextKey("retry")).(int)
|
|
|
|
-
|
|
|
|
url := fmt.Sprintf("%s/v2/%s/blobs/uploads/", mp.Registry, mp.GetNamespaceRepository())
|
|
url := fmt.Sprintf("%s/v2/%s/blobs/uploads/", mp.Registry, mp.GetNamespaceRepository())
|
|
if layer.From != "" {
|
|
if layer.From != "" {
|
|
url = fmt.Sprintf("%s/v2/%s/blobs/uploads/?mount=%s&from=%s", mp.Registry, mp.GetNamespaceRepository(), layer.Digest, layer.From)
|
|
url = fmt.Sprintf("%s/v2/%s/blobs/uploads/?mount=%s&from=%s", mp.Registry, mp.GetNamespaceRepository(), layer.Digest, layer.From)
|
|
}
|
|
}
|
|
|
|
|
|
- resp, err := makeRequest(ctx, "POST", url, nil, nil, regOpts)
|
|
|
|
|
|
+ resp, err := makeRequestWithRetry(ctx, "POST", url, nil, nil, regOpts)
|
|
if err != nil {
|
|
if err != nil {
|
|
log.Printf("couldn't start upload: %v", err)
|
|
log.Printf("couldn't start upload: %v", err)
|
|
return "", err
|
|
return "", err
|
|
}
|
|
}
|
|
defer resp.Body.Close()
|
|
defer resp.Body.Close()
|
|
|
|
|
|
- switch resp.StatusCode {
|
|
|
|
- case http.StatusAccepted, http.StatusCreated:
|
|
|
|
- // noop
|
|
|
|
- case http.StatusUnauthorized:
|
|
|
|
- if retry > MaxRetries {
|
|
|
|
- return "", fmt.Errorf("max retries exceeded: %s", resp.Status)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- auth := resp.Header.Get("www-authenticate")
|
|
|
|
- authRedir := ParseAuthRedirectString(auth)
|
|
|
|
- token, err := getAuthToken(ctx, authRedir, regOpts)
|
|
|
|
- if err != nil {
|
|
|
|
- return "", err
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- regOpts.Token = token
|
|
|
|
- ctx = context.WithValue(ctx, requestContextKey("retry"), retry+1)
|
|
|
|
- return startUpload(ctx, mp, layer, regOpts)
|
|
|
|
- default:
|
|
|
|
- body, _ := io.ReadAll(resp.Body)
|
|
|
|
- return "", fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
// Extract UUID location from header
|
|
// Extract UUID location from header
|
|
location := resp.Header.Get("Location")
|
|
location := resp.Header.Get("Location")
|
|
if location == "" {
|
|
if location == "" {
|
|
@@ -1277,6 +1246,45 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, url string, layer *Lay
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func makeRequestWithRetry(ctx context.Context, method, url string, headers map[string]string, body io.ReadSeeker, regOpts *RegistryOptions) (*http.Response, error) {
|
|
|
|
+ var status string
|
|
|
|
+ for try := 0; try < MaxRetries; try++ {
|
|
|
|
+ resp, err := makeRequest(ctx, method, url, headers, body, regOpts)
|
|
|
|
+ if err != nil {
|
|
|
|
+ log.Printf("couldn't start upload: %v", err)
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ status = resp.Status
|
|
|
|
+
|
|
|
|
+ switch resp.StatusCode {
|
|
|
|
+ case http.StatusAccepted, http.StatusCreated:
|
|
|
|
+ return resp, nil
|
|
|
|
+ case http.StatusUnauthorized:
|
|
|
|
+ auth := resp.Header.Get("www-authenticate")
|
|
|
|
+ authRedir := ParseAuthRedirectString(auth)
|
|
|
|
+ token, err := getAuthToken(ctx, authRedir, regOpts)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ regOpts.Token = token
|
|
|
|
+ if body != nil {
|
|
|
|
+ if _, err := body.Seek(0, io.SeekStart); err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ continue
|
|
|
|
+ default:
|
|
|
|
+ body, _ := io.ReadAll(resp.Body)
|
|
|
|
+ return nil, fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil, fmt.Errorf("max retry exceeded: %v", status)
|
|
|
|
+}
|
|
|
|
+
|
|
func makeRequest(ctx context.Context, method, url string, headers map[string]string, body io.Reader, regOpts *RegistryOptions) (*http.Response, error) {
|
|
func makeRequest(ctx context.Context, method, url string, headers map[string]string, body io.Reader, regOpts *RegistryOptions) (*http.Response, error) {
|
|
if !strings.HasPrefix(url, "http") {
|
|
if !strings.HasPrefix(url, "http") {
|
|
if regOpts.Insecure {
|
|
if regOpts.Insecure {
|