images.go 31 KB


  1. package server
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha256"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "html/template"
  10. "io"
  11. "log"
  12. "math"
  13. "net/http"
  14. "os"
  15. "path"
  16. "path/filepath"
  17. "reflect"
  18. "strconv"
  19. "strings"
  20. "github.com/jmorganca/ollama/api"
  21. "github.com/jmorganca/ollama/llama"
  22. "github.com/jmorganca/ollama/parser"
  23. "github.com/jmorganca/ollama/vector"
  24. "gonum.org/v1/gonum/mat"
  25. )
  26. type RegistryOptions struct {
  27. Insecure bool
  28. Username string
  29. Password string
  30. }
  31. type Model struct {
  32. Name string `json:"name"`
  33. ModelPath string
  34. Template string
  35. System string
  36. Digest string
  37. Options map[string]interface{}
  38. Embeddings []vector.Embedding
  39. }
  40. func (m *Model) Prompt(request api.GenerateRequest) (string, error) {
  41. t := m.Template
  42. if request.Template != "" {
  43. t = request.Template
  44. }
  45. tmpl, err := template.New("").Parse(t)
  46. if err != nil {
  47. return "", err
  48. }
  49. var vars struct {
  50. First bool
  51. System string
  52. Prompt string
  53. Embed string
  54. // deprecated: versions <= 0.0.7 used this to omit the system prompt
  55. Context []int
  56. }
  57. vars.First = len(request.Context) == 0
  58. vars.System = m.System
  59. vars.Prompt = request.Prompt
  60. vars.Context = request.Context
  61. if request.System != "" {
  62. vars.System = request.System
  63. }
  64. if len(m.Embeddings) > 0 {
  65. promptEmbed, err := loaded.llm.Embedding(request.Prompt)
  66. if err != nil {
  67. return "", fmt.Errorf("failed to get embedding for prompt: %v", err)
  68. }
  69. // TODO: set embed_top from specified parameters in modelfile
  70. embed_top := 3
  71. embed := vector.TopK(embed_top, mat.NewVecDense(len(promptEmbed), promptEmbed), loaded.Embeddings)
  72. toEmbed := ""
  73. for _, e := range embed {
  74. toEmbed = fmt.Sprintf("%s %s", toEmbed, e.Embedding.Data)
  75. }
  76. vars.Embed = toEmbed
  77. }
  78. var sb strings.Builder
  79. if err := tmpl.Execute(&sb, vars); err != nil {
  80. return "", err
  81. }
  82. return sb.String(), nil
  83. }
  84. type ManifestV2 struct {
  85. SchemaVersion int `json:"schemaVersion"`
  86. MediaType string `json:"mediaType"`
  87. Config Layer `json:"config"`
  88. Layers []*Layer `json:"layers"`
  89. }
  90. type Layer struct {
  91. MediaType string `json:"mediaType"`
  92. Digest string `json:"digest"`
  93. Size int `json:"size"`
  94. }
  95. type LayerReader struct {
  96. Layer
  97. io.Reader
  98. }
  99. type ConfigV2 struct {
  100. Architecture string `json:"architecture"`
  101. OS string `json:"os"`
  102. RootFS RootFS `json:"rootfs"`
  103. }
  104. type RootFS struct {
  105. Type string `json:"type"`
  106. DiffIDs []string `json:"diff_ids"`
  107. }
  108. func (m *ManifestV2) GetTotalSize() int {
  109. var total int
  110. for _, layer := range m.Layers {
  111. total += layer.Size
  112. }
  113. total += m.Config.Size
  114. return total
  115. }
  116. func GetManifest(mp ModelPath) (*ManifestV2, error) {
  117. fp, err := mp.GetManifestPath(false)
  118. if err != nil {
  119. return nil, err
  120. }
  121. if _, err = os.Stat(fp); err != nil {
  122. return nil, err
  123. }
  124. var manifest *ManifestV2
  125. bts, err := os.ReadFile(fp)
  126. if err != nil {
  127. return nil, fmt.Errorf("couldn't open file '%s'", fp)
  128. }
  129. if err := json.Unmarshal(bts, &manifest); err != nil {
  130. return nil, err
  131. }
  132. return manifest, nil
  133. }
  134. func GetModel(name string) (*Model, error) {
  135. mp := ParseModelPath(name)
  136. manifest, err := GetManifest(mp)
  137. if err != nil {
  138. return nil, err
  139. }
  140. model := &Model{
  141. Name: mp.GetFullTagname(),
  142. Digest: manifest.Config.Digest,
  143. }
  144. for _, layer := range manifest.Layers {
  145. filename, err := GetBlobsPath(layer.Digest)
  146. if err != nil {
  147. return nil, err
  148. }
  149. switch layer.MediaType {
  150. case "application/vnd.ollama.image.model":
  151. model.ModelPath = filename
  152. case "application/vnd.ollama.image.embed":
  153. file, err := os.Open(filename)
  154. if err != nil {
  155. return nil, fmt.Errorf("failed to open file: %s", filename)
  156. }
  157. defer file.Close()
  158. if err = json.NewDecoder(file).Decode(&model.Embeddings); err != nil {
  159. return nil, err
  160. }
  161. case "application/vnd.ollama.image.template":
  162. bts, err := os.ReadFile(filename)
  163. if err != nil {
  164. return nil, err
  165. }
  166. model.Template = string(bts)
  167. case "application/vnd.ollama.image.system":
  168. bts, err := os.ReadFile(filename)
  169. if err != nil {
  170. return nil, err
  171. }
  172. model.System = string(bts)
  173. case "application/vnd.ollama.image.prompt":
  174. bts, err := os.ReadFile(filename)
  175. if err != nil {
  176. return nil, err
  177. }
  178. model.Template = string(bts)
  179. case "application/vnd.ollama.image.params":
  180. params, err := os.Open(filename)
  181. if err != nil {
  182. return nil, err
  183. }
  184. defer params.Close()
  185. // parse model options parameters into a map so that we can see which fields have been specified explicitly
  186. if err = json.NewDecoder(params).Decode(&model.Options); err != nil {
  187. return nil, err
  188. }
  189. }
  190. }
  191. return model, nil
  192. }
  193. func filenameWithPath(path, f string) (string, error) {
  194. // if filePath starts with ~/, replace it with the user's home directory.
  195. if strings.HasPrefix(f, "~/") {
  196. parts := strings.Split(f, "/")
  197. home, err := os.UserHomeDir()
  198. if err != nil {
  199. return "", fmt.Errorf("failed to open file: %v", err)
  200. }
  201. f = filepath.Join(home, filepath.Join(parts[1:]...))
  202. }
  203. // if filePath is not an absolute path, make it relative to the modelfile path
  204. if !filepath.IsAbs(f) {
  205. f = filepath.Join(filepath.Dir(path), f)
  206. }
  207. return f, nil
  208. }
  209. func CreateModel(name string, path string, fn func(resp api.ProgressResponse)) error {
  210. mf, err := os.Open(path)
  211. if err != nil {
  212. fn(api.ProgressResponse{Status: fmt.Sprintf("couldn't open modelfile '%s'", path)})
  213. return fmt.Errorf("failed to open file: %w", err)
  214. }
  215. defer mf.Close()
  216. fn(api.ProgressResponse{Status: "parsing modelfile"})
  217. commands, err := parser.Parse(mf)
  218. if err != nil {
  219. return err
  220. }
  221. var layers []*LayerReader
  222. params := make(map[string][]string)
  223. embed := EmbeddingParams{fn: fn, opts: api.DefaultOptions()}
  224. for _, c := range commands {
  225. log.Printf("[%s] - %s\n", c.Name, c.Args)
  226. switch c.Name {
  227. case "model":
  228. fn(api.ProgressResponse{Status: "looking for model"})
  229. embed.model = c.Args
  230. mf, err := GetManifest(ParseModelPath(c.Args))
  231. if err != nil {
  232. modelFile, err := filenameWithPath(path, c.Args)
  233. if err != nil {
  234. return err
  235. }
  236. if _, err := os.Stat(modelFile); err != nil {
  237. // the model file does not exist, try pulling it
  238. if errors.Is(err, os.ErrNotExist) {
  239. fn(api.ProgressResponse{Status: "pulling model file"})
  240. if err := PullModel(c.Args, &RegistryOptions{}, fn); err != nil {
  241. return err
  242. }
  243. mf, err = GetManifest(ParseModelPath(modelFile))
  244. if err != nil {
  245. return fmt.Errorf("failed to open file after pull: %v", err)
  246. }
  247. } else {
  248. return err
  249. }
  250. } else {
  251. // create a model from this specified file
  252. fn(api.ProgressResponse{Status: "creating model layer"})
  253. file, err := os.Open(modelFile)
  254. if err != nil {
  255. return fmt.Errorf("failed to open file: %v", err)
  256. }
  257. defer file.Close()
  258. l, err := CreateLayer(file)
  259. if err != nil {
  260. return fmt.Errorf("failed to create layer: %v", err)
  261. }
  262. l.MediaType = "application/vnd.ollama.image.model"
  263. layers = append(layers, l)
  264. }
  265. }
  266. if mf != nil {
  267. log.Printf("manifest = %#v", mf)
  268. for _, l := range mf.Layers {
  269. newLayer, err := GetLayerWithBufferFromLayer(l)
  270. if err != nil {
  271. return err
  272. }
  273. layers = append(layers, newLayer)
  274. }
  275. }
  276. case "embed":
  277. // TODO: support entire directories here
  278. embedFilePath, err := filenameWithPath(path, c.Args)
  279. if err != nil {
  280. return err
  281. }
  282. embed.files = append(embed.files, embedFilePath)
  283. case "license", "template", "system", "prompt":
  284. fn(api.ProgressResponse{Status: fmt.Sprintf("creating model %s layer", c.Name)})
  285. // remove the prompt layer if one exists
  286. mediaType := fmt.Sprintf("application/vnd.ollama.image.%s", c.Name)
  287. layers = removeLayerFromLayers(layers, mediaType)
  288. layer, err := CreateLayer(strings.NewReader(c.Args))
  289. if err != nil {
  290. return err
  291. }
  292. layer.MediaType = mediaType
  293. layers = append(layers, layer)
  294. default:
  295. // runtime parameters, build a list of args for each parameter to allow multiple values to be specified (ex: multiple stop tokens)
  296. params[c.Name] = append(params[c.Name], c.Args)
  297. }
  298. }
  299. // Create a single layer for the parameters
  300. if len(params) > 0 {
  301. fn(api.ProgressResponse{Status: "creating parameter layer"})
  302. layers = removeLayerFromLayers(layers, "application/vnd.ollama.image.params")
  303. formattedParams, err := formatParams(params)
  304. if err != nil {
  305. return fmt.Errorf("couldn't create params json: %v", err)
  306. }
  307. bts, err := json.Marshal(formattedParams)
  308. if err != nil {
  309. return err
  310. }
  311. l, err := CreateLayer(bytes.NewReader(bts))
  312. if err != nil {
  313. return fmt.Errorf("failed to create layer: %v", err)
  314. }
  315. l.MediaType = "application/vnd.ollama.image.params"
  316. layers = append(layers, l)
  317. // apply these parameters to the embedding options, in case embeddings need to be generated using this model
  318. embed.opts = api.DefaultOptions()
  319. embed.opts.FromMap(formattedParams)
  320. }
  321. // generate the embedding layers
  322. embeddingLayers, err := embeddingLayers(embed)
  323. if err != nil {
  324. return err
  325. }
  326. layers = append(layers, embeddingLayers...)
  327. digests, err := getLayerDigests(layers)
  328. if err != nil {
  329. return err
  330. }
  331. var manifestLayers []*Layer
  332. for _, l := range layers {
  333. manifestLayers = append(manifestLayers, &l.Layer)
  334. }
  335. // Create a layer for the config object
  336. fn(api.ProgressResponse{Status: "creating config layer"})
  337. cfg, err := createConfigLayer(digests)
  338. if err != nil {
  339. return err
  340. }
  341. layers = append(layers, cfg)
  342. err = SaveLayers(layers, fn, false)
  343. if err != nil {
  344. return err
  345. }
  346. // Create the manifest
  347. fn(api.ProgressResponse{Status: "writing manifest"})
  348. err = CreateManifest(name, cfg, manifestLayers)
  349. if err != nil {
  350. return err
  351. }
  352. fn(api.ProgressResponse{Status: "success"})
  353. return nil
  354. }
  355. type EmbeddingParams struct {
  356. model string
  357. opts api.Options
  358. files []string // paths to files to embed
  359. fn func(resp api.ProgressResponse)
  360. }
  361. // embeddingLayers loads the associated LLM and generates the embeddings to be stored from an input file
  362. func embeddingLayers(e EmbeddingParams) ([]*LayerReader, error) {
  363. layers := []*LayerReader{}
  364. if len(e.files) > 0 {
  365. model, err := GetModel(e.model)
  366. if err != nil {
  367. return nil, fmt.Errorf("failed to get model to generate embeddings: %v", err)
  368. }
  369. e.opts.EmbeddingOnly = true
  370. llm, err := llama.New(model.ModelPath, e.opts)
  371. if err != nil {
  372. return nil, fmt.Errorf("load model to generate embeddings: %v", err)
  373. }
  374. for _, filePath := range e.files {
  375. // TODO: check if txt file type
  376. f, err := os.Open(filePath)
  377. if err != nil {
  378. return nil, fmt.Errorf("could not open embed file: %w", err)
  379. }
  380. scanner := bufio.NewScanner(f)
  381. scanner.Split(bufio.ScanLines)
  382. data := []string{}
  383. for scanner.Scan() {
  384. data = append(data, scanner.Text())
  385. }
  386. f.Close()
  387. // the digest of the file is set here so that the client knows a new operation is in progress
  388. fileDigest, _ := GetSHA256Digest(bytes.NewReader([]byte(filePath)))
  389. embeddings := []vector.Embedding{}
  390. for i, d := range data {
  391. if strings.TrimSpace(d) == "" {
  392. continue
  393. }
  394. e.fn(api.ProgressResponse{
  395. Status: fmt.Sprintf("creating embeddings for file %s", filePath),
  396. Digest: fileDigest,
  397. Total: len(data) - 1,
  398. Completed: i,
  399. })
  400. retry := 0
  401. generate:
  402. if retry > 3 {
  403. log.Printf("failed to generate embedding for '%s' line %d: %v", filePath, i+1, err)
  404. continue
  405. }
  406. embed, err := llm.Embedding(d)
  407. if err != nil {
  408. log.Printf("retrying embedding generation for '%s' line %d: %v", filePath, i+1, err)
  409. retry++
  410. goto generate
  411. }
  412. // Check for NaN and Inf in the embedding, which can't be stored
  413. for _, value := range embed {
  414. if math.IsNaN(value) || math.IsInf(value, 0) {
  415. log.Printf("reloading model, embedding contains NaN or Inf")
  416. // reload the model to get a new embedding
  417. llm, err = llama.New(model.ModelPath, e.opts)
  418. if err != nil {
  419. return nil, fmt.Errorf("load model to generate embeddings: %v", err)
  420. }
  421. retry++
  422. goto generate
  423. }
  424. }
  425. embeddings = append(embeddings, vector.Embedding{Data: d, Vector: embed})
  426. }
  427. b, err := json.Marshal(embeddings)
  428. if err != nil {
  429. return nil, fmt.Errorf("failed to encode embeddings: %w", err)
  430. }
  431. r := bytes.NewReader(b)
  432. digest, size := GetSHA256Digest(r)
  433. // Reset the position of the reader after calculating the digest
  434. if _, err := r.Seek(0, 0); err != nil {
  435. return nil, fmt.Errorf("could not reset embed reader: %w", err)
  436. }
  437. layer := &LayerReader{
  438. Layer: Layer{
  439. MediaType: "application/vnd.ollama.image.embed",
  440. Digest: digest,
  441. Size: size,
  442. },
  443. Reader: r,
  444. }
  445. layers = append(layers, layer)
  446. }
  447. }
  448. return layers, nil
  449. }
  450. func removeLayerFromLayers(layers []*LayerReader, mediaType string) []*LayerReader {
  451. j := 0
  452. for _, l := range layers {
  453. if l.MediaType != mediaType {
  454. layers[j] = l
  455. j++
  456. }
  457. }
  458. return layers[:j]
  459. }
  460. func SaveLayers(layers []*LayerReader, fn func(resp api.ProgressResponse), force bool) error {
  461. // Write each of the layers to disk
  462. for _, layer := range layers {
  463. fp, err := GetBlobsPath(layer.Digest)
  464. if err != nil {
  465. return err
  466. }
  467. _, err = os.Stat(fp)
  468. if os.IsNotExist(err) || force {
  469. fn(api.ProgressResponse{Status: fmt.Sprintf("writing layer %s", layer.Digest)})
  470. out, err := os.Create(fp)
  471. if err != nil {
  472. log.Printf("couldn't create %s", fp)
  473. return err
  474. }
  475. defer out.Close()
  476. if _, err = io.Copy(out, layer.Reader); err != nil {
  477. return err
  478. }
  479. } else {
  480. fn(api.ProgressResponse{Status: fmt.Sprintf("using already created layer %s", layer.Digest)})
  481. }
  482. }
  483. return nil
  484. }
  485. func CreateManifest(name string, cfg *LayerReader, layers []*Layer) error {
  486. mp := ParseModelPath(name)
  487. manifest := ManifestV2{
  488. SchemaVersion: 2,
  489. MediaType: "application/vnd.docker.distribution.manifest.v2+json",
  490. Config: Layer{
  491. MediaType: cfg.MediaType,
  492. Size: cfg.Size,
  493. Digest: cfg.Digest,
  494. },
  495. Layers: layers,
  496. }
  497. manifestJSON, err := json.Marshal(manifest)
  498. if err != nil {
  499. return err
  500. }
  501. fp, err := mp.GetManifestPath(true)
  502. if err != nil {
  503. return err
  504. }
  505. return os.WriteFile(fp, manifestJSON, 0o644)
  506. }
  507. func GetLayerWithBufferFromLayer(layer *Layer) (*LayerReader, error) {
  508. fp, err := GetBlobsPath(layer.Digest)
  509. if err != nil {
  510. return nil, err
  511. }
  512. file, err := os.Open(fp)
  513. if err != nil {
  514. return nil, fmt.Errorf("could not open blob: %w", err)
  515. }
  516. defer file.Close()
  517. newLayer, err := CreateLayer(file)
  518. if err != nil {
  519. return nil, err
  520. }
  521. newLayer.MediaType = layer.MediaType
  522. return newLayer, nil
  523. }
  524. // formatParams converts specified parameter options to their correct types
  525. func formatParams(params map[string][]string) (map[string]interface{}, error) {
  526. opts := api.Options{}
  527. valueOpts := reflect.ValueOf(&opts).Elem() // names of the fields in the options struct
  528. typeOpts := reflect.TypeOf(opts) // types of the fields in the options struct
  529. // build map of json struct tags to their types
  530. jsonOpts := make(map[string]reflect.StructField)
  531. for _, field := range reflect.VisibleFields(typeOpts) {
  532. jsonTag := strings.Split(field.Tag.Get("json"), ",")[0]
  533. if jsonTag != "" {
  534. jsonOpts[jsonTag] = field
  535. }
  536. }
  537. out := make(map[string]interface{})
  538. // iterate params and set values based on json struct tags
  539. for key, vals := range params {
  540. if opt, ok := jsonOpts[key]; ok {
  541. field := valueOpts.FieldByName(opt.Name)
  542. if field.IsValid() && field.CanSet() {
  543. switch field.Kind() {
  544. case reflect.Float32:
  545. floatVal, err := strconv.ParseFloat(vals[0], 32)
  546. if err != nil {
  547. return nil, fmt.Errorf("invalid float value %s", vals)
  548. }
  549. out[key] = floatVal
  550. case reflect.Int:
  551. intVal, err := strconv.ParseInt(vals[0], 10, 0)
  552. if err != nil {
  553. return nil, fmt.Errorf("invalid int value %s", vals)
  554. }
  555. out[key] = intVal
  556. case reflect.Bool:
  557. boolVal, err := strconv.ParseBool(vals[0])
  558. if err != nil {
  559. return nil, fmt.Errorf("invalid bool value %s", vals)
  560. }
  561. out[key] = boolVal
  562. case reflect.String:
  563. out[key] = vals[0]
  564. case reflect.Slice:
  565. // TODO: only string slices are supported right now
  566. out[key] = vals
  567. default:
  568. return nil, fmt.Errorf("unknown type %s for %s", field.Kind(), key)
  569. }
  570. }
  571. }
  572. }
  573. return out, nil
  574. }
  575. func getLayerDigests(layers []*LayerReader) ([]string, error) {
  576. var digests []string
  577. for _, l := range layers {
  578. if l.Digest == "" {
  579. return nil, fmt.Errorf("layer is missing a digest")
  580. }
  581. digests = append(digests, l.Digest)
  582. }
  583. return digests, nil
  584. }
  585. // CreateLayer creates a Layer object from a given file
  586. func CreateLayer(f io.ReadSeeker) (*LayerReader, error) {
  587. digest, size := GetSHA256Digest(f)
  588. f.Seek(0, 0)
  589. layer := &LayerReader{
  590. Layer: Layer{
  591. MediaType: "application/vnd.docker.image.rootfs.diff.tar",
  592. Digest: digest,
  593. Size: size,
  594. },
  595. Reader: f,
  596. }
  597. return layer, nil
  598. }
  599. func CopyModel(src, dest string) error {
  600. srcPath, err := ParseModelPath(src).GetManifestPath(false)
  601. if err != nil {
  602. return err
  603. }
  604. destPath, err := ParseModelPath(dest).GetManifestPath(true)
  605. if err != nil {
  606. return err
  607. }
  608. // copy the file
  609. input, err := os.ReadFile(srcPath)
  610. if err != nil {
  611. fmt.Println("Error reading file:", err)
  612. return err
  613. }
  614. err = os.WriteFile(destPath, input, 0o644)
  615. if err != nil {
  616. fmt.Println("Error reading file:", err)
  617. return err
  618. }
  619. return nil
  620. }
  621. func DeleteModel(name string) error {
  622. mp := ParseModelPath(name)
  623. manifest, err := GetManifest(mp)
  624. if err != nil {
  625. return err
  626. }
  627. deleteMap := make(map[string]bool)
  628. for _, layer := range manifest.Layers {
  629. deleteMap[layer.Digest] = true
  630. }
  631. deleteMap[manifest.Config.Digest] = true
  632. fp, err := GetManifestPath()
  633. if err != nil {
  634. return err
  635. }
  636. err = filepath.Walk(fp, func(path string, info os.FileInfo, err error) error {
  637. if err != nil {
  638. return err
  639. }
  640. if !info.IsDir() {
  641. path := path[len(fp)+1:]
  642. slashIndex := strings.LastIndex(path, "/")
  643. if slashIndex == -1 {
  644. return nil
  645. }
  646. tag := path[:slashIndex] + ":" + path[slashIndex+1:]
  647. fmp := ParseModelPath(tag)
  648. // skip the manifest we're trying to delete
  649. if mp.GetFullTagname() == fmp.GetFullTagname() {
  650. return nil
  651. }
  652. // save (i.e. delete from the deleteMap) any files used in other manifests
  653. manifest, err := GetManifest(fmp)
  654. if err != nil {
  655. log.Printf("skipping file: %s", fp)
  656. return nil
  657. }
  658. for _, layer := range manifest.Layers {
  659. delete(deleteMap, layer.Digest)
  660. }
  661. delete(deleteMap, manifest.Config.Digest)
  662. }
  663. return nil
  664. })
  665. if err != nil {
  666. return err
  667. }
  668. if err != nil {
  669. return err
  670. }
  671. // only delete the files which are still in the deleteMap
  672. for k, v := range deleteMap {
  673. if v {
  674. fp, err := GetBlobsPath(k)
  675. if err != nil {
  676. log.Printf("couldn't get file path for '%s': %v", k, err)
  677. continue
  678. }
  679. if err := os.Remove(fp); err != nil {
  680. log.Printf("couldn't remove file '%s': %v", fp, err)
  681. continue
  682. }
  683. }
  684. }
  685. fp, err = mp.GetManifestPath(false)
  686. if err != nil {
  687. return err
  688. }
  689. err = os.Remove(fp)
  690. if err != nil {
  691. log.Printf("couldn't remove manifest file '%s': %v", fp, err)
  692. return err
  693. }
  694. return nil
  695. }
  696. func PushModel(name string, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error {
  697. mp := ParseModelPath(name)
  698. fn(api.ProgressResponse{Status: "retrieving manifest"})
  699. manifest, err := GetManifest(mp)
  700. if err != nil {
  701. fn(api.ProgressResponse{Status: "couldn't retrieve manifest"})
  702. return err
  703. }
  704. var layers []*Layer
  705. layers = append(layers, manifest.Layers...)
  706. layers = append(layers, &manifest.Config)
  707. for _, layer := range layers {
  708. exists, err := checkBlobExistence(mp, layer.Digest, regOpts)
  709. if err != nil {
  710. return err
  711. }
  712. if exists {
  713. fn(api.ProgressResponse{
  714. Status: "using existing layer",
  715. Digest: layer.Digest,
  716. Total: layer.Size,
  717. Completed: layer.Size,
  718. })
  719. log.Printf("Layer %s already exists", layer.Digest)
  720. continue
  721. }
  722. fn(api.ProgressResponse{
  723. Status: "starting upload",
  724. Digest: layer.Digest,
  725. Total: layer.Size,
  726. })
  727. location, err := startUpload(mp, regOpts)
  728. if err != nil {
  729. log.Printf("couldn't start upload: %v", err)
  730. return err
  731. }
  732. err = uploadBlobChunked(mp, location, layer, regOpts, fn)
  733. if err != nil {
  734. log.Printf("error uploading blob: %v", err)
  735. return err
  736. }
  737. }
  738. fn(api.ProgressResponse{Status: "pushing manifest"})
  739. url := fmt.Sprintf("%s/v2/%s/manifests/%s", mp.Registry, mp.GetNamespaceRepository(), mp.Tag)
  740. headers := map[string]string{
  741. "Content-Type": "application/vnd.docker.distribution.manifest.v2+json",
  742. }
  743. manifestJSON, err := json.Marshal(manifest)
  744. if err != nil {
  745. return err
  746. }
  747. resp, err := makeRequest("PUT", url, headers, bytes.NewReader(manifestJSON), regOpts)
  748. if err != nil {
  749. return err
  750. }
  751. defer resp.Body.Close()
  752. // Check for success: For a successful upload, the Docker registry will respond with a 201 Created
  753. if resp.StatusCode != http.StatusCreated {
  754. body, _ := io.ReadAll(resp.Body)
  755. return fmt.Errorf("on push registry responded with code %d: %v", resp.StatusCode, string(body))
  756. }
  757. fn(api.ProgressResponse{Status: "success"})
  758. return nil
  759. }
  760. func PullModel(name string, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error {
  761. mp := ParseModelPath(name)
  762. fn(api.ProgressResponse{Status: "pulling manifest"})
  763. manifest, err := pullModelManifest(mp, regOpts)
  764. if err != nil {
  765. return fmt.Errorf("pull model manifest: %s", err)
  766. }
  767. var layers []*Layer
  768. layers = append(layers, manifest.Layers...)
  769. layers = append(layers, &manifest.Config)
  770. for _, layer := range layers {
  771. if err := downloadBlob(mp, layer.Digest, regOpts, fn); err != nil {
  772. return err
  773. }
  774. }
  775. fn(api.ProgressResponse{Status: "verifying sha256 digest"})
  776. for _, layer := range layers {
  777. if err := verifyBlob(layer.Digest); err != nil {
  778. if errors.Is(err, errDigestMismatch) {
  779. // something went wrong, delete the blob
  780. fp, err := GetBlobsPath(layer.Digest)
  781. if err != nil {
  782. return err
  783. }
  784. if err := os.Remove(fp); err != nil {
  785. // log this, but return the original error
  786. log.Printf("couldn't remove file with digest mismatch '%s': %v", fp, err)
  787. }
  788. }
  789. return err
  790. }
  791. }
  792. fn(api.ProgressResponse{Status: "writing manifest"})
  793. manifestJSON, err := json.Marshal(manifest)
  794. if err != nil {
  795. return err
  796. }
  797. fp, err := mp.GetManifestPath(true)
  798. if err != nil {
  799. return err
  800. }
  801. err = os.WriteFile(fp, manifestJSON, 0o644)
  802. if err != nil {
  803. log.Printf("couldn't write to %s", fp)
  804. return err
  805. }
  806. fn(api.ProgressResponse{Status: "success"})
  807. return nil
  808. }
  809. func pullModelManifest(mp ModelPath, regOpts *RegistryOptions) (*ManifestV2, error) {
  810. url := fmt.Sprintf("%s/v2/%s/manifests/%s", mp.Registry, mp.GetNamespaceRepository(), mp.Tag)
  811. headers := map[string]string{
  812. "Accept": "application/vnd.docker.distribution.manifest.v2+json",
  813. }
  814. resp, err := makeRequest("GET", url, headers, nil, regOpts)
  815. if err != nil {
  816. log.Printf("couldn't get manifest: %v", err)
  817. return nil, err
  818. }
  819. defer resp.Body.Close()
  820. // Check for success: For a successful upload, the Docker registry will respond with a 201 Created
  821. if resp.StatusCode != http.StatusOK {
  822. if resp.StatusCode == http.StatusNotFound {
  823. return nil, fmt.Errorf("model not found")
  824. }
  825. body, _ := io.ReadAll(resp.Body)
  826. return nil, fmt.Errorf("on pull registry responded with code %d: %s", resp.StatusCode, body)
  827. }
  828. var m *ManifestV2
  829. if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
  830. return nil, err
  831. }
  832. return m, err
  833. }
  834. func createConfigLayer(layers []string) (*LayerReader, error) {
  835. // TODO change architecture and OS
  836. config := ConfigV2{
  837. Architecture: "arm64",
  838. OS: "linux",
  839. RootFS: RootFS{
  840. Type: "layers",
  841. DiffIDs: layers,
  842. },
  843. }
  844. configJSON, err := json.Marshal(config)
  845. if err != nil {
  846. return nil, err
  847. }
  848. digest, size := GetSHA256Digest(bytes.NewBuffer(configJSON))
  849. layer := &LayerReader{
  850. Layer: Layer{
  851. MediaType: "application/vnd.docker.container.image.v1+json",
  852. Digest: digest,
  853. Size: size,
  854. },
  855. Reader: bytes.NewBuffer(configJSON),
  856. }
  857. return layer, nil
  858. }
  859. // GetSHA256Digest returns the SHA256 hash of a given buffer and returns it, and the size of buffer
  860. func GetSHA256Digest(r io.Reader) (string, int) {
  861. h := sha256.New()
  862. n, err := io.Copy(h, r)
  863. if err != nil {
  864. log.Fatal(err)
  865. }
  866. return fmt.Sprintf("sha256:%x", h.Sum(nil)), int(n)
  867. }
  868. func startUpload(mp ModelPath, regOpts *RegistryOptions) (string, error) {
  869. url := fmt.Sprintf("%s/v2/%s/blobs/uploads/", mp.Registry, mp.GetNamespaceRepository())
  870. resp, err := makeRequest("POST", url, nil, nil, regOpts)
  871. if err != nil {
  872. log.Printf("couldn't start upload: %v", err)
  873. return "", err
  874. }
  875. defer resp.Body.Close()
  876. // Check for success
  877. if resp.StatusCode != http.StatusAccepted {
  878. body, _ := io.ReadAll(resp.Body)
  879. return "", fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body)
  880. }
  881. // Extract UUID location from header
  882. location := resp.Header.Get("Location")
  883. if location == "" {
  884. return "", fmt.Errorf("location header is missing in response")
  885. }
  886. return location, nil
  887. }
  888. // Function to check if a blob already exists in the Docker registry
  889. func checkBlobExistence(mp ModelPath, digest string, regOpts *RegistryOptions) (bool, error) {
  890. url := fmt.Sprintf("%s/v2/%s/blobs/%s", mp.Registry, mp.GetNamespaceRepository(), digest)
  891. resp, err := makeRequest("HEAD", url, nil, nil, regOpts)
  892. if err != nil {
  893. log.Printf("couldn't check for blob: %v", err)
  894. return false, err
  895. }
  896. defer resp.Body.Close()
  897. // Check for success: If the blob exists, the Docker registry will respond with a 200 OK
  898. return resp.StatusCode == http.StatusOK, nil
  899. }
  900. func uploadBlobChunked(mp ModelPath, url string, layer *Layer, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error {
  901. // TODO allow resumability
  902. // TODO allow canceling uploads via DELETE
  903. // TODO allow cross repo blob mount
  904. fp, err := GetBlobsPath(layer.Digest)
  905. if err != nil {
  906. return err
  907. }
  908. f, err := os.Open(fp)
  909. if err != nil {
  910. return err
  911. }
  912. totalUploaded := 0
  913. r, w := io.Pipe()
  914. defer r.Close()
  915. go func() {
  916. defer w.Close()
  917. for {
  918. n, err := io.CopyN(w, f, 1024*1024)
  919. if err != nil && !errors.Is(err, io.EOF) {
  920. fn(api.ProgressResponse{
  921. Status: fmt.Sprintf("error copying pipe: %v", err),
  922. Digest: layer.Digest,
  923. Total: layer.Size,
  924. Completed: totalUploaded,
  925. })
  926. return
  927. }
  928. totalUploaded += int(n)
  929. fn(api.ProgressResponse{
  930. Status: fmt.Sprintf("uploading %s", layer.Digest),
  931. Digest: layer.Digest,
  932. Total: layer.Size,
  933. Completed: totalUploaded,
  934. })
  935. if totalUploaded >= layer.Size {
  936. return
  937. }
  938. }
  939. }()
  940. url = fmt.Sprintf("%s&digest=%s", url, layer.Digest)
  941. headers := make(map[string]string)
  942. headers["Content-Type"] = "application/octet-stream"
  943. headers["Content-Range"] = fmt.Sprintf("0-%d", layer.Size-1)
  944. headers["Content-Length"] = strconv.Itoa(int(layer.Size))
  945. // finish the upload
  946. resp, err := makeRequest("PUT", url, headers, r, regOpts)
  947. if err != nil {
  948. log.Printf("couldn't finish upload: %v", err)
  949. return err
  950. }
  951. defer resp.Body.Close()
  952. if resp.StatusCode != http.StatusCreated {
  953. body, _ := io.ReadAll(resp.Body)
  954. return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body))
  955. }
  956. return nil
  957. }
  958. func downloadBlob(mp ModelPath, digest string, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error {
  959. fp, err := GetBlobsPath(digest)
  960. if err != nil {
  961. return err
  962. }
  963. if fi, _ := os.Stat(fp); fi != nil {
  964. // we already have the file, so return
  965. fn(api.ProgressResponse{
  966. Digest: digest,
  967. Total: int(fi.Size()),
  968. Completed: int(fi.Size()),
  969. })
  970. return nil
  971. }
  972. var size int64
  973. chunkSize := 1024 * 1024 // 1 MiB in bytes
  974. fi, err := os.Stat(fp + "-partial")
  975. switch {
  976. case errors.Is(err, os.ErrNotExist):
  977. // noop, file doesn't exist so create it
  978. case err != nil:
  979. return fmt.Errorf("stat: %w", err)
  980. default:
  981. size = fi.Size()
  982. // Ensure the size is divisible by the chunk size by removing excess bytes
  983. size -= size % int64(chunkSize)
  984. err := os.Truncate(fp+"-partial", size)
  985. if err != nil {
  986. return fmt.Errorf("truncate: %w", err)
  987. }
  988. }
  989. url := fmt.Sprintf("%s/v2/%s/blobs/%s", mp.Registry, mp.GetNamespaceRepository(), digest)
  990. headers := map[string]string{
  991. "Range": fmt.Sprintf("bytes=%d-", size),
  992. }
  993. resp, err := makeRequest("GET", url, headers, nil, regOpts)
  994. if err != nil {
  995. log.Printf("couldn't download blob: %v", err)
  996. return err
  997. }
  998. defer resp.Body.Close()
  999. if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
  1000. body, _ := io.ReadAll(resp.Body)
  1001. return fmt.Errorf("on download registry responded with code %d: %v", resp.StatusCode, string(body))
  1002. }
  1003. err = os.MkdirAll(path.Dir(fp), 0o700)
  1004. if err != nil {
  1005. return fmt.Errorf("make blobs directory: %w", err)
  1006. }
  1007. out, err := os.OpenFile(fp+"-partial", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
  1008. if err != nil {
  1009. return fmt.Errorf("open file: %w", err)
  1010. }
  1011. defer out.Close()
  1012. remaining, _ := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
  1013. completed := size
  1014. total := remaining + completed
  1015. for {
  1016. fn(api.ProgressResponse{
  1017. Status: fmt.Sprintf("pulling %s...", digest[7:19]),
  1018. Digest: digest,
  1019. Total: int(total),
  1020. Completed: int(completed),
  1021. })
  1022. if completed >= total {
  1023. if err := out.Close(); err != nil {
  1024. return err
  1025. }
  1026. if err := os.Rename(fp+"-partial", fp); err != nil {
  1027. fn(api.ProgressResponse{
  1028. Status: fmt.Sprintf("error renaming file: %v", err),
  1029. Digest: digest,
  1030. Total: int(total),
  1031. Completed: int(completed),
  1032. })
  1033. return err
  1034. }
  1035. break
  1036. }
  1037. n, err := io.CopyN(out, resp.Body, int64(chunkSize))
  1038. if err != nil && !errors.Is(err, io.EOF) {
  1039. return err
  1040. }
  1041. completed += n
  1042. }
  1043. log.Printf("success getting %s\n", digest)
  1044. return nil
  1045. }
  1046. func makeRequest(method, url string, headers map[string]string, body io.Reader, regOpts *RegistryOptions) (*http.Response, error) {
  1047. if !strings.HasPrefix(url, "http") {
  1048. if regOpts.Insecure {
  1049. url = "http://" + url
  1050. } else {
  1051. url = "https://" + url
  1052. }
  1053. }
  1054. req, err := http.NewRequest(method, url, body)
  1055. if err != nil {
  1056. return nil, err
  1057. }
  1058. for k, v := range headers {
  1059. req.Header.Set(k, v)
  1060. }
  1061. // TODO: better auth
  1062. if regOpts.Username != "" && regOpts.Password != "" {
  1063. req.SetBasicAuth(regOpts.Username, regOpts.Password)
  1064. }
  1065. client := &http.Client{
  1066. CheckRedirect: func(req *http.Request, via []*http.Request) error {
  1067. if len(via) >= 10 {
  1068. return fmt.Errorf("too many redirects")
  1069. }
  1070. log.Printf("redirected to: %s\n", req.URL)
  1071. return nil
  1072. },
  1073. }
  1074. resp, err := client.Do(req)
  1075. if err != nil {
  1076. return nil, err
  1077. }
  1078. return resp, nil
  1079. }
  1080. var errDigestMismatch = fmt.Errorf("digest mismatch, file must be downloaded again")
  1081. func verifyBlob(digest string) error {
  1082. fp, err := GetBlobsPath(digest)
  1083. if err != nil {
  1084. return err
  1085. }
  1086. f, err := os.Open(fp)
  1087. if err != nil {
  1088. return err
  1089. }
  1090. defer f.Close()
  1091. fileDigest, _ := GetSHA256Digest(f)
  1092. if digest != fileDigest {
  1093. return fmt.Errorf("%w: want %s, got %s", errDigestMismatch, digest, fileDigest)
  1094. }
  1095. return nil
  1096. }