Skip to content

Commit 8fa49c6

Browse files
committed
feat(pipeline/operator): k8s pod controller
1 parent 5c8fea0 commit 8fa49c6

15 files changed

+282
-30
lines changed

go.mod

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,28 @@ module github.com/querycap/pipeline
33
go 1.14
44

55
require (
6-
github.com/davecgh/go-spew v1.1.1
6+
github.com/Microsoft/go-winio v0.4.14 // indirect
77
github.com/docker/distribution v2.7.1+incompatible // indirect
88
github.com/docker/docker v1.13.1
99
github.com/docker/go-connections v0.4.0 // indirect
1010
github.com/docker/go-units v0.4.0 // indirect
11-
github.com/go-courier/courier v1.3.1
12-
github.com/go-courier/httptransport v1.17.3
1311
github.com/go-courier/semver v1.0.0
14-
github.com/go-courier/snowflakeid v1.2.1
1512
github.com/gomodule/redigo v2.0.0+incompatible
13+
github.com/imdario/mergo v0.3.9 // indirect
1614
github.com/minio/minio-go/v6 v6.0.55
1715
github.com/onsi/gomega v1.9.0
1816
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
1917
github.com/pkg/errors v0.9.1 // indirect
2018
github.com/sirupsen/logrus v1.6.0
2119
github.com/spf13/afero v1.2.2
20+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect
21+
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b // indirect
22+
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
23+
golang.org/x/sys v0.0.0-20200509044756-6aff5f38e54f // indirect
24+
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
25+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect
2226
gopkg.in/yaml.v2 v2.2.8
27+
k8s.io/api v0.17.0
28+
k8s.io/apimachinery v0.17.0
29+
k8s.io/client-go v0.17.0
2330
)

pipeline/operator/container/image_registry.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func ParseImageRegistry(uri string) (*ImageRegistry, error) {
1616

1717
endpoint := &ImageRegistry{}
1818

19+
endpoint.Name = u.Scheme
1920
endpoint.Host = u.Host
2021
endpoint.Prefix = u.Path
2122

@@ -28,6 +29,7 @@ func ParseImageRegistry(uri string) (*ImageRegistry, error) {
2829
}
2930

3031
type ImageRegistry struct {
32+
Name string
3133
Host string
3234
Username string
3335
Password string
@@ -43,3 +45,15 @@ func (s ImageRegistry) RegistryAuth() string {
4345
b, _ := json.Marshal(authConfig)
4446
return base64.StdEncoding.EncodeToString(b)
4547
}
48+
49+
func (s ImageRegistry) DockerConfigJSON() []byte {
50+
v := struct {
51+
Auths map[string]types.AuthConfig `json:"auths"`
52+
}{
53+
Auths: map[string]types.AuthConfig{
54+
s.Host: {Username: s.Username, Password: s.Password},
55+
},
56+
}
57+
b, _ := json.Marshal(v)
58+
return b
59+
}

pipeline/operator/container/operatormgr.go renamed to pipeline/operator/container/operator_mgr.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,26 @@ import (
77
"github.com/querycap/pipeline/spec"
88
)
99

10-
func NewOperatorMgr(podController PodController, container *Container) pipeline.OperatorMgr {
10+
func NewOperatorMgr(podController PodController, envs map[string]string) pipeline.OperatorMgr {
1111
return &operatorMgr{
1212
podController: podController,
13-
container: container,
13+
envs: envs,
1414
}
1515
}
1616

1717
type operatorMgr struct {
1818
podController PodController
19-
container *Container
19+
envs map[string]string
2020
}
2121

22-
func (d *operatorMgr) Up(scope string, stage string, step spec.Stage, replicas int) error {
22+
func (d *operatorMgr) Up(scope string, stage string, step spec.Stage, replicas int32) error {
2323
c := Container{
2424
Container: step.Container,
2525
Image: step.Uses.RefID(),
2626
Replicas: replicas,
2727
}
2828

29-
c.Envs = c.Envs.Merge(d.container.Envs)
29+
c.Envs = c.Envs.Merge(d.envs)
3030

3131
c.Envs["PIPELINE_SCOPE"] = scope
3232
c.Envs["PIPELINE_STAGE"] = stage

pipeline/operator/container/operatormgr_test.go renamed to pipeline/operator/container/operator_mgr_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func RunAll(t *testing.T, c PodController) {
2828
s.Uses.Name = "nginx"
2929
s.Uses.Version = *semver.MustParseVersion("1.17.10")
3030

31-
mgr := NewOperatorMgr(c, &Container{})
31+
mgr := NewOperatorMgr(c, map[string]string{})
3232

3333
t.Run("start", func(t *testing.T) {
3434
if err := mgr.Up(scope, stage, s, 3); err != nil {

pipeline/operator/container/pod_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import (
88
)
99

1010
func PodNameByScopeAndStage(scope string, stage string) string {
11-
return strings.Replace(scope+"--"+stage, ":", "__", -1)
11+
parts := strings.Split(scope, "/")
12+
return "p" + parts[len(parts)-1] + "-" + stage
1213
}
1314

1415
type PodController interface {
@@ -19,6 +20,6 @@ type PodController interface {
1920
type Container struct {
2021
spec.Container
2122
Image string
22-
Replicas int
23+
Replicas int32
2324
Annotations map[string]string
2425
}

pipeline/operator/container/pod_controller__docker.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,27 +35,27 @@ type DockerPodController struct {
3535
func (c *DockerPodController) Apply(ctx context.Context, name string, container *Container) error {
3636
imageRef := c.imageRegistry.Fix(container.Image)
3737

38-
if err := c.PullImageIfNotExists(ctx, imageRef); err != nil {
38+
if err := c.pullImageIfNotExists(ctx, imageRef); err != nil {
3939
return err
4040
}
4141

4242
container.Annotations = map[string]string{
4343
"pipeline": name,
4444
}
4545

46-
list, err := c.ListMatchedRunningContainer(ctx, name)
46+
list, err := c.listMatchedRunningContainer(ctx, name)
4747
if err != nil {
4848
return err
4949
}
5050

5151
current := len(list)
5252

53-
offset := current - container.Replicas
53+
offset := current - int(container.Replicas)
5454

5555
// scale up
5656
if offset < 0 {
5757
for i := 0; i < -offset; i++ {
58-
if err := c.RunContainer(ctx, imageRef, container); err != nil {
58+
if err := c.runContainer(ctx, imageRef, container); err != nil {
5959
return err
6060
}
6161
}
@@ -67,7 +67,7 @@ func (c *DockerPodController) Apply(ctx context.Context, name string, container
6767
rand.Shuffle(len(list), func(i, j int) { list[i], list[j] = list[j], list[i] })
6868

6969
for _, item := range list[0:offset] {
70-
if err := c.KillContainer(ctx, item.ID); err != nil {
70+
if err := c.killContainer(ctx, item.ID); err != nil {
7171
return err
7272
}
7373
}
@@ -77,33 +77,35 @@ func (c *DockerPodController) Apply(ctx context.Context, name string, container
7777
}
7878

7979
func (c *DockerPodController) Kill(ctx context.Context, name string) error {
80-
list, err := c.ListMatchedRunningContainer(ctx, name)
80+
list, err := c.listMatchedRunningContainer(ctx, name)
8181
if err != nil {
8282
return err
8383
}
8484

8585
for i := range list {
86-
if err := c.KillContainer(ctx, list[i].ID); err != nil {
86+
if err := c.killContainer(ctx, list[i].ID); err != nil {
8787
return err
8888
}
8989
}
9090

9191
return nil
9292
}
9393

94-
func (c *DockerPodController) ListMatchedRunningContainer(ctx context.Context, name string) ([]types.Container, error) {
94+
func (c *DockerPodController) listMatchedRunningContainer(ctx context.Context, name string) ([]types.Container, error) {
9595
containerListFilters := filters.NewArgs()
9696
containerListFilters.Add("label", "pipeline="+name)
9797

98-
return c.ListRunningContainer(ctx, containerListFilters)
98+
return c.listRunningContainer(ctx, containerListFilters)
9999
}
100100

101-
func (c *DockerPodController) RunContainer(ctx context.Context, image string, cc *Container) error {
101+
func (c *DockerPodController) runContainer(ctx context.Context, image string, cc *Container) error {
102102
logrus.WithContext(ctx).Debugf("running from %s", image)
103103

104104
containerConfig := &container.Config{
105-
Image: image,
106-
Labels: cc.Annotations,
105+
Image: image,
106+
Labels: cc.Annotations,
107+
Entrypoint: cc.Command,
108+
Cmd: cc.Args,
107109
}
108110

109111
for k := range cc.Envs {
@@ -143,20 +145,20 @@ func (c *DockerPodController) RunContainer(ctx context.Context, image string, cc
143145
return nil
144146
}
145147

146-
func (c *DockerPodController) ListRunningContainer(ctx context.Context, args filters.Args) ([]types.Container, error) {
148+
func (c *DockerPodController) listRunningContainer(ctx context.Context, args filters.Args) ([]types.Container, error) {
147149
return c.client.ContainerList(ctx, types.ContainerListOptions{
148150
Filters: args,
149151
All: false,
150152
})
151153
}
152154

153-
func (c *DockerPodController) KillContainer(ctx context.Context, containerID string) error {
155+
func (c *DockerPodController) killContainer(ctx context.Context, containerID string) error {
154156
logrus.WithContext(ctx).Debugf("killing %s", containerID)
155157

156158
return c.client.ContainerKill(ctx, containerID, "SIGKILL")
157159
}
158160

159-
func (c *DockerPodController) PullImageIfNotExists(ctx context.Context, image string) error {
161+
func (c *DockerPodController) pullImageIfNotExists(ctx context.Context, image string) error {
160162
imageListFilters := filters.NewArgs()
161163
imageListFilters.Add("reference", strings.Replace(image, "docker.io/library/", "", 1))
162164

0 commit comments

Comments
 (0)