Skip to content

Commit 90d2bb9

Browse files
committed
Improve savepoint and update (#4)
1 parent 8bd5636 commit 90d2bb9

22 files changed

+1922
-1524
lines changed

api/v1beta1/flinkcluster_default_test.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,17 @@ func TestSetDefault(t *testing.T) {
4545
var defaultJmBlobPort = int32(6124)
4646
var defaultJmQueryPort = int32(6125)
4747
var defaultJmUIPort = int32(8081)
48+
var defaultJmIngressTLSUse = false
4849
var defaultTmDataPort = int32(6121)
4950
var defaultTmRPCPort = int32(6122)
5051
var defaultTmQueryPort = int32(6125)
5152
var defaultJobAllowNonRestoredState = false
5253
var defaultJobParallelism = int32(1)
5354
var defaultJobNoLoggingToStdout = false
5455
var defaultJobRestartPolicy = JobRestartPolicyNever
55-
var defatulJobManagerIngressTLSUse = false
5656
var defaultMemoryOffHeapRatio = int32(25)
5757
var defaultMemoryOffHeapMin = resource.MustParse("600M")
58-
defaultRecreateOnUpdate := new(bool)
59-
*defaultRecreateOnUpdate = true
58+
var defaultRecreateOnUpdate = true
6059
var expectedCluster = FlinkCluster{
6160
TypeMeta: metav1.TypeMeta{},
6261
ObjectMeta: metav1.ObjectMeta{},
@@ -70,7 +69,7 @@ func TestSetDefault(t *testing.T) {
7069
Replicas: &defaultJmReplicas,
7170
AccessScope: "Cluster",
7271
Ingress: &JobManagerIngressSpec{
73-
UseTLS: &defatulJobManagerIngressTLSUse,
72+
UseTLS: &defaultJmIngressTLSUse,
7473
},
7574
Ports: JobManagerPorts{
7675
RPC: &defaultJmRPCPort,
@@ -115,7 +114,7 @@ func TestSetDefault(t *testing.T) {
115114
MountPath: "/etc/hadoop/conf",
116115
},
117116
EnvVars: nil,
118-
RecreateOnUpdate: defaultRecreateOnUpdate,
117+
RecreateOnUpdate: &defaultRecreateOnUpdate,
119118
},
120119
Status: FlinkClusterStatus{},
121120
}
@@ -134,23 +133,22 @@ func TestSetNonDefault(t *testing.T) {
134133
var jmBlobPort = int32(8124)
135134
var jmQueryPort = int32(8125)
136135
var jmUIPort = int32(9081)
136+
var jmIngressTLSUse = true
137137
var tmDataPort = int32(8121)
138138
var tmRPCPort = int32(8122)
139139
var tmQueryPort = int32(8125)
140140
var jobAllowNonRestoredState = true
141141
var jobParallelism = int32(2)
142142
var jobNoLoggingToStdout = true
143143
var jobRestartPolicy = JobRestartPolicyFromSavepointOnFailure
144-
var jobManagerIngressTLSUse = true
145144
var memoryOffHeapRatio = int32(50)
146145
var memoryOffHeapMin = resource.MustParse("600M")
146+
var recreateOnUpdate = false
147147
var securityContextUserGroup = int64(9999)
148148
var securityContext = corev1.PodSecurityContext{
149149
RunAsUser: &securityContextUserGroup,
150150
RunAsGroup: &securityContextUserGroup,
151151
}
152-
defaultRecreateOnUpdate := new(bool)
153-
*defaultRecreateOnUpdate = true
154152
var cluster = FlinkCluster{
155153
TypeMeta: metav1.TypeMeta{},
156154
ObjectMeta: metav1.ObjectMeta{},
@@ -164,7 +162,7 @@ func TestSetNonDefault(t *testing.T) {
164162
Replicas: &jmReplicas,
165163
AccessScope: "Cluster",
166164
Ingress: &JobManagerIngressSpec{
167-
UseTLS: &jobManagerIngressTLSUse,
165+
UseTLS: &jmIngressTLSUse,
168166
},
169167
Ports: JobManagerPorts{
170168
RPC: &jmRPCPort,
@@ -208,7 +206,8 @@ func TestSetNonDefault(t *testing.T) {
208206
HadoopConfig: &HadoopConfig{
209207
MountPath: "/opt/flink/hadoop/conf",
210208
},
211-
EnvVars: nil,
209+
EnvVars: nil,
210+
RecreateOnUpdate: &recreateOnUpdate,
212211
},
213212
Status: FlinkClusterStatus{},
214213
}
@@ -228,7 +227,7 @@ func TestSetNonDefault(t *testing.T) {
228227
Replicas: &jmReplicas,
229228
AccessScope: "Cluster",
230229
Ingress: &JobManagerIngressSpec{
231-
UseTLS: &jobManagerIngressTLSUse,
230+
UseTLS: &jmIngressTLSUse,
232231
},
233232
Ports: JobManagerPorts{
234233
RPC: &jmRPCPort,
@@ -273,7 +272,7 @@ func TestSetNonDefault(t *testing.T) {
273272
MountPath: "/opt/flink/hadoop/conf",
274273
},
275274
EnvVars: nil,
276-
RecreateOnUpdate: defaultRecreateOnUpdate,
275+
RecreateOnUpdate: &recreateOnUpdate,
277276
},
278277
Status: FlinkClusterStatus{},
279278
}

api/v1beta1/flinkcluster_types.go

Lines changed: 71 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,17 @@ const (
4343

4444
// JobState defines states for a Flink job deployment.
4545
const (
46-
JobStatePending = "Pending"
47-
JobStateRunning = "Running"
48-
JobStateUpdating = "Updating"
49-
JobStateSucceeded = "Succeeded"
50-
JobStateFailed = "Failed"
51-
JobStateCancelled = "Cancelled"
52-
JobStateSuspended = "Suspended"
53-
JobStateUnknown = "Unknown"
54-
JobStateLost = "Lost"
46+
JobStatePending = "Pending"
47+
JobStateUpdating = "Updating"
48+
JobStateRestarting = "Restarting"
49+
JobStateDeploying = "Deploying"
50+
JobStateDeployFailed = "DeployFailed"
51+
JobStateRunning = "Running"
52+
JobStateSucceeded = "Succeeded"
53+
JobStateCancelled = "Cancelled"
54+
JobStateFailed = "Failed"
55+
JobStateLost = "Lost"
56+
JobStateUnknown = "Unknown"
5557
)
5658

5759
// AccessScope defines the access scope of JobManager service.
@@ -85,24 +87,23 @@ const (
8587
ControlNameJobCancel = "job-cancel"
8688

8789
// control state
88-
ControlStateProgressing = "Progressing"
89-
ControlStateSucceeded = "Succeeded"
90-
ControlStateFailed = "Failed"
90+
ControlStateRequested = "Requested"
91+
ControlStateInProgress = "InProgress"
92+
ControlStateSucceeded = "Succeeded"
93+
ControlStateFailed = "Failed"
9194
)
9295

9396
// Savepoint status
9497
const (
95-
SavepointStateNotTriggered = "NotTriggered"
9698
SavepointStateInProgress = "InProgress"
9799
SavepointStateTriggerFailed = "TriggerFailed"
98100
SavepointStateFailed = "Failed"
99101
SavepointStateSucceeded = "Succeeded"
100102

101-
SavepointTriggerReasonUserRequested = "user requested"
102-
SavepointTriggerReasonScheduled = "scheduled"
103-
SavepointTriggerReasonScheduledInitial = "scheduled initial" // The first triggered savepoint has slightly different flow
104-
SavepointTriggerReasonJobCancel = "job cancel"
105-
SavepointTriggerReasonUpdate = "update"
103+
SavepointTriggerReasonUserRequested = "user requested"
104+
SavepointTriggerReasonJobCancel = "job cancel"
105+
SavepointTriggerReasonScheduled = "scheduled"
106+
SavepointTriggerReasonUpdate = "update"
106107
)
107108

108109
// ImageSpec defines Flink image of JobManager and TaskManager containers.
@@ -348,12 +349,20 @@ type JobSpec struct {
348349
// Allow non-restored state, default: false.
349350
AllowNonRestoredState *bool `json:"allowNonRestoredState,omitempty"`
350351

351-
// Should take savepoint before upgrading the job, default: false.
352-
TakeSavepointOnUpgrade *bool `json:"takeSavepointOnUpgrade,omitempty"`
353-
354352
// Savepoints dir where to store savepoints of the job.
355353
SavepointsDir *string `json:"savepointsDir,omitempty"`
356354

355+
// Should take savepoint before updating job, default: true.
356+
// If this is set as false, maxStateAgeToRestoreSeconds must be provided to limit the savepoint age to restore.
357+
TakeSavepointOnUpdate *bool `json:"takeSavepointOnUpdate,omitempty"`
358+
359+
// Maximum age of the savepoint that allowed to restore state..
360+
// This is applied to auto restart on failure, update from stopped state and update without taking savepoint.
361+
// If nil, job can be restarted only when the latest savepoint is the final job state (created by "stop with savepoint")
362+
// - that is, only when job can be resumed from the suspended state.
363+
// +kubebuilder:validation:Minimum=0
364+
MaxStateAgeToRestoreSeconds *int32 `json:"maxStateAgeToRestoreSeconds,omitempty"`
365+
357366
// Automatically take a savepoint to the `savepointsDir` every n seconds.
358367
AutoSavepointSeconds *int32 `json:"autoSavepointSeconds,omitempty"`
359368

@@ -555,7 +564,7 @@ type JobStatus struct {
555564
// The ID of the Flink job.
556565
ID string `json:"id,omitempty"`
557566

558-
// The state of the Kubernetes job.
567+
// The state of the Flink job deployment.
559568
State string `json:"state"`
560569

561570
// The actual savepoint from which this job started.
@@ -571,21 +580,26 @@ type JobStatus struct {
571580
// Savepoint location.
572581
SavepointLocation string `json:"savepointLocation,omitempty"`
573582

574-
// Last savepoint trigger ID.
575-
LastSavepointTriggerID string `json:"lastSavepointTriggerID,omitempty"`
583+
// Last successful savepoint completed timestamp.
584+
SavepointTime string `json:"savepointTime,omitempty"`
576585

577-
// Last savepoint trigger time. This is updated to make sure multiple
578-
// savepoints will not be taken simultaneously.
579-
LastSavepointTriggerTime string `json:"lastSavepointTriggerTime,omitempty"`
586+
// The savepoint recorded in savepointLocation is the final state of the job.
587+
FinalSavepoint bool `json:"finalSavepoint,omitempty"`
580588

581-
// Last successful or failed savepoint operation timestamp.
582-
LastSavepointTime string `json:"lastSavepointTime,omitempty"`
589+
// The timestamp of the Flink job deployment that creating job submitter.
590+
DeployTime string `json:"deployTime,omitempty"`
591+
592+
// The Flink job started timestamp.
593+
StartTime string `json:"startTime,omitempty"`
594+
595+
// The Flink job ended timestamp.
596+
EndTime string `json:"endTime,omitempty"`
583597

584598
// The number of restarts.
585599
RestartCount int32 `json:"restartCount,omitempty"`
586600
}
587601

588-
// SavepointStatus defines the status of savepoint progress
602+
// SavepointStatus is the status of savepoint progress.
589603
type SavepointStatus struct {
590604
// The ID of the Flink job.
591605
JobID string `json:"jobID,omitempty"`
@@ -599,8 +613,8 @@ type SavepointStatus struct {
599613
// Savepoint triggered reason.
600614
TriggerReason string `json:"triggerReason,omitempty"`
601615

602-
// Savepoint requested time.
603-
RequestTime string `json:"requestTime,omitempty"`
616+
// Savepoint status update time.
617+
UpdateTime string `json:"requestTime,omitempty"`
604618

605619
// Savepoint state.
606620
State string `json:"state"`
@@ -609,6 +623,27 @@ type SavepointStatus struct {
609623
Message string `json:"message,omitempty"`
610624
}
611625

626+
type RevisionStatus struct {
627+
// When the controller creates new ControllerRevision, it generates hash string from the FlinkCluster spec
628+
// which is to be stored in ControllerRevision and uses it to compose the ControllerRevision name.
629+
// Then the controller updates nextRevision to the ControllerRevision name.
630+
// When update process is completed, the controller updates currentRevision as nextRevision.
631+
// currentRevision and nextRevision is composed like this:
632+
// <FLINK_CLUSTER_NAME>-<FLINK_CLUSTER_SPEC_HASH>-<REVISION_NUMBER_IN_CONTROLLERREVISION>
633+
// e.g., myflinkcluster-c464ff7-5
634+
635+
// CurrentRevision indicates the version of FlinkCluster.
636+
CurrentRevision string `json:"currentRevision,omitempty"`
637+
638+
// NextRevision indicates the version of FlinkCluster updating.
639+
NextRevision string `json:"nextRevision,omitempty"`
640+
641+
// collisionCount is the count of hash collisions for the FlinkCluster. The controller
642+
// uses this field as a collision avoidance mechanism when it needs to create the name for the
643+
// newest ControllerRevision.
644+
CollisionCount *int32 `json:"collisionCount,omitempty"`
645+
}
646+
612647
// JobManagerIngressStatus defines the status of a JobManager ingress.
613648
type JobManagerIngressStatus struct {
614649
// The name of the Kubernetes ingress resource.
@@ -644,30 +679,14 @@ type FlinkClusterStatus struct {
644679
// The status of the components.
645680
Components FlinkClusterComponentsStatus `json:"components"`
646681

647-
// The status of control requested by user
682+
// The status of control requested by user.
648683
Control *FlinkClusterControlStatus `json:"control,omitempty"`
649684

650-
// The status of savepoint progress
685+
// The status of savepoint progress.
651686
Savepoint *SavepointStatus `json:"savepoint,omitempty"`
652687

653-
// When the controller creates new ControllerRevision, it generates hash string from the FlinkCluster spec
654-
// which is to be stored in ControllerRevision and uses it to compose the ControllerRevision name.
655-
// Then the controller updates nextRevision to the ControllerRevision name.
656-
// When update process is completed, the controller updates currentRevision as nextRevision.
657-
// currentRevision and nextRevision is composed like this:
658-
// <FLINK_CLUSTER_NAME>-<FLINK_CLUSTER_SPEC_HASH>-<REVISION_NUMBER_IN_CONTROLLERREVISION>
659-
// e.g., myflinkcluster-c464ff7-5
660-
661-
// CurrentRevision indicates the version of FlinkCluster.
662-
CurrentRevision string `json:"currentRevision,omitempty"`
663-
664-
// NextRevision indicates the version of FlinkCluster updating.
665-
NextRevision string `json:"nextRevision,omitempty"`
666-
667-
// collisionCount is the count of hash collisions for the FlinkCluster. The controller
668-
// uses this field as a collision avoidance mechanism when it needs to create the name for the
669-
// newest ControllerRevision.
670-
CollisionCount *int32 `json:"collisionCount,omitempty"`
688+
// The status of revision.
689+
Revision RevisionStatus `json:"revision,omitempty"`
671690

672691
// Last update timestamp for this status.
673692
LastUpdateTime string `json:"lastUpdateTime,omitempty"`

0 commit comments

Comments
 (0)