server.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. // Package implements an Ollama registry client and server package registry
  2. package registry
  3. import (
  4. "bytes"
  5. "cmp"
  6. "context"
  7. "errors"
  8. "fmt"
  9. "log"
  10. "net/http"
  11. "net/url"
  12. "path"
  13. "strconv"
  14. "strings"
  15. "time"
  16. "github.com/minio/minio-go/v7"
  17. "github.com/minio/minio-go/v7/pkg/credentials"
  18. "github.com/ollama/ollama/x/client/ollama"
  19. "github.com/ollama/ollama/x/model"
  20. "github.com/ollama/ollama/x/oweb"
  21. "github.com/ollama/ollama/x/registry/apitype"
  22. "github.com/ollama/ollama/x/utils/upload"
  23. )
  24. // Defaults
  25. const (
  26. DefaultUploadChunkSize = 50 * 1024 * 1024
  27. )
  28. type Server struct {
  29. UploadChunkSize int64 // default is DefaultUploadChunkSize
  30. S3Client *minio.Client
  31. }
  32. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  33. if err := s.serveHTTP(w, r); err != nil {
  34. log.Printf("error: %v", err) // TODO(bmizerany): take a slog.Logger
  35. var e *ollama.Error
  36. if !errors.As(err, &e) {
  37. e = oweb.ErrInternal
  38. }
  39. w.WriteHeader(cmp.Or(e.Status, 400))
  40. if err := oweb.EncodeJSON(w, e); err != nil {
  41. log.Printf("error encoding error: %v", err)
  42. }
  43. }
  44. }
  45. func (s *Server) serveHTTP(w http.ResponseWriter, r *http.Request) error {
  46. switch r.URL.Path {
  47. case "/v1/push":
  48. return s.handlePush(w, r)
  49. case "/v1/pull":
  50. return s.handlePull(w, r)
  51. default:
  52. return oweb.ErrNotFound
  53. }
  54. }
  55. func (s *Server) uploadChunkSize() int64 {
  56. return cmp.Or(s.UploadChunkSize, DefaultUploadChunkSize)
  57. }
  58. func (s *Server) handlePush(w http.ResponseWriter, r *http.Request) error {
  59. const bucketTODO = "test"
  60. const minimumMultipartSize = 5 * 1024 * 1024 // S3 spec
  61. pr, err := oweb.DecodeUserJSON[apitype.PushRequest]("", r.Body)
  62. if err != nil {
  63. return err
  64. }
  65. mp := model.ParseName(pr.Name)
  66. if !mp.IsComplete() {
  67. return oweb.Invalid("name", pr.Name, "must be complete")
  68. }
  69. m, err := oweb.DecodeUserJSON[apitype.Manifest]("manifest", bytes.NewReader(pr.Manifest))
  70. if err != nil {
  71. return err
  72. }
  73. mcc := &minio.Core{Client: s.s3()}
  74. // TODO(bmizerany): complete uploads before stats for any with ETag
  75. type completeParts struct {
  76. key string
  77. parts []minio.CompletePart
  78. }
  79. completePartsByUploadID := make(map[string]completeParts)
  80. for _, mcp := range pr.CompleteParts {
  81. // parse the URL
  82. u, err := url.Parse(mcp.URL)
  83. if err != nil {
  84. return err
  85. }
  86. q := u.Query()
  87. // Check if this is a part upload, if not, skip
  88. uploadID := q.Get("uploadId")
  89. if uploadID == "" {
  90. // not a part upload
  91. continue
  92. }
  93. // PartNumber is required
  94. queryPartNumber := q.Get("partNumber")
  95. partNumber, err := strconv.Atoi(queryPartNumber)
  96. if err != nil {
  97. return oweb.Invalid("partNumber", queryPartNumber, "")
  98. }
  99. if partNumber < 1 {
  100. return oweb.Invalid("partNumber", queryPartNumber, "must be >= 1")
  101. }
  102. // ETag is required
  103. if mcp.ETag == "" {
  104. return oweb.Missing("etag")
  105. }
  106. cp := completePartsByUploadID[uploadID]
  107. cp.key = u.Path
  108. cp.parts = append(cp.parts, minio.CompletePart{
  109. PartNumber: partNumber,
  110. ETag: mcp.ETag,
  111. })
  112. completePartsByUploadID[uploadID] = cp
  113. }
  114. for uploadID, cp := range completePartsByUploadID {
  115. var zeroOpts minio.PutObjectOptions
  116. // TODO: gross fix!!!!!!!!!!!!!!!
  117. key := strings.TrimPrefix(cp.key, "/"+bucketTODO+"/")
  118. fmt.Printf("Completing multipart upload %s %s %v\n", bucketTODO, key, cp.parts)
  119. _, err := mcc.CompleteMultipartUpload(r.Context(), bucketTODO, key, uploadID, cp.parts, zeroOpts)
  120. if err != nil {
  121. var e minio.ErrorResponse
  122. if errors.As(err, &e) && e.Code == "NoSuchUpload" {
  123. return oweb.Invalid("uploadId", uploadID, "")
  124. }
  125. return err
  126. }
  127. }
  128. var requirements []apitype.Requirement
  129. for _, l := range m.Layers {
  130. // TODO(bmizerany): do in parallel
  131. if l.Size == 0 {
  132. continue
  133. }
  134. // TODO(bmizerany): "global" throttle of rate of transfer
  135. pushed, err := s.statObject(r.Context(), l.Digest)
  136. if err != nil {
  137. println("ERROR:", "statObject", err)
  138. return err
  139. }
  140. if !pushed {
  141. key := path.Join("blobs", l.Digest)
  142. if l.Size < minimumMultipartSize {
  143. // single part upload
  144. fmt.Printf("Presigning single %s %s\n", bucketTODO, key)
  145. signedURL, err := s.s3().PresignedPutObject(r.Context(), bucketTODO, key, 15*time.Minute)
  146. if err != nil {
  147. println("ERROR:", "presign single", err)
  148. return err
  149. }
  150. requirements = append(requirements, apitype.Requirement{
  151. Digest: l.Digest,
  152. Size: l.Size,
  153. URL: signedURL.String(),
  154. })
  155. } else {
  156. uploadID, err := mcc.NewMultipartUpload(r.Context(), bucketTODO, key, minio.PutObjectOptions{})
  157. if err != nil {
  158. return err
  159. }
  160. fmt.Printf("Presigning multi %s %s %s\n", bucketTODO, key, uploadID)
  161. for partNumber, c := range upload.Chunks(l.Size, s.uploadChunkSize()) {
  162. const timeToStartUpload = 15 * time.Minute
  163. signedURL, err := s.s3().Presign(r.Context(), "PUT", bucketTODO, key, timeToStartUpload, url.Values{
  164. "partNumber": []string{strconv.Itoa(partNumber)},
  165. "uploadId": []string{uploadID},
  166. })
  167. if err != nil {
  168. println("ERROR:", "presign multi", err)
  169. return err
  170. }
  171. requirements = append(requirements, apitype.Requirement{
  172. Digest: l.Digest,
  173. Offset: c.Offset,
  174. Size: c.N,
  175. URL: signedURL.String(),
  176. })
  177. }
  178. }
  179. }
  180. }
  181. if len(requirements) == 0 {
  182. // Commit the manifest
  183. body := bytes.NewReader(pr.Manifest)
  184. path := path.Join("manifests", path.Join(mp.Parts()...))
  185. _, err := s.s3().PutObject(r.Context(), bucketTODO, path, body, int64(len(pr.Manifest)), minio.PutObjectOptions{})
  186. if err != nil {
  187. return err
  188. }
  189. }
  190. return oweb.EncodeJSON(w, &apitype.PushResponse{Requirements: requirements})
  191. }
  192. func (s *Server) handlePull(w http.ResponseWriter, r *http.Request) error {
  193. // lookup manifest
  194. panic("TODO")
  195. }
  196. func (s *Server) statObject(ctx context.Context, digest string) (pushed bool, err error) {
  197. // HEAD the object
  198. path := path.Join("blobs", digest)
  199. _, err = s.s3().StatObject(ctx, "test", path, minio.StatObjectOptions{})
  200. if err != nil {
  201. if isNoSuchKey(err) {
  202. err = nil
  203. }
  204. return false, err
  205. }
  206. return true, nil
  207. }
  208. func isNoSuchKey(err error) bool {
  209. var e minio.ErrorResponse
  210. return errors.As(err, &e) && e.Code == "NoSuchKey"
  211. }
  212. func (s *Server) s3() *minio.Client {
  213. if s.S3Client != nil {
  214. return s.S3Client
  215. }
  216. s3, err := minio.New("localhost:9000", &minio.Options{
  217. Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
  218. Secure: false,
  219. })
  220. if err != nil {
  221. panic(err)
  222. }
  223. return s3
  224. }