Skip to content
187 changes: 160 additions & 27 deletions cli/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,33 @@ limitations under the License.
package cmd

import (
"encoding/base64"
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/eks"
"github.com/aws/aws-sdk-go/service/elbv2"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/cortexlabs/cortex/cli/cluster"
"github.com/cortexlabs/cortex/cli/types/cliconfig"
"github.com/cortexlabs/cortex/cli/types/flags"
"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/health"
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/console"
"github.com/cortexlabs/cortex/pkg/lib/docker"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/exit"
"github.com/cortexlabs/cortex/pkg/lib/files"
libjson "github.com/cortexlabs/cortex/pkg/lib/json"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
libmath "github.com/cortexlabs/cortex/pkg/lib/math"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/lib/prompt"
Expand All @@ -51,6 +56,10 @@ import (
"github.com/cortexlabs/cortex/pkg/types/clusterstate"
"github.com/cortexlabs/yaml"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/aws-iam-authenticator/pkg/token"
)

var (
Expand Down Expand Up @@ -101,11 +110,21 @@ func clusterInit() {
addClusterNameFlag(_clusterExportCmd)
addClusterRegionFlag(_clusterExportCmd)
_clusterCmd.AddCommand(_clusterExportCmd)

_clusterHealthCmd.Flags().SortFlags = false
addClusterConfigFlag(_clusterHealthCmd)
addClusterNameFlag(_clusterHealthCmd)
addClusterRegionFlag(_clusterHealthCmd)
_clusterHealthCmd.Flags().VarP(&_flagOutput, "output", "o", fmt.Sprintf("output format: one of %s", strings.Join(flags.OutputTypeStringsExcluding(flags.YAMLOutputType), "|")))
_clusterCmd.AddCommand(_clusterHealthCmd)
}

func addClusterConfigFlag(cmd *cobra.Command) {
cmd.Flags().StringVarP(&_flagClusterConfig, "config", "c", "", "path to a cluster configuration file")
cmd.Flags().SetAnnotation("config", cobra.BashCompFilenameExt, _configFileExts)
err := cmd.Flags().SetAnnotation("config", cobra.BashCompFilenameExt, _configFileExts)
if err != nil {
exit.Error(err) // should never happen
}
}

func addClusterNameFlag(cmd *cobra.Command) {
Expand Down Expand Up @@ -631,8 +650,8 @@ var _clusterDownCmd = &cobra.Command{
}

// best-effort deletion of cached config
cachedClusterConfigPath := cachedClusterConfigPath(accessConfig.ClusterName, accessConfig.Region)
os.Remove(cachedClusterConfigPath)
cachedClusterConfigPath := getCachedClusterConfigPath(accessConfig.ClusterName, accessConfig.Region)
_ = os.Remove(cachedClusterConfigPath)

if len(errorsList) > 0 {
exit.Error(errors.ListOfErrors(ErrClusterDown, false, errorsList...))
Expand Down Expand Up @@ -743,7 +762,83 @@ var _clusterExportCmd = &cobra.Command{
},
}

func cmdInfo(awsClient *aws.Client, accessConfig *clusterconfig.AccessConfig, stacks clusterstate.ClusterStacks, printConfig bool, outputType flags.OutputType, disallowPrompt bool) {
var _clusterHealthCmd = &cobra.Command{
Use: "health",
Short: "inspect the health of components in the cluster",
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
accessConfig, err := getClusterAccessConfigWithCache(true)
if err != nil {
exit.Error(err)
}

awsClient, err := awslib.NewForRegion(accessConfig.Region)
if err != nil {
exit.Error(err)
}

restConfig, err := getClusterRESTConfig(awsClient, accessConfig.ClusterName)
if err != nil {
exit.Error(err)
}

scheme := runtime.NewScheme()
if err := clientgoscheme.AddToScheme(scheme); err != nil {
exit.Error(err)
}

k8sClient, err := k8s.New("default", false, restConfig, scheme)
if err != nil {
exit.Error(err)
}

clusterHealth, err := health.Check(awsClient, k8sClient, accessConfig.ClusterName)
if err != nil {
exit.Error(err)
}

clusterWarnings, err := health.GetWarnings(k8sClient)
if err != nil {
exit.Error(err)
}

if _flagOutput == flags.JSONOutputType {
fmt.Println(clusterHealth)
return
}

healthTable := table.Table{
Headers: []table.Header{
{Title: ""},
{Title: "live"},
{Title: "warning", Hidden: !clusterWarnings.HasWarnings()},
},
Rows: [][]interface{}{
{"operator", console.BoolColor(clusterHealth.Operator), ""},
{"prometheus", console.BoolColor(clusterHealth.Prometheus), clusterWarnings.Prometheus},
{"autoscaler", console.BoolColor(clusterHealth.Autoscaler), ""},
{"activator", console.BoolColor(clusterHealth.Activator), ""},
{"grafana", console.BoolColor(clusterHealth.Grafana), ""},
{"controller manager", console.BoolColor(clusterHealth.ControllerManager), ""},
{"apis gateway", console.BoolColor(clusterHealth.APIsGateway), ""},
{"operator gateway", console.BoolColor(clusterHealth.APIsGateway), ""},
{"cluster autoscaler", console.BoolColor(clusterHealth.ClusterAutoscaler), ""},
{"operator load balancer", console.BoolColor(clusterHealth.OperatorLoadBalancer), ""},
{"apis load balancer", console.BoolColor(clusterHealth.APIsLoadBalancer), ""},
{"fluent bit", console.BoolColor(clusterHealth.FluentBit), ""},
{"node exporter", console.BoolColor(clusterHealth.NodeExporter), ""},
{"dcgm exporter", console.BoolColor(clusterHealth.DCGMExporter), ""},
{"statsd exporter", console.BoolColor(clusterHealth.StatsDExporter), ""},
{"event exporter", console.BoolColor(clusterHealth.EventExporter), ""},
{"kube state metrics", console.BoolColor(clusterHealth.KubeStateMetrics), ""},
},
}

fmt.Println(healthTable.MustFormat())
},
}

func cmdInfo(awsClient *awslib.Client, accessConfig *clusterconfig.AccessConfig, stacks clusterstate.ClusterStacks, printConfig bool, outputType flags.OutputType, disallowPrompt bool) {
clusterConfig := refreshCachedClusterConfig(awsClient, accessConfig, outputType == flags.PrettyOutputType)

operatorLoadBalancer, err := getLoadBalancer(accessConfig.ClusterName, OperatorLoadBalancer, awsClient)
Expand Down Expand Up @@ -846,14 +941,14 @@ func getInfoOperatorResponse(operatorEndpoint string) (*schema.InfoResponse, err
}

func printInfoPricing(infoResponse *schema.InfoResponse, clusterConfig clusterconfig.Config) {
eksPrice := aws.EKSPrices[clusterConfig.Region]
operatorInstancePrice := aws.InstanceMetadatas[clusterConfig.Region]["t3.medium"].Price
operatorEBSPrice := aws.EBSMetadatas[clusterConfig.Region]["gp3"].PriceGB * 20 / 30 / 24
prometheusInstancePrice := aws.InstanceMetadatas[clusterConfig.Region][clusterConfig.PrometheusInstanceType].Price
prometheusEBSPrice := aws.EBSMetadatas[clusterConfig.Region]["gp3"].PriceGB * 20 / 30 / 24
metricsEBSPrice := aws.EBSMetadatas[clusterConfig.Region]["gp2"].PriceGB * (40 + 2) / 30 / 24
nlbPrice := aws.NLBMetadatas[clusterConfig.Region].Price
natUnitPrice := aws.NATMetadatas[clusterConfig.Region].Price
eksPrice := awslib.EKSPrices[clusterConfig.Region]
operatorInstancePrice := awslib.InstanceMetadatas[clusterConfig.Region]["t3.medium"].Price
operatorEBSPrice := awslib.EBSMetadatas[clusterConfig.Region]["gp3"].PriceGB * 20 / 30 / 24
prometheusInstancePrice := awslib.InstanceMetadatas[clusterConfig.Region][clusterConfig.PrometheusInstanceType].Price
prometheusEBSPrice := awslib.EBSMetadatas[clusterConfig.Region]["gp3"].PriceGB * 20 / 30 / 24
metricsEBSPrice := awslib.EBSMetadatas[clusterConfig.Region]["gp2"].PriceGB * (40 + 2) / 30 / 24
nlbPrice := awslib.NLBMetadatas[clusterConfig.Region].Price
natUnitPrice := awslib.NATMetadatas[clusterConfig.Region].Price

headers := []table.Header{
{Title: "aws resource"},
Expand All @@ -874,13 +969,13 @@ func printInfoPricing(infoResponse *schema.InfoResponse, clusterConfig clusterco
nodesInfo := infoResponse.GetNodesWithNodeGroupName(ngNamePrefix + ng.Name)
numInstances := len(nodesInfo)

ebsPrice := aws.EBSMetadatas[clusterConfig.Region][ng.InstanceVolumeType.String()].PriceGB * float64(ng.InstanceVolumeSize) / 30 / 24
ebsPrice := awslib.EBSMetadatas[clusterConfig.Region][ng.InstanceVolumeType.String()].PriceGB * float64(ng.InstanceVolumeSize) / 30 / 24
if ng.InstanceVolumeType == clusterconfig.IO1VolumeType && ng.InstanceVolumeIOPS != nil {
ebsPrice += aws.EBSMetadatas[clusterConfig.Region][ng.InstanceVolumeType.String()].PriceIOPS * float64(*ng.InstanceVolumeIOPS) / 30 / 24
ebsPrice += awslib.EBSMetadatas[clusterConfig.Region][ng.InstanceVolumeType.String()].PriceIOPS * float64(*ng.InstanceVolumeIOPS) / 30 / 24
}
if ng.InstanceVolumeType == clusterconfig.GP3VolumeType && ng.InstanceVolumeIOPS != nil && ng.InstanceVolumeThroughput != nil {
ebsPrice += libmath.MaxFloat64(0, (aws.EBSMetadatas[clusterConfig.Region][ng.InstanceVolumeType.String()].PriceIOPS-3000)*float64(*ng.InstanceVolumeIOPS)/30/24)
ebsPrice += libmath.MaxFloat64(0, (aws.EBSMetadatas[clusterConfig.Region][ng.InstanceVolumeType.String()].PriceThroughput-125)*float64(*ng.InstanceVolumeThroughput)/30/24)
ebsPrice += libmath.MaxFloat64(0, (awslib.EBSMetadatas[clusterConfig.Region][ng.InstanceVolumeType.String()].PriceIOPS-3000)*float64(*ng.InstanceVolumeIOPS)/30/24)
ebsPrice += libmath.MaxFloat64(0, (awslib.EBSMetadatas[clusterConfig.Region][ng.InstanceVolumeType.String()].PriceThroughput-125)*float64(*ng.InstanceVolumeThroughput)/30/24)
}
totalEBSPrice := ebsPrice * float64(numInstances)

Expand Down Expand Up @@ -1040,7 +1135,7 @@ func updateCLIEnv(envName string, operatorEndpoint string, disallowPrompt bool,
return nil
}

func cmdDebug(awsClient *aws.Client, accessConfig *clusterconfig.AccessConfig) {
func cmdDebug(awsClient *awslib.Client, accessConfig *clusterconfig.AccessConfig) {
// note: if modifying this string, also change it in files.IgnoreCortexDebug()
debugFileName := fmt.Sprintf("cortex-debug-%s.tgz", time.Now().UTC().Format("2006-01-02-15-04-05"))

Expand All @@ -1064,9 +1159,9 @@ func cmdDebug(awsClient *aws.Client, accessConfig *clusterconfig.AccessConfig) {
return
}

func refreshCachedClusterConfig(awsClient *aws.Client, accessConfig *clusterconfig.AccessConfig, printToStdout bool) clusterconfig.Config {
func refreshCachedClusterConfig(awsClient *awslib.Client, accessConfig *clusterconfig.AccessConfig, printToStdout bool) clusterconfig.Config {
// add empty file if cached cluster doesn't exist so that the file output by manager container maintains current user permissions
cachedClusterConfigPath := cachedClusterConfigPath(accessConfig.ClusterName, accessConfig.Region)
cachedClusterConfigPath := getCachedClusterConfigPath(accessConfig.ClusterName, accessConfig.Region)
containerConfigPath := fmt.Sprintf("/out/%s", filepath.Base(cachedClusterConfigPath))

copyFromPaths := []dockerCopyFromPath{
Expand Down Expand Up @@ -1095,7 +1190,7 @@ func refreshCachedClusterConfig(awsClient *aws.Client, accessConfig *clusterconf
return *refreshedClusterConfig
}

func createS3BucketIfNotFound(awsClient *aws.Client, bucket string, tags map[string]string) error {
func createS3BucketIfNotFound(awsClient *awslib.Client, bucket string, tags map[string]string) error {
bucketFound, err := awsClient.DoesBucketExist(bucket)
if err != nil {
return err
Expand Down Expand Up @@ -1123,7 +1218,7 @@ func createS3BucketIfNotFound(awsClient *aws.Client, bucket string, tags map[str
fmt.Println(" ✓")
return nil
}
if !aws.IsNoSuchBucketErr(err) {
if !awslib.IsNoSuchBucketErr(err) {
break
}
time.Sleep(1 * time.Second)
Expand All @@ -1133,7 +1228,7 @@ func createS3BucketIfNotFound(awsClient *aws.Client, bucket string, tags map[str
return err
}

func setLifecycleRulesOnClusterUp(awsClient *aws.Client, bucket, newClusterUID string) error {
func setLifecycleRulesOnClusterUp(awsClient *awslib.Client, bucket, newClusterUID string) error {
err := awsClient.DeleteLifecycleRules(bucket)
if err != nil {
return err
Expand Down Expand Up @@ -1177,7 +1272,7 @@ func setLifecycleRulesOnClusterUp(awsClient *aws.Client, bucket, newClusterUID s
return awsClient.SetLifecycleRules(bucket, rules)
}

func setLifecycleRulesOnClusterDown(awsClient *aws.Client, bucket string) error {
func setLifecycleRulesOnClusterDown(awsClient *awslib.Client, bucket string) error {
err := awsClient.DeleteLifecycleRules(bucket)
if err != nil {
return err
Expand All @@ -1198,7 +1293,7 @@ func setLifecycleRulesOnClusterDown(awsClient *aws.Client, bucket string) error
})
}

func createLogGroupIfNotFound(awsClient *aws.Client, logGroup string, tags map[string]string) error {
func createLogGroupIfNotFound(awsClient *awslib.Client, logGroup string, tags map[string]string) error {
logGroupFound, err := awsClient.DoesLogGroupExist(logGroup)
if err != nil {
return err
Expand Down Expand Up @@ -1240,7 +1335,7 @@ func (lb LoadBalancer) String() string {
}

// Will return error if the load balancer can't be found
func getLoadBalancer(clusterName string, whichLB LoadBalancer, awsClient *aws.Client) (*elbv2.LoadBalancer, error) {
func getLoadBalancer(clusterName string, whichLB LoadBalancer, awsClient *awslib.Client) (*elbv2.LoadBalancer, error) {
loadBalancer, err := awsClient.FindLoadBalancer(map[string]string{
clusterconfig.ClusterNameTag: clusterName,
"cortex.dev/load-balancer": whichLB.String(),
Expand All @@ -1256,7 +1351,7 @@ func getLoadBalancer(clusterName string, whichLB LoadBalancer, awsClient *aws.Cl
return loadBalancer, nil
}

func listPVCVolumesForCluster(awsClient *aws.Client, clusterName string) ([]ec2.Volume, error) {
func listPVCVolumesForCluster(awsClient *awslib.Client, clusterName string) ([]ec2.Volume, error) {
return awsClient.ListVolumes(ec2.Tag{
Key: pointer.String(fmt.Sprintf("kubernetes.io/cluster/%s", clusterName)),
Value: nil, // any value should be ok as long as the key is present
Expand All @@ -1266,3 +1361,41 @@ func listPVCVolumesForCluster(awsClient *aws.Client, clusterName string) ([]ec2.
func filterEKSCTLOutput(out string) string {
return strings.Join(s.RemoveDuplicates(strings.Split(out, "\n"), _eksctlPrefixRegex), "\n")
}

func getClusterRESTConfig(awsClient *awslib.Client, clusterName string) (*rest.Config, error) {
clusterOutput, err := awsClient.EKS().DescribeCluster(
&eks.DescribeClusterInput{
Name: aws.String(clusterName),
},
)
if err != nil {
return nil, err
}

gen, err := token.NewGenerator(true, false)
if err != nil {
return nil, err
}

opts := &token.GetTokenOptions{
ClusterID: aws.StringValue(clusterOutput.Cluster.Name),
}

tok, err := gen.GetWithOptions(opts)
if err != nil {
return nil, err
}

ca, err := base64.StdEncoding.DecodeString(aws.StringValue(clusterOutput.Cluster.CertificateAuthority.Data))
if err != nil {
return nil, err
}

return &rest.Config{
Host: aws.StringValue(clusterOutput.Cluster.Endpoint),
BearerToken: tok.Token,
TLSClientConfig: rest.TLSClientConfig{
CAData: ca,
},
}, nil
}
2 changes: 1 addition & 1 deletion cli/cmd/lib_cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

var _cachedClusterConfigRegex = regexp.MustCompile(`^cluster_\S+\.yaml$`)

func cachedClusterConfigPath(clusterName string, region string) string {
func getCachedClusterConfigPath(clusterName string, region string) string {
return filepath.Join(_localDir, fmt.Sprintf("cluster_%s_%s.yaml", clusterName, region))
}

Expand Down
4 changes: 2 additions & 2 deletions cli/cmd/lib_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func runManager(containerConfig *container.Config, addNewLineAfterPull bool, cop
}

removeContainer := func() {
dockerClient.ContainerRemove(context.Background(), containerInfo.ID, dockertypes.ContainerRemoveOptions{
_ = dockerClient.ContainerRemove(context.Background(), containerInfo.ID, dockertypes.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
})
Expand Down Expand Up @@ -166,7 +166,7 @@ func runManagerWithClusterConfig(entrypoint string, clusterConfig *clusterconfig
return "", nil, errors.WithStack(err)
}

cachedClusterConfigPath := cachedClusterConfigPath(clusterConfig.ClusterName, clusterConfig.Region)
cachedClusterConfigPath := getCachedClusterConfigPath(clusterConfig.ClusterName, clusterConfig.Region)
if err := files.WriteFile(clusterConfigBytes, cachedClusterConfigPath); err != nil {
return "", nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ require (
k8s.io/client-go v0.20.8
k8s.io/klog/v2 v2.9.0 // indirect
k8s.io/kube-openapi v0.0.0-20210527164424-3c818078ee3d // indirect
k8s.io/metrics v0.20.8
k8s.io/utils v0.0.0-20210629042839-4a2b36d8d73f // indirect
sigs.k8s.io/aws-iam-authenticator v0.5.3
sigs.k8s.io/controller-runtime v0.8.3
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect
)
Expand Down
Loading