Skip to content

Commit 5565fa8

Browse files
committed
Add creation handler to create worker and flower deployments
1 parent 3163d6c commit 5565fa8

18 files changed

+560
-82
lines changed

handlers.py

Lines changed: 38 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,61 @@
1-
import os
2-
import kopf
3-
import kubernetes
4-
import requests
5-
import constants
61
from math import ceil
72
from collections import namedtuple
3+
import requests
4+
import kubernetes
5+
import kopf
6+
import constants
7+
from models.celery_custom_resource import celery_custom_resource_from_dict
8+
from kubernetes_utils.worker_deployment_generator import WorkerDeploymentGenerator
9+
from kubernetes_utils.flower_deployment_generator import FlowerDeploymentGenerator
810

9-
from deployment_utils import (
10-
deploy_celery_workers,
11-
deploy_flower,
12-
expose_flower_service
13-
)
14-
from update_utils import (
11+
from utilities.patching import (
1512
update_all_deployments,
1613
update_worker_deployment,
1714
update_flower_deployment
1815
)
1916

2017

2118
@kopf.on.create('celeryproject.org', 'v1alpha1', 'celery')
22-
def create_fn(spec, name, namespace, logger, **kwargs):
19+
def create_fn(spec, namespace, logger, **kwargs):
2320
"""
2421
Celery custom resource creation handler
2522
"""
26-
27-
# 1. Validation of spec
28-
val, err_msg = validate_spec(spec)
29-
if err_msg:
30-
status = 'Failed validation'
31-
raise kopf.PermanentError(f"{err_msg}. Got {val}")
32-
3323
api = kubernetes.client.CoreV1Api()
3424
apps_api_instance = kubernetes.client.AppsV1Api()
35-
36-
# 2. Deployment for celery workers
37-
worker_deployment = deploy_celery_workers(
38-
apps_api_instance, namespace, spec, logger
25+
try:
26+
celery_cr = celery_custom_resource_from_dict(dict(spec))
27+
except Exception as e:
28+
raise kopf.PermanentError(e)
29+
30+
# deploy worker
31+
worker_deployment = WorkerDeploymentGenerator(
32+
namespace=namespace, celery_cr=celery_cr
33+
).get_worker_deployment()
34+
kopf.adopt(worker_deployment)
35+
apps_api_instance.create_namespaced_deployment(
36+
namespace=namespace,
37+
body=worker_deployment
3938
)
4039

41-
# 3. Deployment for flower
42-
flower_deployment = deploy_flower(
43-
apps_api_instance, namespace, spec, logger
40+
# deploy flower
41+
flower_dep_gen_instance = FlowerDeploymentGenerator(
42+
namespace=namespace, celery_cr=celery_cr
4443
)
45-
46-
# 4. Expose flower service
47-
flower_svc = expose_flower_service(
48-
api, namespace, spec, logger
44+
flower_deployment = flower_dep_gen_instance.get_flower_deployment()
45+
kopf.adopt(flower_deployment)
46+
apps_api_instance.create_namespaced_deployment(
47+
namespace=namespace,
48+
body=flower_deployment
4949
)
5050

51-
children = [
52-
{
53-
'name': worker_deployment.metadata.name,
54-
'replicas': worker_deployment.spec.replicas,
55-
'kind': constants.DEPLOYMENT_KIND,
56-
'type': constants.WORKER_TYPE
57-
},
58-
{
59-
'name': flower_deployment.metadata.name,
60-
'replicas': flower_deployment.spec.replicas,
61-
'kind': constants.DEPLOYMENT_KIND,
62-
'type': constants.FLOWER_TYPE
63-
},
64-
{
65-
'name': flower_svc.metadata.name,
66-
'spec': flower_svc.spec.to_dict(),
67-
'kind': constants.SERVICE_KIND,
68-
'type': constants.FLOWER_TYPE
69-
}
70-
]
51+
# expose service
52+
flower_svc = flower_dep_gen_instance.get_flower_svc()
53+
kopf.adopt(flower_svc)
54+
api.create_namespaced_service(namespace=namespace, body=flower_svc)
7155

56+
# TODO: Decide the return structure
7257
return {
73-
'children': children,
74-
'children_count': len(children),
58+
'children': 3,
7559
'status': constants.STATUS_CREATED
7660
}
7761

@@ -172,7 +156,7 @@ def get_flower_svc_host(status):
172156

173157

174158
@kopf.timer('celeryproject.org', 'v1alpha1', 'celery',
175-
initial_delay=5, interval=10, idle=10)
159+
initial_delay=50000, interval=10000, idle=10)
176160
def message_queue_length(spec, status, **kwargs):
177161
flower_svc_host = get_flower_svc_host(status)
178162
if not flower_svc_host:
@@ -242,23 +226,4 @@ def horizontal_autoscale(spec, status, namespace, **kwargs):
242226
return {
243227
'deploymentName': updated_deployment.metadata.name,
244228
'updated_num_of_replicas': updated_num_of_replicas
245-
}
246-
247-
248-
def validate_stuff(spec):
249-
"""
250-
1. If the deployment/svc already exists, k8s throws error
251-
2. Response and spec classes and enums
252-
"""
253-
pass
254-
255-
256-
def validate_spec(spec):
257-
"""
258-
Validates the incoming spec
259-
@returns - True/False, Error Message
260-
"""
261-
# size = spec.get('size')
262-
# if not size:
263-
# return size, "Size must be set"
264-
return None, None
229+
}

kubernetes_utils/__init__.py

Whitespace-only changes.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from typing import List
2+
from kubernetes import client as k8s
3+
from kubernetes_utils.pod_generator import PodGenerator
4+
from models.celery_custom_resource import CeleryCustomResource
5+
6+
7+
class FlowerDeploymentGenerator(object):
8+
9+
def __init__(self, namespace: str, celery_cr: CeleryCustomResource):
10+
self.namespace = namespace
11+
self.celery_cr = celery_cr
12+
13+
def get_flower_deployment(self) -> k8s.V1Deployment:
14+
template = k8s.V1PodTemplateSpec(
15+
metadata=k8s.V1ObjectMeta(
16+
labels={"app": f'{self.celery_cr.app_name}-celery-flower'}
17+
),
18+
spec=self.get_flower_pod().spec
19+
)
20+
deployment_spec = k8s.V1DeploymentSpec(
21+
replicas=self.celery_cr.worker_replicas,
22+
template=template,
23+
selector={'matchLabels': {'app': f'{self.celery_cr.app_name}-celery-flower'}}
24+
)
25+
26+
return k8s.V1Deployment(
27+
api_version="apps/v1",
28+
kind="Deployment",
29+
metadata=k8s.V1ObjectMeta(name=f'{self.celery_cr.app_name}-celery-flower'),
30+
spec=deployment_spec
31+
)
32+
33+
def get_flower_pod(self) -> k8s.V1Pod:
34+
return PodGenerator(
35+
namespace=self.namespace,
36+
image=self.celery_cr.image,
37+
image_pull_policy=self.celery_cr.image_pull_policy,
38+
image_pull_secrets=self.celery_cr.image_pull_secrets,
39+
name=f"{self.celery_cr.app_name}-celery-flower",
40+
container_name='flower',
41+
volume_mounts=self.celery_cr.volume_mounts,
42+
volumes=self.celery_cr.volumes,
43+
init_containers=self.celery_cr.init_containers,
44+
cmds=["flower"],
45+
envs=self.celery_cr.flower_spec.env,
46+
args=self.__get_flower_container_args(
47+
self.celery_cr.celery_app, self.celery_cr.flower_spec.args
48+
),
49+
node_selectors=self.celery_cr.flower_spec.node_selector,
50+
resources=self.celery_cr.flower_spec.resources.to_dict(),
51+
readiness_probe=self.celery_cr.readiness_probe,
52+
liveness_probe=self.celery_cr.liveness_probe
53+
).gen_pod()
54+
55+
def get_flower_svc(self) -> k8s.V1Service:
56+
svc_template = self.celery_cr.flower_spec.service
57+
metadata = {
58+
'name': f'{self.celery_cr.app_name}-celery-flower'
59+
}
60+
return k8s.V1Service(
61+
metadata=metadata,
62+
spec=svc_template.get('spec')
63+
)
64+
65+
def __get_flower_container_args(
66+
self, celery_app: str, args: List[str]
67+
) -> List[str]:
68+
base_args = [f"--app={celery_app}", "flower"]
69+
if args:
70+
base_args.extend(args)
71+
return base_args

0 commit comments

Comments
 (0)