Skip to content
This repository was archived by the owner on May 8, 2025. It is now read-only.

Conversation

@elanv
Copy link
Contributor

@elanv elanv commented Dec 3, 2020

Changes

  • Job submitting and tracking

    • Job submitter pod is finished after job submission.
    • Identify Flink job by extracting the job ID from termination-log of job submitter pod
    • Flink job is tracked by the operator itself (not by job submitter)
    • Shorten the bootstrap time of the job cluster by parallelizing cluster and job submit initialization
  • Fix

    • Job recovery
      Fix bug that job recovery does not work if more than one job history remains in the job manager
    • Change latest savepoint to provided fromSavepoint when update.
      When updating with provided fromSavepoint, change latest savepoint (status.savepointLocation) to it also.

Resolves #294

@elanv
Copy link
Contributor Author

elanv commented Dec 4, 2020

I will add and fix some tests soon.

Copy link
Contributor

@functicons functicons left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I left a few comments.

@elanv elanv requested a review from functicons December 4, 2020 16:13
Copy link
Contributor

@functicons functicons left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments, thanks!

@elanv elanv requested a review from functicons December 5, 2020 01:36
@functicons
Copy link
Contributor

/gcbrun

@elanv
Copy link
Contributor Author

elanv commented Dec 5, 2020

Restored a CRD that was accidentally changed. Removed JobManager check start delay from submit script. This is because the time difference between the submitter and JM initialization is small and sometimes the submitter can take longer, such as downloading large file.

@functicons
Copy link
Contributor

/gcbrun

@functicons
Copy link
Contributor

I just did a test, the sample job finished successfully, but the status in the CR was not quite right, it was still in Pending status:

Status:
  Components:
    Job:
      Id:     af4f6808f9c78597623a626e300c73f5
      Name:   flinkjobcluster-sample-job
      State:  Pending
  ...
  Current Revision:  flinkjobcluster-sample-5d96cb58dd-1
  Last Update Time:  2020-12-06T04:55:33Z
  Next Revision:     flinkjobcluster-sample-5d96cb58dd-1
  State:             Stopped

Copy link
Contributor

@functicons functicons left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend we change the name suffix of the Job resource from job to job-submitter; otherwise, people might be confused when seeing it is in Completed status.

We also need to update the "Flink job" section of the user guide on how to monitor the job. Previously monitoring the job submitter makes sense, but with this change, it doesn't make much sense. I think we can briefly mention how the job is submitted.

@elanv
Copy link
Contributor Author

elanv commented Dec 6, 2020

Thanks for your review of something I missed.
Fixed finished job related issues.

Will proceed remaining works of renaming and docs.

@elanv elanv force-pushed the feature/job-submitting-tracking-recovery-cli branch from 0d16303 to 34e37d7 Compare December 7, 2020 01:29
@elanv elanv requested a review from functicons December 7, 2020 01:29
@functicons
Copy link
Contributor

Unit test is failing, could you fix it?

E1207 05:00:42.876848    3516 factory.go:35] Failed initializing volcano batch scheduler: invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable
--- FAIL: TestGetDesiredClusterState (0.01s)
    flinkcluster_converter_test.go:889: assertion failed: 
        --- desiredState.Job
        +++ expectedDesiredJob
          v1.Job{
                TypeMeta: v1.TypeMeta{},
                ObjectMeta: v1.ObjectMeta{
        -               Name:         "flinkjobcluster-sample-job-submitter",
        +               Name:         "flinkjobcluster-sample-job",
                        GenerateName: "",
                        Namespace:    "default",
                        ... // 13 identical fields
                },

@elanv elanv requested a review from functicons December 7, 2020 05:12
@functicons
Copy link
Contributor

I noticed another problem, in the current job status, it shows x-job-submitter in Succeeded status. It unclear whether the status is for the job submitter or the job. Maybe remove the Name field to make it less confusing?

Status:
    Components:
        Job:
            Id:     b2a57bdfdc5127ddbab05da9ec438168
            Name:   flinkjobcluster-sample-job-submitter
            State:  Succeeded

@elanv
Copy link
Contributor Author

elanv commented Dec 7, 2020

I noticed another problem, in the current job status, it shows x-job-submitter in Succeeded status. It unclear whether the status is for the job submitter or the job. Maybe remove the Name field to make it less confusing?

That's right. I think it would be better to remove it too.
Or it would be nice to keep the field and make it optional for later use, and not set that value now.

@functicons
Copy link
Contributor

/gcbrun

@functicons functicons merged commit 2d0509e into GoogleCloudPlatform:master Dec 7, 2020
Comment on lines -49 to -57
function check_existing_jobs() {
echo "Checking existing jobs..."
list_jobs
if list_jobs | grep -e "(SCHEDULED)" -e "(CREATED)" -e "(SUSPENDED)" -e "(FINISHED)" -e "(FAILED)" -e "(CANCELED)" \
-e "(RUNNING)" -e "(RESTARTING)" -e "(CANCELLING)" -e "(FAILING)" -e "(RECONCILING)"; then
echo "Found an existing job, skip resubmitting..."
return 0
fi
return 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elanv, could you please explain why you removed this functionality?
We use HA mode. So that, when the cluster is recreated (removed and and deployed again), the newly created jobmanager is able to restore cluster's state and the job. And in the same time the job submitter submitted the same job again. We ended up with two jobs.

Copy link
Contributor Author

@elanv elanv Dec 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abroskin The job is now being tracked inside the operator (link) rather than the submitter script. And since the previous job tracking script had an issue like #294 with the flink operator's auto job recovery feature, this PR changed to track the job by ID unlike before. And is the spec.job.restartPolicy set to Never? If so, the flink operator should not automatically restart the job. In addition, it would be nice if you could explain more about the HA configuration and deployment.

elanv added a commit to elanv/gcp-flink-on-k8s-operator that referenced this pull request Dec 17, 2020
elanv added a commit to elanv/gcp-flink-on-k8s-operator that referenced this pull request Dec 17, 2020
functicons pushed a commit that referenced this pull request Dec 18, 2020
shashken added a commit to shashken/flink-on-k8s-operator that referenced this pull request Jan 3, 2021
Copy link

@bains00 bains00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review plzz

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Operator does not resubmit failed jobs.

4 participants