cache.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. // Package blob implements a content-addressable disk cache for blobs and
  2. // manifests.
  3. package blob
  4. import (
  5. "bytes"
  6. "crypto/sha256"
  7. "errors"
  8. "fmt"
  9. "hash"
  10. "io"
  11. "io/fs"
  12. "iter"
  13. "os"
  14. "path/filepath"
  15. "strings"
  16. "time"
  17. "github.com/ollama/ollama/server/internal/internal/names"
  18. )
  19. // Entry contains metadata about a blob in the cache.
  20. type Entry struct {
  21. Digest Digest
  22. Size int64
  23. Time time.Time // when added to the cache
  24. }
  25. // DiskCache caches blobs and manifests on disk.
  26. //
  27. // The cache is rooted at a directory, which is created if it does not exist.
  28. //
  29. // Blobs are stored in the "blobs" subdirectory, and manifests are stored in the
  30. // "manifests" subdirectory. A example directory structure might look like:
  31. //
  32. // <dir>/
  33. // blobs/
  34. // sha256-<digest> - <blob data>
  35. // manifests/
  36. // <host>/
  37. // <namespace>/
  38. // <name>/
  39. // <tag> - <manifest data>
  40. //
  41. // The cache is safe for concurrent use.
  42. //
  43. // Name casing is preserved in the cache, but is not significant when resolving
  44. // names. For example, "Foo" and "foo" are considered the same name.
  45. //
  46. // The cache is not safe for concurrent use. It guards concurrent writes, but
  47. // does not prevent duplicated effort. Because blobs are immutable, duplicate
  48. // writes should result in the same file being written to disk.
  49. type DiskCache struct {
  50. // Dir specifies the top-level directory where blobs and manifest
  51. // pointers are stored.
  52. dir string
  53. now func() time.Time
  54. testHookBeforeFinalWrite func(f *os.File)
  55. }
  56. // PutString is a convenience function for c.Put(d, strings.NewReader(s), int64(len(s))).
  57. func PutBytes[S string | []byte](c *DiskCache, d Digest, data S) error {
  58. return c.Put(d, bytes.NewReader([]byte(data)), int64(len(data)))
  59. }
  60. // Open opens a cache rooted at the given directory. If the directory does not
  61. // exist, it is created. If the directory is not a directory, an error is
  62. // returned.
  63. func Open(dir string) (*DiskCache, error) {
  64. if dir == "" {
  65. return nil, errors.New("blob: empty directory name")
  66. }
  67. info, err := os.Stat(dir)
  68. if err == nil && !info.IsDir() {
  69. return nil, fmt.Errorf("%q is not a directory", dir)
  70. }
  71. if err := os.MkdirAll(dir, 0o777); err != nil {
  72. return nil, err
  73. }
  74. subdirs := []string{"blobs", "manifests"}
  75. for _, subdir := range subdirs {
  76. if err := os.MkdirAll(filepath.Join(dir, subdir), 0o777); err != nil {
  77. return nil, err
  78. }
  79. }
  80. // TODO(bmizerany): support shards
  81. c := &DiskCache{
  82. dir: dir,
  83. now: time.Now,
  84. }
  85. return c, nil
  86. }
  87. func readAndSum(filename string, limit int64) (data []byte, _ Digest, err error) {
  88. f, err := os.Open(filename)
  89. if err != nil {
  90. return nil, Digest{}, err
  91. }
  92. defer f.Close()
  93. h := sha256.New()
  94. r := io.TeeReader(f, h)
  95. data, err = io.ReadAll(io.LimitReader(r, limit))
  96. if err != nil {
  97. return nil, Digest{}, err
  98. }
  99. var d Digest
  100. h.Sum(d.sum[:0])
  101. return data, d, nil
  102. }
  103. //lint:ignore U1000 used for debugging purposes as needed in tests
  104. var debug = false
  105. // debugger returns a function that can be used to add a step to the error message.
  106. // The error message will be a list of steps that were taken before the error occurred.
  107. // The steps are added in the order they are called.
  108. //
  109. // To set the error message, call the returned function with an empty string.
  110. //
  111. //lint:ignore U1000 used for debugging purposes as needed in tests
  112. func debugger(err *error) func(step string) {
  113. if !debug {
  114. return func(string) {}
  115. }
  116. var steps []string
  117. return func(step string) {
  118. if step == "" && *err != nil {
  119. *err = fmt.Errorf("%q: %w", steps, *err)
  120. return
  121. }
  122. steps = append(steps, step)
  123. if len(steps) > 100 {
  124. // shift hints in case of a bug that causes a lot of hints
  125. copy(steps, steps[1:])
  126. steps = steps[:100]
  127. }
  128. }
  129. }
  130. // Resolve resolves a name to a digest. The name is expected to
  131. // be in either of the following forms:
  132. //
  133. // @<digest>
  134. // <name>
  135. // <name>
  136. //
  137. // If a digest is provided, it is returned as is and nothing else happens.
  138. //
  139. // If a name is provided for a manifest that exists in the cache, the digest
  140. // of the manifest is returned. If there is no manifest in the cache, it
  141. // returns [fs.ErrNotExist].
  142. //
  143. // To cover the case where a manifest may change without the cache knowing
  144. // (e.g. it was reformatted or modified by hand), the manifest data read and
  145. // hashed is passed to a PutBytes call to ensure that the manifest is in the
  146. // blob store. This is done to ensure that future calls to [Get] succeed in
  147. // these cases.
  148. //
  149. // TODO(bmizerany): Move Links/Resolve/etc. out of this package.
  150. func (c *DiskCache) Resolve(name string) (Digest, error) {
  151. name, digest := splitNameDigest(name)
  152. if digest != "" {
  153. return ParseDigest(digest)
  154. }
  155. // We want to address manifests files by digest using Get. That requires
  156. // them to be blobs. This cannot be directly accomplished by looking in
  157. // the blob store because manifests can change without Ollama knowing
  158. // (e.g. a user modifies a manifests by hand then pushes it to update
  159. // their model). We also need to support the blob caches inherited from
  160. // older versions of Ollama, which do not store manifests in the blob
  161. // store, so for these cases, we need to handle adding the manifests to
  162. // the blob store, just in time.
  163. //
  164. // So now we read the manifests file, hash it, and copy it to the blob
  165. // store if it's not already there.
  166. //
  167. // This should be cheap because manifests are small, and accessed
  168. // infrequently.
  169. file, err := c.manifestPath(name)
  170. if err != nil {
  171. return Digest{}, err
  172. }
  173. data, d, err := readAndSum(file, 1<<20)
  174. if err != nil {
  175. return Digest{}, err
  176. }
  177. // Ideally we'd read the "manifest" file as a manifest to the blob file,
  178. // but we are not changing this yet, so copy the manifest to the blob
  179. // store so it can be addressed by digest subsequent calls to Get.
  180. if err := PutBytes(c, d, data); err != nil {
  181. return Digest{}, err
  182. }
  183. return d, nil
  184. }
  185. // Put writes a new blob to the cache, identified by its digest. The operation
  186. // reads content from r, which must precisely match both the specified size and
  187. // digest.
  188. //
  189. // Concurrent write safety is achieved through file locking. The implementation
  190. // guarantees write integrity by enforcing size limits and content validation
  191. // before allowing the file to reach its final state.
  192. func (c *DiskCache) Put(d Digest, r io.Reader, size int64) error {
  193. return c.copyNamedFile(c.GetFile(d), r, d, size)
  194. }
  195. // Import imports a blob from the provided reader into the cache. It reads the
  196. // entire content of the reader, calculates its digest, and stores it in the
  197. // cache.
  198. //
  199. // Import should be considered unsafe for use with untrusted data, such as data
  200. // read from a network. The caller is responsible for ensuring the integrity of
  201. // the data being imported.
  202. func (c *DiskCache) Import(r io.Reader, size int64) (Digest, error) {
  203. // users that want to change the temp dir can set TEMPDIR.
  204. f, err := os.CreateTemp("", "blob-")
  205. if err != nil {
  206. return Digest{}, err
  207. }
  208. defer os.Remove(f.Name())
  209. // Copy the blob to a temporary file.
  210. h := sha256.New()
  211. r = io.TeeReader(r, h)
  212. n, err := io.Copy(f, r)
  213. if err != nil {
  214. return Digest{}, err
  215. }
  216. if n != size {
  217. return Digest{}, fmt.Errorf("blob: expected %d bytes, got %d", size, n)
  218. }
  219. // Check the digest.
  220. var d Digest
  221. h.Sum(d.sum[:0])
  222. if err := f.Close(); err != nil {
  223. return Digest{}, err
  224. }
  225. name := c.GetFile(d)
  226. // Rename the temporary file to the final file.
  227. if err := os.Rename(f.Name(), name); err != nil {
  228. return Digest{}, err
  229. }
  230. os.Chtimes(name, c.now(), c.now()) // mainly for tests
  231. return d, nil
  232. }
  233. // Get retrieves a blob from the cache using the provided digest. The operation
  234. // fails if the digest is malformed or if any errors occur during blob
  235. // retrieval.
  236. func (c *DiskCache) Get(d Digest) (Entry, error) {
  237. name := c.GetFile(d)
  238. info, err := os.Stat(name)
  239. if err != nil {
  240. return Entry{}, err
  241. }
  242. if info.Size() == 0 {
  243. return Entry{}, fs.ErrNotExist
  244. }
  245. return Entry{
  246. Digest: d,
  247. Size: info.Size(),
  248. Time: info.ModTime(),
  249. }, nil
  250. }
  251. // Link creates a symbolic reference in the cache that maps the provided name
  252. // to a blob identified by its digest, making it retrievable by name using
  253. // [Resolve].
  254. //
  255. // It returns an error if either the name or digest is invalid, or if link
  256. // creation encounters any issues.
  257. func (c *DiskCache) Link(name string, d Digest) error {
  258. manifest, err := c.manifestPath(name)
  259. if err != nil {
  260. return err
  261. }
  262. f, err := os.OpenFile(c.GetFile(d), os.O_RDONLY, 0)
  263. if err != nil {
  264. return err
  265. }
  266. defer f.Close()
  267. // TODO(bmizerany): test this happens only if the blob was found to
  268. // avoid leaving debris
  269. if err := os.MkdirAll(filepath.Dir(manifest), 0o777); err != nil {
  270. return err
  271. }
  272. info, err := f.Stat()
  273. if err != nil {
  274. return err
  275. }
  276. // Copy manifest to cache directory.
  277. return c.copyNamedFile(manifest, f, d, info.Size())
  278. }
  279. // Unlink removes the any link for name. If the link does not exist, nothing
  280. // happens, and no error is returned.
  281. //
  282. // It returns an error if the name is invalid or if the link removal encounters
  283. // any issues.
  284. func (c *DiskCache) Unlink(name string) error {
  285. manifest, err := c.manifestPath(name)
  286. if err != nil {
  287. return err
  288. }
  289. err = os.Remove(manifest)
  290. if errors.Is(err, fs.ErrNotExist) {
  291. return nil
  292. }
  293. return err
  294. }
  295. // GetFile returns the absolute path to the file, in the cache, for the given
  296. // digest. It does not check if the file exists.
  297. //
  298. // The returned path should not be stored, used outside the lifetime of the
  299. // cache, or interpreted in any way.
  300. func (c *DiskCache) GetFile(d Digest) string {
  301. filename := fmt.Sprintf("sha256-%x", d.sum)
  302. return absJoin(c.dir, "blobs", filename)
  303. }
  304. // Links returns a sequence of links in the cache in lexical order.
  305. func (c *DiskCache) Links() iter.Seq2[string, error] {
  306. return func(yield func(string, error) bool) {
  307. for path, err := range c.links() {
  308. if err != nil {
  309. yield("", err)
  310. return
  311. }
  312. if !yield(pathToName(path), nil) {
  313. return
  314. }
  315. }
  316. }
  317. }
  318. // pathToName converts a path to a name. It is the inverse of nameToPath. The
  319. // path is assumed to be in filepath.ToSlash format.
  320. func pathToName(s string) string {
  321. s = strings.TrimPrefix(s, "manifests/")
  322. rr := []rune(s)
  323. for i := len(rr) - 1; i > 0; i-- {
  324. if rr[i] == '/' {
  325. rr[i] = ':'
  326. return string(rr)
  327. }
  328. }
  329. return s
  330. }
  331. // manifestPath finds the first manifest file on disk that matches the given
  332. // name using a case-insensitive comparison. If no manifest file is found, it
  333. // returns the path where the manifest file would be if it existed.
  334. //
  335. // If two manifest files exists on disk that match the given name using a
  336. // case-insensitive comparison, the one that sorts first, lexically, is
  337. // returned.
  338. func (c *DiskCache) manifestPath(name string) (string, error) {
  339. np, err := nameToPath(name)
  340. if err != nil {
  341. return "", err
  342. }
  343. maybe := filepath.Join("manifests", np)
  344. for l, err := range c.links() {
  345. if err != nil {
  346. return "", err
  347. }
  348. if strings.EqualFold(maybe, l) {
  349. return filepath.Join(c.dir, l), nil
  350. }
  351. }
  352. return filepath.Join(c.dir, maybe), nil
  353. }
  354. // links returns a sequence of links in the cache in lexical order.
  355. func (c *DiskCache) links() iter.Seq2[string, error] {
  356. // TODO(bmizerany): reuse empty dirnames if exist
  357. return func(yield func(string, error) bool) {
  358. fsys := os.DirFS(c.dir)
  359. manifests, err := fs.Glob(fsys, "manifests/*/*/*/*")
  360. if err != nil {
  361. yield("", err)
  362. return
  363. }
  364. for _, manifest := range manifests {
  365. if !yield(manifest, nil) {
  366. return
  367. }
  368. }
  369. }
  370. }
  371. type checkWriter struct {
  372. d Digest
  373. size int64
  374. n int64
  375. h hash.Hash
  376. f *os.File
  377. err error
  378. testHookBeforeFinalWrite func(*os.File)
  379. }
  380. func (w *checkWriter) seterr(err error) error {
  381. if w.err == nil {
  382. w.err = err
  383. }
  384. return err
  385. }
  386. // Write writes p to the underlying hash and writer. The last write to the
  387. // underlying writer is guaranteed to be the last byte of p as verified by the
  388. // hash.
  389. func (w *checkWriter) Write(p []byte) (int, error) {
  390. _, err := w.h.Write(p)
  391. if err != nil {
  392. return 0, w.seterr(err)
  393. }
  394. nextSize := w.n + int64(len(p))
  395. if nextSize == w.size {
  396. // last write. check hash.
  397. sum := w.h.Sum(nil)
  398. if !bytes.Equal(sum, w.d.sum[:]) {
  399. return 0, w.seterr(fmt.Errorf("file content changed underfoot"))
  400. }
  401. if w.testHookBeforeFinalWrite != nil {
  402. w.testHookBeforeFinalWrite(w.f)
  403. }
  404. }
  405. if nextSize > w.size {
  406. return 0, w.seterr(fmt.Errorf("content exceeds expected size: %d > %d", nextSize, w.size))
  407. }
  408. n, err := w.f.Write(p)
  409. w.n += int64(n)
  410. return n, w.seterr(err)
  411. }
  412. // copyNamedFile copies file into name, expecting it to have the given Digest
  413. // and size, if that file is not present already.
  414. func (c *DiskCache) copyNamedFile(name string, file io.Reader, out Digest, size int64) error {
  415. info, err := os.Stat(name)
  416. if err == nil && info.Size() == size {
  417. // File already exists with correct size. This is good enough.
  418. // We can skip expensive hash checks.
  419. //
  420. // TODO: Do the hash check, but give caller a way to skip it.
  421. return nil
  422. }
  423. // Copy file to cache directory.
  424. mode := os.O_RDWR | os.O_CREATE
  425. if err == nil && info.Size() > size { // shouldn't happen but fix in case
  426. mode |= os.O_TRUNC
  427. }
  428. f, err := os.OpenFile(name, mode, 0o666)
  429. if err != nil {
  430. return err
  431. }
  432. defer f.Close()
  433. if size == 0 {
  434. // File now exists with correct size.
  435. // Only one possible zero-length file, so contents are OK too.
  436. // Early return here makes sure there's a "last byte" for code below.
  437. return nil
  438. }
  439. // From here on, if any of the I/O writing the file fails,
  440. // we make a best-effort attempt to truncate the file f
  441. // before returning, to avoid leaving bad bytes in the file.
  442. // Copy file to f, but also into h to double-check hash.
  443. cw := &checkWriter{
  444. d: out,
  445. size: size,
  446. h: sha256.New(),
  447. f: f,
  448. testHookBeforeFinalWrite: c.testHookBeforeFinalWrite,
  449. }
  450. n, err := io.Copy(cw, file)
  451. if err != nil {
  452. f.Truncate(0)
  453. return err
  454. }
  455. if n < size {
  456. f.Truncate(0)
  457. return io.ErrUnexpectedEOF
  458. }
  459. if err := f.Close(); err != nil {
  460. // Data might not have been written,
  461. // but file may look like it is the right size.
  462. // To be extra careful, remove cached file.
  463. os.Remove(name)
  464. return err
  465. }
  466. os.Chtimes(name, c.now(), c.now()) // mainly for tests
  467. return nil
  468. }
  469. func splitNameDigest(s string) (name, digest string) {
  470. i := strings.LastIndexByte(s, '@')
  471. if i < 0 {
  472. return s, ""
  473. }
  474. return s[:i], s[i+1:]
  475. }
  476. var errInvalidName = errors.New("invalid name")
  477. func nameToPath(name string) (_ string, err error) {
  478. if strings.Contains(name, "@") {
  479. // TODO(bmizerany): HACK: Fix names.Parse to validate.
  480. // TODO(bmizerany): merge with default parts (maybe names.Merge(a, b))
  481. return "", errInvalidName
  482. }
  483. n := names.Parse(name)
  484. if !n.IsFullyQualified() {
  485. return "", errInvalidName
  486. }
  487. return filepath.Join(n.Host(), n.Namespace(), n.Model(), n.Tag()), nil
  488. }
  489. func absJoin(pp ...string) string {
  490. abs, err := filepath.Abs(filepath.Join(pp...))
  491. if err != nil {
  492. // Likely a bug bug or a bad OS problem. Just panic.
  493. panic(err)
  494. }
  495. return abs
  496. }