server.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. // Package implements an Ollama registry client and server package registry
  2. package registry
  3. import (
  4. "bytes"
  5. "cmp"
  6. "context"
  7. "errors"
  8. "fmt"
  9. "log"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "path"
  14. "strconv"
  15. "time"
  16. "bllamo.com/build/blob"
  17. "bllamo.com/client/ollama"
  18. "bllamo.com/oweb"
  19. "bllamo.com/registry/apitype"
  20. "bllamo.com/utils/upload"
  21. "github.com/minio/minio-go/v7"
  22. "github.com/minio/minio-go/v7/pkg/credentials"
  23. )
  24. // Defaults
  25. const (
  26. DefaultUploadChunkSize = 50 * 1024 * 1024
  27. )
  28. // TODO(bmizerany): move all env things to package envkobs?
  29. var defaultLibrary = cmp.Or(os.Getenv("OLLAMA_REGISTRY"), "registry.ollama.ai/library")
  30. func DefaultLibrary() string {
  31. return defaultLibrary
  32. }
  33. type Server struct {
  34. UploadChunkSize int64 // default is DefaultUploadChunkSize
  35. minioClient *minio.Client
  36. }
  37. func New(mc *minio.Client) *Server {
  38. return &Server{minioClient: mc}
  39. }
  40. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  41. if err := s.serveHTTP(w, r); err != nil {
  42. log.Printf("error: %v", err) // TODO(bmizerany): take a slog.Logger
  43. var e *ollama.Error
  44. if !errors.As(err, &e) {
  45. e = oweb.ErrInternal
  46. }
  47. w.WriteHeader(cmp.Or(e.Status, 400))
  48. if err := oweb.EncodeJSON(w, e); err != nil {
  49. log.Printf("error encoding error: %v", err)
  50. }
  51. }
  52. }
  53. func (s *Server) serveHTTP(w http.ResponseWriter, r *http.Request) error {
  54. switch r.URL.Path {
  55. case "/v1/push":
  56. return s.handlePush(w, r)
  57. case "/v1/pull":
  58. return s.handlePull(w, r)
  59. default:
  60. return oweb.ErrNotFound
  61. }
  62. }
  63. func (s *Server) uploadChunkSize() int64 {
  64. return cmp.Or(s.UploadChunkSize, DefaultUploadChunkSize)
  65. }
  66. func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error {
  67. const bucketTODO = "test"
  68. pr, err := oweb.DecodeUserJSON[apitype.PushRequest]("", r.Body)
  69. if err != nil {
  70. return err
  71. }
  72. ref := blob.ParseRef(pr.Ref)
  73. if !ref.Complete() {
  74. return oweb.Mistake("invalid", "name", "must be complete")
  75. }
  76. m, err := oweb.DecodeUserJSON[apitype.Manifest]("manifest", bytes.NewReader(pr.Manifest))
  77. if err != nil {
  78. return err
  79. }
  80. mcc := &minio.Core{Client: s.mc()}
  81. // TODO(bmizerany): complete uploads before stats for any with ETag
  82. type completeParts struct {
  83. key string
  84. parts []minio.CompletePart
  85. }
  86. completePartsByUploadID := make(map[string]completeParts)
  87. for _, pu := range pr.Uploaded {
  88. // parse the URL
  89. u, err := url.Parse(pu.URL)
  90. if err != nil {
  91. return err
  92. }
  93. q := u.Query()
  94. uploadID := q.Get("UploadId")
  95. if uploadID == "" {
  96. return oweb.Mistake("invalid", "url", "missing UploadId")
  97. }
  98. partNumber, err := strconv.Atoi(q.Get("PartNumber"))
  99. if err != nil {
  100. return oweb.Mistake("invalid", "url", "invalid or missing PartNumber")
  101. }
  102. etag := pu.ETag
  103. if etag == "" {
  104. return oweb.Mistake("invalid", "etag", "missing")
  105. }
  106. cp, ok := completePartsByUploadID[uploadID]
  107. if !ok {
  108. cp = completeParts{key: u.Path}
  109. completePartsByUploadID[uploadID] = cp
  110. }
  111. cp.parts = append(cp.parts, minio.CompletePart{
  112. PartNumber: partNumber,
  113. ETag: etag,
  114. })
  115. fmt.Println("uploadID", uploadID, "partNumber", partNumber, "etag", etag)
  116. completePartsByUploadID[uploadID] = cp
  117. }
  118. for uploadID, cp := range completePartsByUploadID {
  119. var zeroOpts minio.PutObjectOptions
  120. _, err := mcc.CompleteMultipartUpload(r.Context(), bucketTODO, cp.key, uploadID, cp.parts, zeroOpts)
  121. if err != nil {
  122. // log and continue; put backpressure on the client
  123. log.Printf("error completing upload: %v", err)
  124. }
  125. }
  126. var requirements []apitype.Requirement
  127. for _, l := range m.Layers {
  128. // TODO(bmizerany): do in parallel
  129. if l.Size == 0 {
  130. continue
  131. }
  132. // TODO(bmizerany): "global" throttle of rate of transfer
  133. pushed, err := s.statObject(r.Context(), l.Digest)
  134. if err != nil {
  135. return err
  136. }
  137. if !pushed {
  138. key := path.Join("blobs", l.Digest)
  139. uploadID, err := mcc.NewMultipartUpload(r.Context(), bucketTODO, key, minio.PutObjectOptions{})
  140. if err != nil {
  141. return err
  142. }
  143. for partNumber, c := range upload.Chunks(l.Size, s.uploadChunkSize()) {
  144. const timeToStartUpload = 15 * time.Minute
  145. signedURL, err := s.mc().Presign(r.Context(), "PUT", bucketTODO, key, timeToStartUpload, url.Values{
  146. "UploadId": []string{uploadID},
  147. "PartNumber": []string{strconv.Itoa(partNumber)},
  148. "ContentLength": []string{strconv.FormatInt(c.Size, 10)},
  149. })
  150. if err != nil {
  151. return err
  152. }
  153. requirements = append(requirements, apitype.Requirement{
  154. Digest: l.Digest,
  155. Offset: c.Offset,
  156. Size: c.Size,
  157. URL: signedURL.String(),
  158. })
  159. }
  160. }
  161. }
  162. if len(requirements) == 0 {
  163. // Commit the manifest
  164. body := bytes.NewReader(pr.Manifest)
  165. path := path.Join("manifests", path.Join(ref.Parts()...))
  166. _, err := s.mc().PutObject(r.Context(), bucketTODO, path, body, int64(len(pr.Manifest)), minio.PutObjectOptions{})
  167. if err != nil {
  168. return err
  169. }
  170. }
  171. return oweb.EncodeJSON(w, &apitype.PushResponse{Requirements: requirements})
  172. }
  173. func (s *Server) handlePull(w http.ResponseWriter, r *http.Request) error {
  174. // lookup manifest
  175. panic("TODO")
  176. }
  177. func (s *Server) statObject(ctx context.Context, digest string) (pushed bool, err error) {
  178. // HEAD the object
  179. path := path.Join("blobs", digest)
  180. _, err = s.mc().StatObject(ctx, "test", path, minio.StatObjectOptions{})
  181. if err != nil {
  182. if isNoSuchKey(err) {
  183. err = nil
  184. }
  185. return false, err
  186. }
  187. return true, nil
  188. }
  189. func isNoSuchKey(err error) bool {
  190. var e minio.ErrorResponse
  191. return errors.As(err, &e) && e.Code == "NoSuchKey"
  192. }
  193. func (s *Server) mc() *minio.Client {
  194. if s.minioClient != nil {
  195. return s.minioClient
  196. }
  197. mc, err := minio.New("localhost:9000", &minio.Options{
  198. Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
  199. Secure: false,
  200. })
  201. if err != nil {
  202. panic(err)
  203. }
  204. return mc
  205. }