images.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837
  1. package server
  2. import (
  3. "bytes"
  4. "crypto/sha256"
  5. "encoding/hex"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "log"
  12. "net/http"
  13. "os"
  14. "path"
  15. "path/filepath"
  16. "reflect"
  17. "strconv"
  18. "strings"
  19. "github.com/jmorganca/ollama/api"
  20. "github.com/jmorganca/ollama/parser"
  21. )
  22. type Model struct {
  23. Name string `json:"name"`
  24. ModelPath string
  25. Prompt string
  26. Options api.Options
  27. }
  28. type ManifestV2 struct {
  29. SchemaVersion int `json:"schemaVersion"`
  30. MediaType string `json:"mediaType"`
  31. Config Layer `json:"config"`
  32. Layers []*Layer `json:"layers"`
  33. }
  34. type Layer struct {
  35. MediaType string `json:"mediaType"`
  36. Digest string `json:"digest"`
  37. Size int `json:"size"`
  38. }
  39. type LayerWithBuffer struct {
  40. Layer
  41. Buffer *bytes.Buffer
  42. }
  43. type ConfigV2 struct {
  44. Architecture string `json:"architecture"`
  45. OS string `json:"os"`
  46. RootFS RootFS `json:"rootfs"`
  47. }
  48. type RootFS struct {
  49. Type string `json:"type"`
  50. DiffIDs []string `json:"diff_ids"`
  51. }
  52. func (m *ManifestV2) GetTotalSize() int {
  53. var total int
  54. for _, layer := range m.Layers {
  55. total += layer.Size
  56. }
  57. total += m.Config.Size
  58. return total
  59. }
  60. func GetManifest(mp ModelPath) (*ManifestV2, error) {
  61. fp, err := mp.GetManifestPath(false)
  62. if err != nil {
  63. return nil, err
  64. }
  65. if _, err = os.Stat(fp); err != nil && !errors.Is(err, os.ErrNotExist) {
  66. return nil, fmt.Errorf("couldn't find model '%s'", mp.GetShortTagname())
  67. }
  68. var manifest *ManifestV2
  69. f, err := os.Open(fp)
  70. if err != nil {
  71. return nil, fmt.Errorf("couldn't open file '%s'", fp)
  72. }
  73. decoder := json.NewDecoder(f)
  74. err = decoder.Decode(&manifest)
  75. if err != nil {
  76. return nil, err
  77. }
  78. return manifest, nil
  79. }
  80. func GetModel(name string) (*Model, error) {
  81. mp := ParseModelPath(name)
  82. manifest, err := GetManifest(mp)
  83. if err != nil {
  84. return nil, err
  85. }
  86. model := &Model{
  87. Name: mp.GetFullTagname(),
  88. }
  89. for _, layer := range manifest.Layers {
  90. filename, err := GetBlobsPath(layer.Digest)
  91. if err != nil {
  92. return nil, err
  93. }
  94. switch layer.MediaType {
  95. case "application/vnd.ollama.image.model":
  96. model.ModelPath = filename
  97. case "application/vnd.ollama.image.prompt":
  98. data, err := os.ReadFile(filename)
  99. if err != nil {
  100. return nil, err
  101. }
  102. model.Prompt = string(data)
  103. case "application/vnd.ollama.image.params":
  104. params, err := os.Open(filename)
  105. if err != nil {
  106. return nil, err
  107. }
  108. defer params.Close()
  109. var opts api.Options
  110. if err = json.NewDecoder(params).Decode(&opts); err != nil {
  111. return nil, err
  112. }
  113. model.Options = opts
  114. }
  115. }
  116. return model, nil
  117. }
  118. func getAbsPath(fp string) (string, error) {
  119. if strings.HasPrefix(fp, "~/") {
  120. parts := strings.Split(fp, "/")
  121. home, err := os.UserHomeDir()
  122. if err != nil {
  123. return "", err
  124. }
  125. fp = filepath.Join(home, filepath.Join(parts[1:]...))
  126. }
  127. return os.ExpandEnv(fp), nil
  128. }
  129. func CreateModel(name string, mf io.Reader, fn func(status string)) error {
  130. fn("parsing modelfile")
  131. commands, err := parser.Parse(mf)
  132. if err != nil {
  133. fn(fmt.Sprintf("error: %v", err))
  134. return err
  135. }
  136. var layers []*LayerWithBuffer
  137. params := make(map[string]string)
  138. for _, c := range commands {
  139. log.Printf("[%s] - %s\n", c.Name, c.Arg)
  140. switch c.Name {
  141. case "model":
  142. fn("looking for model")
  143. mf, err := GetManifest(ParseModelPath(c.Arg))
  144. if err != nil {
  145. // if we couldn't read the manifest, try getting the bin file
  146. fp, err := getAbsPath(c.Arg)
  147. if err != nil {
  148. fn("error determing path. exiting.")
  149. return err
  150. }
  151. fn("creating model layer")
  152. file, err := os.Open(fp)
  153. if err != nil {
  154. fn(fmt.Sprintf("couldn't find model '%s'", c.Arg))
  155. return fmt.Errorf("failed to open file: %v", err)
  156. }
  157. defer file.Close()
  158. l, err := CreateLayer(file)
  159. if err != nil {
  160. fn(fmt.Sprintf("couldn't create model layer: %v", err))
  161. return fmt.Errorf("failed to create layer: %v", err)
  162. }
  163. l.MediaType = "application/vnd.ollama.image.model"
  164. layers = append(layers, l)
  165. } else {
  166. log.Printf("manifest = %#v", mf)
  167. for _, l := range mf.Layers {
  168. newLayer, err := GetLayerWithBufferFromLayer(l)
  169. if err != nil {
  170. fn(fmt.Sprintf("couldn't read layer: %v", err))
  171. return err
  172. }
  173. layers = append(layers, newLayer)
  174. }
  175. }
  176. case "prompt":
  177. fn("creating prompt layer")
  178. // remove the prompt layer if one exists
  179. layers = removeLayerFromLayers(layers, "application/vnd.ollama.image.prompt")
  180. prompt := strings.NewReader(c.Arg)
  181. l, err := CreateLayer(prompt)
  182. if err != nil {
  183. fn(fmt.Sprintf("couldn't create prompt layer: %v", err))
  184. return fmt.Errorf("failed to create layer: %v", err)
  185. }
  186. l.MediaType = "application/vnd.ollama.image.prompt"
  187. layers = append(layers, l)
  188. default:
  189. params[c.Name] = c.Arg
  190. }
  191. }
  192. // Create a single layer for the parameters
  193. if len(params) > 0 {
  194. fn("creating parameter layer")
  195. layers = removeLayerFromLayers(layers, "application/vnd.ollama.image.params")
  196. paramData, err := paramsToReader(params)
  197. if err != nil {
  198. return fmt.Errorf("couldn't create params json: %v", err)
  199. }
  200. l, err := CreateLayer(paramData)
  201. if err != nil {
  202. return fmt.Errorf("failed to create layer: %v", err)
  203. }
  204. l.MediaType = "application/vnd.ollama.image.params"
  205. layers = append(layers, l)
  206. }
  207. digests, err := getLayerDigests(layers)
  208. if err != nil {
  209. return err
  210. }
  211. var manifestLayers []*Layer
  212. for _, l := range layers {
  213. manifestLayers = append(manifestLayers, &l.Layer)
  214. }
  215. // Create a layer for the config object
  216. fn("creating config layer")
  217. cfg, err := createConfigLayer(digests)
  218. if err != nil {
  219. return err
  220. }
  221. layers = append(layers, cfg)
  222. err = SaveLayers(layers, fn, false)
  223. if err != nil {
  224. fn(fmt.Sprintf("error saving layers: %v", err))
  225. return err
  226. }
  227. // Create the manifest
  228. fn("writing manifest")
  229. err = CreateManifest(name, cfg, manifestLayers)
  230. if err != nil {
  231. fn(fmt.Sprintf("error creating manifest: %v", err))
  232. return err
  233. }
  234. fn("success")
  235. return nil
  236. }
  237. func removeLayerFromLayers(layers []*LayerWithBuffer, mediaType string) []*LayerWithBuffer {
  238. j := 0
  239. for _, l := range layers {
  240. if l.MediaType != mediaType {
  241. layers[j] = l
  242. j++
  243. }
  244. }
  245. return layers[:j]
  246. }
  247. func SaveLayers(layers []*LayerWithBuffer, fn func(status string), force bool) error {
  248. // Write each of the layers to disk
  249. for _, layer := range layers {
  250. fp, err := GetBlobsPath(layer.Digest)
  251. if err != nil {
  252. return err
  253. }
  254. _, err = os.Stat(fp)
  255. if os.IsNotExist(err) || force {
  256. fn(fmt.Sprintf("writing layer %s", layer.Digest))
  257. out, err := os.Create(fp)
  258. if err != nil {
  259. log.Printf("couldn't create %s", fp)
  260. return err
  261. }
  262. defer out.Close()
  263. _, err = io.Copy(out, layer.Buffer)
  264. if err != nil {
  265. return err
  266. }
  267. } else {
  268. fn(fmt.Sprintf("using already created layer %s", layer.Digest))
  269. }
  270. }
  271. return nil
  272. }
  273. func CreateManifest(name string, cfg *LayerWithBuffer, layers []*Layer) error {
  274. mp := ParseModelPath(name)
  275. manifest := ManifestV2{
  276. SchemaVersion: 2,
  277. MediaType: "application/vnd.docker.distribution.manifest.v2+json",
  278. Config: Layer{
  279. MediaType: cfg.MediaType,
  280. Size: cfg.Size,
  281. Digest: cfg.Digest,
  282. },
  283. Layers: layers,
  284. }
  285. manifestJSON, err := json.Marshal(manifest)
  286. if err != nil {
  287. return err
  288. }
  289. fp, err := mp.GetManifestPath(true)
  290. if err != nil {
  291. return err
  292. }
  293. return os.WriteFile(fp, manifestJSON, 0o644)
  294. }
  295. func GetLayerWithBufferFromLayer(layer *Layer) (*LayerWithBuffer, error) {
  296. fp, err := GetBlobsPath(layer.Digest)
  297. if err != nil {
  298. return nil, err
  299. }
  300. file, err := os.Open(fp)
  301. if err != nil {
  302. return nil, fmt.Errorf("could not open blob: %w", err)
  303. }
  304. defer file.Close()
  305. newLayer, err := CreateLayer(file)
  306. if err != nil {
  307. return nil, err
  308. }
  309. newLayer.MediaType = layer.MediaType
  310. return newLayer, nil
  311. }
  312. func paramsToReader(params map[string]string) (io.Reader, error) {
  313. opts := api.DefaultOptions()
  314. typeOpts := reflect.TypeOf(opts)
  315. // build map of json struct tags
  316. jsonOpts := make(map[string]reflect.StructField)
  317. for _, field := range reflect.VisibleFields(typeOpts) {
  318. jsonTag := strings.Split(field.Tag.Get("json"), ",")[0]
  319. if jsonTag != "" {
  320. jsonOpts[jsonTag] = field
  321. }
  322. }
  323. valueOpts := reflect.ValueOf(&opts).Elem()
  324. // iterate params and set values based on json struct tags
  325. for key, val := range params {
  326. if opt, ok := jsonOpts[key]; ok {
  327. field := valueOpts.FieldByName(opt.Name)
  328. if field.IsValid() && field.CanSet() {
  329. switch field.Kind() {
  330. case reflect.Float32:
  331. floatVal, err := strconv.ParseFloat(val, 32)
  332. if err != nil {
  333. return nil, fmt.Errorf("invalid float value %s", val)
  334. }
  335. field.SetFloat(floatVal)
  336. case reflect.Int:
  337. intVal, err := strconv.ParseInt(val, 10, 0)
  338. if err != nil {
  339. return nil, fmt.Errorf("invalid int value %s", val)
  340. }
  341. field.SetInt(intVal)
  342. case reflect.Bool:
  343. boolVal, err := strconv.ParseBool(val)
  344. if err != nil {
  345. return nil, fmt.Errorf("invalid bool value %s", val)
  346. }
  347. field.SetBool(boolVal)
  348. case reflect.String:
  349. field.SetString(val)
  350. default:
  351. return nil, fmt.Errorf("unknown type %s for %s", field.Kind(), key)
  352. }
  353. }
  354. }
  355. }
  356. bts, err := json.Marshal(opts)
  357. if err != nil {
  358. return nil, err
  359. }
  360. return bytes.NewReader(bts), nil
  361. }
  362. func getLayerDigests(layers []*LayerWithBuffer) ([]string, error) {
  363. var digests []string
  364. for _, l := range layers {
  365. if l.Digest == "" {
  366. return nil, fmt.Errorf("layer is missing a digest")
  367. }
  368. digests = append(digests, l.Digest)
  369. }
  370. return digests, nil
  371. }
  372. // CreateLayer creates a Layer object from a given file
  373. func CreateLayer(f io.Reader) (*LayerWithBuffer, error) {
  374. buf := new(bytes.Buffer)
  375. _, err := io.Copy(buf, f)
  376. if err != nil {
  377. return nil, err
  378. }
  379. digest, size := GetSHA256Digest(buf)
  380. layer := &LayerWithBuffer{
  381. Layer: Layer{
  382. MediaType: "application/vnd.docker.image.rootfs.diff.tar",
  383. Digest: digest,
  384. Size: size,
  385. },
  386. Buffer: buf,
  387. }
  388. return layer, nil
  389. }
  390. func PushModel(name, username, password string, fn func(status, digest string, Total, Completed int, Percent float64)) error {
  391. mp := ParseModelPath(name)
  392. fn("retrieving manifest", "", 0, 0, 0)
  393. manifest, err := GetManifest(mp)
  394. if err != nil {
  395. fn("couldn't retrieve manifest", "", 0, 0, 0)
  396. return err
  397. }
  398. var layers []*Layer
  399. var total int
  400. var completed int
  401. for _, layer := range manifest.Layers {
  402. layers = append(layers, layer)
  403. total += layer.Size
  404. }
  405. layers = append(layers, &manifest.Config)
  406. total += manifest.Config.Size
  407. for _, layer := range layers {
  408. exists, err := checkBlobExistence(mp, layer.Digest, username, password)
  409. if err != nil {
  410. return err
  411. }
  412. if exists {
  413. completed += layer.Size
  414. fn("using existing layer", layer.Digest, total, completed, float64(completed)/float64(total))
  415. continue
  416. }
  417. fn("starting upload", layer.Digest, total, completed, float64(completed)/float64(total))
  418. location, err := startUpload(mp, username, password)
  419. if err != nil {
  420. log.Printf("couldn't start upload: %v", err)
  421. return err
  422. }
  423. err = uploadBlob(location, layer, username, password)
  424. if err != nil {
  425. log.Printf("error uploading blob: %v", err)
  426. return err
  427. }
  428. completed += layer.Size
  429. fn("upload complete", layer.Digest, total, completed, float64(completed)/float64(total))
  430. }
  431. fn("pushing manifest", "", total, completed, float64(completed/total))
  432. url := fmt.Sprintf("%s://%s/v2/%s/manifests/%s", mp.ProtocolScheme, mp.Registry, mp.GetNamespaceRepository(), mp.Tag)
  433. headers := map[string]string{
  434. "Content-Type": "application/vnd.docker.distribution.manifest.v2+json",
  435. }
  436. manifestJSON, err := json.Marshal(manifest)
  437. if err != nil {
  438. return err
  439. }
  440. resp, err := makeRequest("PUT", url, headers, bytes.NewReader(manifestJSON), username, password)
  441. if err != nil {
  442. return err
  443. }
  444. defer resp.Body.Close()
  445. // Check for success: For a successful upload, the Docker registry will respond with a 201 Created
  446. if resp.StatusCode != http.StatusCreated {
  447. body, _ := io.ReadAll(resp.Body)
  448. return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
  449. }
  450. fn("success", "", total, completed, 1.0)
  451. return nil
  452. }
  453. func PullModel(name, username, password string, fn func(status, digest string, Total, Completed int, Percent float64)) error {
  454. mp := ParseModelPath(name)
  455. fn("pulling manifest", "", 0, 0, 0)
  456. manifest, err := pullModelManifest(mp, username, password)
  457. if err != nil {
  458. return fmt.Errorf("pull model manifest: %q", err)
  459. }
  460. var layers []*Layer
  461. var total int
  462. var completed int
  463. for _, layer := range manifest.Layers {
  464. layers = append(layers, layer)
  465. total += layer.Size
  466. }
  467. layers = append(layers, &manifest.Config)
  468. total += manifest.Config.Size
  469. for _, layer := range layers {
  470. fn("starting download", layer.Digest, total, completed, float64(completed)/float64(total))
  471. if err := downloadBlob(mp, layer.Digest, username, password, fn); err != nil {
  472. fn(fmt.Sprintf("error downloading: %v", err), layer.Digest, 0, 0, 0)
  473. return err
  474. }
  475. completed += layer.Size
  476. fn("download complete", layer.Digest, total, completed, float64(completed)/float64(total))
  477. }
  478. fn("writing manifest", "", total, completed, 1.0)
  479. manifestJSON, err := json.Marshal(manifest)
  480. if err != nil {
  481. return err
  482. }
  483. fp, err := mp.GetManifestPath(true)
  484. if err != nil {
  485. return err
  486. }
  487. err = os.WriteFile(fp, manifestJSON, 0644)
  488. if err != nil {
  489. log.Printf("couldn't write to %s", fp)
  490. return err
  491. }
  492. fn("success", "", total, completed, 1.0)
  493. return nil
  494. }
  495. func pullModelManifest(mp ModelPath, username, password string) (*ManifestV2, error) {
  496. url := fmt.Sprintf("%s://%s/v2/%s/manifests/%s", mp.ProtocolScheme, mp.Registry, mp.GetNamespaceRepository(), mp.Tag)
  497. headers := map[string]string{
  498. "Accept": "application/vnd.docker.distribution.manifest.v2+json",
  499. }
  500. resp, err := makeRequest("GET", url, headers, nil, username, password)
  501. if err != nil {
  502. log.Printf("couldn't get manifest: %v", err)
  503. return nil, err
  504. }
  505. defer resp.Body.Close()
  506. // Check for success: For a successful upload, the Docker registry will respond with a 201 Created
  507. if resp.StatusCode != http.StatusOK {
  508. body, _ := io.ReadAll(resp.Body)
  509. return nil, fmt.Errorf("registry responded with code %d: %s", resp.StatusCode, body)
  510. }
  511. var m *ManifestV2
  512. if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
  513. return nil, err
  514. }
  515. return m, err
  516. }
  517. func createConfigLayer(layers []string) (*LayerWithBuffer, error) {
  518. // TODO change architecture and OS
  519. config := ConfigV2{
  520. Architecture: "arm64",
  521. OS: "linux",
  522. RootFS: RootFS{
  523. Type: "layers",
  524. DiffIDs: layers,
  525. },
  526. }
  527. configJSON, err := json.Marshal(config)
  528. if err != nil {
  529. return nil, err
  530. }
  531. buf := bytes.NewBuffer(configJSON)
  532. digest, size := GetSHA256Digest(buf)
  533. layer := &LayerWithBuffer{
  534. Layer: Layer{
  535. MediaType: "application/vnd.docker.container.image.v1+json",
  536. Digest: digest,
  537. Size: size,
  538. },
  539. Buffer: buf,
  540. }
  541. return layer, nil
  542. }
  543. // GetSHA256Digest returns the SHA256 hash of a given buffer and returns it, and the size of buffer
  544. func GetSHA256Digest(data *bytes.Buffer) (string, int) {
  545. layerBytes := data.Bytes()
  546. hash := sha256.Sum256(layerBytes)
  547. return "sha256:" + hex.EncodeToString(hash[:]), len(layerBytes)
  548. }
  549. func startUpload(mp ModelPath, username string, password string) (string, error) {
  550. url := fmt.Sprintf("%s://%s/v2/%s/blobs/uploads/", mp.ProtocolScheme, mp.Registry, mp.GetNamespaceRepository())
  551. resp, err := makeRequest("POST", url, nil, nil, username, password)
  552. if err != nil {
  553. log.Printf("couldn't start upload: %v", err)
  554. return "", err
  555. }
  556. defer resp.Body.Close()
  557. // Check for success
  558. if resp.StatusCode != http.StatusAccepted {
  559. body, _ := io.ReadAll(resp.Body)
  560. return "", fmt.Errorf("registry responded with code %d: %s", resp.StatusCode, body)
  561. }
  562. // Extract UUID location from header
  563. location := resp.Header.Get("Location")
  564. if location == "" {
  565. return "", fmt.Errorf("location header is missing in response")
  566. }
  567. return location, nil
  568. }
  569. // Function to check if a blob already exists in the Docker registry
  570. func checkBlobExistence(mp ModelPath, digest string, username string, password string) (bool, error) {
  571. url := fmt.Sprintf("%s://%s/v2/%s/blobs/%s", mp.ProtocolScheme, mp.Registry, mp.GetNamespaceRepository(), digest)
  572. resp, err := makeRequest("HEAD", url, nil, nil, username, password)
  573. if err != nil {
  574. log.Printf("couldn't check for blob: %v", err)
  575. return false, err
  576. }
  577. defer resp.Body.Close()
  578. // Check for success: If the blob exists, the Docker registry will respond with a 200 OK
  579. return resp.StatusCode == http.StatusOK, nil
  580. }
  581. func uploadBlob(location string, layer *Layer, username string, password string) error {
  582. // Create URL
  583. url := fmt.Sprintf("%s&digest=%s", location, layer.Digest)
  584. headers := make(map[string]string)
  585. headers["Content-Length"] = fmt.Sprintf("%d", layer.Size)
  586. headers["Content-Type"] = "application/octet-stream"
  587. // TODO change from monolithic uploads to chunked uploads
  588. // TODO allow resumability
  589. // TODO allow canceling uploads via DELETE
  590. // TODO allow cross repo blob mount
  591. fp, err := GetBlobsPath(layer.Digest)
  592. if err != nil {
  593. return err
  594. }
  595. f, err := os.Open(fp)
  596. if err != nil {
  597. return err
  598. }
  599. resp, err := makeRequest("PUT", url, headers, f, username, password)
  600. if err != nil {
  601. log.Printf("couldn't upload blob: %v", err)
  602. return err
  603. }
  604. defer resp.Body.Close()
  605. // Check for success: For a successful upload, the Docker registry will respond with a 201 Created
  606. if resp.StatusCode != http.StatusCreated {
  607. body, _ := io.ReadAll(resp.Body)
  608. return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
  609. }
  610. return nil
  611. }
  612. func downloadBlob(mp ModelPath, digest string, username, password string, fn func(status, digest string, Total, Completed int, Percent float64)) error {
  613. fp, err := GetBlobsPath(digest)
  614. if err != nil {
  615. return err
  616. }
  617. _, err = os.Stat(fp)
  618. if !os.IsNotExist(err) {
  619. // we already have the file, so return
  620. log.Printf("already have %s\n", digest)
  621. return nil
  622. }
  623. var size int64
  624. fi, err := os.Stat(fp + "-partial")
  625. switch {
  626. case errors.Is(err, os.ErrNotExist):
  627. // noop, file doesn't exist so create it
  628. case err != nil:
  629. return fmt.Errorf("stat: %w", err)
  630. default:
  631. size = fi.Size()
  632. }
  633. url := fmt.Sprintf("%s://%s/v2/%s/blobs/%s", mp.ProtocolScheme, mp.Registry, mp.GetNamespaceRepository(), digest)
  634. headers := map[string]string{
  635. "Range": fmt.Sprintf("bytes=%d-", size),
  636. }
  637. resp, err := makeRequest("GET", url, headers, nil, username, password)
  638. if err != nil {
  639. log.Printf("couldn't download blob: %v", err)
  640. return err
  641. }
  642. defer resp.Body.Close()
  643. if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
  644. body, _ := ioutil.ReadAll(resp.Body)
  645. return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
  646. }
  647. err = os.MkdirAll(path.Dir(fp), 0o700)
  648. if err != nil {
  649. return fmt.Errorf("make blobs directory: %w", err)
  650. }
  651. out, err := os.OpenFile(fp+"-partial", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  652. if err != nil {
  653. panic(err)
  654. }
  655. defer out.Close()
  656. remaining, _ := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
  657. completed := size
  658. total := remaining + completed
  659. for {
  660. fn(fmt.Sprintf("Downloading %s", digest), digest, int(total), int(completed), float64(completed)/float64(total))
  661. if completed >= total {
  662. if err := os.Rename(fp+"-partial", fp); err != nil {
  663. fn(fmt.Sprintf("error renaming file: %v", err), digest, int(total), int(completed), 1)
  664. return err
  665. }
  666. break
  667. }
  668. n, err := io.CopyN(out, resp.Body, 8192)
  669. if err != nil && !errors.Is(err, io.EOF) {
  670. return err
  671. }
  672. completed += n
  673. }
  674. log.Printf("success getting %s\n", digest)
  675. return nil
  676. }
  677. func makeRequest(method, url string, headers map[string]string, body io.Reader, username, password string) (*http.Response, error) {
  678. req, err := http.NewRequest(method, url, body)
  679. if err != nil {
  680. return nil, err
  681. }
  682. for k, v := range headers {
  683. req.Header.Set(k, v)
  684. }
  685. // TODO: better auth
  686. if username != "" && password != "" {
  687. req.SetBasicAuth(username, password)
  688. }
  689. client := &http.Client{
  690. CheckRedirect: func(req *http.Request, via []*http.Request) error {
  691. if len(via) >= 10 {
  692. return fmt.Errorf("too many redirects")
  693. }
  694. log.Printf("redirected to: %s\n", req.URL)
  695. return nil
  696. },
  697. }
  698. resp, err := client.Do(req)
  699. if err != nil {
  700. return nil, err
  701. }
  702. return resp, nil
  703. }