Skip to content
This repository was archived by the owner on May 8, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ const (
ComponentStateDeleted = "Deleted"
)

// JobState defines states for a Flink job.
// JobState defines states for a Flink job deployment.
const (
JobStatePending = "Pending"
JobStateRunning = "Running"
JobStateUpdating = "Updating"
JobStateSucceeded = "Succeeded"
JobStateFailed = "Failed"
JobStateCancelled = "Cancelled"
JobStateSuspended = "Suspended"
JobStateUnknown = "Unknown"
JobStateLost = "Lost"
)

// AccessScope defines the access scope of JobManager service.
Expand Down Expand Up @@ -540,7 +542,7 @@ type JobStatus struct {
Name string `json:"name"`

// The ID of the Flink job.
ID string `json:"id"`
ID string `json:"id,omitempty"`

// The state of the Kubernetes job.
State string `json:"state"`
Expand Down
41 changes: 41 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ spec:
properties:
batchSchedulerName:
type: string
serviceAccountName:
type: string
envFrom:
items:
properties:
Expand Down Expand Up @@ -3284,6 +3282,8 @@ spec:
revisionHistoryLimit:
format: int32
type: integer
serviceAccountName:
type: string
taskManager:
properties:
extraPorts:
Expand Down Expand Up @@ -5163,7 +5163,6 @@ spec:
state:
type: string
required:
- id
- name
- state
type: object
Expand Down
5 changes: 3 additions & 2 deletions controllers/flinkcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (handler *FlinkClusterHandler) reconcile(

log.Info("---------- 3. Compute the desired state ----------")

*desired = getDesiredClusterState(observed.cluster, time.Now())
*desired = getDesiredClusterState(observed, time.Now())
if desired.ConfigMap != nil {
log.Info("Desired state", "ConfigMap", *desired.ConfigMap)
} else {
Expand Down Expand Up @@ -211,7 +211,8 @@ func (handler *FlinkClusterHandler) reconcile(
context: context,
log: log,
observed: handler.observed,
desired: handler.desired, recorder: handler.recorder,
desired: handler.desired,
recorder: handler.recorder,
}
result, err := reconciler.reconcile()
if err != nil {
Expand Down
79 changes: 49 additions & 30 deletions controllers/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ var flinkSysProps = map[string]struct{}{

// Gets the desired state of a cluster.
func getDesiredClusterState(
cluster *v1beta1.FlinkCluster,
observed *ObservedClusterState,
now time.Time) model.DesiredClusterState {
var cluster = observed.cluster

// The cluster has been deleted, all resources should be cleaned up.
if cluster == nil {
return model.DesiredClusterState{}
Expand All @@ -71,7 +73,7 @@ func getDesiredClusterState(
JmService: getDesiredJobManagerService(cluster),
JmIngress: getDesiredJobManagerIngress(cluster),
TmDeployment: getDesiredTaskManagerDeployment(cluster),
Job: getDesiredJob(cluster),
Job: getDesiredJob(observed),
}
}

Expand Down Expand Up @@ -584,18 +586,29 @@ func getDesiredConfigMap(
}

// Gets the desired job spec from a cluster spec.
func getDesiredJob(
flinkCluster *v1beta1.FlinkCluster) *batchv1.Job {
func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
var flinkCluster = observed.cluster
var jobSpec = flinkCluster.Spec.Job

if jobSpec == nil {
return nil
}

// Terminated job remains in that state, if no update is triggered.
if !isUpdateTriggered(flinkCluster.Status) {
// We need to watch whether job is cancelled already if jobSpec.CancelRequested is deprecated
var jobStatus = flinkCluster.Status.Components.Job
if isJobCancelRequested(*flinkCluster) || (jobStatus != nil && jobStatus.State == v1beta1.JobStateCancelled) {
// Job cancelled case
var recordedJobStatus = flinkCluster.Status.Components.Job
var shouldBeTerminated = isJobCancelRequested(*flinkCluster) ||
(recordedJobStatus != nil && recordedJobStatus.State == v1beta1.JobStateCancelled)
if shouldBeTerminated {
return nil
}

// Job failed and no restart case
var restartPolicy = jobSpec.RestartPolicy
var noRestartFromFailure = recordedJobStatus != nil && recordedJobStatus.State == v1beta1.JobStateFailed &&
(restartPolicy == nil || *restartPolicy == v1beta1.JobRestartPolicyNever)
if noRestartFromFailure {
return nil
}
}
Expand All @@ -619,8 +632,8 @@ func getDesiredJob(
jobArgs = append(jobArgs, "--class", *jobSpec.ClassName)
}

var fromSavepoint = convertFromSavepoint(jobSpec, &flinkCluster.Status)
if fromSavepoint != nil && *fromSavepoint != "" {
var fromSavepoint = convertFromSavepoint(jobSpec, flinkCluster.Status.Components.Job)
if fromSavepoint != nil {
jobArgs = append(jobArgs, "--fromSavepoint", *fromSavepoint)
}

Expand All @@ -640,7 +653,7 @@ func getDesiredJob(

var securityContext = jobSpec.SecurityContext

var envVars = []corev1.EnvVar{}
var envVars []corev1.EnvVar

// If the JAR file is remote, put the URI in the env variable
// FLINK_JOB_JAR_URI and rewrite the JAR path to a local path. The entrypoint
Expand All @@ -654,6 +667,12 @@ func getDesiredJob(
Value: jobSpec.JarFile,
})
}
envVars = append(envVars,
corev1.EnvVar{
Name: "FLINK_JM_ADDR",
Value: jobManagerAddress,
})

jobArgs = append(jobArgs, jarPath)
jobArgs = append(jobArgs, jobSpec.Args...)

Expand Down Expand Up @@ -716,7 +735,7 @@ func getDesiredJob(
ServiceAccountName: getServiceAccountName(serviceAccount),
}

// Disable the retry mechanism of k8s Job, all retires should be initiated
// Disable the retry mechanism of k8s Job, all retries should be initiated
// by the operator based on the job restart policy. This is because Flink
// jobs are stateful, if a job fails after running for 10 hours, we probably
// don't want to start over from the beginning, instead we want to resume
Expand Down Expand Up @@ -752,28 +771,28 @@ func getDesiredJob(
// When FlinkCluster is created or updated, if spec.job.fromSavepoint is specified, Flink job will be restored from it.
//
// case 2) Restore Flink job from the latest savepoint.
// When FlinkCluster is updated not specifying spec.job.fromSavepoint, or job is restarted from the failed state,
// Flink job will be restored from the latest savepoint created by the operator or the savepoint from which current job was restored.
func convertFromSavepoint(jobSpec *v1beta1.JobSpec, clusterStatus *v1beta1.FlinkClusterStatus) *string {
var jobStatus = clusterStatus.Components.Job
// When FlinkCluster is updated with no spec.job.fromSavepoint, or job is restarted from the failed state,
// Flink job will be restored from the latest savepoint created by the operator.
//
// case 3) When latest created savepoint is unavailable, use the savepoint from which current job was restored.
func convertFromSavepoint(jobSpec *v1beta1.JobSpec, jobStatus *v1beta1.JobStatus) *string {
switch {
case shouldRestartJob(jobSpec.RestartPolicy, jobStatus):
return &jobStatus.SavepointLocation
case isUpdateTriggered(*clusterStatus) && (jobSpec.FromSavepoint == nil || *jobSpec.FromSavepoint == ""):
if jobStatus == nil {
return nil
}
// Latest savepoint created by Flink operator
if jobStatus.SavepointLocation != "" {
return &jobStatus.SavepointLocation
// Creating for the first time
case jobStatus == nil:
if jobSpec.FromSavepoint != nil && *jobSpec.FromSavepoint != "" {
return jobSpec.FromSavepoint
}
// The savepoint from which current running job was restored
if jobStatus.FromSavepoint != "" {
return &jobStatus.FromSavepoint
}
return nil
// Updating with FromSavepoint provided
case jobStatus.State == v1beta1.JobStateUpdating && jobSpec.FromSavepoint != nil && *jobSpec.FromSavepoint != "":
return jobSpec.FromSavepoint
// Latest savepoint
case jobStatus.SavepointLocation != "":
return &jobStatus.SavepointLocation
// The savepoint from which current job was restored
case jobStatus.FromSavepoint != "":
return &jobStatus.FromSavepoint
}
return jobSpec.FromSavepoint
return nil
}

// Copy any non-duplicate volume mounts to the specified initContainers
Expand Down
Loading