Skip to content

Commit 5c8fea0

Browse files
committed
feat: operator scalable
1 parent 3d107b1 commit 5c8fea0

15 files changed

+408
-337
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package container
2+
3+
import (
4+
"encoding/base64"
5+
"encoding/json"
6+
"net/url"
7+
8+
"github.com/docker/docker/api/types"
9+
)
10+
11+
func ParseImageRegistry(uri string) (*ImageRegistry, error) {
12+
u, err := url.ParseRequestURI(uri)
13+
if err != nil {
14+
return nil, err
15+
}
16+
17+
endpoint := &ImageRegistry{}
18+
19+
endpoint.Host = u.Host
20+
endpoint.Prefix = u.Path
21+
22+
if u.User != nil {
23+
endpoint.Username = u.User.Username()
24+
endpoint.Password, _ = u.User.Password()
25+
}
26+
27+
return endpoint, nil
28+
}
29+
30+
type ImageRegistry struct {
31+
Host string
32+
Username string
33+
Password string
34+
Prefix string
35+
}
36+
37+
func (s ImageRegistry) Fix(imageRef string) string {
38+
return s.Host + s.Prefix + imageRef
39+
}
40+
41+
func (s ImageRegistry) RegistryAuth() string {
42+
authConfig := types.AuthConfig{Username: s.Username, Password: s.Password, ServerAddress: s.Host}
43+
b, _ := json.Marshal(authConfig)
44+
return base64.StdEncoding.EncodeToString(b)
45+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package container
2+
3+
import (
4+
"context"
5+
6+
"github.com/querycap/pipeline/pipeline"
7+
"github.com/querycap/pipeline/spec"
8+
)
9+
10+
func NewOperatorMgr(podController PodController, container *Container) pipeline.OperatorMgr {
11+
return &operatorMgr{
12+
podController: podController,
13+
container: container,
14+
}
15+
}
16+
17+
type operatorMgr struct {
18+
podController PodController
19+
container *Container
20+
}
21+
22+
func (d *operatorMgr) Up(scope string, stage string, step spec.Stage, replicas int) error {
23+
c := Container{
24+
Container: step.Container,
25+
Image: step.Uses.RefID(),
26+
Replicas: replicas,
27+
}
28+
29+
c.Envs = c.Envs.Merge(d.container.Envs)
30+
31+
c.Envs["PIPELINE_SCOPE"] = scope
32+
c.Envs["PIPELINE_STAGE"] = stage
33+
34+
return d.podController.Apply(context.Background(), PodNameByScopeAndStage(scope, stage), &c)
35+
}
36+
37+
func (d operatorMgr) Destroy(scope string, stage string) error {
38+
return d.podController.Kill(context.Background(), PodNameByScopeAndStage(scope, stage))
39+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package container
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/go-courier/semver"
8+
"github.com/querycap/pipeline/spec"
9+
"github.com/sirupsen/logrus"
10+
)
11+
12+
func init() {
13+
logrus.SetLevel(logrus.DebugLevel)
14+
}
15+
16+
var scope, stage = "xxx", "fetch-image"
17+
var imageRegistry, _ = ParseImageRegistry("registry://docker.io/library/")
18+
19+
func Test(t *testing.T) {
20+
t.Run("docker", func(t *testing.T) {
21+
c, _ := NewDockerPodController(imageRegistry)
22+
RunAll(t, c)
23+
})
24+
}
25+
26+
func RunAll(t *testing.T, c PodController) {
27+
s := spec.Stage{}
28+
s.Uses.Name = "nginx"
29+
s.Uses.Version = *semver.MustParseVersion("1.17.10")
30+
31+
mgr := NewOperatorMgr(c, &Container{})
32+
33+
t.Run("start", func(t *testing.T) {
34+
if err := mgr.Up(scope, stage, s, 3); err != nil {
35+
panic(err)
36+
}
37+
38+
time.Sleep(1 * time.Second)
39+
40+
if err := mgr.Up(scope, stage, s, 5); err != nil {
41+
panic(err)
42+
}
43+
44+
time.Sleep(1 * time.Second)
45+
46+
if err := mgr.Up(scope, stage, s, 3); err != nil {
47+
panic(err)
48+
}
49+
50+
time.Sleep(1 * time.Second)
51+
52+
if err := mgr.Up(scope, stage, s, 0); err != nil {
53+
panic(err)
54+
}
55+
56+
time.Sleep(1 * time.Second)
57+
})
58+
59+
t.Run("stop", func(t *testing.T) {
60+
if err := mgr.Destroy(scope, stage); err != nil {
61+
panic(err)
62+
}
63+
})
64+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package container
2+
3+
import (
4+
"context"
5+
"strings"
6+
7+
"github.com/querycap/pipeline/spec"
8+
)
9+
10+
func PodNameByScopeAndStage(scope string, stage string) string {
11+
return strings.Replace(scope+"--"+stage, ":", "__", -1)
12+
}
13+
14+
type PodController interface {
15+
Apply(ctx context.Context, name string, container *Container) error
16+
Kill(ctx context.Context, name string) error
17+
}
18+
19+
type Container struct {
20+
spec.Container
21+
Image string
22+
Replicas int
23+
Annotations map[string]string
24+
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package container
2+
3+
import (
4+
"context"
5+
"io"
6+
"math/rand"
7+
"os"
8+
"strings"
9+
10+
"github.com/docker/docker/api/types"
11+
"github.com/docker/docker/api/types/container"
12+
"github.com/docker/docker/api/types/filters"
13+
"github.com/docker/docker/client"
14+
"github.com/docker/docker/pkg/stdcopy"
15+
"github.com/sirupsen/logrus"
16+
)
17+
18+
func NewDockerPodController(imageRegistry *ImageRegistry) (*DockerPodController, error) {
19+
c, err := client.NewEnvClient()
20+
if err != nil {
21+
return nil, err
22+
}
23+
24+
return &DockerPodController{
25+
client: c,
26+
imageRegistry: imageRegistry,
27+
}, nil
28+
}
29+
30+
type DockerPodController struct {
31+
client *client.Client
32+
imageRegistry *ImageRegistry
33+
}
34+
35+
func (c *DockerPodController) Apply(ctx context.Context, name string, container *Container) error {
36+
imageRef := c.imageRegistry.Fix(container.Image)
37+
38+
if err := c.PullImageIfNotExists(ctx, imageRef); err != nil {
39+
return err
40+
}
41+
42+
container.Annotations = map[string]string{
43+
"pipeline": name,
44+
}
45+
46+
list, err := c.ListMatchedRunningContainer(ctx, name)
47+
if err != nil {
48+
return err
49+
}
50+
51+
current := len(list)
52+
53+
offset := current - container.Replicas
54+
55+
// scale up
56+
if offset < 0 {
57+
for i := 0; i < -offset; i++ {
58+
if err := c.RunContainer(ctx, imageRef, container); err != nil {
59+
return err
60+
}
61+
}
62+
return nil
63+
}
64+
65+
// scale down
66+
if offset > 0 {
67+
rand.Shuffle(len(list), func(i, j int) { list[i], list[j] = list[j], list[i] })
68+
69+
for _, item := range list[0:offset] {
70+
if err := c.KillContainer(ctx, item.ID); err != nil {
71+
return err
72+
}
73+
}
74+
}
75+
76+
return nil
77+
}
78+
79+
func (c *DockerPodController) Kill(ctx context.Context, name string) error {
80+
list, err := c.ListMatchedRunningContainer(ctx, name)
81+
if err != nil {
82+
return err
83+
}
84+
85+
for i := range list {
86+
if err := c.KillContainer(ctx, list[i].ID); err != nil {
87+
return err
88+
}
89+
}
90+
91+
return nil
92+
}
93+
94+
func (c *DockerPodController) ListMatchedRunningContainer(ctx context.Context, name string) ([]types.Container, error) {
95+
containerListFilters := filters.NewArgs()
96+
containerListFilters.Add("label", "pipeline="+name)
97+
98+
return c.ListRunningContainer(ctx, containerListFilters)
99+
}
100+
101+
func (c *DockerPodController) RunContainer(ctx context.Context, image string, cc *Container) error {
102+
logrus.WithContext(ctx).Debugf("running from %s", image)
103+
104+
containerConfig := &container.Config{
105+
Image: image,
106+
Labels: cc.Annotations,
107+
}
108+
109+
for k := range cc.Envs {
110+
containerConfig.Env = append(containerConfig.Env, k+"="+cc.Envs[k])
111+
}
112+
113+
created, err := c.client.ContainerCreate(ctx, containerConfig, nil, nil, "")
114+
if err != nil {
115+
return err
116+
}
117+
118+
if err := c.client.ContainerStart(ctx, created.ID, types.ContainerStartOptions{}); err != nil {
119+
return err
120+
}
121+
122+
go func() {
123+
if !logrus.IsLevelEnabled(logrus.DebugLevel) {
124+
return
125+
}
126+
127+
// clone logs
128+
r, err := c.client.ContainerLogs(ctx, created.ID, types.ContainerLogsOptions{
129+
ShowStdout: true,
130+
ShowStderr: true,
131+
Follow: true,
132+
})
133+
if err != nil {
134+
return
135+
}
136+
defer r.Close()
137+
138+
if _, err := stdcopy.StdCopy(os.Stdout, os.Stderr, r); err != nil && err != io.EOF {
139+
logrus.Fatal(err)
140+
}
141+
}()
142+
143+
return nil
144+
}
145+
146+
func (c *DockerPodController) ListRunningContainer(ctx context.Context, args filters.Args) ([]types.Container, error) {
147+
return c.client.ContainerList(ctx, types.ContainerListOptions{
148+
Filters: args,
149+
All: false,
150+
})
151+
}
152+
153+
func (c *DockerPodController) KillContainer(ctx context.Context, containerID string) error {
154+
logrus.WithContext(ctx).Debugf("killing %s", containerID)
155+
156+
return c.client.ContainerKill(ctx, containerID, "SIGKILL")
157+
}
158+
159+
func (c *DockerPodController) PullImageIfNotExists(ctx context.Context, image string) error {
160+
imageListFilters := filters.NewArgs()
161+
imageListFilters.Add("reference", strings.Replace(image, "docker.io/library/", "", 1))
162+
163+
images, err := c.client.ImageList(ctx, types.ImageListOptions{Filters: imageListFilters})
164+
if err != nil {
165+
return err
166+
}
167+
168+
imageExists := len(images) > 0
169+
170+
if !imageExists {
171+
if _, err = c.client.ImagePull(ctx, image, types.ImagePullOptions{
172+
RegistryAuth: c.imageRegistry.RegistryAuth(),
173+
}); err != nil {
174+
return err
175+
}
176+
}
177+
178+
return nil
179+
}

0 commit comments

Comments
 (0)