Skip to content
This repository was archived by the owner on May 8, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,38 @@ FLINK_OPERATOR_NAMESPACE ?= flink-operator-system
RESOURCE_PREFIX ?= flink-operator-
# The Kubernetes namespace to limit watching.
WATCH_NAMESPACE ?=
# Kind cluster name
KIND_CLUSTER ?= kind-flink-operator
KIND_IMAGE ?= kindest/node:v1.17.17

GREEN=\033[1;32m
RED=\033[1;31m
RESET=\033[0m

#################### Local build and test ####################

# Setup kind cluster
kind-ensure:
kubectx kind-${KIND_CLUSTER}

kind-setup:
kind create cluster --name ${KIND_CLUSTER} --image ${KIND_IMAGE}

kind-up-image: operator-image
kind load docker-image ${IMG} --name ${KIND_CLUSTER}

kind-teardown:
kind delete cluster --name ${KIND_CLUSTER}

kind-deploy: kind-ensure
deploy

kind-example: kind-ensure
kubectl apply -f config/samples/flinkoperator_v1beta1_flinkjobcluster.yaml

kind-undeploy: kind-ensure
undeploy

# Build the flink-operator binary
build: generate fmt vet
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o bin/flink-operator main.go
Expand Down Expand Up @@ -153,3 +178,5 @@ undeploy: undeploy-controller undeploy-crd
# Deploy the sample Flink clusters in the Kubernetes cluster
samples:
kubectl apply -f config/samples/

#################### Local build and test ####################
53 changes: 52 additions & 1 deletion api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,18 @@ type JobSpec struct {
SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
}

// HPASpec defines properties of a HPA for the cluster.
type HPASpec struct {
// Taskmanager lower limit for the number of pods.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to define the default value for optional fields.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D you mean a default value for the HPASpec or for the struct fields?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the fields, e.g., if the user doesn't specify the value, what is the default value / behavior?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌 got it

MinReplicas *int32 `json:"minReplicas,omitempty"`

// Taskmanager upper limit for the number of pods.
MaxReplicas int32 `json:"maxReplicas"`

// Taskmanager target average CPU utilization
TargetCPUUtilizationPercentage *int32 `json:"targetCPUUtilizationPercentage,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Percentage" seems to be unnecessary, can we remove it here and below?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mostly agree, but it does disambiguate whether is a percentage, proportion or even absolute value. It also matches the autoscaling k8s api.

}

// FlinkClusterSpec defines the desired state of FlinkCluster
type FlinkClusterSpec struct {
// Flink image spec for the cluster's components.
Expand Down Expand Up @@ -452,6 +464,9 @@ type FlinkClusterSpec struct {
// Config for GCP.
GCPConfig *GCPConfig `json:"gcpConfig,omitempty"`

// Config for HPA
HPA *HPASpec `json:"hpa,omitempty"`

// The logging configuration, which should have keys 'log4j-console.properties' and 'logback-console.xml'.
// These will end up in the 'flink-config-volume' ConfigMap, which gets mounted at /opt/flink/conf.
// If not provided, defaults that log to console only will be used.
Expand Down Expand Up @@ -519,7 +534,10 @@ type FlinkClusterComponentsStatus struct {
JobManagerIngress *JobManagerIngressStatus `json:"jobManagerIngress,omitempty"`

// The state of TaskManager StatefulSet.
TaskManagerStatefulSet FlinkClusterComponentState `json:"taskManagerStatefulSet"`
TaskManagerStatefulSet TaskManagerStatefulSetStatus `json:"taskManagerStatefulSet"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this!


// The state of the HPA
HPA *HPAStatus `json:"hpa,omitempty"`

// The status of the job, available only when JobSpec is provided.
Job *JobStatus `json:"job,omitempty"`
Expand Down Expand Up @@ -577,6 +595,23 @@ type JobStatus struct {
RestartCount int32 `json:"restartCount,omitempty"`
}

type HPAStatus struct {
// The name of the Kubernetes HPS resource.
Name string `json:"name,omitempty"`

// The state of the Kubernetes hpa.
State string `json:"state"`

// The current number of replicas in the cluster seen by the hpa.
CurrentReplicas int32 `json:"currentReplicas"`

// The desired number of replicas in the cluster seen by the hpa.
DesiredReplicas int32 `json:"desiredReplicas"`

// The current average CPU utilization over all pods in the taskmanager statefulset
CurrentCPUUtilizationPercentage *int32 `json:"currentCPUUtilizationPercentage,omitempty"`
}

// SavepointStatus defines the status of savepoint progress
type SavepointStatus struct {
// The ID of the Flink job.
Expand Down Expand Up @@ -625,6 +660,21 @@ type JobManagerServiceStatus struct {
NodePort int32 `json:"nodePort,omitempty"`
}

// TaskManagerStatefulSetStatus
type TaskManagerStatefulSetStatus struct {
// The name of the Kubernetes jobManager service.
Name string `json:"name"`

// The state of the component.
State string `json:"state"`

// The number of replicas in the tasks manager.
Replicas int32 `json:"replicas"`

// The label for the tasks manager pods.
Selector string `json:"selector"`
}

// FlinkClusterStatus defines the observed state of FlinkCluster
type FlinkClusterStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
Expand Down Expand Up @@ -667,6 +717,7 @@ type FlinkClusterStatus struct {

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:subresource:scale:specpath=.spec.taskManager.replicas,statuspath=.status.components.taskManagerStatefulSet.replicas,selectorpath=.status.components.taskManagerStatefulSet.selector

// FlinkCluster is the Schema for the flinkclusters API
type FlinkCluster struct {
Expand Down
60 changes: 60 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading