server_test.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package registry
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "net/http/httptest"
  11. "os"
  12. "os/exec"
  13. "strings"
  14. "testing"
  15. "time"
  16. "bllamo.com/registry/apitype"
  17. "bllamo.com/utils/backoff"
  18. "github.com/minio/minio-go/v7"
  19. "github.com/minio/minio-go/v7/pkg/credentials"
  20. "kr.dev/diff"
  21. )
  22. const abc = "abcdefghijklmnopqrstuvwxyz"
  23. func testPush(t *testing.T, chunkSize int64) {
  24. t.Run(fmt.Sprintf("chunkSize=%d", chunkSize), func(t *testing.T) {
  25. mc := startMinio(t, false)
  26. manifest := []byte(`{
  27. "layers": [
  28. {"digest": "sha256-1", "size": 1},
  29. {"digest": "sha256-2", "size": 2},
  30. {"digest": "sha256-3", "size": 3}
  31. ]
  32. }`)
  33. const ref = "registry.ollama.ai/x/y:latest+Z"
  34. hs := httptest.NewServer(&Server{
  35. minioClient: mc,
  36. UploadChunkSize: chunkSize,
  37. })
  38. t.Cleanup(hs.Close)
  39. c := &Client{BaseURL: hs.URL}
  40. requirements, err := c.Push(context.Background(), ref, manifest, nil)
  41. if err != nil {
  42. t.Fatal(err)
  43. }
  44. if len(requirements) < 3 {
  45. t.Fatalf("expected at least 3 requirements; got %d", len(requirements))
  46. t.Logf("requirements: %v", requirements)
  47. }
  48. var uploaded []apitype.CompletePart
  49. for i, r := range requirements {
  50. t.Logf("[%d] pushing layer: offset=%d size=%d", i, r.Offset, r.Size)
  51. body := strings.NewReader(abc)
  52. etag, err := PushLayer(context.Background(), r.URL, r.Offset, r.Size, body)
  53. if err != nil {
  54. t.Fatal(err)
  55. }
  56. uploaded = append(uploaded, apitype.CompletePart{
  57. URL: r.URL,
  58. ETag: etag,
  59. })
  60. }
  61. requirements, err = c.Push(context.Background(), ref, manifest, &PushParams{
  62. Uploaded: uploaded,
  63. })
  64. if err != nil {
  65. t.Fatal(err)
  66. }
  67. if len(requirements) != 0 {
  68. t.Fatalf("unexpected requirements: %v", requirements)
  69. }
  70. var paths []string
  71. keys := mc.ListObjects(context.Background(), "test", minio.ListObjectsOptions{
  72. Recursive: true,
  73. })
  74. for k := range keys {
  75. paths = append(paths, k.Key)
  76. }
  77. t.Logf("paths: %v", paths)
  78. diff.Test(t, t.Errorf, paths, []string{
  79. "blobs/sha256-1",
  80. "blobs/sha256-2",
  81. "blobs/sha256-3",
  82. "manifests/registry.ollama.ai/x/y/latest/Z",
  83. })
  84. obj, err := mc.GetObject(context.Background(), "test", "manifests/registry.ollama.ai/x/y/latest/Z", minio.GetObjectOptions{})
  85. if err != nil {
  86. t.Fatal(err)
  87. }
  88. defer obj.Close()
  89. var gotM apitype.Manifest
  90. if err := json.NewDecoder(obj).Decode(&gotM); err != nil {
  91. t.Fatal(err)
  92. }
  93. diff.Test(t, t.Errorf, gotM, apitype.Manifest{
  94. Layers: []apitype.Layer{
  95. {Digest: "sha256-1", Size: 1},
  96. {Digest: "sha256-2", Size: 2},
  97. {Digest: "sha256-3", Size: 3},
  98. },
  99. })
  100. // checksum the blobs
  101. for i, l := range gotM.Layers {
  102. obj, err := mc.GetObject(context.Background(), "test", "blobs/"+l.Digest, minio.GetObjectOptions{})
  103. if err != nil {
  104. t.Fatal(err)
  105. }
  106. defer obj.Close()
  107. info, err := obj.Stat()
  108. if err != nil {
  109. t.Fatal(err)
  110. }
  111. t.Logf("[%d] layer info: name=%q l.Size=%d size=%d", i, info.Key, l.Size, info.Size)
  112. data, err := io.ReadAll(obj)
  113. if err != nil {
  114. t.Fatal(err)
  115. }
  116. got := string(data)
  117. want := abc[:l.Size]
  118. if got != want {
  119. t.Errorf("[%d] got layer data = %q; want %q", i, got, want)
  120. }
  121. }
  122. })
  123. }
  124. func TestPush(t *testing.T) {
  125. testPush(t, 0)
  126. testPush(t, 1)
  127. }
  128. func availableAddr() string {
  129. l, err := net.Listen("tcp", "localhost:0")
  130. if err != nil {
  131. panic(err)
  132. }
  133. defer l.Close()
  134. return l.Addr().String()
  135. }
  136. func startMinio(t *testing.T, debug bool) *minio.Client {
  137. t.Helper()
  138. dir := t.TempDir()
  139. t.Logf(">> minio data dir: %s", dir)
  140. addr := availableAddr()
  141. cmd := exec.Command("minio", "server", "--address", addr, dir)
  142. cmd.Env = os.Environ()
  143. if debug {
  144. stdout, err := cmd.StdoutPipe()
  145. if err != nil {
  146. t.Fatal(err)
  147. }
  148. doneLogging := make(chan struct{})
  149. t.Cleanup(func() {
  150. <-doneLogging
  151. })
  152. go func() {
  153. defer close(doneLogging)
  154. sc := bufio.NewScanner(stdout)
  155. for sc.Scan() {
  156. t.Logf("minio: %s", sc.Text())
  157. }
  158. }()
  159. }
  160. // TODO(bmizerany): wait delay etc...
  161. if err := cmd.Start(); err != nil {
  162. t.Fatal(err)
  163. }
  164. t.Cleanup(func() {
  165. cmd.Process.Kill()
  166. if err := cmd.Wait(); err != nil {
  167. var e *exec.ExitError
  168. if errors.As(err, &e) && e.Exited() {
  169. t.Logf("minio stderr: %s", e.Stderr)
  170. t.Logf("minio exit status: %v", e.ExitCode())
  171. t.Logf("minio exited: %v", e.Exited())
  172. t.Error(err)
  173. }
  174. }
  175. })
  176. mc, err := minio.New(addr, &minio.Options{
  177. Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
  178. Secure: false,
  179. })
  180. if err != nil {
  181. t.Fatal(err)
  182. }
  183. ctx, cancel := context.WithCancel(context.Background())
  184. deadline, ok := t.Deadline()
  185. if ok {
  186. ctx, cancel = context.WithDeadline(ctx, deadline.Add(-100*time.Millisecond))
  187. defer cancel()
  188. }
  189. // wait for server to start with exponential backoff
  190. for _, err := range backoff.Upto(ctx, 1*time.Second) {
  191. if err != nil {
  192. t.Fatal(err)
  193. }
  194. if mc.IsOnline() {
  195. break
  196. }
  197. }
  198. if err := mc.MakeBucket(context.Background(), "test", minio.MakeBucketOptions{}); err != nil {
  199. t.Fatal(err)
  200. }
  201. return mc
  202. }
  203. // contextForTest returns a context that is canceled when the test deadline,
  204. // if any, is reached. The returned doneLogging function should be called
  205. // after all Log/Error/Fatalf calls are done before the test returns.
  206. func contextForTest(t *testing.T) (_ context.Context, doneLogging func()) {
  207. done := make(chan struct{})
  208. deadline, ok := t.Deadline()
  209. if !ok {
  210. return context.Background(), func() {}
  211. }
  212. ctx, cancel := context.WithDeadline(context.Background(), deadline.Add(-100*time.Millisecond))
  213. t.Cleanup(func() {
  214. cancel()
  215. <-done
  216. })
  217. return ctx, func() { close(done) }
  218. }