|
- package registry
- import (
- "bufio"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net"
- "net/http/httptest"
- "os"
- "os/exec"
- "strings"
- "testing"
- "time"
- "bllamo.com/registry/apitype"
- "bllamo.com/utils/backoff"
- "github.com/minio/minio-go/v7"
- "github.com/minio/minio-go/v7/pkg/credentials"
- "kr.dev/diff"
- )
- const abc = "abcdefghijklmnopqrstuvwxyz"
- func testPush(t *testing.T, chunkSize int64) {
- t.Run(fmt.Sprintf("chunkSize=%d", chunkSize), func(t *testing.T) {
- mc := startMinio(t, false)
- manifest := []byte(`{
- "layers": [
- {"digest": "sha256-1", "size": 1},
- {"digest": "sha256-2", "size": 2},
- {"digest": "sha256-3", "size": 3}
- ]
- }`)
- const ref = "registry.ollama.ai/x/y:latest+Z"
- hs := httptest.NewServer(&Server{
- minioClient: mc,
- UploadChunkSize: chunkSize,
- })
- t.Cleanup(hs.Close)
- c := &Client{BaseURL: hs.URL}
- requirements, err := c.Push(context.Background(), ref, manifest, nil)
- if err != nil {
- t.Fatal(err)
- }
- if len(requirements) < 3 {
- t.Fatalf("expected at least 3 requirements; got %d", len(requirements))
- t.Logf("requirements: %v", requirements)
- }
- var uploaded []apitype.CompletePart
- for i, r := range requirements {
- t.Logf("[%d] pushing layer: offset=%d size=%d", i, r.Offset, r.Size)
- body := strings.NewReader(abc)
- etag, err := PushLayer(context.Background(), r.URL, r.Offset, r.Size, body)
- if err != nil {
- t.Fatal(err)
- }
- uploaded = append(uploaded, apitype.CompletePart{
- URL: r.URL,
- ETag: etag,
- })
- }
- requirements, err = c.Push(context.Background(), ref, manifest, &PushParams{
- Uploaded: uploaded,
- })
- if err != nil {
- t.Fatal(err)
- }
- if len(requirements) != 0 {
- t.Fatalf("unexpected requirements: %v", requirements)
- }
- var paths []string
- keys := mc.ListObjects(context.Background(), "test", minio.ListObjectsOptions{
- Recursive: true,
- })
- for k := range keys {
- paths = append(paths, k.Key)
- }
- t.Logf("paths: %v", paths)
- diff.Test(t, t.Errorf, paths, []string{
- "blobs/sha256-1",
- "blobs/sha256-2",
- "blobs/sha256-3",
- "manifests/registry.ollama.ai/x/y/latest/Z",
- })
- obj, err := mc.GetObject(context.Background(), "test", "manifests/registry.ollama.ai/x/y/latest/Z", minio.GetObjectOptions{})
- if err != nil {
- t.Fatal(err)
- }
- defer obj.Close()
- var gotM apitype.Manifest
- if err := json.NewDecoder(obj).Decode(&gotM); err != nil {
- t.Fatal(err)
- }
- diff.Test(t, t.Errorf, gotM, apitype.Manifest{
- Layers: []apitype.Layer{
- {Digest: "sha256-1", Size: 1},
- {Digest: "sha256-2", Size: 2},
- {Digest: "sha256-3", Size: 3},
- },
- })
- // checksum the blobs
- for i, l := range gotM.Layers {
- obj, err := mc.GetObject(context.Background(), "test", "blobs/"+l.Digest, minio.GetObjectOptions{})
- if err != nil {
- t.Fatal(err)
- }
- defer obj.Close()
- info, err := obj.Stat()
- if err != nil {
- t.Fatal(err)
- }
- t.Logf("[%d] layer info: name=%q l.Size=%d size=%d", i, info.Key, l.Size, info.Size)
- data, err := io.ReadAll(obj)
- if err != nil {
- t.Fatal(err)
- }
- got := string(data)
- want := abc[:l.Size]
- if got != want {
- t.Errorf("[%d] got layer data = %q; want %q", i, got, want)
- }
- }
- })
- }
- func TestPush(t *testing.T) {
- testPush(t, 0)
- testPush(t, 1)
- }
- func availableAddr() string {
- l, err := net.Listen("tcp", "localhost:0")
- if err != nil {
- panic(err)
- }
- defer l.Close()
- return l.Addr().String()
- }
- func startMinio(t *testing.T, debug bool) *minio.Client {
- t.Helper()
- dir := t.TempDir()
- t.Logf(">> minio data dir: %s", dir)
- addr := availableAddr()
- cmd := exec.Command("minio", "server", "--address", addr, dir)
- cmd.Env = os.Environ()
- if debug {
- stdout, err := cmd.StdoutPipe()
- if err != nil {
- t.Fatal(err)
- }
- doneLogging := make(chan struct{})
- t.Cleanup(func() {
- <-doneLogging
- })
- go func() {
- defer close(doneLogging)
- sc := bufio.NewScanner(stdout)
- for sc.Scan() {
- t.Logf("minio: %s", sc.Text())
- }
- }()
- }
- // TODO(bmizerany): wait delay etc...
- if err := cmd.Start(); err != nil {
- t.Fatal(err)
- }
- t.Cleanup(func() {
- cmd.Process.Kill()
- if err := cmd.Wait(); err != nil {
- var e *exec.ExitError
- if errors.As(err, &e) && e.Exited() {
- t.Logf("minio stderr: %s", e.Stderr)
- t.Logf("minio exit status: %v", e.ExitCode())
- t.Logf("minio exited: %v", e.Exited())
- t.Error(err)
- }
- }
- })
- mc, err := minio.New(addr, &minio.Options{
- Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
- Secure: false,
- })
- if err != nil {
- t.Fatal(err)
- }
- ctx, cancel := context.WithCancel(context.Background())
- deadline, ok := t.Deadline()
- if ok {
- ctx, cancel = context.WithDeadline(ctx, deadline.Add(-100*time.Millisecond))
- defer cancel()
- }
- // wait for server to start with exponential backoff
- for _, err := range backoff.Upto(ctx, 1*time.Second) {
- if err != nil {
- t.Fatal(err)
- }
- if mc.IsOnline() {
- break
- }
- }
- if err := mc.MakeBucket(context.Background(), "test", minio.MakeBucketOptions{}); err != nil {
- t.Fatal(err)
- }
- return mc
- }
- // contextForTest returns a context that is canceled when the test deadline,
- // if any, is reached. The returned doneLogging function should be called
- // after all Log/Error/Fatalf calls are done before the test returns.
- func contextForTest(t *testing.T) (_ context.Context, doneLogging func()) {
- done := make(chan struct{})
- deadline, ok := t.Deadline()
- if !ok {
- return context.Background(), func() {}
- }
- ctx, cancel := context.WithDeadline(context.Background(), deadline.Add(-100*time.Millisecond))
- t.Cleanup(func() {
- cancel()
- <-done
- })
- return ctx, func() { close(done) }
- }
|