blob.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. // Package blobstore implements a blob store.
  2. package blobstore
  3. import (
  4. "bytes"
  5. "crypto/sha256"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/fs"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "time"
  14. "github.com/ollama/ollama/x/types/structs"
  15. )
  16. var (
  17. ErrInvalidID = errors.New("invalid ID")
  18. )
  19. const HashSize = 32
  20. // An ID is a blob output key, the hash of an output of a computation.
  21. type ID struct {
  22. a [HashSize]byte
  23. }
  24. func (id ID) MarshalText() ([]byte, error) {
  25. return []byte(id.String()), nil
  26. }
  27. func (id *ID) UnmarshalText(text []byte) error {
  28. *id = ParseID(string(text))
  29. return nil
  30. }
  31. func ParseID(s string) ID {
  32. const prefix = "sha256-"
  33. h, ok := strings.CutPrefix(s, prefix)
  34. if !ok {
  35. return ID{}
  36. }
  37. if len(h) != HashSize*2 {
  38. return ID{}
  39. }
  40. var b []byte
  41. _, err := fmt.Sscanf(h, "%x", &b)
  42. if err != nil {
  43. return ID{}
  44. }
  45. var id ID
  46. copy(id.a[:], b)
  47. return id
  48. }
  49. func (id ID) String() string {
  50. if !id.Valid() {
  51. return ""
  52. }
  53. return fmt.Sprintf("sha256-%x", id.a[:])
  54. }
  55. func (id ID) Valid() bool {
  56. return id != ID{}
  57. }
  58. func (id ID) Match(h [HashSize]byte) bool {
  59. return id.a == h
  60. }
  61. // A Store is a blob store, backed by a file system directory tree.
  62. type Store struct {
  63. dir string
  64. now func() time.Time
  65. }
  66. // Open opens and returns the store in the given directory.
  67. //
  68. // It is safe for multiple processes on a single machine to use the
  69. // same store directory in a local file system simultaneously.
  70. // They will coordinate using operating system file locks and may
  71. // duplicate effort but will not corrupt the store.
  72. //
  73. // However, it is NOT safe for multiple processes on different machines
  74. // to share a store directory (for example, if the directory were stored
  75. // in a network file system). File locking is notoriously unreliable in
  76. // network file systems and may not suffice to protect the store.
  77. func Open(dir string) (*Store, error) {
  78. info, err := os.Stat(dir)
  79. if err != nil {
  80. return nil, err
  81. }
  82. if !info.IsDir() {
  83. return nil, &fs.PathError{Op: "open", Path: dir, Err: fmt.Errorf("not a directory")}
  84. }
  85. if err := os.MkdirAll(filepath.Join(dir, "blobs"), 0777); err != nil {
  86. return nil, err
  87. }
  88. c := &Store{
  89. dir: dir,
  90. now: time.Now,
  91. }
  92. return c, nil
  93. }
  94. func (s *Store) Dir() string {
  95. return s.dir
  96. }
  97. // fileName returns the name of the blob file corresponding to the given id.
  98. func (s *Store) fileName(id ID) string {
  99. return filepath.Join(s.dir, "blobs", fmt.Sprintf("sha256-%x", id.a[:]))
  100. }
  101. // An entryNotFoundError indicates that a store entry was not found, with an
  102. // optional underlying reason.
  103. type entryNotFoundError struct {
  104. Err error
  105. }
  106. func (e *entryNotFoundError) Error() string {
  107. if e.Err == nil {
  108. return "store entry not found"
  109. }
  110. return fmt.Sprintf("store entry not found: %v", e.Err)
  111. }
  112. func (e *entryNotFoundError) Unwrap() error {
  113. return e.Err
  114. }
  115. type Entry struct {
  116. _ structs.Incomparable
  117. ID ID
  118. Size int64
  119. Time time.Time // when added to store
  120. }
  121. // GetFile looks up the blob ID in the store and returns
  122. // the name of the corresponding data file.
  123. func GetFile(s *Store, id ID) (file string, entry Entry, err error) {
  124. entry, err = s.Get(id)
  125. if err != nil {
  126. return "", Entry{}, err
  127. }
  128. file = s.OutputFilename(entry.ID)
  129. info, err := os.Stat(file)
  130. if err != nil {
  131. return "", Entry{}, &entryNotFoundError{Err: err}
  132. }
  133. if info.Size() != entry.Size {
  134. return "", Entry{}, &entryNotFoundError{Err: errors.New("file incomplete")}
  135. }
  136. return file, entry, nil
  137. }
  138. // GetBytes looks up the blob ID in the store and returns
  139. // the corresponding output bytes.
  140. // GetBytes should only be used for data that can be expected to fit in memory.
  141. func GetBytes(s *Store, id ID) ([]byte, Entry, error) {
  142. entry, err := s.Get(id)
  143. if err != nil {
  144. return nil, entry, err
  145. }
  146. data, _ := os.ReadFile(s.OutputFilename(entry.ID))
  147. if entry.ID.Match(sha256.Sum256(data)) {
  148. return nil, entry, &entryNotFoundError{Err: errors.New("bad checksum")}
  149. }
  150. return data, entry, nil
  151. }
  152. // OutputFilename returns the name of the blob file for the given ID.
  153. func (s *Store) OutputFilename(id ID) string {
  154. file := s.fileName(id)
  155. // TODO(bmizerany): touch as "used" for cache trimming. (see
  156. // cache.go in cmd/go/internal/cache for the full reference implementation to go off of.
  157. return file
  158. }
  159. // Get looks up the blob ID in the store,
  160. // returning the corresponding output ID and file size, if any.
  161. // Note that finding an output ID does not guarantee that the
  162. // saved file for that output ID is still available.
  163. func (s *Store) Get(id ID) (Entry, error) {
  164. file := s.fileName(id)
  165. info, err := os.Stat(file)
  166. if err != nil {
  167. return Entry{}, &entryNotFoundError{Err: err}
  168. }
  169. return Entry{
  170. ID: id,
  171. Size: info.Size(),
  172. Time: info.ModTime(),
  173. }, nil
  174. }
  175. func (s *Store) Close() error {
  176. // TODO(bmizerany): return c.Trim()
  177. return nil
  178. }
  179. // Put stores the data read from the given file into the store as ID.
  180. //
  181. // It may read file twice. The content of file must not change between the
  182. // two passes.
  183. func (s *Store) Put(file io.ReadSeeker) (ID, int64, error) {
  184. return s.put(file)
  185. }
  186. func PutBytes(s *Store, data []byte) (ID, int64, error) {
  187. return s.Put(bytes.NewReader(data))
  188. }
  189. func PutString(s *Store, data string) (ID, int64, error) {
  190. return s.Put(strings.NewReader(data))
  191. }
  192. func (s *Store) put(file io.ReadSeeker) (ID, int64, error) {
  193. // Compute output ID.
  194. h := sha256.New()
  195. if _, err := file.Seek(0, 0); err != nil {
  196. return ID{}, 0, err
  197. }
  198. size, err := io.Copy(h, file)
  199. if err != nil {
  200. return ID{}, 0, err
  201. }
  202. var out ID
  203. h.Sum(out.a[:0])
  204. // Copy to blob file (if not already present).
  205. if err := s.copyFile(file, out, size); err != nil {
  206. return out, size, err
  207. }
  208. // TODO: Add to manifest index.
  209. return out, size, nil
  210. }
  211. // copyFile copies file into the store, expecting it to have the given
  212. // output ID and size, if that file is not present already.
  213. func (s *Store) copyFile(file io.ReadSeeker, out ID, size int64) error {
  214. name := s.fileName(out)
  215. println("name", name)
  216. info, err := os.Stat(name)
  217. if err == nil && info.Size() == size {
  218. // Check hash.
  219. if f, err := os.Open(name); err == nil {
  220. h := sha256.New()
  221. io.Copy(h, f)
  222. f.Close()
  223. var out2 ID
  224. h.Sum(out2.a[:0])
  225. if out == out2 {
  226. return nil
  227. }
  228. }
  229. // Hash did not match. Fall through and rewrite file.
  230. }
  231. // Copy file to blobs directory.
  232. mode := os.O_RDWR | os.O_CREATE
  233. if err == nil && info.Size() > size { // shouldn't happen but fix in case
  234. mode |= os.O_TRUNC
  235. }
  236. f, err := os.OpenFile(name, mode, 0666)
  237. if err != nil {
  238. return err
  239. }
  240. defer f.Close()
  241. if size == 0 {
  242. // File now exists with correct size.
  243. // Only one possible zero-length file, so contents are OK too.
  244. // Early return here makes sure there's a "last byte" for code below.
  245. return nil
  246. }
  247. // From here on, if any of the I/O writing the file fails,
  248. // we make a best-effort attempt to truncate the file f
  249. // before returning, to avoid leaving bad bytes in the file.
  250. // Copy file to f, but also into h to double-check hash.
  251. if _, err := file.Seek(0, 0); err != nil {
  252. f.Truncate(0)
  253. return err
  254. }
  255. h := sha256.New()
  256. w := io.MultiWriter(f, h)
  257. if _, err := io.CopyN(w, file, size-1); err != nil {
  258. f.Truncate(0)
  259. return err
  260. }
  261. // Check last byte before writing it; writing it will make the size match
  262. // what other processes expect to find and might cause them to start
  263. // using the file.
  264. buf := make([]byte, 1)
  265. if _, err := file.Read(buf); err != nil {
  266. f.Truncate(0)
  267. return err
  268. }
  269. h.Write(buf)
  270. sum := h.Sum(nil)
  271. if !bytes.Equal(sum, out.a[:]) {
  272. f.Truncate(0)
  273. return fmt.Errorf("file content changed underfoot")
  274. }
  275. // Commit manifest entry.
  276. if _, err := f.Write(buf); err != nil {
  277. f.Truncate(0)
  278. return err
  279. }
  280. if err := f.Close(); err != nil {
  281. // Data might not have been written,
  282. // but file may look like it is the right size.
  283. // To be extra careful, remove stored file.
  284. os.Remove(name)
  285. return err
  286. }
  287. os.Chtimes(name, s.now(), s.now()) // mainly for tests
  288. return nil
  289. }