server.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. // Package implements an Ollama registry client and server package registry
  2. package registry
  3. import (
  4. "bytes"
  5. "cmp"
  6. "context"
  7. "errors"
  8. "log"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. "path"
  13. "strconv"
  14. "time"
  15. "github.com/minio/minio-go/v7"
  16. "github.com/minio/minio-go/v7/pkg/credentials"
  17. "github.com/ollama/ollama/x/client/ollama"
  18. "github.com/ollama/ollama/x/model"
  19. "github.com/ollama/ollama/x/oweb"
  20. "github.com/ollama/ollama/x/registry/apitype"
  21. "github.com/ollama/ollama/x/utils/upload"
  22. )
  23. // Defaults
  24. const (
  25. DefaultUploadChunkSize = 50 * 1024 * 1024
  26. )
  27. // TODO(bmizerany): move all env things to package envkobs?
  28. var defaultLibrary = cmp.Or(os.Getenv("OLLAMA_REGISTRY"), "registry.ollama.ai/library")
  29. func DefaultLibrary() string {
  30. return defaultLibrary
  31. }
  32. type Server struct {
  33. UploadChunkSize int64 // default is DefaultUploadChunkSize
  34. minioClient *minio.Client
  35. }
  36. func New(mc *minio.Client) *Server {
  37. return &Server{minioClient: mc}
  38. }
  39. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  40. if err := s.serveHTTP(w, r); err != nil {
  41. log.Printf("error: %v", err) // TODO(bmizerany): take a slog.Logger
  42. var e *ollama.Error
  43. if !errors.As(err, &e) {
  44. e = oweb.ErrInternal
  45. }
  46. w.WriteHeader(cmp.Or(e.Status, 400))
  47. if err := oweb.EncodeJSON(w, e); err != nil {
  48. log.Printf("error encoding error: %v", err)
  49. }
  50. }
  51. }
  52. func (s *Server) serveHTTP(w http.ResponseWriter, r *http.Request) error {
  53. switch r.URL.Path {
  54. case "/v1/push":
  55. return s.handlePush(w, r)
  56. case "/v1/pull":
  57. return s.handlePull(w, r)
  58. default:
  59. return oweb.ErrNotFound
  60. }
  61. }
  62. func (s *Server) uploadChunkSize() int64 {
  63. return cmp.Or(s.UploadChunkSize, DefaultUploadChunkSize)
  64. }
  65. func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error {
  66. const bucketTODO = "test"
  67. const minimumMultipartSize = 5 * 1024 * 1024 // S3 spec
  68. pr, err := oweb.DecodeUserJSON[apitype.PushRequest]("", r.Body)
  69. if err != nil {
  70. return err
  71. }
  72. mp := model.ParseName(pr.Name)
  73. if !mp.Complete() {
  74. return oweb.Invalid("name", pr.Name, "must be complete")
  75. }
  76. m, err := oweb.DecodeUserJSON[apitype.Manifest]("manifest", bytes.NewReader(pr.Manifest))
  77. if err != nil {
  78. return err
  79. }
  80. mcc := &minio.Core{Client: s.mc()}
  81. // TODO(bmizerany): complete uploads before stats for any with ETag
  82. type completeParts struct {
  83. key string
  84. parts []minio.CompletePart
  85. }
  86. completePartsByUploadID := make(map[string]completeParts)
  87. for _, mcp := range pr.CompleteParts {
  88. // parse the URL
  89. u, err := url.Parse(mcp.URL)
  90. if err != nil {
  91. return err
  92. }
  93. q := u.Query()
  94. // Check if this is a part upload, if not, skip
  95. uploadID := q.Get("uploadId")
  96. if uploadID == "" {
  97. // not a part upload
  98. continue
  99. }
  100. // PartNumber is required
  101. queryPartNumber := q.Get("partNumber")
  102. partNumber, err := strconv.Atoi(queryPartNumber)
  103. if err != nil {
  104. return oweb.Invalid("partNumber", queryPartNumber, "invalid or missing PartNumber")
  105. }
  106. // ETag is required
  107. if mcp.ETag == "" {
  108. return oweb.Missing("etag")
  109. }
  110. cp := completePartsByUploadID[uploadID]
  111. cp.key = u.Path
  112. cp.parts = append(cp.parts, minio.CompletePart{
  113. PartNumber: partNumber,
  114. ETag: mcp.ETag,
  115. })
  116. completePartsByUploadID[uploadID] = cp
  117. }
  118. for uploadID, cp := range completePartsByUploadID {
  119. var zeroOpts minio.PutObjectOptions
  120. _, err := mcc.CompleteMultipartUpload(r.Context(), bucketTODO, cp.key, uploadID, cp.parts, zeroOpts)
  121. if err != nil {
  122. var e minio.ErrorResponse
  123. if errors.As(err, &e) && e.Code == "NoSuchUpload" {
  124. return oweb.Invalid("uploadId", uploadID, "unknown uploadId")
  125. }
  126. return err
  127. }
  128. }
  129. var requirements []apitype.Requirement
  130. for _, l := range m.Layers {
  131. // TODO(bmizerany): do in parallel
  132. if l.Size == 0 {
  133. continue
  134. }
  135. // TODO(bmizerany): "global" throttle of rate of transfer
  136. pushed, err := s.statObject(r.Context(), l.Digest)
  137. if err != nil {
  138. return err
  139. }
  140. if !pushed {
  141. key := path.Join("blobs", l.Digest)
  142. if l.Size < minimumMultipartSize {
  143. // single part upload
  144. signedURL, err := s.mc().PresignedPutObject(r.Context(), bucketTODO, key, 15*time.Minute)
  145. if err != nil {
  146. return err
  147. }
  148. requirements = append(requirements, apitype.Requirement{
  149. Digest: l.Digest,
  150. Size: l.Size,
  151. URL: signedURL.String(),
  152. })
  153. } else {
  154. uploadID, err := mcc.NewMultipartUpload(r.Context(), bucketTODO, key, minio.PutObjectOptions{})
  155. if err != nil {
  156. return err
  157. }
  158. for partNumber, c := range upload.Chunks(l.Size, s.uploadChunkSize()) {
  159. const timeToStartUpload = 15 * time.Minute
  160. signedURL, err := s.mc().Presign(r.Context(), "PUT", bucketTODO, key, timeToStartUpload, url.Values{
  161. "uploadId": []string{uploadID},
  162. "partNumber": []string{strconv.Itoa(partNumber)},
  163. })
  164. if err != nil {
  165. return err
  166. }
  167. requirements = append(requirements, apitype.Requirement{
  168. Digest: l.Digest,
  169. Offset: c.Offset,
  170. Size: c.N,
  171. URL: signedURL.String(),
  172. })
  173. }
  174. }
  175. }
  176. }
  177. if len(requirements) == 0 {
  178. // Commit the manifest
  179. body := bytes.NewReader(pr.Manifest)
  180. path := path.Join("manifests", path.Join(mp.Parts()...))
  181. _, err := s.mc().PutObject(r.Context(), bucketTODO, path, body, int64(len(pr.Manifest)), minio.PutObjectOptions{})
  182. if err != nil {
  183. return err
  184. }
  185. }
  186. return oweb.EncodeJSON(w, &apitype.PushResponse{Requirements: requirements})
  187. }
  188. func (s *Server) handlePull(w http.ResponseWriter, r *http.Request) error {
  189. // lookup manifest
  190. panic("TODO")
  191. }
  192. func (s *Server) statObject(ctx context.Context, digest string) (pushed bool, err error) {
  193. // HEAD the object
  194. path := path.Join("blobs", digest)
  195. _, err = s.mc().StatObject(ctx, "test", path, minio.StatObjectOptions{})
  196. if err != nil {
  197. if isNoSuchKey(err) {
  198. err = nil
  199. }
  200. return false, err
  201. }
  202. return true, nil
  203. }
  204. func isNoSuchKey(err error) bool {
  205. var e minio.ErrorResponse
  206. return errors.As(err, &e) && e.Code == "NoSuchKey"
  207. }
  208. func (s *Server) mc() *minio.Client {
  209. if s.minioClient != nil {
  210. return s.minioClient
  211. }
  212. mc, err := minio.New("localhost:9000", &minio.Options{
  213. Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
  214. Secure: false,
  215. })
  216. if err != nil {
  217. panic(err)
  218. }
  219. return mc
  220. }