Skip to content

Commit c817ad2

Browse files
authored
Use StatefulSet instead of Deployment (GoogleCloudPlatform#354)
1 parent 2d0509e commit c817ad2

23 files changed

+316
-286
lines changed

api/v1alpha1/flinkcluster_types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ type FlinkClusterComponentsStatus struct {
372372
// The state of configMap.
373373
ConfigMap FlinkClusterComponentState `json:"configMap"`
374374

375-
// The state of JobManager deployment.
375+
// The state of JobManager Deployment.
376376
JobManagerDeployment FlinkClusterComponentState `json:"jobManagerDeployment"`
377377

378378
// The state of JobManager service.
@@ -381,7 +381,7 @@ type FlinkClusterComponentsStatus struct {
381381
// The state of JobManager ingress.
382382
JobManagerIngress *JobManagerIngressStatus `json:"jobManagerIngress,omitempty"`
383383

384-
// The state of TaskManager deployment.
384+
// The state of TaskManager Deployment.
385385
TaskManagerDeployment FlinkClusterComponentState `json:"taskManagerDeployment"`
386386

387387
// The status of the job, available only when JobSpec is provided.

api/v1beta1/flinkcluster_types.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ type JobManagerSpec struct {
205205
// Volume mounts in the JobManager container.
206206
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`
207207

208+
VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`
209+
208210
// Init containers of the Job Manager pod.
209211
InitContainers []corev1.Container `json:"initContainers,omitempty"`
210212

@@ -221,13 +223,13 @@ type JobManagerSpec struct {
221223
// pod.
222224
Sidecars []corev1.Container `json:"sidecars,omitempty"`
223225

224-
// JobManager Deployment pod template annotations.
226+
// JobManager StatefulSet pod template annotations.
225227
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
226228

227229
// SecurityContext of the JM pod.
228230
SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
229231

230-
// JobManager Deployment pod template labels.
232+
// JobManager StatefulSet pod template labels.
231233
PodLabels map[string]string `json:"podLabels,omitempty"`
232234
}
233235

@@ -278,6 +280,8 @@ type TaskManagerSpec struct {
278280
// More info: https://kubernetes.io/docs/concepts/storage/volumes/
279281
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`
280282

283+
VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`
284+
281285
// Init containers of the Task Manager pod.
282286
InitContainers []corev1.Container `json:"initContainers,omitempty"`
283287

@@ -294,13 +298,13 @@ type TaskManagerSpec struct {
294298
// pod.
295299
Sidecars []corev1.Container `json:"sidecars,omitempty"`
296300

297-
// TaskManager Deployment pod template annotations.
301+
// TaskManager StatefulSet pod template annotations.
298302
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
299303

300304
// SecurityContext of the TM pod.
301305
SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
302306

303-
// TaskManager Deployment pod template labels.
307+
// TaskManager StatefulSet pod template labels.
304308
PodLabels map[string]string `json:"podLabels,omitempty"`
305309
}
306310

@@ -502,17 +506,17 @@ type FlinkClusterComponentsStatus struct {
502506
// The state of configMap.
503507
ConfigMap FlinkClusterComponentState `json:"configMap"`
504508

505-
// The state of JobManager deployment.
506-
JobManagerDeployment FlinkClusterComponentState `json:"jobManagerDeployment"`
509+
// The state of JobManager StatefulSet.
510+
JobManagerStatefulSet FlinkClusterComponentState `json:"jobManagerStatefulSet"`
507511

508512
// The state of JobManager service.
509513
JobManagerService JobManagerServiceStatus `json:"jobManagerService"`
510514

511515
// The state of JobManager ingress.
512516
JobManagerIngress *JobManagerIngressStatus `json:"jobManagerIngress,omitempty"`
513517

514-
// The state of TaskManager deployment.
515-
TaskManagerDeployment FlinkClusterComponentState `json:"taskManagerDeployment"`
518+
// The state of TaskManager StatefulSet.
519+
TaskManagerStatefulSet FlinkClusterComponentState `json:"taskManagerStatefulSet"`
516520

517521
// The status of the job, available only when JobSpec is provided.
518522
Job *JobStatus `json:"job,omitempty"`

api/v1beta1/zz_generated.deepcopy.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5165,7 +5165,7 @@ spec:
51655165
required:
51665166
- state
51675167
type: object
5168-
jobManagerDeployment:
5168+
jobManagerStatefulSet:
51695169
properties:
51705170
name:
51715171
type: string
@@ -5202,7 +5202,7 @@ spec:
52025202
- name
52035203
- state
52045204
type: object
5205-
taskManagerDeployment:
5205+
taskManagerStatefulSet:
52065206
properties:
52075207
name:
52085208
type: string
@@ -5214,9 +5214,9 @@ spec:
52145214
type: object
52155215
required:
52165216
- configMap
5217-
- jobManagerDeployment
5217+
- jobManagerStatefulSet
52185218
- jobManagerService
5219-
- taskManagerDeployment
5219+
- taskManagerStatefulSet
52205220
type: object
52215221
control:
52225222
properties:

config/rbac/role.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ rules:
3030
- apps
3131
resources:
3232
- deployments
33+
- statefulsets
3334
verbs:
3435
- get
3536
- list
@@ -42,6 +43,7 @@ rules:
4243
- apps
4344
resources:
4445
- deployments/status
46+
- statefulsets/status
4547
verbs:
4648
- get
4749
- apiGroups:

controllers/batchscheduler/volcano/volcano.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,19 @@ func (v *VolcanoBatchScheduler) Schedule(cluster *v1beta1.FlinkCluster, state *m
7474

7575
func (v *VolcanoBatchScheduler) setSchedulerMeta(cluster *v1beta1.FlinkCluster, state *model.DesiredClusterState) {
7676
podgroupName := v.getPodGroupName(cluster)
77-
if state.TmDeployment != nil {
78-
state.TmDeployment.Spec.Template.Spec.SchedulerName = v.Name()
79-
if state.TmDeployment.Spec.Template.Annotations == nil {
80-
state.TmDeployment.Spec.Template.Annotations = make(map[string]string)
77+
if state.TmStatefulSet != nil {
78+
state.TmStatefulSet.Spec.Template.Spec.SchedulerName = v.Name()
79+
if state.TmStatefulSet.Spec.Template.Annotations == nil {
80+
state.TmStatefulSet.Spec.Template.Annotations = make(map[string]string)
8181
}
82-
state.TmDeployment.Spec.Template.Annotations[scheduling.KubeGroupNameAnnotationKey] = podgroupName
82+
state.TmStatefulSet.Spec.Template.Annotations[scheduling.KubeGroupNameAnnotationKey] = podgroupName
8383
}
84-
if state.JmDeployment != nil {
85-
state.JmDeployment.Spec.Template.Spec.SchedulerName = v.Name()
86-
if state.JmDeployment.Spec.Template.Annotations == nil {
87-
state.JmDeployment.Spec.Template.Annotations = make(map[string]string)
84+
if state.JmStatefulSet != nil {
85+
state.JmStatefulSet.Spec.Template.Spec.SchedulerName = v.Name()
86+
if state.JmStatefulSet.Spec.Template.Annotations == nil {
87+
state.JmStatefulSet.Spec.Template.Annotations = make(map[string]string)
8888
}
89-
state.JmDeployment.Spec.Template.Annotations[scheduling.KubeGroupNameAnnotationKey] = podgroupName
89+
state.JmStatefulSet.Spec.Template.Annotations[scheduling.KubeGroupNameAnnotationKey] = podgroupName
9090
}
9191
if state.Job != nil {
9292
state.Job.Spec.Template.Spec.SchedulerName = v.Name()
@@ -145,18 +145,18 @@ func getClusterResource(state *model.DesiredClusterState) (corev1.ResourceList,
145145
resource := corev1.ResourceList{}
146146
var size int32
147147

148-
if state.JmDeployment != nil {
149-
size += *state.JmDeployment.Spec.Replicas
150-
for i := int32(0); i < *state.JmDeployment.Spec.Replicas; i++ {
151-
jmResource := getPodResource(&state.JmDeployment.Spec.Template.Spec)
148+
if state.JmStatefulSet != nil {
149+
size += *state.JmStatefulSet.Spec.Replicas
150+
for i := int32(0); i < *state.JmStatefulSet.Spec.Replicas; i++ {
151+
jmResource := getPodResource(&state.JmStatefulSet.Spec.Template.Spec)
152152
addResourceList(resource, jmResource, nil)
153153
}
154154
}
155155

156-
if state.TmDeployment != nil {
157-
size += *state.TmDeployment.Spec.Replicas
158-
for i := int32(0); i < *state.TmDeployment.Spec.Replicas; i++ {
159-
tmResource := getPodResource(&state.TmDeployment.Spec.Template.Spec)
156+
if state.TmStatefulSet != nil {
157+
size += *state.TmStatefulSet.Spec.Replicas
158+
for i := int32(0); i < *state.TmStatefulSet.Spec.Replicas; i++ {
159+
tmResource := getPodResource(&state.TmStatefulSet.Spec.Template.Spec)
160160
addResourceList(resource, tmResource, nil)
161161
}
162162
}

controllers/batchscheduler/volcano/volcano_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestGetClusterResource(t *testing.T) {
3232
jmRep := int32(1)
3333
replicas := int32(4)
3434
desiredState := &model.DesiredClusterState{
35-
JmDeployment: &appsv1.Deployment{
35+
JmStatefulSet: &appsv1.StatefulSet{
3636
ObjectMeta: metav1.ObjectMeta{
3737
Name: "flinkjobcluster-sample-jobmanager",
3838
Namespace: "default",
@@ -42,8 +42,9 @@ func TestGetClusterResource(t *testing.T) {
4242
"component": "jobmanager",
4343
},
4444
},
45-
Spec: appsv1.DeploymentSpec{
45+
Spec: appsv1.StatefulSetSpec{
4646
Replicas: &jmRep,
47+
ServiceName: "flinkjobcluster-sample-jobmanager",
4748
Selector: &metav1.LabelSelector{
4849
MatchLabels: map[string]string{
4950
"app": "flink",
@@ -161,7 +162,7 @@ func TestGetClusterResource(t *testing.T) {
161162
},
162163
},
163164
},
164-
TmDeployment: &appsv1.Deployment{
165+
TmStatefulSet: &appsv1.StatefulSet{
165166
ObjectMeta: metav1.ObjectMeta{
166167
Name: "flinkjobcluster-sample-taskmanager",
167168
Namespace: "default",
@@ -171,8 +172,10 @@ func TestGetClusterResource(t *testing.T) {
171172
"component": "taskmanager",
172173
},
173174
},
174-
Spec: appsv1.DeploymentSpec{
175+
Spec: appsv1.StatefulSetSpec{
175176
Replicas: &replicas,
177+
ServiceName: "flinkjobcluster-sample-taskmanager",
178+
PodManagementPolicy: "Parallel",
176179
Selector: &metav1.LabelSelector{
177180
MatchLabels: map[string]string{
178181
"app": "flink",

controllers/flinkcluster_controller.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ type FlinkClusterReconciler struct {
4848
// +kubebuilder:rbac:groups=flinkoperator.k8s.io,resources=flinkclusters/status,verbs=get;update;patch
4949
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
5050
// +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get
51+
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
52+
// +kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get
5153
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
5254
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get
5355
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
@@ -88,6 +90,7 @@ func (reconciler *FlinkClusterReconciler) SetupWithManager(
8890
return ctrl.NewControllerManagedBy(mgr).
8991
For(&v1beta1.FlinkCluster{}).
9092
Owns(&appsv1.Deployment{}).
93+
Owns(&appsv1.StatefulSet{}).
9194
Owns(&corev1.Service{}).
9295
Owns(&batchv1.Job{}).
9396
Complete(reconciler)
@@ -177,10 +180,10 @@ func (handler *FlinkClusterHandler) reconcile(
177180
} else {
178181
log.Info("Desired state", "ConfigMap", "nil")
179182
}
180-
if desired.JmDeployment != nil {
181-
log.Info("Desired state", "JobManager deployment", *desired.JmDeployment)
183+
if desired.JmStatefulSet != nil {
184+
log.Info("Desired state", "JobManager StatefulSet", *desired.JmStatefulSet)
182185
} else {
183-
log.Info("Desired state", "JobManager deployment", "nil")
186+
log.Info("Desired state", "JobManager StatefulSet", "nil")
184187
}
185188
if desired.JmService != nil {
186189
log.Info("Desired state", "JobManager service", *desired.JmService)
@@ -192,10 +195,10 @@ func (handler *FlinkClusterHandler) reconcile(
192195
} else {
193196
log.Info("Desired state", "JobManager ingress", "nil")
194197
}
195-
if desired.TmDeployment != nil {
196-
log.Info("Desired state", "TaskManager deployment", *desired.TmDeployment)
198+
if desired.TmStatefulSet != nil {
199+
log.Info("Desired state", "TaskManager StatefulSet", *desired.TmStatefulSet)
197200
} else {
198-
log.Info("Desired state", "TaskManager deployment", "nil")
201+
log.Info("Desired state", "TaskManager StatefulSet", "nil")
199202
}
200203
if desired.Job != nil {
201204
log.Info("Desired state", "Job", *desired.Job)

0 commit comments

Comments
 (0)