server_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package registry
  2. import (
  3. "bufio"
  4. "bytes"
  5. "cmp"
  6. "context"
  7. "crypto/sha256"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "net"
  13. "net/http/httptest"
  14. "net/url"
  15. "os"
  16. "os/exec"
  17. "strconv"
  18. "syscall"
  19. "testing"
  20. "time"
  21. "bllamo.com/registry/apitype"
  22. "bllamo.com/utils/backoff"
  23. "bllamo.com/utils/upload"
  24. "github.com/minio/minio-go/v7"
  25. "github.com/minio/minio-go/v7/pkg/credentials"
  26. "kr.dev/diff"
  27. )
  28. func TestPushBasic(t *testing.T) {
  29. const MB = 1024 * 1024
  30. mc := startMinio(t, true)
  31. defer func() {
  32. mcc := &minio.Core{Client: mc}
  33. // fail if there are any incomplete uploads
  34. for x := range mcc.ListIncompleteUploads(context.Background(), "test", "theKey", true) {
  35. t.Errorf("incomplete: %v", x)
  36. }
  37. }()
  38. // Upload two small layers and one large layer that will
  39. // trigger a multipart upload.
  40. manifest := []byte(`{
  41. "layers": [
  42. {"digest": "sha256-1", "size": 1},
  43. {"digest": "sha256-2", "size": 2},
  44. {"digest": "sha256-3", "size": 11000000}
  45. ]
  46. }`)
  47. const ref = "registry.ollama.ai/x/y:latest+Z"
  48. hs := httptest.NewServer(&Server{
  49. minioClient: mc,
  50. UploadChunkSize: 5 * MB,
  51. })
  52. t.Cleanup(hs.Close)
  53. c := &Client{BaseURL: hs.URL}
  54. requirements, err := c.Push(context.Background(), ref, manifest, nil)
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. if len(requirements) < 3 {
  59. t.Errorf("expected at least 3 requirements; got %d", len(requirements))
  60. t.Logf("requirements: %v", requirements)
  61. }
  62. var uploaded []apitype.CompletePart
  63. for i, r := range requirements {
  64. t.Logf("[%d] pushing layer: offset=%d size=%d", i, r.Offset, r.Size)
  65. cp, err := PushLayer(context.Background(), &abcReader{}, r.URL, r.Offset, r.Size)
  66. if err != nil {
  67. t.Fatal(err)
  68. }
  69. uploaded = append(uploaded, cp)
  70. }
  71. requirements, err = c.Push(context.Background(), ref, manifest, &PushParams{
  72. CompleteParts: uploaded,
  73. })
  74. if err != nil {
  75. t.Fatal(err)
  76. }
  77. if len(requirements) != 0 {
  78. t.Errorf("unexpected requirements: %v", requirements)
  79. }
  80. var paths []string
  81. keys := mc.ListObjects(context.Background(), "test", minio.ListObjectsOptions{
  82. Recursive: true,
  83. })
  84. for k := range keys {
  85. paths = append(paths, k.Key)
  86. }
  87. t.Logf("paths: %v", paths)
  88. diff.Test(t, t.Errorf, paths, []string{
  89. "blobs/sha256-1",
  90. "blobs/sha256-2",
  91. "blobs/sha256-3",
  92. "manifests/registry.ollama.ai/x/y/latest/Z",
  93. })
  94. obj, err := mc.GetObject(context.Background(), "test", "manifests/registry.ollama.ai/x/y/latest/Z", minio.GetObjectOptions{})
  95. if err != nil {
  96. t.Fatal(err)
  97. }
  98. defer obj.Close()
  99. var gotM apitype.Manifest
  100. if err := json.NewDecoder(obj).Decode(&gotM); err != nil {
  101. t.Fatal(err)
  102. }
  103. diff.Test(t, t.Errorf, gotM, apitype.Manifest{
  104. Layers: []apitype.Layer{
  105. {Digest: "sha256-1", Size: 1},
  106. {Digest: "sha256-2", Size: 2},
  107. {Digest: "sha256-3", Size: 3},
  108. },
  109. })
  110. // checksum the blobs
  111. for i, l := range gotM.Layers {
  112. obj, err := mc.GetObject(context.Background(), "test", "blobs/"+l.Digest, minio.GetObjectOptions{})
  113. if err != nil {
  114. t.Fatal(err)
  115. }
  116. defer obj.Close()
  117. info, err := obj.Stat()
  118. if err != nil {
  119. t.Fatal(err)
  120. }
  121. t.Logf("[%d] layer info: name=%q l.Size=%d size=%d", i, info.Key, l.Size, info.Size)
  122. if msg := checkABCs(obj, int(l.Size)); msg != "" {
  123. t.Errorf("[%d] %s", i, msg)
  124. }
  125. }
  126. }
  127. // TestBasicPresignS3MultipartReferenceDoNotDelete tests the basic flow of
  128. // presigning a multipart upload, uploading the parts, and completing the
  129. // upload. It is for future reference and should not be deleted. This flow
  130. // is tricky and if we get it wrong in our server, we can refer back to this
  131. // as a "back to basics" test/reference.
  132. func TestBasicPresignS3MultipartReferenceDoNotDelete(t *testing.T) {
  133. t.Skip("skipping reference test; unskip when needed")
  134. mc := startMinio(t, true)
  135. mcc := &minio.Core{Client: mc}
  136. uploadID, err := mcc.NewMultipartUpload(context.Background(), "test", "theKey", minio.PutObjectOptions{})
  137. if err != nil {
  138. t.Fatal(err)
  139. }
  140. var completed []minio.CompletePart
  141. const size int64 = 10 * 1024 * 1024
  142. const chunkSize = 5 * 1024 * 1024
  143. for partNumber, c := range upload.Chunks(size, chunkSize) {
  144. u, err := mcc.Presign(context.Background(), "PUT", "test", "theKey", 15*time.Minute, url.Values{
  145. "partNumber": {strconv.Itoa(partNumber)},
  146. "uploadId": {uploadID},
  147. })
  148. if err != nil {
  149. t.Fatalf("[partNumber=%d]: %v", partNumber, err)
  150. }
  151. t.Logf("[partNumber=%d]: %v", partNumber, u)
  152. var body abcReader
  153. cp, err := PushLayer(context.Background(), &body, u.String(), c.Offset, c.N)
  154. if err != nil {
  155. t.Fatalf("[partNumber=%d]: %v", partNumber, err)
  156. }
  157. t.Logf("completed part: %v", cp)
  158. // behave like server here (don't cheat and use partNumber)
  159. // instead get partNumber from the URL
  160. retPartNumber, err := strconv.Atoi(u.Query().Get("partNumber"))
  161. if err != nil {
  162. t.Fatalf("[partNumber=%d]: %v", partNumber, err)
  163. }
  164. completed = append(completed, minio.CompletePart{
  165. PartNumber: retPartNumber,
  166. ETag: cp.ETag,
  167. })
  168. }
  169. defer func() {
  170. // fail if there are any incomplete uploads
  171. for x := range mcc.ListIncompleteUploads(context.Background(), "test", "theKey", true) {
  172. t.Errorf("incomplete: %v", x)
  173. }
  174. }()
  175. info, err := mcc.CompleteMultipartUpload(context.Background(), "test", "theKey", uploadID, completed, minio.PutObjectOptions{})
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. t.Logf("completed: %v", info)
  180. // Check key in bucket
  181. obj, err := mc.GetObject(context.Background(), "test", "theKey", minio.GetObjectOptions{})
  182. if err != nil {
  183. t.Fatal(err)
  184. }
  185. defer obj.Close()
  186. h := sha256.New()
  187. if _, err := io.Copy(h, obj); err != nil {
  188. t.Fatal(err)
  189. }
  190. gotSum := h.Sum(nil)
  191. h.Reset()
  192. var body abcReader
  193. if _, err := io.CopyN(h, &body, size); err != nil {
  194. t.Fatal(err)
  195. }
  196. wantSum := h.Sum(nil)
  197. if !bytes.Equal(gotSum, wantSum) {
  198. t.Errorf("got sum = %x; want %x", gotSum, wantSum)
  199. }
  200. }
  201. func availableAddr() string {
  202. l, err := net.Listen("tcp", "localhost:0")
  203. if err != nil {
  204. panic(err)
  205. }
  206. defer l.Close()
  207. return l.Addr().String()
  208. }
  209. // tracing is "experimental" and may be removed in the future, I can't get it to
  210. // work consistently, but I'm leaving it in for now.
  211. func startMinio(t *testing.T, trace bool) *minio.Client {
  212. t.Helper()
  213. // Trace is enabled by setting the OLLAMA_MINIO_TRACE environment or
  214. // explicitly setting trace to true.
  215. trace = cmp.Or(trace, os.Getenv("OLLAMA_MINIO_TRACE") != "")
  216. dir := t.TempDir()
  217. t.Cleanup(func() {
  218. // TODO(bmizerany): trim temp dir based on dates so that
  219. // future runs may be able to inspect results for some time.
  220. })
  221. waitAndMaybeLogError := func(cmd *exec.Cmd) {
  222. if err := cmd.Wait(); err != nil {
  223. var e *exec.ExitError
  224. if errors.As(err, &e) {
  225. if e.Exited() {
  226. return
  227. }
  228. t.Logf("startMinio: %s stderr: %s", cmd.Path, e.Stderr)
  229. t.Logf("startMinio: %s exit status: %v", cmd.Path, e.ExitCode())
  230. t.Logf("startMinio: %s exited: %v", cmd.Path, e.Exited())
  231. t.Logf("startMinio: %s stderr: %s", cmd.Path, e.Stderr)
  232. } else {
  233. if errors.Is(err, context.Canceled) {
  234. return
  235. }
  236. t.Logf("startMinio: %s exit error: %v", cmd.Path, err)
  237. }
  238. }
  239. }
  240. // Cancel must be called first so do wait to add to Cleanup
  241. // stack as last cleanup.
  242. ctx, cancel := context.WithCancel(context.Background())
  243. deadline, ok := t.Deadline()
  244. if ok {
  245. ctx, cancel = context.WithDeadline(ctx, deadline.Add(-100*time.Millisecond))
  246. }
  247. t.Logf(">> minio: minio server %s", dir)
  248. addr := availableAddr()
  249. cmd := exec.CommandContext(ctx, "minio", "server", "--address", addr, dir)
  250. cmd.Env = os.Environ()
  251. cmd.WaitDelay = 3 * time.Second
  252. cmd.Cancel = func() error {
  253. return cmd.Process.Signal(syscall.SIGQUIT)
  254. }
  255. if err := cmd.Start(); err != nil {
  256. t.Fatalf("startMinio: %v", err)
  257. }
  258. t.Cleanup(func() {
  259. cancel()
  260. waitAndMaybeLogError(cmd)
  261. })
  262. mc, err := minio.New(addr, &minio.Options{
  263. Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
  264. Secure: false,
  265. })
  266. if err != nil {
  267. t.Fatalf("startMinio: %v", err)
  268. }
  269. // wait for server to start with exponential backoff
  270. for _, err := range backoff.Upto(ctx, 1*time.Second) {
  271. if err != nil {
  272. t.Fatalf("startMinio: %v", err)
  273. }
  274. // try list buckets to see if server is up
  275. if _, err := mc.ListBuckets(ctx); err == nil {
  276. break
  277. }
  278. t.Logf("startMinio: server is offline; retrying")
  279. }
  280. if trace {
  281. cmd := exec.CommandContext(ctx, "mc", "admin", "trace", "--verbose", "test")
  282. cmd.Env = append(os.Environ(),
  283. "MC_HOST_test=http://minioadmin:minioadmin@"+addr,
  284. )
  285. cmd.WaitDelay = 3 * time.Second
  286. cmd.Cancel = func() error {
  287. return cmd.Process.Signal(syscall.SIGQUIT)
  288. }
  289. stdout, err := cmd.StdoutPipe()
  290. if err != nil {
  291. t.Fatalf("startMinio: %v", err)
  292. }
  293. if err := cmd.Start(); err != nil {
  294. t.Fatalf("startMinio: %v", err)
  295. }
  296. doneLogging := make(chan struct{})
  297. sc := bufio.NewScanner(stdout)
  298. go func() {
  299. defer close(doneLogging)
  300. // Scan lines until the process exits.
  301. for sc.Scan() {
  302. t.Logf("startMinio: mc trace: %s", sc.Text())
  303. }
  304. _ = sc.Err() // ignore (not important)
  305. }()
  306. t.Cleanup(func() {
  307. cancel()
  308. waitAndMaybeLogError(cmd)
  309. // Make sure we do not log after test exists to
  310. // avoid panic.
  311. <-doneLogging
  312. })
  313. }
  314. if err := mc.MakeBucket(context.Background(), "test", minio.MakeBucketOptions{}); err != nil {
  315. t.Fatalf("startMinio: %v", err)
  316. }
  317. return mc
  318. }
  319. // contextForTest returns a context that is canceled when the test deadline,
  320. // if any, is reached. The returned doneLogging function should be called
  321. // after all Log/Error/Fatalf calls are done before the test returns.
  322. func contextForTest(t *testing.T) (_ context.Context, doneLogging func()) {
  323. done := make(chan struct{})
  324. deadline, ok := t.Deadline()
  325. if !ok {
  326. return context.Background(), func() {}
  327. }
  328. ctx, cancel := context.WithDeadline(context.Background(), deadline.Add(-100*time.Millisecond))
  329. t.Cleanup(func() {
  330. cancel()
  331. <-done
  332. })
  333. return ctx, func() { close(done) }
  334. }
  335. // abcReader repeats the string s infinitely.
  336. type abcReader struct {
  337. pos int
  338. }
  339. const theABCs = "abcdefghijklmnopqrstuvwxyz"
  340. func (r *abcReader) Read(p []byte) (n int, err error) {
  341. for i := range p {
  342. p[i] = theABCs[r.pos]
  343. r.pos++
  344. if r.pos == len(theABCs) {
  345. r.pos = 0
  346. }
  347. }
  348. return len(p), nil
  349. }
  350. func (r *abcReader) ReadAt(p []byte, off int64) (n int, err error) {
  351. for i := range p {
  352. p[i] = theABCs[(off+int64(i))%int64(len(theABCs))]
  353. }
  354. return len(p), nil
  355. }
  356. func checkABCs(r io.Reader, size int) (reason string) {
  357. h := sha256.New()
  358. n, err := io.CopyN(h, &abcReader{}, int64(size))
  359. if err != nil {
  360. return err.Error()
  361. }
  362. if n != int64(size) {
  363. panic("short read; should not happen")
  364. }
  365. want := h.Sum(nil)
  366. h = sha256.New()
  367. n, err = io.Copy(h, r)
  368. if err != nil {
  369. return err.Error()
  370. }
  371. if n != int64(size) {
  372. return fmt.Sprintf("got len(r) = %d; want %d", n, size)
  373. }
  374. got := h.Sum(nil)
  375. if !bytes.Equal(got, want) {
  376. return fmt.Sprintf("got sum = %x; want %x", got, want)
  377. }
  378. return ""
  379. }