chunked.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package blob
  2. import (
  3. "crypto/sha256"
  4. "errors"
  5. "io"
  6. "os"
  7. "github.com/ollama/ollama/server/internal/chunks"
  8. )
  9. type Chunk = chunks.Chunk // TODO: move chunks here?
  10. // Chunker writes to a blob in chunks.
  11. // Its zero value is invalid. Use [DiskCache.Chunked] to create a new Chunker.
  12. type Chunker struct {
  13. digest Digest
  14. size int64
  15. f *os.File // nil means pre-validated
  16. }
  17. // Chunked returns a new Chunker, ready for use storing a blob of the given
  18. // size in chunks.
  19. //
  20. // Use [Chunker.Put] to write data to the blob at specific offsets.
  21. func (c *DiskCache) Chunked(d Digest, size int64) (*Chunker, error) {
  22. name := c.GetFile(d)
  23. info, err := os.Stat(name)
  24. if err == nil && info.Size() == size {
  25. return &Chunker{}, nil
  26. }
  27. f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY, 0o666)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return &Chunker{digest: d, size: size, f: f}, nil
  32. }
  33. // Put copies chunk.Size() bytes from r to the blob at the given offset,
  34. // merging the data with the existing blob. It returns an error if any. As a
  35. // special case, if r has less than chunk.Size() bytes, Put returns
  36. // io.ErrUnexpectedEOF.
  37. func (c *Chunker) Put(chunk Chunk, d Digest, r io.Reader) error {
  38. if c.f == nil {
  39. return nil
  40. }
  41. cw := &checkWriter{
  42. d: d,
  43. size: chunk.Size(),
  44. h: sha256.New(),
  45. f: c.f,
  46. w: io.NewOffsetWriter(c.f, chunk.Start),
  47. }
  48. _, err := io.CopyN(cw, r, chunk.Size())
  49. if err != nil && errors.Is(err, io.EOF) {
  50. return io.ErrUnexpectedEOF
  51. }
  52. return err
  53. }
  54. // Close closes the underlying file.
  55. func (c *Chunker) Close() error {
  56. return c.f.Close()
  57. }