diff --git a/api/v1beta1/flinkcluster_types.go b/api/v1beta1/flinkcluster_types.go index 8d19f09e..46281a44 100644 --- a/api/v1beta1/flinkcluster_types.go +++ b/api/v1beta1/flinkcluster_types.go @@ -317,6 +317,61 @@ type JobSpec struct { CancelRequested *bool `json:"cancelRequested,omitempty"` } +// NativeSessionClusterJobSpec defines properties of a Native Flink session cluster. +// The properties in NativeSessionClusterJobSpec comes from +// https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#kubernetes +type NativeSessionClusterJobSpec struct { + // kubernetes.cluster-id. The cluster id used for identifying the unique flink cluster. + // We use the Name of flinkCluster.ObjectMeta.Name + FlinkClusterID string `json:"flinkClusterID,omitempty"` + + // kubernetes.config.file. The kubernetes config file will be used to create the client. + // The default is located at ~/.kube/config. The sericeaccount in the pod also works. + KubeConfig *string `json:"kubeConfig,omitempty"` + + // kubernetes.container-start-command-template. Template for the kubernetes jobmanager + // and taskmanager container start invocation. + // Default: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%" + ContainerStartCommandTemplate *string `json:"containerStartCommandTemplate,omitempty"` + + // kubernetes.entry.path. The entrypoint script of kubernetes in the image. It will be used as command for jobmanager and taskmanager container. + // Default: "/opt/flink/bin/kubernetes-entry.sh" + EntryPath *string `json:"entryPath,omitempty"` + + // kubernetes.flink.conf.dir. The flink conf directory that will be mounted in pod. + // The flink-conf.yaml, log4j.properties, logback.xml in this path will be overwritten from config map. + // Default: "/opt/flink/conf" + CongfigDir *string `json:"congfigDir,omitempty"` + + // kubernetes.flink.log.dir. The directory that logs of jobmanager and taskmanager be saved in the pod. + // Default: "/opt/flink/log". + LogDir *string `json:"logDir,omitempty"` + + // kubernetes.jobmanager.cpu. The number of cpu used by job manager. + // Default: 1.0 + CPUJobManager *int32 `json:"CPUJobManager,omitempty"` + + // kubernetes.jobmanager.service-account. Service account that is used by jobmanager within kubernetes cluster. + // The job manager uses this service account when requesting taskmanager pods from the API server. + // Default: "default" + FlinkClusterSA *string `json:"flinkClusterSA,omitempty"` + + // kubernetes.rest-service.exposed.type. It could be ClusterIP/NodePort/LoadBalancer(default). + // When set to ClusterIP, the rest service will not be created. + // Default: "LoadBalancer" + FlinkRestServiceType *string `json:"flinkRestServiceType,omitempty"` + + // kubernetes.service.create-timeout. Timeout used for creating the service. + // The timeout value requires a time-unit specifier (ms/s/min/h/d). + // Default: "1 min" + FlinkServiceCreateTimeout *string `json:"flinkServiceCreateTimeout,omitempty"` + + // kubernetes.taskmanager.cpu. The number of cpu used by task manager. + // By default, the cpu is set to the number of slots per TaskManager. + // Default: -1.0 + TaskManagerCPU *int32 `json:"taskManagerCPU,omitempty"` +} + // FlinkClusterSpec defines the desired state of FlinkCluster type FlinkClusterSpec struct { // Flink image spec for the cluster's components. @@ -333,6 +388,10 @@ type FlinkClusterSpec struct { // otherwise, it is a long-running Session Cluster. Job *JobSpec `json:"job,omitempty"` + // (Optional) Native Flink session spec. If specified, + // this cluster is a Native Flink session(only jobmanager created in advanced.) + NativeSessionClusterJob *NativeSessionClusterJobSpec `json:"nativeSessionClusterJob,omitempty"` + // Environment variables shared by all JobManager, TaskManager and job // containers. EnvVars []corev1.EnvVar `json:"envVars,omitempty"` diff --git a/api/v1beta1/flinkcluster_validate.go b/api/v1beta1/flinkcluster_validate.go index fb81f036..94ef87a8 100644 --- a/api/v1beta1/flinkcluster_validate.go +++ b/api/v1beta1/flinkcluster_validate.go @@ -49,6 +49,10 @@ func (v *Validator) ValidateCreate(cluster *FlinkCluster) error { if err != nil { return err } + if cluster.Spec.NativeSessionClusterJob != nil { + // It's a native session cluster, will not to valide the jobManager, taskManager, etc. + return v.validateNativeSessionClusterJob() + } err = v.validateJobManager(&cluster.Spec.JobManager) if err != nil { return err @@ -235,6 +239,11 @@ func (v *Validator) validateImage(imageSpec *ImageSpec) error { return nil } +func (v *Validator) validateNativeSessionClusterJob() error { + //TODO: Check if it's a need to validate the properties + return nil +} + func (v *Validator) validateJobManager(jmSpec *JobManagerSpec) error { var err error diff --git a/api/v1beta1/flinkcluster_webhook.go b/api/v1beta1/flinkcluster_webhook.go index 2299c16c..11b3fba0 100644 --- a/api/v1beta1/flinkcluster_webhook.go +++ b/api/v1beta1/flinkcluster_webhook.go @@ -55,7 +55,11 @@ var _ webhook.Defaulter = &FlinkCluster{} // type. func (cluster *FlinkCluster) Default() { log.Info("default", "name", cluster.Name, "original", *cluster) - _SetDefault(cluster) + if cluster.Spec.NativeSessionClusterJob != nil { + log.Info("It's a NativeSessionCluster, will not set defaults.") + } else { + _SetDefault(cluster) + } log.Info("default", "name", cluster.Name, "augmented", *cluster) } diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index a02b5104..91d15519 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -176,6 +176,11 @@ func (in *FlinkClusterSpec) DeepCopyInto(out *FlinkClusterSpec) { *out = new(JobSpec) (*in).DeepCopyInto(*out) } + if in.NativeSessionClusterJob != nil { + in, out := &in.NativeSessionClusterJob, &out.NativeSessionClusterJob + *out = new(NativeSessionClusterJobSpec) + (*in).DeepCopyInto(*out) + } if in.EnvVars != nil { in, out := &in.EnvVars, &out.EnvVars *out = make([]v1.EnvVar, len(*in)) @@ -577,6 +582,71 @@ func (in *JobStatus) DeepCopy() *JobStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NativeSessionClusterJobSpec) DeepCopyInto(out *NativeSessionClusterJobSpec) { + *out = *in + if in.KubeConfig != nil { + in, out := &in.KubeConfig, &out.KubeConfig + *out = new(string) + **out = **in + } + if in.ContainerStartCommandTemplate != nil { + in, out := &in.ContainerStartCommandTemplate, &out.ContainerStartCommandTemplate + *out = new(string) + **out = **in + } + if in.EntryPath != nil { + in, out := &in.EntryPath, &out.EntryPath + *out = new(string) + **out = **in + } + if in.CongfigDir != nil { + in, out := &in.CongfigDir, &out.CongfigDir + *out = new(string) + **out = **in + } + if in.LogDir != nil { + in, out := &in.LogDir, &out.LogDir + *out = new(string) + **out = **in + } + if in.CPUJobManager != nil { + in, out := &in.CPUJobManager, &out.CPUJobManager + *out = new(int32) + **out = **in + } + if in.FlinkClusterSA != nil { + in, out := &in.FlinkClusterSA, &out.FlinkClusterSA + *out = new(string) + **out = **in + } + if in.FlinkRestServiceType != nil { + in, out := &in.FlinkRestServiceType, &out.FlinkRestServiceType + *out = new(string) + **out = **in + } + if in.FlinkServiceCreateTimeout != nil { + in, out := &in.FlinkServiceCreateTimeout, &out.FlinkServiceCreateTimeout + *out = new(string) + **out = **in + } + if in.TaskManagerCPU != nil { + in, out := &in.TaskManagerCPU, &out.TaskManagerCPU + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NativeSessionClusterJobSpec. +func (in *NativeSessionClusterJobSpec) DeepCopy() *NativeSessionClusterJobSpec { + if in == nil { + return nil + } + out := new(NativeSessionClusterJobSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TaskManagerPorts) DeepCopyInto(out *TaskManagerPorts) { *out = *in diff --git a/config/samples/flinkoperator_v1beta1_flinknativesessioncluster.yaml b/config/samples/flinkoperator_v1beta1_flinknativesessioncluster.yaml new file mode 100644 index 00000000..e8bdc1a6 --- /dev/null +++ b/config/samples/flinkoperator_v1beta1_flinknativesessioncluster.yaml @@ -0,0 +1,24 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: flinkoperator.k8s.io/v1beta1 +kind: FlinkCluster +metadata: + name: native-flinksessioncluster-sample +spec: + image: + name: ccr.ccs.tencentyun.com/kinderyj/flink-test:1.10 + pullPolicy: Always + nativeSessionClusterJob: + flinkClusterID: native-flinksessioncluster-sample \ No newline at end of file diff --git a/controllers/flinkcluster_controller.go b/controllers/flinkcluster_controller.go index 28ed4f0a..df23088e 100644 --- a/controllers/flinkcluster_controller.go +++ b/controllers/flinkcluster_controller.go @@ -33,9 +33,9 @@ import ( // FlinkClusterReconciler reconciles a FlinkCluster object type FlinkClusterReconciler struct { - Client client.Client - Log logr.Logger - Mgr ctrl.Manager + Client client.Client + Log logr.Logger + Mgr ctrl.Manager } // +kubebuilder:rbac:groups=flinkoperator.k8s.io,resources=flinkclusters,verbs=get;list;watch;create;update;patch;delete @@ -60,7 +60,7 @@ func (reconciler *FlinkClusterReconciler) Reconcile( var log = reconciler.Log.WithValues( "cluster", request.NamespacedName) var handler = FlinkClusterHandler{ - k8sClient: reconciler.Client, + k8sClient: reconciler.Client, flinkClient: flinkclient.FlinkClient{ Log: log, HTTPClient: flinkclient.HTTPClient{Log: log}, @@ -90,14 +90,14 @@ func (reconciler *FlinkClusterReconciler) SetupWithManager( // FlinkClusterHandler holds the context and state for a // reconcile request. type FlinkClusterHandler struct { - k8sClient client.Client - flinkClient flinkclient.FlinkClient - request ctrl.Request - context context.Context - log logr.Logger - recorder record.EventRecorder - observed ObservedClusterState - desired DesiredClusterState + k8sClient client.Client + flinkClient flinkclient.FlinkClient + request ctrl.Request + context context.Context + log logr.Logger + recorder record.EventRecorder + observed ObservedClusterState + desired DesiredClusterState } func (handler *FlinkClusterHandler) reconcile( @@ -184,7 +184,11 @@ func (handler *FlinkClusterHandler) reconcile( } else { log.Info("Desired state", "Job", "nil") } - + if desired.NativeClusterSessionJob != nil { + log.Info("Desired state", "NativeClusterSessionJob", *desired.NativeClusterSessionJob) + } else { + log.Info("Desired state", "NativeClusterSessionJob", "nil") + } log.Info("---------- 4. Take actions ----------") var reconciler = ClusterReconciler{ diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index 4052ea4f..d2bed7ae 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -57,12 +57,13 @@ var flinkSysProps = map[string]struct{}{ // DesiredClusterState holds desired state of a cluster. type DesiredClusterState struct { - JmDeployment *appsv1.Deployment - JmService *corev1.Service - JmIngress *extensionsv1beta1.Ingress - TmDeployment *appsv1.Deployment - ConfigMap *corev1.ConfigMap - Job *batchv1.Job + JmDeployment *appsv1.Deployment + JmService *corev1.Service + JmIngress *extensionsv1beta1.Ingress + TmDeployment *appsv1.Deployment + ConfigMap *corev1.ConfigMap + Job *batchv1.Job + NativeClusterSessionJob *batchv1.Job } // Gets the desired state of a cluster. @@ -74,12 +75,13 @@ func getDesiredClusterState( return DesiredClusterState{} } return DesiredClusterState{ - ConfigMap: getDesiredConfigMap(cluster), - JmDeployment: getDesiredJobManagerDeployment(cluster), - JmService: getDesiredJobManagerService(cluster), - JmIngress: getDesiredJobManagerIngress(cluster), - TmDeployment: getDesiredTaskManagerDeployment(cluster), - Job: getDesiredJob(cluster), + ConfigMap: getDesiredConfigMap(cluster), + JmDeployment: getDesiredJobManagerDeployment(cluster), + JmService: getDesiredJobManagerService(cluster), + JmIngress: getDesiredJobManagerIngress(cluster), + TmDeployment: getDesiredTaskManagerDeployment(cluster), + Job: getDesiredJob(cluster), + NativeClusterSessionJob: getDesiredNativeClusterSessionJob(cluster), } } @@ -91,6 +93,11 @@ func getDesiredJobManagerDeployment( return nil } + if flinkCluster.Spec.NativeSessionClusterJob != nil { + //It's a native flink session cluster + return nil + } + var clusterNamespace = flinkCluster.ObjectMeta.Namespace var clusterName = flinkCluster.ObjectMeta.Name var clusterSpec = flinkCluster.Spec @@ -236,6 +243,11 @@ func getDesiredJobManagerService( return nil } + if flinkCluster.Spec.NativeSessionClusterJob != nil { + //It's a native flink session cluster + return nil + } + var clusterNamespace = flinkCluster.ObjectMeta.Namespace var clusterName = flinkCluster.ObjectMeta.Name var jobManagerSpec = flinkCluster.Spec.JobManager @@ -307,6 +319,11 @@ func getDesiredJobManagerIngress( return nil } + if flinkCluster.Spec.NativeSessionClusterJob != nil { + //It's a native flink session cluster + return nil + } + var clusterNamespace = flinkCluster.ObjectMeta.Namespace var clusterName = flinkCluster.ObjectMeta.Name var jobManagerServiceName = getJobManagerServiceName(clusterName) @@ -378,6 +395,11 @@ func getDesiredTaskManagerDeployment( return nil } + if flinkCluster.Spec.NativeSessionClusterJob != nil { + //It's a native flink session cluster + return nil + } + var clusterNamespace = flinkCluster.ObjectMeta.Namespace var clusterName = flinkCluster.ObjectMeta.Name var clusterSpec = flinkCluster.Spec @@ -521,6 +543,11 @@ func getDesiredConfigMap( return nil } + if flinkCluster.Spec.NativeSessionClusterJob != nil { + //It's a native flink session cluster + return nil + } + var clusterNamespace = flinkCluster.ObjectMeta.Namespace var clusterName = flinkCluster.ObjectMeta.Name var flinkProperties = flinkCluster.Spec.FlinkProperties @@ -586,6 +613,11 @@ func getDesiredJob( return nil } + if flinkCluster.Spec.NativeSessionClusterJob != nil { + //It's a native flink session cluster + return nil + } + var controlStatus = flinkCluster.Status.Control // We need to watch whether job is cancelled already if jobSpec.CancelRequested is deprecated if (flinkCluster.Status.Components.Job != nil && flinkCluster.Status.Components.Job.State == v1beta1.JobStateCancelled) || @@ -732,6 +764,117 @@ func getDesiredJob( return job } +func getDesiredNativeClusterSessionJob( + flinkCluster *v1beta1.FlinkCluster) *batchv1.Job { + var jobSpec = flinkCluster.Spec.NativeSessionClusterJob + + if jobSpec == nil { + return nil + } + var clusterSpec = flinkCluster.Spec + var imageSpec = clusterSpec.Image + var clusterNamespace = flinkCluster.ObjectMeta.Namespace + var clusterName = flinkCluster.ObjectMeta.Name + var jobName = getNativeSessionClusterJobName(clusterName) + var labels = map[string]string{ + "cluster": clusterName, + "app": "flinkNativeSessionCluster", + } + + var jobArgs = []string{"/opt/flink/bin/kubernetes-session.sh"} + + jobArgs = append(jobArgs, "-Dkubernetes.cluster-id="+clusterName) + + if imageSpec.Name != "" { + jobArgs = append(jobArgs, "-Dkubernetes.container.image="+imageSpec.Name) + } + + if jobSpec.EntryPath != nil { + jobArgs = append(jobArgs, *jobSpec.EntryPath) + } + + if jobSpec.CongfigDir != nil { + jobArgs = append(jobArgs, *jobSpec.CongfigDir) + } + + if jobSpec.FlinkClusterSA != nil { + jobArgs = append(jobArgs, *jobSpec.FlinkClusterSA) + } + //TODO: check all properties and append to jobArgs. + + var envVars = []corev1.EnvVar{} + var volumes []corev1.Volume + var volumeMounts []corev1.VolumeMount + + // Hadoop config. + var hcVolume, hcMount, hcEnv = convertHadoopConfig(clusterSpec.HadoopConfig) + if hcVolume != nil { + volumes = append(volumes, *hcVolume) + } + if hcMount != nil { + volumeMounts = append(volumeMounts, *hcMount) + } + if hcEnv != nil { + envVars = append(envVars, *hcEnv) + } + + // GCP service account config. + var saVolume, saMount, saEnv = convertGCPConfig(clusterSpec.GCPConfig) + if saVolume != nil { + volumes = append(volumes, *saVolume) + } + if saMount != nil { + volumeMounts = append(volumeMounts, *saMount) + } + if saEnv != nil { + envVars = append(envVars, *saEnv) + } + + envVars = append(envVars, flinkCluster.Spec.EnvVars...) + + var podSpec = corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "main", + Image: imageSpec.Name, + ImagePullPolicy: imageSpec.PullPolicy, + Args: jobArgs, + Env: envVars, + VolumeMounts: volumeMounts, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + Volumes: volumes, + ImagePullSecrets: imageSpec.PullSecrets, + } + + // Disable the retry mechanism of k8s Job, all retires should be initiated + // by the operator based on the job restart policy. This is because Flink + // jobs are stateful, if a job fails after running for 10 hours, we probably + // don't want to start over from the beginning, instead we want to resume + // the job from the latest savepoint which means strictly speaking it is no + // longer the same job as the previous one because the `--fromSavepoint` + // parameter has changed. + var backoffLimit int32 = 0 + var job = &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: clusterNamespace, + Name: jobName, + OwnerReferences: []metav1.OwnerReference{ + toOwnerReference(flinkCluster)}, + Labels: labels, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + Spec: podSpec, + }, + BackoffLimit: &backoffLimit, + }, + } + return job +} + func convertFromSavepoint( jobSpec *v1beta1.JobSpec, jobStatus *v1beta1.JobStatus) *string { if shouldRestartJob(jobSpec.RestartPolicy, jobStatus) { diff --git a/controllers/flinkcluster_observer.go b/controllers/flinkcluster_observer.go index aa1a0789..71976af0 100644 --- a/controllers/flinkcluster_observer.go +++ b/controllers/flinkcluster_observer.go @@ -43,16 +43,17 @@ type ClusterStateObserver struct { // ObservedClusterState holds observed state of a cluster. type ObservedClusterState struct { - cluster *v1beta1.FlinkCluster - configMap *corev1.ConfigMap - jmDeployment *appsv1.Deployment - jmService *corev1.Service - jmIngress *extensionsv1beta1.Ingress - tmDeployment *appsv1.Deployment - job *batchv1.Job - flinkJobList *flinkclient.JobStatusList - flinkRunningJobIDs []string - flinkJobID *string + cluster *v1beta1.FlinkCluster + configMap *corev1.ConfigMap + jmDeployment *appsv1.Deployment + jmService *corev1.Service + jmIngress *extensionsv1beta1.Ingress + tmDeployment *appsv1.Deployment + job *batchv1.Job + flinkJobList *flinkclient.JobStatusList + flinkRunningJobIDs []string + flinkJobID *string + nativeClusterSessionJob *batchv1.Job } // Observes the state of the cluster and its components. @@ -77,6 +78,9 @@ func (observer *ClusterStateObserver) observe( observed.cluster = observedCluster } + // Native session cluster job. + err = observer.observeNativeSessionClusterJob(observed) + // ConfigMap. var observedConfigMap = new(corev1.ConfigMap) err = observer.observeConfigMap(observedConfigMap) @@ -190,6 +194,34 @@ func (observer *ClusterStateObserver) observeJob( return nil } +func (observer *ClusterStateObserver) observeNativeSessionClusterJob( + observed *ObservedClusterState) error { + var err error + var log = observer.log + + //Either the cluster has been deleted or it is not a native session cluster. + //if observed.cluster == nil || observed.cluster.Spec.NativeSessionClusterJob == nil { + // return nil + //} + + // Job resource. + var observedJob = new(batchv1.Job) + err = observer.observeNativeSessionClusterJobResource(observedJob) + if err != nil { + if client.IgnoreNotFound(err) != nil { + log.Error(err, "Failed to get NativeSessionCluster job") + return err + } + log.Info("Observed NativeSessionCluster job", "state", "nil") + observedJob = nil + } else { + log.Info("Observed NativeSessionCluster job", "state", *observedJob) + observed.nativeClusterSessionJob = observedJob + } + + return nil +} + // Observes Flink jobs through Flink API (instead of Kubernetes jobs through // Kubernetes API). // @@ -365,3 +397,17 @@ func (observer *ClusterStateObserver) observeJobResource( }, observedJob) } + +func (observer *ClusterStateObserver) observeNativeSessionClusterJobResource( + observedJob *batchv1.Job) error { + var clusterNamespace = observer.request.Namespace + var clusterName = observer.request.Name + + return observer.k8sClient.Get( + observer.context, + types.NamespacedName{ + Namespace: clusterNamespace, + Name: getNativeSessionClusterJobName(clusterName), + }, + observedJob) +} diff --git a/controllers/flinkcluster_reconciler.go b/controllers/flinkcluster_reconciler.go index d064495c..d85258a9 100644 --- a/controllers/flinkcluster_reconciler.go +++ b/controllers/flinkcluster_reconciler.go @@ -28,6 +28,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -53,6 +54,12 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) { // Child resources of the cluster CR will be automatically reclaimed by K8S. if reconciler.observed.cluster == nil { reconciler.log.Info("The cluster has been deleted, no action to take") + //Flink uses Kubernetes ownerReference’s to cleanup all cluster components. + //All the Flink created resources, including ConfigMap, Service, Deployment, + //Pod, have been set the ownerReference to service/. When the service + //is deleted, all other resource will be deleted automatically. + //More info : https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html + reconciler.reconcileNativeClusterService() return ctrl.Result{}, nil } @@ -81,6 +88,11 @@ func (reconciler *ClusterReconciler) reconcile() (ctrl.Result, error) { return ctrl.Result{}, err } + err = reconciler.reconcileNativeSessionClusterJob() + if err != nil { + return ctrl.Result{}, err + } + result, err := reconciler.reconcileJob() return result, nil @@ -100,6 +112,61 @@ func (reconciler *ClusterReconciler) reconcileTaskManagerDeployment() error { reconciler.observed.tmDeployment) } +func (reconciler *ClusterReconciler) reconcileNativeSessionClusterJob() error { + var desiredNativeSessionClusterJob = reconciler.desired.NativeClusterSessionJob + var observedNativeSessionClusterJob = reconciler.observed.nativeClusterSessionJob + + if desiredNativeSessionClusterJob != nil && observedNativeSessionClusterJob == nil { + return reconciler.createJob(desiredNativeSessionClusterJob) + } + + if desiredNativeSessionClusterJob != nil && observedNativeSessionClusterJob != nil { + reconciler.log.Info("NativeSessionClusterJob already exists, no action") + return nil + // TODO: compare and update if needed. + } + + if desiredNativeSessionClusterJob == nil && observedNativeSessionClusterJob != nil { + reconciler.reconcileNativeClusterService() + return reconciler.deleteJob(observedNativeSessionClusterJob) + } + + return nil +} + +func (reconciler *ClusterReconciler) reconcileNativeClusterService() { + // Delete the service which created by the flink. + var nativeFlinkSessionService = new(corev1.Service) + var k8sClient = reconciler.k8sClient + var context = reconciler.context + var log = reconciler.log.WithValues("component", "nativeSessionJob") + + if reconciler.observed.nativeClusterSessionJob == nil { + log.Info("The job for the native session cluster has been deleted.") + return + } + var jobObjectMeta = reconciler.observed.nativeClusterSessionJob.ObjectMeta + log.Info("Deleting nativeFlinkSessionService") + err := k8sClient.Get( + context, + types.NamespacedName{ + Namespace: jobObjectMeta.Namespace, + Name: getNativeFlinkClusterName(jobObjectMeta.Name), + }, + nativeFlinkSessionService) + if err != nil { + log.Error(err, "Failed to get nativeFlinkSessionService.") + } else { + log.Info("Get nativeFlinkSessionService", "state", *nativeFlinkSessionService) + err = reconciler.deleteService(nativeFlinkSessionService, "nativeSessionJob") + if err != nil { + log.Info("Failed to delete nativeFlinkSessionService", "error", err) + } else { + log.Info("Delete nativeFlinkSessionService successfully.") + } + } +} + func (reconciler *ClusterReconciler) reconcileDeployment( component string, desiredDeployment *appsv1.Deployment, diff --git a/controllers/flinkcluster_util.go b/controllers/flinkcluster_util.go index 678c3e05..ab3d38ba 100644 --- a/controllers/flinkcluster_util.go +++ b/controllers/flinkcluster_util.go @@ -19,6 +19,7 @@ package controllers import ( "fmt" "strconv" + "strings" "time" v1beta1 "github.com/googlecloudplatform/flink-operator/api/v1beta1" @@ -63,6 +64,16 @@ func getJobName(clusterName string) string { return clusterName + "-job" } +// Gets Job name +func getNativeSessionClusterJobName(clusterName string) string { + return clusterName + "-native-session-cluster-job" +} + +// Gets native flink cluster name +func getNativeFlinkClusterName(jobName string) string { + return strings.TrimSuffix(jobName, "-native-session-cluster-job") +} + // TimeConverter converts between time.Time and string. type TimeConverter struct{} diff --git a/main.go b/main.go index 7310c5b4..cf81118b 100644 --- a/main.go +++ b/main.go @@ -75,8 +75,8 @@ func main() { } err = (&controllers.FlinkClusterReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("FlinkCluster"), + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("FlinkCluster"), }).SetupWithManager(mgr) if err != nil { setupLog.Error(err, "Unable to create controller", "controller", "FlinkCluster")