Skip to content
This repository was archived by the owner on May 8, 2025. It is now read-only.
4 changes: 4 additions & 0 deletions api/v1alpha1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ type JobManagerSpec struct {
// Volume mounts in the JobManager container.
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`

VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`

// Selector which must match a node's labels for the JobManager pod to be
// scheduled on that node.
// More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
Expand Down Expand Up @@ -197,6 +199,8 @@ type TaskManagerSpec struct {
// More info: https://kubernetes.io/docs/concepts/storage/volumes/
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`

VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`

// Selector which must match a node's labels for the TaskManager pod to be
// scheduled on that node.
// More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
Expand Down
4 changes: 4 additions & 0 deletions api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ type JobManagerSpec struct {
// Volume mounts in the JobManager container.
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`

VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`

// Init containers of the Job Manager pod.
InitContainers []corev1.Container `json:"initContainers,omitempty"`

Expand Down Expand Up @@ -276,6 +278,8 @@ type TaskManagerSpec struct {
// More info: https://kubernetes.io/docs/concepts/storage/volumes/
VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"`

VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`

// Init containers of the Task Manager pod.
InitContainers []corev1.Container `json:"initContainers,omitempty"`

Expand Down
11 changes: 7 additions & 4 deletions controllers/batchscheduler/volcano/volcano_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestGetClusterResource(t *testing.T) {
jmRep := int32(1)
replicas := int32(4)
desiredState := &model.DesiredClusterState{
JmDeployment: &appsv1.Deployment{
JmDeployment: &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "flinkjobcluster-sample-jobmanager",
Namespace: "default",
Expand All @@ -42,8 +42,9 @@ func TestGetClusterResource(t *testing.T) {
"component": "jobmanager",
},
},
Spec: appsv1.DeploymentSpec{
Spec: appsv1.StatefulSetSpec{
Replicas: &jmRep,
ServiceName: "flinkjobcluster-sample-jobmanager",
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "flink",
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestGetClusterResource(t *testing.T) {
},
},
},
TmDeployment: &appsv1.Deployment{
TmDeployment: &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "flinkjobcluster-sample-taskmanager",
Namespace: "default",
Expand All @@ -171,8 +172,10 @@ func TestGetClusterResource(t *testing.T) {
"component": "taskmanager",
},
},
Spec: appsv1.DeploymentSpec{
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
ServiceName: "flinkjobcluster-sample-taskmanager",
PodManagementPolicy: "Parallel",
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "flink",
Expand Down
1 change: 1 addition & 0 deletions controllers/flinkcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (reconciler *FlinkClusterReconciler) SetupWithManager(
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.FlinkCluster{}).
Owns(&appsv1.Deployment{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Owns(&batchv1.Job{}).
Complete(reconciler)
Expand Down
18 changes: 12 additions & 6 deletions controllers/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func getDesiredClusterState(

// Gets the desired JobManager deployment spec from the FlinkCluster spec.
func getDesiredJobManagerDeployment(
flinkCluster *v1beta1.FlinkCluster) *appsv1.Deployment {
flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet {

if shouldCleanup(flinkCluster, "JobManagerDeployment") {
return nil
Expand Down Expand Up @@ -206,16 +206,19 @@ func getDesiredJobManagerDeployment(
SecurityContext: securityContext,
ServiceAccountName: getServiceAccountName(serviceAccount),
}
var jobManagerDeployment = &appsv1.Deployment{

var jobManagerDeployment = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: jobManagerDeploymentName,
OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
Labels: deploymentLabels,
},
Spec: appsv1.DeploymentSpec{
Spec: appsv1.StatefulSetSpec{
Replicas: jobManagerSpec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: podLabels},
ServiceName: jobManagerDeploymentName,
VolumeClaimTemplates: jobManagerSpec.VolumeClaimTemplates,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Expand Down Expand Up @@ -373,7 +376,7 @@ func getDesiredJobManagerIngress(

// Gets the desired TaskManager deployment spec from a cluster spec.
func getDesiredTaskManagerDeployment(
flinkCluster *v1beta1.FlinkCluster) *appsv1.Deployment {
flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet {

if shouldCleanup(flinkCluster, "TaskManagerDeployment") {
return nil
Expand Down Expand Up @@ -502,17 +505,20 @@ func getDesiredTaskManagerDeployment(
SecurityContext: securityContext,
ServiceAccountName: getServiceAccountName(serviceAccount),
}
var taskManagerDeployment = &appsv1.Deployment{
var taskManagerDeployment = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: taskManagerDeploymentName,
OwnerReferences: []metav1.OwnerReference{
ToOwnerReference(flinkCluster)},
Labels: deploymentLabels,
},
Spec: appsv1.DeploymentSpec{
Spec: appsv1.StatefulSetSpec{
Replicas: &taskManagerSpec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: podLabels},
ServiceName: taskManagerDeploymentName,
VolumeClaimTemplates: taskManagerSpec.VolumeClaimTemplates,
PodManagementPolicy: "Parallel",
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Expand Down
11 changes: 7 additions & 4 deletions controllers/flinkcluster_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestGetDesiredClusterState(t *testing.T) {
// Verify.

// JmDeployment
var expectedDesiredJmDeployment = appsv1.Deployment{
var expectedDesiredJmDeployment = appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "flinkjobcluster-sample-jobmanager",
Namespace: "default",
Expand All @@ -302,14 +302,15 @@ func TestGetDesiredClusterState(t *testing.T) {
},
},
},
Spec: appsv1.DeploymentSpec{
Spec: appsv1.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "flink",
"cluster": "flinkjobcluster-sample",
"component": "jobmanager",
},
},
ServiceName: "flinkjobcluster-sample-jobmanager",
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down Expand Up @@ -550,7 +551,7 @@ func TestGetDesiredClusterState(t *testing.T) {
expectedDesiredJmIngress)

// TmDeployment
var expectedDesiredTmDeployment = appsv1.Deployment{
var expectedDesiredTmDeployment = appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "flinkjobcluster-sample-taskmanager",
Namespace: "default",
Expand All @@ -570,8 +571,10 @@ func TestGetDesiredClusterState(t *testing.T) {
},
},
},
Spec: appsv1.DeploymentSpec{
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
ServiceName: "flinkjobcluster-sample-taskmanager",
PodManagementPolicy: "Parallel",
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "flink",
Expand Down
14 changes: 7 additions & 7 deletions controllers/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ type ObservedClusterState struct {
cluster *v1beta1.FlinkCluster
revisions []*appsv1.ControllerRevision
configMap *corev1.ConfigMap
jmDeployment *appsv1.Deployment
jmDeployment *appsv1.StatefulSet
jmService *corev1.Service
jmIngress *extensionsv1beta1.Ingress
tmDeployment *appsv1.Deployment
tmDeployment *appsv1.StatefulSet
job *batchv1.Job
flinkJobList *flinkclient.JobStatusList
flinkRunningJobIDs []string
Expand Down Expand Up @@ -121,7 +121,7 @@ func (observer *ClusterStateObserver) observe(
}

// JobManager deployment.
var observedJmDeployment = new(appsv1.Deployment)
var observedJmDeployment = new(appsv1.StatefulSet)
err = observer.observeJobManagerDeployment(observedJmDeployment)
if err != nil {
if client.IgnoreNotFound(err) != nil {
Expand Down Expand Up @@ -166,7 +166,7 @@ func (observer *ClusterStateObserver) observe(
}

// TaskManager deployment.
var observedTmDeployment = new(appsv1.Deployment)
var observedTmDeployment = new(appsv1.StatefulSet)
err = observer.observeTaskManagerDeployment(observedTmDeployment)
if err != nil {
if client.IgnoreNotFound(err) != nil {
Expand Down Expand Up @@ -361,7 +361,7 @@ func (observer *ClusterStateObserver) observeConfigMap(
}

func (observer *ClusterStateObserver) observeJobManagerDeployment(
observedDeployment *appsv1.Deployment) error {
observedDeployment *appsv1.StatefulSet) error {
var clusterNamespace = observer.request.Namespace
var clusterName = observer.request.Name
var jmDeploymentName = getJobManagerDeploymentName(clusterName)
Expand All @@ -370,7 +370,7 @@ func (observer *ClusterStateObserver) observeJobManagerDeployment(
}

func (observer *ClusterStateObserver) observeTaskManagerDeployment(
observedDeployment *appsv1.Deployment) error {
observedDeployment *appsv1.StatefulSet) error {
var clusterNamespace = observer.request.Namespace
var clusterName = observer.request.Name
var tmDeploymentName = getTaskManagerDeploymentName(clusterName)
Expand All @@ -382,7 +382,7 @@ func (observer *ClusterStateObserver) observeDeployment(
namespace string,
name string,
component string,
observedDeployment *appsv1.Deployment) error {
observedDeployment *appsv1.StatefulSet) error {
var log = observer.log.WithValues("component", component)
var err = observer.k8sClient.Get(
observer.context,
Expand Down
10 changes: 5 additions & 5 deletions controllers/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func (reconciler *ClusterReconciler) reconcileTaskManagerDeployment() error {

func (reconciler *ClusterReconciler) reconcileDeployment(
component string,
desiredDeployment *appsv1.Deployment,
observedDeployment *appsv1.Deployment) error {
desiredDeployment *appsv1.StatefulSet,
observedDeployment *appsv1.StatefulSet) error {
var log = reconciler.log.WithValues("component", component)

if desiredDeployment != nil && observedDeployment == nil {
Expand Down Expand Up @@ -158,7 +158,7 @@ func (reconciler *ClusterReconciler) reconcileDeployment(
}

func (reconciler *ClusterReconciler) createDeployment(
deployment *appsv1.Deployment, component string) error {
deployment *appsv1.StatefulSet, component string) error {
var context = reconciler.context
var log = reconciler.log.WithValues("component", component)
var k8sClient = reconciler.k8sClient
Expand Down Expand Up @@ -193,7 +193,7 @@ func (reconciler *ClusterReconciler) deleteOldComponent(desired runtime.Object,
}

func (reconciler *ClusterReconciler) updateDeployment(
deployment *appsv1.Deployment, component string) error {
deployment *appsv1.StatefulSet, component string) error {
var context = reconciler.context
var log = reconciler.log.WithValues("component", component)
var k8sClient = reconciler.k8sClient
Expand All @@ -209,7 +209,7 @@ func (reconciler *ClusterReconciler) updateDeployment(
}

func (reconciler *ClusterReconciler) deleteDeployment(
deployment *appsv1.Deployment, component string) error {
deployment *appsv1.StatefulSet, component string) error {
var context = reconciler.context
var log = reconciler.log.WithValues("component", component)
var k8sClient = reconciler.k8sClient
Expand Down
4 changes: 2 additions & 2 deletions controllers/flinkcluster_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,8 +892,8 @@ func (updater *ClusterStatusUpdater) clearControlAnnotation(newControlStatus *v1
return nil
}

func getDeploymentState(deployment *appsv1.Deployment) string {
if deployment.Status.AvailableReplicas >= *deployment.Spec.Replicas {
func getDeploymentState(deployment *appsv1.StatefulSet) string {
if deployment.Status.ReadyReplicas >= *deployment.Spec.Replicas {
return v1beta1.ComponentStateReady
}
return v1beta1.ComponentStateNotReady
Expand Down
12 changes: 6 additions & 6 deletions controllers/flinkcluster_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (

func TestGetDeploymentStateNotReady(t *testing.T) {
var replicas int32 = 3
var deployment = appsv1.Deployment{
Spec: appsv1.DeploymentSpec{Replicas: &replicas},
Status: appsv1.DeploymentStatus{AvailableReplicas: 2},
var deployment = appsv1.StatefulSet{
Spec: appsv1.StatefulSetSpec{Replicas: &replicas},
Status: appsv1.StatefulSetStatus{ReadyReplicas: 2},
}
var state = getDeploymentState(&deployment)
assert.Assert(
Expand All @@ -38,9 +38,9 @@ func TestGetDeploymentStateNotReady(t *testing.T) {

func TestGetDeploymentStateReady(t *testing.T) {
var replicas int32 = 3
var deployment = appsv1.Deployment{
Spec: appsv1.DeploymentSpec{Replicas: &replicas},
Status: appsv1.DeploymentStatus{AvailableReplicas: 3},
var deployment = appsv1.StatefulSet{
Spec: appsv1.StatefulSetSpec{Replicas: &replicas},
Status: appsv1.StatefulSetStatus{ReadyReplicas: 3},
}
var state = getDeploymentState(&deployment)
assert.Assert(t, state == v1beta1.ComponentStateReady)
Expand Down
4 changes: 4 additions & 0 deletions controllers/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@ func isComponentUpdated(component runtime.Object, cluster v1beta1.FlinkCluster)
if o == nil {
return false
}
case *appsv1.StatefulSet:
if o == nil {
return false
}
case *corev1.ConfigMap:
if o == nil {
return false
Expand Down
Loading