diff --git a/doc/source/readme.rst b/doc/source/readme.rst index 760236df9..31eae25c0 100644 --- a/doc/source/readme.rst +++ b/doc/source/readme.rst @@ -58,6 +58,40 @@ To list all pods: More complicated examples, like asynchronous multiple watch or tail logs from pods, you can find in `examples/` folder. +There is also support for DynamicClient which will query the cluster for all supported +resources. This allows for dynamic resource selection at runtime. +The above example using DynamicClient would look like this: +:: + + import asyncio + from kubernetes_asyncio import config + from kubernetes_asyncio.client.api_client import ApiClient + from kubernetes_asyncio.dynamic import DynamicClient + + + async def main(): + # Configs can be set in Configuration class directly or using helper + # utility. If no argument provided, the config will be loaded from + # default location. + await config.load_kube_config() + + # use the context manager to close http sessions automatically + async with ApiClient() as api: + client = await DynamicClient(api) + v1 = await client.resources.get(api_version="v1", kind="Pod") + print("Listing pods with their IPs:") + ret = await v1.get() + + for i in ret.items: + print(i.status.pod_ip, i.metadata.namespace, i.metadata.name) + + + if __name__ == '__main__': + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close() + +Additional examples are in the `examples/dynamic-client` folder. Versions -------- diff --git a/examples/dynamic-client/accept_header.py b/examples/dynamic-client/accept_header.py new file mode 100644 index 000000000..cec077785 --- /dev/null +++ b/examples/dynamic-client/accept_header.py @@ -0,0 +1,50 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This example demonstrates how to pass the custom header in the cluster. + +""" + +import asyncio + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.client.configuration import Configuration +from kubernetes_asyncio.config import kube_config +from kubernetes_asyncio.dynamic import DynamicClient + + +async def main(): + # Creating a dynamic client + config = Configuration() + await kube_config.load_kube_config(client_configuration=config) + async with api_client.ApiClient(configuration=config) as apic: + async with DynamicClient(apic) as client: + # fetching the node api + api = await client.resources.get(api_version="v1", kind="Node") + + # Creating a custom header + params = {'header_params': {'Accept': 'application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io'}} + + resp = await api.get(**params) + + # Printing the kind and apiVersion after passing new header params. + print("VERSION\t\t\t\tKIND") + print(f"{resp.apiVersion}\t\t{resp.kind}") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/examples/dynamic-client/cluster_scoped_custom_resource.py b/examples/dynamic-client/cluster_scoped_custom_resource.py new file mode 100644 index 000000000..cf135df11 --- /dev/null +++ b/examples/dynamic-client/cluster_scoped_custom_resource.py @@ -0,0 +1,198 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This example demonstrates the following: + - Creation of a custom resource definition (CRD) using dynamic-client + - Creation of cluster scoped custom resources (CR) using the above created CRD + - List, patch (update), delete the custom resources + - Delete the custom resource definition (CRD) +""" + +import asyncio + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.client.configuration import Configuration +from kubernetes_asyncio.config import kube_config +from kubernetes_asyncio.dynamic import DynamicClient +from kubernetes_asyncio.dynamic.exceptions import ResourceNotFoundError + + +async def main(): + # Creating a dynamic client + config = Configuration() + await kube_config.load_kube_config(client_configuration=config) + async with api_client.ApiClient(configuration=config) as apic: + client = await DynamicClient(apic) + + # fetching the custom resource definition (CRD) api + crd_api = await client.resources.get( + api_version="apiextensions.k8s.io/v1", kind="CustomResourceDefinition" + ) + + # Creating a Namespaced CRD named "ingressroutes.apps.example.com" + name = "ingressroutes.apps.example.com" + + crd_manifest = { + "apiVersion": "apiextensions.k8s.io/v1", + "kind": "CustomResourceDefinition", + "metadata": { + "name": name, + }, + "spec": { + "group": "apps.example.com", + "versions": [ + { + "name": "v1", + "schema": { + "openAPIV3Schema": { + "properties": { + "spec": { + "properties": { + "strategy": {"type": "string"}, + "virtualhost": { + "properties": { + "fqdn": {"type": "string"}, + "tls": { + "properties": { + "secretName": {"type": "string"} + }, + "type": "object", + }, + }, + "type": "object", + }, + }, + "type": "object", + } + }, + "type": "object", + } + }, + "served": True, + "storage": True, + } + ], + "scope": "Cluster", + "names": { + "plural": "ingressroutes", + "listKind": "IngressRouteList", + "singular": "ingressroute", + "kind": "IngressRoute", + "shortNames": ["ir"], + }, + }, + } + + crd_creation_response = await crd_api.create(crd_manifest) + print( + "\n[INFO] custom resource definition `ingressroutes.apps.example.com` created\n" + ) + print("SCOPE\t\tNAME") + print(f"{crd_creation_response.spec.scope}\t\t{crd_creation_response.metadata.name}") + + # Fetching the "ingressroutes" CRD api + + try: + await client.resources.get( + api_version="apps.example.com/v1", kind="IngressRoute" + ) + except ResourceNotFoundError: + # Need to wait a sec for the discovery layer to get updated + await asyncio.sleep(2) + + ingressroute_api = await client.resources.get( + api_version="apps.example.com/v1", kind="IngressRoute" + ) + + # Creating a custom resource (CR) `ingress-route-*`, using the above CRD `ingressroutes.apps.example.com` + + ingressroute_manifest_first = { + "apiVersion": "apps.example.com/v1", + "kind": "IngressRoute", + "metadata": { + "name": "ingress-route-first", + }, + "spec": { + "virtualhost": { + "fqdn": "www.google.com", + "tls": {"secretName": "google-tls"}, + }, + "strategy": "RoundRobin", + }, + } + + ingressroute_manifest_second = { + "apiVersion": "apps.example.com/v1", + "kind": "IngressRoute", + "metadata": { + "name": "ingress-route-second", + }, + "spec": { + "virtualhost": { + "fqdn": "www.yahoo.com", + "tls": {"secretName": "yahoo-tls"}, + }, + "strategy": "RoundRobin", + }, + } + + await ingressroute_api.create(body=ingressroute_manifest_first) + await ingressroute_api.create(body=ingressroute_manifest_second) + print("\n[INFO] custom resources `ingress-route-*` created\n") + + # Listing the `ingress-route-*` custom resources + + ingress_routes_list = await ingressroute_api.get() + print("NAME\t\t\t\tFQDN\t\tTLS\t\t\t\tSTRATEGY") + for item in ingress_routes_list.items: + print(f"{item.metadata.name}\t{item.spec.virtualhost.fqdn}\t{item.spec.virtualhost.tls}\t" + f"{item.spec.strategy}") + + # Patching the ingressroutes custom resources + + ingressroute_manifest_first["spec"]["strategy"] = "Random" + ingressroute_manifest_second["spec"]["strategy"] = "WeightedLeastRequest" + + await ingressroute_api.patch(body=ingressroute_manifest_first, content_type="application/merge-patch+json") + await ingressroute_api.patch(body=ingressroute_manifest_second, content_type="application/merge-patch+json") + + print( + "\n[INFO] custom resources `ingress-route-*` patched to update the strategy\n" + ) + patched_ingress_routes_list = await ingressroute_api.get() + print("NAME\t\t\t\tFQDN\t\t\tTLS\t\t\tSTRATEGY") + for item in patched_ingress_routes_list.items: + print(f"{item.metadata.name}\t{item.spec.virtualhost.fqdn}\t{item.spec.virtualhost.tls}\t" + f"{item.spec.strategy}") + + # Deleting the ingressroutes custom resources + + await ingressroute_api.delete(name="ingress-route-first") + await ingressroute_api.delete(name="ingress-route-second") + + print("\n[INFO] custom resources `ingress-route-*` deleted") + + # Deleting the ingressroutes.apps.example.com custom resource definition + + await crd_api.delete(name=name) + print( + "\n[INFO] custom resource definition `ingressroutes.apps.example.com` deleted" + ) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/examples/dynamic-client/configmap.py b/examples/dynamic-client/configmap.py new file mode 100644 index 000000000..32914aa96 --- /dev/null +++ b/examples/dynamic-client/configmap.py @@ -0,0 +1,92 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This example demonstrates the following: + - Creation of a k8s configmap using dynamic-client + - List, patch(update), delete the configmap +""" + +import asyncio + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.client.configuration import Configuration +from kubernetes_asyncio.config import kube_config +from kubernetes_asyncio.dynamic import DynamicClient + + +async def main(): + # Creating a dynamic client + config = Configuration() + await kube_config.load_kube_config(client_configuration=config) + async with api_client.ApiClient(configuration=config) as apic: + client = await DynamicClient(apic) + + # fetching the configmap api + api = await client.resources.get(api_version="v1", kind="ConfigMap") + + configmap_name = "test-configmap" + + configmap_manifest = { + "kind": "ConfigMap", + "apiVersion": "v1", + "metadata": { + "name": configmap_name, + "labels": { + "foo": "bar", + }, + }, + "data": { + "config.json": '{"command":"/usr/bin/mysqld_safe"}', + "frontend.cnf": "[mysqld]\nbind-address = 10.0.0.3\n", + }, + } + + # Creating configmap `test-configmap` in the `default` namespace + + await api.create(body=configmap_manifest, namespace="default") + + print("\n[INFO] configmap `test-configmap` created\n") + + # Listing the configmaps in the `default` namespace + + configmap_list = await api.get( + name=configmap_name, namespace="default", label_selector="foo=bar" + ) + + print(f"NAME:\n{configmap_list.metadata.name}\n") + print(f"DATA:\n{configmap_list.data}\n") + + # Updating the configmap's data, `config.json` + + configmap_manifest["data"]["config.json"] = "{}" + + configmap_patched = await api.patch( + name=configmap_name, namespace="default", body=configmap_manifest + ) + + print("\n[INFO] configmap `test-configmap` patched\n") + print(f"NAME:\n{configmap_patched.metadata.name}\n") + print(f"DATA:\n{configmap_patched.data}\n") + + # Deleting configmap `test-configmap` from the `default` namespace + + await api.delete(name=configmap_name, body={}, namespace="default") + print("\n[INFO] configmap `test-configmap` deleted\n") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/examples/dynamic-client/deployment_rolling_restart.py b/examples/dynamic-client/deployment_rolling_restart.py new file mode 100644 index 000000000..c54a63e30 --- /dev/null +++ b/examples/dynamic-client/deployment_rolling_restart.py @@ -0,0 +1,110 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This example demonstrates the following: + - Creation of a k8s deployment using dynamic-client + - Rolling restart of the deployment (demonstrate patch/update action) + - Listing & deletion of the deployment +""" +import asyncio +import datetime + +import pytz + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.client.configuration import Configuration +from kubernetes_asyncio.config import kube_config +from kubernetes_asyncio.dynamic import DynamicClient + + +async def main(): + # Creating a dynamic client + config = Configuration() + await kube_config.load_kube_config(client_configuration=config) + async with api_client.ApiClient(configuration=config) as apic: + client = await DynamicClient(apic) + + # fetching the deployment api + api = await client.resources.get(api_version="apps/v1", kind="Deployment") + + name = "nginx-deployment" + + deployment_manifest = { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": {"labels": {"app": "nginx"}, "name": name}, + "spec": { + "replicas": 3, + "selector": {"matchLabels": {"app": "nginx"}}, + "template": { + "metadata": {"labels": {"app": "nginx"}}, + "spec": { + "containers": [ + { + "name": "nginx", + "image": "nginx:1.14.2", + "ports": [{"containerPort": 80}], + } + ] + }, + }, + }, + } + + # Creating deployment `nginx-deployment` in the `default` namespace + + await api.create(body=deployment_manifest, namespace="default") + + print("\n[INFO] deployment `nginx-deployment` created\n") + + # Listing deployment `nginx-deployment` in the `default` namespace + + deployment_created = await api.get(name=name, namespace="default") + + print("NAMESPACE\tNAME\t\t\t\tREVISION\t\t\t\t\t\t\t\t\t\tRESTARTED-AT") + print(f"{deployment_created.metadata.namespace}\t\t{deployment_created.metadata.name}\t" + f"{deployment_created.metadata.annotations}\t\t{deployment_created.spec.template.metadata.annotations}") + + # Patching the `spec.template.metadata` section to add `kubectl.kubernetes.io/restartedAt` annotation + # In order to perform a rolling restart on the deployment `nginx-deployment` + + deployment_manifest["spec"]["template"]["metadata"] = { + "annotations": { + "kubectl.kubernetes.io/restartedAt": datetime.datetime.utcnow() + .replace(tzinfo=pytz.UTC) + .isoformat() + } + } + + deployment_patched = await api.patch( + body=deployment_manifest, name=name, namespace="default" + ) + + print("\n[INFO] deployment `nginx-deployment` restarted\n") + print("NAMESPACE\tNAME\t\t\t\tREVISION\t\t\t\t\t\t\t\t\tRESTARTED-AT") + print(f"{deployment_patched.metadata.namespace}\t\t{deployment_patched.metadata.name}\t" + f"{deployment_patched.metadata.annotations}\t{deployment_patched.spec.template.metadata.annotations}") + + # Deleting deployment `nginx-deployment` from the `default` namespace + + await api.delete(name=name, body={}, namespace="default") + + print("\n[INFO] deployment `nginx-deployment` deleted\n") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/examples/dynamic-client/namespaced_custom_resource.py b/examples/dynamic-client/namespaced_custom_resource.py new file mode 100644 index 000000000..c80650a92 --- /dev/null +++ b/examples/dynamic-client/namespaced_custom_resource.py @@ -0,0 +1,265 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This example demonstrates the following: + - Creation of a custom resource definition (CRD) using dynamic-client + - Creation of namespaced custom resources (CR) using the above CRD + - List, patch (update), delete the custom resources + - Delete the custom resource definition (CRD) +""" + +import asyncio + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.client.configuration import Configuration +from kubernetes_asyncio.config import kube_config +from kubernetes_asyncio.dynamic import DynamicClient +from kubernetes_asyncio.dynamic.exceptions import ResourceNotFoundError + + +async def create_namespace(namespace_api, name): + namespace_manifest = { + "apiVersion": "v1", + "kind": "Namespace", + "metadata": {"name": name, "resourceversion": "v1"}, + } + await namespace_api.create(body=namespace_manifest) + + +async def delete_namespace(namespace_api, name): + await namespace_api.delete(name=name) + + +async def main(): + # Creating a dynamic client + config = Configuration() + await kube_config.load_kube_config(client_configuration=config) + async with api_client.ApiClient(configuration=config) as apic: + client = await DynamicClient(apic) + + # fetching the custom resource definition (CRD) api + crd_api = await client.resources.get( + api_version="apiextensions.k8s.io/v1", kind="CustomResourceDefinition" + ) + + namespace_api = await client.resources.get(api_version="v1", kind="Namespace") + + # Creating a Namespaced CRD named "ingressroutes.apps.example.com" + name = "ingressroutes.apps.example.com" + + crd_manifest = { + "apiVersion": "apiextensions.k8s.io/v1", + "kind": "CustomResourceDefinition", + "metadata": {"name": name, "namespace": "default"}, + "spec": { + "group": "apps.example.com", + "versions": [ + { + "name": "v1", + "schema": { + "openAPIV3Schema": { + "properties": { + "spec": { + "properties": { + "strategy": {"type": "string"}, + "virtualhost": { + "properties": { + "fqdn": {"type": "string"}, + "tls": { + "properties": { + "secretName": {"type": "string"} + }, + "type": "object", + }, + }, + "type": "object", + }, + }, + "type": "object", + } + }, + "type": "object", + } + }, + "served": True, + "storage": True, + } + ], + "scope": "Namespaced", + "names": { + "plural": "ingressroutes", + "listKind": "IngressRouteList", + "singular": "ingressroute", + "kind": "IngressRoute", + "shortNames": ["ir"], + }, + }, + } + + crd_creation_response = await crd_api.create(crd_manifest) + print( + "\n[INFO] custom resource definition `ingressroutes.apps.example.com` created\n" + ) + print("%s\t\t%s" % ("SCOPE", "NAME")) + print( + "%s\t%s\n" + % (crd_creation_response.spec.scope, crd_creation_response.metadata.name) + ) + + # Fetching the "ingressroutes" CRD api + + try: + await client.resources.get( + api_version="apps.example.com/v1", kind="IngressRoute" + ) + except ResourceNotFoundError: + # Need to wait a sec for the discovery layer to get updated + await asyncio.sleep(2) + + ingressroute_api = await client.resources.get( + api_version="apps.example.com/v1", kind="IngressRoute" + ) + + # Creating a custom resource (CR) `ingress-route-*`, using the above CRD `ingressroutes.apps.example.com` + + namespace_first = "test-namespace-first" + namespace_second = "test-namespace-second" + + await create_namespace(namespace_api, namespace_first) + await create_namespace(namespace_api, namespace_second) + + ingressroute_manifest_first = { + "apiVersion": "apps.example.com/v1", + "kind": "IngressRoute", + "metadata": { + "name": "ingress-route-first", + "namespace": namespace_first, + }, + "spec": { + "virtualhost": { + "fqdn": "www.google.com", + "tls": {"secretName": "google-tls"}, + }, + "strategy": "RoundRobin", + }, + } + + ingressroute_manifest_second = { + "apiVersion": "apps.example.com/v1", + "kind": "IngressRoute", + "metadata": { + "name": "ingress-route-second", + "namespace": namespace_second, + }, + "spec": { + "virtualhost": { + "fqdn": "www.yahoo.com", + "tls": {"secretName": "yahoo-tls"}, + }, + "strategy": "RoundRobin", + }, + } + + await ingressroute_api.create(body=ingressroute_manifest_first, namespace=namespace_first) + await ingressroute_api.create(body=ingressroute_manifest_second, namespace=namespace_second) + print("\n[INFO] custom resources `ingress-route-*` created\n") + + # Listing the `ingress-route-*` custom resources + routes = await ingressroute_api.get() + + print( + "%s\t\t\t%s\t\t\t%s\t\t%s\t\t\t\t%s" + % ("NAME", "NAMESPACE", "FQDN", "TLS", "STRATEGY") + ) + + for item in routes["items"]: + print( + "%s\t%s\t\t%s\t%s\t%s" + % ( + item["metadata"]["name"], + item["metadata"]["namespace"], + item["spec"]["virtualhost"]["fqdn"], + item["spec"]["virtualhost"]["tls"], + item["spec"]["strategy"] + ) + ) + + # Patching the ingressroutes custom resources + + ingressroute_manifest_first["spec"]["strategy"] = "Random" + ingressroute_manifest_second["spec"]["strategy"] = "WeightedLeastRequest" + + await ingressroute_api.patch( + body=ingressroute_manifest_first, content_type="application/merge-patch+json" + ) + await ingressroute_api.patch( + body=ingressroute_manifest_second, content_type="application/merge-patch+json" + ) + + print( + "\n[INFO] custom resources `ingress-route-*` patched to update the strategy\n" + ) + + routes = await ingressroute_api.get() + + print( + "%s\t\t\t%s\t\t\t%s\t\t%s\t\t\t\t%s" + % ("NAME", "NAMESPACE", "FQDN", "TLS", "STRATEGY") + ) + + for item in routes["items"]: + print( + "%s\t%s\t\t%s\t%s\t%s" + % ( + item["metadata"]["name"], + item["metadata"]["namespace"], + item["spec"]["virtualhost"]["fqdn"], + item["spec"]["virtualhost"]["tls"], + item["spec"]["strategy"] + ) + ) + + # Deleting the ingressroutes custom resources + + await ingressroute_api.delete( + name="ingress-route-first", namespace=namespace_first + ) + await ingressroute_api.delete( + name="ingress-route-second", namespace=namespace_second + ) + + print("\n[INFO] custom resources `ingress-route-*` deleted") + + # Deleting the namespaces + + await delete_namespace(namespace_api, namespace_first) + await asyncio.sleep(4) + await delete_namespace(namespace_api, namespace_second) + await asyncio.sleep(4) + + print("\n[INFO] test namespaces deleted") + + # Deleting the ingressroutes.apps.example.com custom resource definition + + await crd_api.delete(name=name) + print( + "\n[INFO] custom resource definition `ingressroutes.apps.example.com` deleted" + ) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/examples/dynamic-client/node.py b/examples/dynamic-client/node.py new file mode 100644 index 000000000..73c1ce596 --- /dev/null +++ b/examples/dynamic-client/node.py @@ -0,0 +1,57 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This example demonstrates how to list cluster nodes using dynamic client. + +""" + +import asyncio + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.client.configuration import Configuration +from kubernetes_asyncio.config import kube_config +from kubernetes_asyncio.dynamic import DynamicClient + + +async def main(): + # Creating a dynamic client + config = Configuration() + await kube_config.load_kube_config(client_configuration=config) + async with api_client.ApiClient(configuration=config) as apic: + client = await DynamicClient(apic) + + # fetching the node api + api = await client.resources.get(api_version="v1", kind="Node") + + # Listing cluster nodes + + print("%s\t\t%s\t\t%s" % ("NAME", "STATUS", "VERSION")) + resp = await api.get() + for item in resp.items: + node = await api.get(name=item.metadata.name) + print( + "%s\t%s\t\t%s\n" + % ( + node.metadata.name, + node.status.conditions[3]["type"], + node.status.nodeInfo.kubeProxyVersion, + ) + ) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/examples/dynamic-client/replication_controller.py b/examples/dynamic-client/replication_controller.py new file mode 100644 index 000000000..98934742d --- /dev/null +++ b/examples/dynamic-client/replication_controller.py @@ -0,0 +1,90 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This example demonstrates the creation, listing & deletion of a namespaced replication controller using dynamic-client. +""" + +import asyncio + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.client.configuration import Configuration +from kubernetes_asyncio.config import kube_config +from kubernetes_asyncio.dynamic import DynamicClient + + +async def main(): + # Creating a dynamic client + config = Configuration() + await kube_config.load_kube_config(client_configuration=config) + async with api_client.ApiClient(configuration=config) as apic: + client = await DynamicClient(apic) + + # fetching the replication controller api + api = await client.resources.get(api_version="v1", kind="ReplicationController") + + name = "frontend-replication-controller" + + replication_controller_manifest = { + "apiVersion": "v1", + "kind": "ReplicationController", + "metadata": {"labels": {"name": name}, "name": name}, + "spec": { + "replicas": 2, + "selector": {"name": name}, + "template": { + "metadata": {"labels": {"name": name}}, + "spec": { + "containers": [ + { + "image": "nginx", + "name": "nginx", + "ports": [{"containerPort": 80, "protocol": "TCP"}], + } + ] + }, + }, + }, + } + + # Creating replication-controller `frontend-replication-controller` in the `default` namespace + await api.create( + body=replication_controller_manifest, namespace="default" + ) + + print("\n[INFO] replication-controller `frontend-replication-controller` created\n") + + # Listing replication-controllers in the `default` namespace + replication_controller_created = await api.get(name=name, namespace="default") + + print("%s\t%s\t\t\t\t\t%s" % ("NAMESPACE", "NAME", "REPLICAS")) + print( + "%s\t\t%s\t\t%s\n" + % ( + replication_controller_created.metadata.namespace, + replication_controller_created.metadata.name, + replication_controller_created.spec.replicas, + ) + ) + + # Deleting replication-controller `frontend-service` from the `default` namespace + await api.delete(name=name, body={}, namespace="default") + + print("[INFO] replication-controller `frontend-replication-controller` deleted\n") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/examples/dynamic-client/request_timeout.py b/examples/dynamic-client/request_timeout.py new file mode 100644 index 000000000..920d89d43 --- /dev/null +++ b/examples/dynamic-client/request_timeout.py @@ -0,0 +1,79 @@ +# Copyright 2023 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This example demonstrates the following: + - Creation of a k8s configmap using dynamic-client + - Setting the request timeout which is time duration in seconds +""" + +import asyncio + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.client.configuration import Configuration +from kubernetes_asyncio.config import kube_config +from kubernetes_asyncio.dynamic import DynamicClient + + +async def main(): + # Creating a dynamic client + config = Configuration() + await kube_config.load_kube_config(client_configuration=config) + async with api_client.ApiClient(configuration=config) as apic: + client = await DynamicClient(apic) + + # fetching the configmap api + api = await client.resources.get(api_version="v1", kind="ConfigMap") + + configmap_name = "request-timeout-test-configmap" + + configmap_manifest = { + "kind": "ConfigMap", + "apiVersion": "v1", + "metadata": { + "name": configmap_name, + "labels": { + "foo": "bar", + }, + }, + "data": { + "config.json": '{"command":"/usr/bin/mysqld_safe"}', + "frontend.cnf": "[mysqld]\nbind-address = 10.0.0.3\n", + }, + } + + # Creating configmap `request-timeout-test-configmap` in the `default` namespace + # Client-side timeout to 60 seconds + + await api.create(body=configmap_manifest, namespace="default", _request_time=60) + + print("\n[INFO] configmap `request-timeout-test-configmap` created\n") + + # Listing the configmaps in the `default` namespace + # Client-side timeout to 60 seconds + + configmap_list = await api.get( + name=configmap_name, namespace="default", label_selector="foo=bar", _request_time=60 + ) + + print(f"NAME:\n{configmap_list.metadata.name}\n") + print(f"DATA:\n{configmap_list.data}\n") + + await api.delete(name=configmap_name, namespace="default", _request_time=60) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/examples/dynamic-client/service.py b/examples/dynamic-client/service.py new file mode 100644 index 000000000..ae53fc5c3 --- /dev/null +++ b/examples/dynamic-client/service.py @@ -0,0 +1,96 @@ +# Copyright 2021 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This example demonstrates the following: + - Creation of a k8s service using dynamic-client + - List, patch(update), delete the service +""" + +import asyncio + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.client.configuration import Configuration +from kubernetes_asyncio.config import kube_config +from kubernetes_asyncio.dynamic import DynamicClient + + +async def main(): + # Creating a dynamic client + config = Configuration() + await kube_config.load_kube_config(client_configuration=config) + async with api_client.ApiClient(configuration=config) as apic: + client = await DynamicClient(apic) + + # fetching the service api + api = await client.resources.get(api_version="v1", kind="Service") + + name = "frontend-service" + + service_manifest = { + "apiVersion": "v1", + "kind": "Service", + "metadata": {"labels": {"name": name}, "name": name, "resourceversion": "v1"}, + "spec": { + "ports": [ + {"name": "port", "port": 80, "protocol": "TCP", "targetPort": 80} + ], + "selector": {"name": name}, + }, + } + + # Creating service `frontend-service` in the `default` namespace + + await api.create(body=service_manifest, namespace="default") + + print("\n[INFO] service `frontend-service` created\n") + + # Listing service `frontend-service` in the `default` namespace + service_created = await api.get(name=name, namespace="default") + + print("%s\t%s" % ("NAMESPACE", "NAME")) + print( + "%s\t\t%s\n" + % (service_created.metadata.namespace, service_created.metadata.name) + ) + + # Patching the `spec` section of the `frontend-service` + + service_manifest["spec"]["ports"] = [ + {"name": "new", "port": 8080, "protocol": "TCP", "targetPort": 8080} + ] + + service_patched = await api.patch(body=service_manifest, name=name, namespace="default") + + print("\n[INFO] service `frontend-service` patched\n") + print("%s\t%s\t\t\t%s" % ("NAMESPACE", "NAME", "PORTS")) + print( + "%s\t\t%s\t%s\n" + % ( + service_patched.metadata.namespace, + service_patched.metadata.name, + service_patched.spec.ports, + ) + ) + + # Deleting service `frontend-service` from the `default` namespace + await api.delete(name=name, body={}, namespace="default") + + print("\n[INFO] service `frontend-service` deleted\n") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/kubernetes_asyncio/dynamic/__init__.py b/kubernetes_asyncio/dynamic/__init__.py new file mode 100644 index 000000000..a1d3d8f8e --- /dev/null +++ b/kubernetes_asyncio/dynamic/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .client import * # NOQA diff --git a/kubernetes_asyncio/dynamic/client.py b/kubernetes_asyncio/dynamic/client.py new file mode 100644 index 000000000..fd0702d88 --- /dev/null +++ b/kubernetes_asyncio/dynamic/client.py @@ -0,0 +1,340 @@ +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kubernetes_asyncio import watch +from kubernetes_asyncio.client.rest import ApiException + +from .discovery import EagerDiscoverer, LazyDiscoverer +from .exceptions import KubernetesValidateMissing, api_exception +from .resource import ( + Resource, ResourceField, ResourceInstance, ResourceList, Subresource, +) + +try: + import kubernetes_validate + HAS_KUBERNETES_VALIDATE = True +except ImportError: + HAS_KUBERNETES_VALIDATE = False + +try: + from kubernetes_validate.utils import VersionNotSupportedError +except ImportError: + class VersionNotSupportedError(NotImplementedError): + pass + +__all__ = [ + 'DynamicClient', + 'ResourceInstance', + 'Resource', + 'ResourceList', + 'Subresource', + 'EagerDiscoverer', + 'LazyDiscoverer', + 'ResourceField', +] + + +def meta_request(func): + """ Handles parsing response structure and translating API Exceptions """ + async def inner(self, *args, **kwargs): + serialize_response = kwargs.pop('serialize', True) + serializer = kwargs.pop('serializer', ResourceInstance) + try: + resp = await func(self, *args, **kwargs) + except ApiException as e: + raise api_exception(e) + if serialize_response: + try: + data = await resp.json() + return serializer(self, data) + except ValueError: + data = await resp.json() + return data + return resp + + return inner + + +class DynamicClient(object): + """ A kubernetes client that dynamically discovers and interacts with + the kubernetes API + """ + + def __init__(self, client, cache_file=None, discoverer=None): + self.cache_file = cache_file + self.client = client + self.configuration = client.configuration + self.discoverer = discoverer or LazyDiscoverer + + def __await__(self): + async def closure(): + self.__discoverer = await self.discoverer(self, self.cache_file) + return self + + return closure().__await__() + + async def __aenter__(self): + self.__discoverer = await self.discoverer(self, self.cache_file) + return self + + async def __aexit__(self, *args, **kwargs): + return + + @property + def resources(self): + return self.__discoverer + + @property + def version(self): + return self.__discoverer.version + + @staticmethod + def ensure_namespace(resource, namespace, body): + namespace = namespace or body.get('metadata', {}).get('namespace') + if not namespace: + raise ValueError("Namespace is required for {}.{}".format(resource.group_version, resource.kind)) + return namespace + + @staticmethod + def serialize_body(body): + """Serialize body to raw dict so apiserver can handle it + + :param body: kubernetes resource body, current support: Union[Dict, ResourceInstance] + """ + # This should match any `ResourceInstance` instances + if callable(getattr(body, 'to_dict', None)): + return body.to_dict() + return body or {} + + async def get(self, resource, name=None, namespace=None, **kwargs): + path = resource.path(name=name, namespace=namespace) + return await self.request('get', path, **kwargs) + + async def create(self, resource, body=None, namespace=None, **kwargs): + body = self.serialize_body(body) + if resource.namespaced: + namespace = self.ensure_namespace(resource, namespace, body) + path = resource.path(namespace=namespace) + return await self.request('post', path, body=body, **kwargs) + + async def delete(self, resource, name=None, namespace=None, body=None, label_selector=None, + field_selector=None, **kwargs): + if not (name or label_selector or field_selector): + raise ValueError("At least one of name|label_selector|field_selector is required") + if resource.namespaced and not (label_selector or field_selector or namespace): + raise ValueError("At least one of namespace|label_selector|field_selector is required") + path = resource.path(name=name, namespace=namespace) + return await self.request('delete', path, body=body, label_selector=label_selector, + field_selector=field_selector, **kwargs) + + async def replace(self, resource, body=None, name=None, namespace=None, **kwargs): + body = self.serialize_body(body) + name = name or body.get('metadata', {}).get('name') + if not name: + raise ValueError("name is required to replace {}.{}".format(resource.group_version, resource.kind)) + if resource.namespaced: + namespace = self.ensure_namespace(resource, namespace, body) + path = resource.path(name=name, namespace=namespace) + return await self.request('put', path, body=body, **kwargs) + + async def patch(self, resource, body=None, name=None, namespace=None, **kwargs): + body = self.serialize_body(body) + name = name or body.get('metadata', {}).get('name') + if not name: + raise ValueError("name is required to patch {}.{}".format(resource.group_version, resource.kind)) + if resource.namespaced: + namespace = self.ensure_namespace(resource, namespace, body) + + content_type = kwargs.pop('content_type', 'application/strategic-merge-patch+json') + path = resource.path(name=name, namespace=namespace) + + return await self.request('patch', path, body=body, content_type=content_type, **kwargs) + + async def server_side_apply(self, resource, body=None, name=None, namespace=None, force_conflicts=None, **kwargs): + body = self.serialize_body(body) + name = name or body.get('metadata', {}).get('name') + if not name: + raise ValueError("name is required to patch {}.{}".format(resource.group_version, resource.kind)) + if resource.namespaced: + namespace = self.ensure_namespace(resource, namespace, body) + + # force content type to 'application/apply-patch+yaml' + kwargs.update({'content_type': 'application/apply-patch+yaml'}) + path = resource.path(name=name, namespace=namespace) + + return await self.request('patch', path, body=body, force_conflicts=force_conflicts, **kwargs) + + @staticmethod + async def watch(resource, namespace=None, name=None, label_selector=None, field_selector=None, + resource_version=None, timeout=None, watcher=None): + """ + Stream events for a resource from the Kubernetes API + + :param resource: The API resource object that will be used to query the API + :param namespace: The namespace to query + :param name: The name of the resource instance to query + :param label_selector: The label selector with which to filter results + :param field_selector: The field selector with which to filter results + :param resource_version: The version with which to filter results. Only events with + a resource_version greater than this value will be returned + :param timeout: The amount of time in seconds to wait before terminating the stream + :param watcher: The Watcher object that will be used to stream the resource + + :return: Event object with these keys: + 'type': The type of event such as "ADDED", "DELETED", etc. + 'raw_object': a dict representing the watched object. + 'object': A ResourceInstance wrapping raw_object. + + Example: + client = DynamicClient(k8s_client) + watcher = watch.Watch() + v1_pods = client.resources.get(api_version='v1', kind='Pod') + + for e in v1_pods.watch(resource_version=0, namespace=default, timeout=5, watcher=watcher): + print(e['type']) + print(e['object'].metadata) + # If you want to gracefully stop the stream watcher + watcher.stop() + """ + if not watcher: + watcher = watch.Watch() + + # Use field selector to query for named instance so the watch parameter is handled properly. + if name: + field_selector = f"metadata.name={name}" + + async for event in watcher.stream( + resource.get, + namespace=namespace, + field_selector=field_selector, + label_selector=label_selector, + resource_version=resource_version, + serialize=False, + timeout_seconds=timeout + ): + event['object'] = ResourceInstance(resource, event['object']) + yield event + + @meta_request + async def request(self, method, path, body=None, **params): + if not path.startswith('/'): + path = '/' + path + + path_params = params.get('path_params', {}) + query_params = params.get('query_params', []) + if params.get('pretty') is not None: + query_params.append(('pretty', params['pretty'])) + if params.get('_continue') is not None: + query_params.append(('continue', params['_continue'])) + if params.get('include_uninitialized') is not None: + query_params.append(('includeUninitialized', params['include_uninitialized'])) + if params.get('field_selector') is not None: + query_params.append(('fieldSelector', params['field_selector'])) + if params.get('label_selector') is not None: + query_params.append(('labelSelector', params['label_selector'])) + if params.get('limit') is not None: + query_params.append(('limit', params['limit'])) + if params.get('resource_version') is not None: + query_params.append(('resourceVersion', params['resource_version'])) + if params.get('timeout_seconds') is not None: + query_params.append(('timeoutSeconds', params['timeout_seconds'])) + if params.get('watch') is not None: + query_params.append(('watch', params['watch'])) + if params.get('grace_period_seconds') is not None: + query_params.append(('gracePeriodSeconds', params['grace_period_seconds'])) + if params.get('propagation_policy') is not None: + query_params.append(('propagationPolicy', params['propagation_policy'])) + if params.get('orphan_dependents') is not None: + query_params.append(('orphanDependents', params['orphan_dependents'])) + if params.get('dry_run') is not None: + query_params.append(('dryRun', params['dry_run'])) + if params.get('field_manager') is not None: + query_params.append(('fieldManager', params['field_manager'])) + if params.get('force_conflicts') is not None: + query_params.append(('force', params['force_conflicts'])) + + header_params = params.get('header_params', {}) + form_params = [] + local_var_files = {} + + # Checking Accept header. + new_header_params = dict((key.lower(), value) for key, value in header_params.items()) + if 'accept' not in new_header_params: + header_params['Accept'] = self.client.select_header_accept([ + 'application/json', + 'application/yaml', + ]) + + # HTTP header `Content-Type` + if params.get('content_type'): + header_params['Content-Type'] = params['content_type'] + else: + header_params['Content-Type'] = self.client.select_header_content_type(['*/*']) + + # Authentication setting + auth_settings = ['BearerToken'] + + api_response = await self.client.call_api( + path, + method.upper(), + path_params, + query_params, + header_params, + body=body, + post_params=form_params, + async_req=params.get('async_req'), + files=local_var_files, + auth_settings=auth_settings, + _preload_content=False, + _return_http_data_only=params.get('_return_http_data_only', True), + _request_timeout=params.get('_request_timeout') + ) + if params.get('async_req'): + return api_response.get() + else: + return api_response + + def validate(self, definition, version=None, strict=False): + """validate checks a kubernetes resource definition + + Args: + definition (dict): resource definition + version (str): version of kubernetes to validate against + strict (bool): whether unexpected additional properties should be considered errors + + Returns: + warnings (list), errors (list): warnings are missing validations, errors are validation failures + """ + if not HAS_KUBERNETES_VALIDATE: + raise KubernetesValidateMissing() + + errors = list() + warnings = list() + try: + if version is None: + try: + version = self.version['kubernetes']['gitVersion'] + except KeyError: + version = kubernetes_validate.latest_version() + kubernetes_validate.validate(definition, version, strict) + except kubernetes_validate.utils.ValidationError as e: + errors.append("resource definition validation error at %s: %s" % ('.'.join([str(item) for item in e.path]), + e.message)) # noqa: B306 + except VersionNotSupportedError: + errors.append("Kubernetes version %s is not supported by kubernetes-validate" % version) + except kubernetes_validate.utils.SchemaNotFoundError as e: + warnings.append("Could not find schema for object kind %s with API version %s in Kubernetes version %s" + " (possibly Custom Resource?)" % + (e.kind, e.api_version, e.version)) + return warnings, errors diff --git a/kubernetes_asyncio/dynamic/client_test.py b/kubernetes_asyncio/dynamic/client_test.py new file mode 100644 index 000000000..fa6f1a1f4 --- /dev/null +++ b/kubernetes_asyncio/dynamic/client_test.py @@ -0,0 +1,557 @@ +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import unittest +import uuid + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.dynamic import DynamicClient +from kubernetes_asyncio.dynamic.exceptions import ResourceNotFoundError +from kubernetes_asyncio.dynamic.resource import ResourceField, ResourceInstance +from kubernetes_asyncio.e2e_test import base + + +def short_uuid(): + id_ = str(uuid.uuid4()) + return id_[-12:] + + +class TestDynamicClient(unittest.IsolatedAsyncioTestCase): + + @classmethod + def setUpClass(cls): + cls.config = base.get_e2e_configuration() + + async def test_cluster_custom_resources(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + + with self.assertRaises(ResourceNotFoundError): + await client.resources.get(api_version='apps.example.com/v1', kind='ClusterChangeMe') + + crd_api = await client.resources.get( + api_version='apiextensions.k8s.io/v1beta1', + kind='CustomResourceDefinition') + name = 'clusterchangemes.apps.example.com' + crd_manifest = { + 'apiVersion': 'apiextensions.k8s.io/v1beta1', + 'kind': 'CustomResourceDefinition', + 'metadata': { + 'name': name, + }, + 'spec': { + 'group': 'apps.example.com', + 'names': { + 'kind': 'ClusterChangeMe', + 'listKind': 'ClusterChangeMeList', + 'plural': 'clusterchangemes', + 'singular': 'clusterchangeme', + }, + 'scope': 'Cluster', + 'version': 'v1', + 'subresources': { + 'status': {} + } + } + } + resp = await crd_api.create(crd_manifest) + + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + resp = await crd_api.get( + name=name, + ) + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + try: + await client.resources.get(api_version='apps.example.com/v1', kind='ClusterChangeMe') + except ResourceNotFoundError: + # Need to wait a sec for the discovery layer to get updated + await asyncio.sleep(2) + changeme_api = await client.resources.get( + api_version='apps.example.com/v1', kind='ClusterChangeMe') + resp = await changeme_api.get() + self.assertEqual(resp.items, []) + changeme_name = 'custom-resource' + short_uuid() + changeme_manifest = { + 'apiVersion': 'apps.example.com/v1', + 'kind': 'ClusterChangeMe', + 'metadata': { + 'name': changeme_name, + }, + 'spec': {} + } + + resp = await changeme_api.create(body=changeme_manifest) + self.assertEqual(resp.metadata.name, changeme_name) + + resp = await changeme_api.get(name=changeme_name) + self.assertEqual(resp.metadata.name, changeme_name) + + changeme_manifest['spec']['size'] = 3 + resp = await changeme_api.patch( + body=changeme_manifest, + content_type='application/merge-patch+json' + ) + self.assertEqual(resp.spec.size, 3) + + resp = await changeme_api.get(name=changeme_name) + self.assertEqual(resp.spec.size, 3) + + resp = await changeme_api.get() + self.assertEqual(len(resp.items), 1) + + await changeme_api.delete(name=changeme_name) + + resp = await changeme_api.get() + self.assertEqual(len(resp.items), 0) + + await crd_api.delete(name=name) + + await asyncio.sleep(2) + await client.resources.invalidate_cache() + with self.assertRaises(ResourceNotFoundError): + await client.resources.get(api_version='apps.example.com/v1', kind='ClusterChangeMe') + + # async def test_async_namespaced_custom_resources(self): + # async with api_client.ApiClient(configuration=self.config) as apic: + # client = await DynamicClient.newclient(apic) + # + # with self.assertRaises(ResourceNotFoundError): + # await client.resources.get(api_version='apps.example.com/v1', kind='ChangeMe') + # + # crd_api = await client.resources.get( + # api_version='apiextensions.k8s.io/v1beta1', + # kind='CustomResourceDefinition') + # + # name = 'changemes.apps.example.com' + # + # crd_manifest = { + # 'apiVersion': 'apiextensions.k8s.io/v1beta1', + # 'kind': 'CustomResourceDefinition', + # 'metadata': { + # 'name': name, + # }, + # 'spec': { + # 'group': 'apps.example.com', + # 'names': { + # 'kind': 'ChangeMe', + # 'listKind': 'ChangeMeList', + # 'plural': 'changemes', + # 'singular': 'changeme', + # }, + # 'scope': 'Namespaced', + # 'version': 'v1', + # 'subresources': { + # 'status': {} + # } + # } + # } + # async_resp = await crd_api.create(crd_manifest, async_req=True) + # + # self.assertEqual(name, async_resp.metadata.name) + # self.assertTrue(async_resp.status) + # + # async_resp = await crd_api.get(name=name, async_req=True) + # self.assertEqual(name, async_resp.metadata.name) + # self.assertTrue(async_resp.status) + # + # try: + # changeme_api = await client.resources.get( + # api_version='apps.example.com/v1', kind='ChangeMe') + # except ResourceNotFoundError: + # # Need to wait a sec for the discovery layer to get updated + # await asyncio.sleep(2) + # changeme_api = await client.resources.get( + # api_version='apps.example.com/v1', kind='ChangeMe') + # + # async_resp = await changeme_api.get(async_req=True) + # self.assertEqual(async_resp.items, []) + # + # changeme_name = 'custom-resource' + short_uuid() + # changeme_manifest = { + # 'apiVersion': 'apps.example.com/v1', + # 'kind': 'ChangeMe', + # 'metadata': { + # 'name': changeme_name, + # }, + # 'spec': {} + # } + # + # async_resp = await changeme_api.create(body=changeme_manifest, namespace='default', async_req=True) + # self.assertEqual(async_resp.metadata.name, changeme_name) + # + # async_resp = await changeme_api.get(name=changeme_name, namespace='default', async_req=True) + # self.assertEqual(async_resp.metadata.name, changeme_name) + # + # changeme_manifest['spec']['size'] = 3 + # async_resp = await changeme_api.patch( + # body=changeme_manifest, + # namespace='default', + # content_type='application/merge-patch+json', + # async_req=True + # ) + # self.assertEqual(async_resp.spec.size, 3) + # + # async_resp = await changeme_api.get(name=changeme_name, namespace='default', async_req=True) + # self.assertEqual(async_resp.spec.size, 3) + # + # async_resp = await changeme_api.get(namespace='default', async_req=True) + # self.assertEqual(len(async_resp.items), 1) + # + # async_resp = await changeme_api.get(async_req=True) + # self.assertEqual(len(async_resp.items), 1) + # + # await changeme_api.delete(name=changeme_name, namespace='default', async_req=True) + # + # async_resp = await changeme_api.get(namespace='default', async_req=True) + # self.assertEqual(len(async_resp.items), 0) + # + # async_resp = await changeme_api.get(async_req=True) + # self.assertEqual(len(async_resp.items), 0) + # + # await crd_api.delete(name=name, async_req=True) + # + # await asyncio.sleep(2) + # await client.resources.invalidate_cache() + # with self.assertRaises(ResourceNotFoundError): + # await client.resources.get( + # api_version='apps.example.com/v1', kind='ChangeMe') + + async def test_namespaced_custom_resources(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + + with self.assertRaises(ResourceNotFoundError): + await client.resources.get(api_version='apps.example.com/v1', kind='ChangeMe') + + crd_api = await client.resources.get( + api_version='apiextensions.k8s.io/v1beta1', + kind='CustomResourceDefinition') + name = 'changemes.apps.example.com' + crd_manifest = { + 'apiVersion': 'apiextensions.k8s.io/v1beta1', + 'kind': 'CustomResourceDefinition', + 'metadata': { + 'name': name, + }, + 'spec': { + 'group': 'apps.example.com', + 'names': { + 'kind': 'ChangeMe', + 'listKind': 'ChangeMeList', + 'plural': 'changemes', + 'singular': 'changeme', + }, + 'scope': 'Namespaced', + 'version': 'v1', + 'subresources': { + 'status': {} + } + } + } + resp = await crd_api.create(crd_manifest) + + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + resp = await crd_api.get( + name=name, + ) + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + try: + await client.resources.get(api_version='apps.example.com/v1', kind='ChangeMe') + except ResourceNotFoundError: + # Need to wait a sec for the discovery layer to get updated + await asyncio.sleep(2) + changeme_api = await client.resources.get( + api_version='apps.example.com/v1', kind='ChangeMe') + resp = await changeme_api.get() + self.assertEqual(resp.items, []) + changeme_name = 'custom-resource' + short_uuid() + changeme_manifest = { + 'apiVersion': 'apps.example.com/v1', + 'kind': 'ChangeMe', + 'metadata': { + 'name': changeme_name, + }, + 'spec': {} + } + + resp = await changeme_api.create(body=changeme_manifest, namespace='default') + self.assertEqual(resp.metadata.name, changeme_name) + + resp = await changeme_api.get(name=changeme_name, namespace='default') + self.assertEqual(resp.metadata.name, changeme_name) + + changeme_manifest['spec']['size'] = 3 + resp = await changeme_api.patch( + body=changeme_manifest, + namespace='default', + content_type='application/merge-patch+json' + ) + self.assertEqual(resp.spec.size, 3) + + resp = await changeme_api.get(name=changeme_name, namespace='default') + self.assertEqual(resp.spec.size, 3) + + resp = await changeme_api.get(namespace='default') + self.assertEqual(len(resp.items), 1) + + resp = await changeme_api.get() + self.assertEqual(len(resp.items), 1) + + await changeme_api.delete(name=changeme_name, namespace='default') + + resp = await changeme_api.get(namespace='default') + self.assertEqual(len(resp.items), 0) + + resp = await changeme_api.get() + self.assertEqual(len(resp.items), 0) + + await crd_api.delete(name=name) + + await asyncio.sleep(2) + await client.resources.invalidate_cache() + with self.assertRaises(ResourceNotFoundError): + await client.resources.get(api_version='apps.example.com/v1', kind='ChangeMe') + + async def test_service_apis(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + api = await client.resources.get(api_version='v1', kind='Service') + + name = 'frontend-' + short_uuid() + service_manifest = {'apiVersion': 'v1', + 'kind': 'Service', + 'metadata': {'labels': {'name': name}, + 'name': name, + 'resourceversion': 'v1'}, + 'spec': {'ports': [{'name': 'port', + 'port': 80, + 'protocol': 'TCP', + 'targetPort': 80}], + 'selector': {'name': name}}} + + resp = await api.create( + body=service_manifest, + namespace='default' + ) + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + resp = await api.get( + name=name, + namespace='default' + ) + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status) + + service_manifest['spec']['ports'] = [{'name': 'new', + 'port': 8080, + 'protocol': 'TCP', + 'targetPort': 8080}] + resp = await api.patch( + body=service_manifest, + name=name, + namespace='default' + ) + self.assertEqual(2, len(resp.spec.ports)) + self.assertTrue(resp.status) + + await api.delete(name=name, body={}, namespace='default') + + async def test_replication_controller_apis(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + + api = await client.resources.get( + api_version='v1', kind='ReplicationController') + + name = 'frontend-' + short_uuid() + rc_manifest = { + 'apiVersion': 'v1', + 'kind': 'ReplicationController', + 'metadata': {'labels': {'name': name}, + 'name': name}, + 'spec': {'replicas': 2, + 'selector': {'name': name}, + 'template': {'metadata': { + 'labels': {'name': name}}, + 'spec': {'containers': [{ + 'image': 'nginx', + 'name': 'nginx', + 'ports': [{'containerPort': 80, + 'protocol': 'TCP'}]}]}}}} + + resp = await api.create( + body=rc_manifest, namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertEqual(2, resp.spec.replicas) + + resp = await api.get( + name=name, namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertEqual(2, resp.spec.replicas) + + await api.delete( + name=name, + namespace='default', + propagation_policy='Background') + + async def test_configmap_apis(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + api = await client.resources.get(api_version='v1', kind='ConfigMap') + + name = 'test-configmap-' + short_uuid() + test_configmap = { + "kind": "ConfigMap", + "apiVersion": "v1", + "metadata": { + "name": name, + "labels": { + "e2e-test": "true", + }, + }, + "data": { + "config.json": "{\"command\":\"/usr/bin/mysqld_safe\"}", + "frontend.cnf": "[mysqld]\nbind-address = 10.0.0.3\n" + } + } + + resp = await api.create( + body=test_configmap, namespace='default' + ) + self.assertEqual(name, resp.metadata.name) + + resp = await api.get( + name=name, namespace='default', label_selector="e2e-test=true") + self.assertEqual(name, resp.metadata.name) + + count = 0 + async for _ in client.watch(api, timeout=10, namespace="default", name=name): + count += 1 + self.assertTrue(count > 0, msg="no events received for watch") + + test_configmap['data']['config.json'] = "{}" + await api.patch(name=name, namespace='default', body=test_configmap) + + await api.delete(name=name, body={}, namespace='default') + + resp = await api.get( + namespace='default', + pretty=True, + label_selector="e2e-test=true") + self.assertEqual([], resp.items) + + async def test_node_apis(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + + api = await client.resources.get(api_version='v1', kind='Node') + nodes = await api.get() + for item in nodes.items: + node = await api.get(name=item.metadata.name) + self.assertTrue(len(dict(node.metadata.labels)) > 0) + + # test_node_apis_partial_object_metadata lists all nodes in the cluster, + # but only retrieves object metadata + async def test_node_apis_partial_object_metadata(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + api = await client.resources.get(api_version='v1', kind='Node') + + params = { + 'header_params': { + 'Accept': 'application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io'}} + resp = await api.get(**params) + self.assertEqual('PartialObjectMetadataList', resp.kind) + self.assertEqual('meta.k8s.io/v1', resp.apiVersion) + + params = { + 'header_params': { + 'aCcePt': 'application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io'}} + resp = await api.get(**params) + self.assertEqual('PartialObjectMetadataList', resp.kind) + self.assertEqual('meta.k8s.io/v1', resp.apiVersion) + + async def test_server_side_apply_api(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + api = await client.resources.get(api_version='v1', kind='Pod') + + name = 'pod-' + short_uuid() + pod_manifest = { + 'apiVersion': 'apps/v1', + 'kind': 'Deployment', + 'metadata': {'labels': {'name': name}, + 'name': name}, + 'spec': {'template': {'spec': {'containers': [{ + 'image': 'nginx', + 'name': 'nginx', + 'ports': [{'containerPort': 80, + 'protocol': 'TCP'}]}]}}}} + + resp = await api.server_side_apply( + namespace='default', body=pod_manifest, + field_manager='kubernetes-unittests', dry_run="All") + self.assertEqual('kubernetes-unittests', resp.metadata.managedFields[0].manager) + + +class TestDynamicClientSerialization(unittest.IsolatedAsyncioTestCase): + + @classmethod + def setUpClass(cls): + cls.config = base.get_e2e_configuration() + cls.pod_manifest = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': {'name': 'foo-pod'}, + 'spec': {'containers': [{'name': "main", 'image': "busybox"}]}, + } + + async def test_dict_type(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + self.assertEqual(client.serialize_body(self.pod_manifest), self.pod_manifest) + + async def test_resource_instance_type(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + inst = ResourceInstance(client, self.pod_manifest) + self.assertEqual(client.serialize_body(inst), self.pod_manifest) + + async def test_resource_field(self): + """`ResourceField` is a special type which overwrites `__getattr__` method to return `None` + when a non-existent attribute was accessed. which means it can pass any `hasattr(...)` tests. + """ + params = { + "foo": "bar", + "self": True + } + res = ResourceField(params) + self.assertEqual(res["foo"], params["foo"]) + self.assertEqual(res["self"], params["self"]) + + # method will return original object when it doesn't know how to proceed + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + serialized = client.serialize_body(res) + self.assertDictEqual(serialized, res.to_dict()) diff --git a/kubernetes_asyncio/dynamic/discovery.py b/kubernetes_asyncio/dynamic/discovery.py new file mode 100644 index 000000000..6914dcfe2 --- /dev/null +++ b/kubernetes_asyncio/dynamic/discovery.py @@ -0,0 +1,454 @@ +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import json +import logging +import os +import tempfile +from abc import abstractmethod +from collections import defaultdict +from functools import partial +from typing import Dict + +from urllib3.exceptions import MaxRetryError, ProtocolError + +from kubernetes_asyncio import __version__ + +from .exceptions import ( + NotFoundError, ResourceNotFoundError, ResourceNotUniqueError, + ServiceUnavailableError, +) +from .resource import Resource, ResourceList + +DISCOVERY_PREFIX = 'apis' + + +class Discoverer(object): + """ + A convenient container for storing discovered API resources. Allows + easy searching and retrieval of specific resources. + + Subclasses implement the abstract methods with different loading strategies. + """ + + def __init__(self, client, cache_file=None): + self.client = client + default_cache_id = self.client.configuration.host + default_cache_id = default_cache_id.encode('utf-8') + try: + default_cachefile_name = 'osrcp-{0}.json'.format( + hashlib.md5(default_cache_id, usedforsecurity=False).hexdigest()) + except TypeError: + # usedforsecurity is only supported in 3.9+ + default_cachefile_name = 'osrcp-{0}.json'.format(hashlib.md5(default_cache_id).hexdigest()) + self.__cache_file = cache_file or os.path.join(tempfile.gettempdir(), default_cachefile_name) + + def __await__(self): + async def closure(): + await self.__init_cache() + return self + + return closure().__await__() + + async def __aenter__(self): + await self.__init_cache() + return self + + async def __init_cache(self, refresh=False): + if refresh or not os.path.exists(self.__cache_file): + self._cache = {'library_version': __version__} + refresh = True + else: + try: + with open(self.__cache_file, 'r') as f: + self._cache = json.load(f, cls=partial(CacheDecoder, self.client)) + if self._cache.get('library_version') != __version__: + # Version mismatch, need to refresh cache + await self.invalidate_cache() + except Exception as e: + logging.error("load cache error: %s", e) + await self.invalidate_cache() + await self._load_server_info() + await self.discover() + if refresh: + self._write_cache() + + def _write_cache(self): + try: + with open(self.__cache_file, 'w') as f: + json.dump(self._cache, f, cls=CacheEncoder) + except Exception: + # Failing to write the cache isn't a big enough error to crash on + pass + + async def invalidate_cache(self): + await self.__init_cache(refresh=True) + + @property + @abstractmethod + def api_groups(self): + pass + + @abstractmethod + async def search(self, prefix=None, group=None, api_version=None, kind=None, **kwargs): + pass + + @abstractmethod + def discover(self): + pass + + @property + def version(self): + return self.__version + + async def default_groups(self, request_resources=False): + groups = { + 'api': {'': { + 'v1': (ResourceGroup(True, resources=await self.get_resources_for_api_version('api', '', 'v1', True)) + if request_resources else ResourceGroup(True)) + }}, + + DISCOVERY_PREFIX: {'': { + 'v1': ResourceGroup(True, resources={"List": [ResourceList(self.client)]}) + }}} + return groups + + async def parse_api_groups(self, request_resources=False, update=False) -> Dict: + """ Discovers all API groups present in the cluster """ + if not self._cache.get('resources') or update: + self._cache['resources'] = self._cache.get('resources', {}) + response = await self.client.request('GET', '/{}'.format(DISCOVERY_PREFIX)) + groups_response = response.groups + + groups = await self.default_groups(request_resources=request_resources) + + for group in groups_response: + new_group = {} + for version_raw in group['versions']: + version = version_raw['version'] + resource_group = self._cache.get('resources', {}).get(DISCOVERY_PREFIX, + {}).get(group['name'], {}).get(version) + preferred = version_raw == group['preferredVersion'] + resources = resource_group.resources if resource_group else {} + if request_resources: + resources = await self.get_resources_for_api_version(DISCOVERY_PREFIX, group['name'], version, + preferred) + new_group[version] = ResourceGroup(preferred, resources=resources) + groups[DISCOVERY_PREFIX][group['name']] = new_group + self._cache['resources'].update(groups) + self._write_cache() + + return self._cache['resources'] + + async def _load_server_info(self): + def just_json(_, serialized): + return serialized + + if not self._cache.get('version'): + try: + self._cache['version'] = { + 'kubernetes': await self.client.request('get', '/version', serializer=just_json) + } + except (ValueError, MaxRetryError) as e: + if isinstance(e, MaxRetryError) and not isinstance(e.reason, ProtocolError): + raise + if not self.client.configuration.host.startswith("https://"): + raise ValueError("Host value %s should start with https:// when talking to HTTPS endpoint" % + self.client.configuration.host) + else: + raise + + self.__version = self._cache['version'] + + async def get_resources_for_api_version(self, prefix, group, version, preferred): + """ returns a dictionary of resources associated with provided (prefix, group, version)""" + + resources = defaultdict(list) + subresources = {} + + path = '/'.join(filter(None, [prefix, group, version])) + try: + response = await self.client.request('GET', path) + resources_response = response.resources or [] + except ServiceUnavailableError: + resources_response = [] + + resources_raw = list(filter(lambda r: '/' not in r['name'], resources_response)) + subresources_raw = list(filter(lambda r: '/' in r['name'], resources_response)) + for subresource in subresources_raw: + resource, name = subresource['name'].split('/') + if not subresources.get(resource): + subresources[resource] = {} + subresources[resource][name] = subresource + + for resource in resources_raw: + # Prevent duplicate keys + for key in ('prefix', 'group', 'api_version', 'client', 'preferred'): + resource.pop(key, None) + + resourceobj = Resource( + prefix=prefix, + group=group, + api_version=version, + client=self.client, + preferred=preferred, + subresources=subresources.get(resource['name']), + **resource + ) + resources[resource['kind']].append(resourceobj) + + resource_list = ResourceList(self.client, group=group, api_version=version, base_kind=resource['kind']) + resources[resource_list.kind].append(resource_list) + return resources + + async def get(self, **kwargs): + """ Same as search, but will throw an error if there are multiple or no + results. If there are multiple results and only one is an exact match + on api_version, that resource will be returned. + """ + results = await self.search(**kwargs) + # If there are multiple matches, prefer exact matches on api_version + if len(results) > 1 and kwargs.get('api_version'): + results = [ + result for result in results if result.group_version == kwargs['api_version'] + ] + # If there are multiple matches, prefer non-List kinds + if len(results) > 1 and not all([isinstance(x, ResourceList) for x in results]): + results = [result for result in results if not isinstance(result, ResourceList)] + if len(results) == 1: + return results[0] + elif not results: + raise ResourceNotFoundError('No matches found for {}'.format(kwargs)) + else: + raise ResourceNotUniqueError('Multiple matches found for {}: {}'.format(kwargs, results)) + + +class LazyDiscoverer(Discoverer): + """ A convenient container for storing discovered API resources. Allows + easy searching and retrieval of specific resources. + + Resources for the cluster are loaded lazily. + """ + + def __init__(self, client, cache_file): + self.__resources = None + Discoverer.__init__(self, client, cache_file) + self.__update_cache = False + + async def discover(self): + self.__resources = await self.parse_api_groups(request_resources=False) + + def __maybe_write_cache(self): + if self.__update_cache: + self._write_cache() + self.__update_cache = False + + @property + async def api_groups(self): + groups = await self.parse_api_groups(request_resources=False, update=True) + return groups['apis'].keys() + + async def search(self, **kwargs): + # In first call, ignore ResourceNotFoundError and set default value for results + try: + results = await self.__search(self.__build_search(**kwargs), self.__resources, []) + except ResourceNotFoundError: + results = [] + if not results: + await self.invalidate_cache() + results = await self.__search(self.__build_search(**kwargs), self.__resources, []) + self.__maybe_write_cache() + return results + + async def __search(self, parts, resources, req_params): + part = parts[0] + if part != '*': + resource_part = resources.get(part) + if not resource_part: + return [] + elif isinstance(resource_part, ResourceGroup): + if len(req_params) != 2: + raise ValueError("prefix and group params should be present, have %s" % req_params) + # Check if we've requested resources for this group + if not resource_part.resources: + prefix, group, version = req_params[0], req_params[1], part + try: + resource_part.resources = await self.get_resources_for_api_version( + prefix, group, part, resource_part.preferred) + except NotFoundError: + raise ResourceNotFoundError + + self._cache['resources'][prefix][group][version] = resource_part + self.__update_cache = True + return await self.__search(parts[1:], resource_part.resources, req_params) + elif isinstance(resource_part, dict): + # In this case parts [0] will be a specified prefix, group, version + # as we recurse + return await self.__search(parts[1:], resource_part, req_params + [part]) + else: + if parts[1] != '*' and isinstance(parts[1], dict): + for _resource in resource_part: + for term, value in parts[1].items(): + if getattr(_resource, term) == value: + return [_resource] + return [] + else: + return resource_part + else: + matches = [] + for key in resources.keys(): + matches.extend(await self.__search([key] + parts[1:], resources, req_params)) + return matches + + @staticmethod + def __build_search(prefix=None, group=None, api_version=None, kind=None, **kwargs): + if not group and api_version and '/' in api_version: + group, api_version = api_version.split('/') + + items = [prefix, group, api_version, kind, kwargs] + return list(map(lambda x: x or '*', items)) + + async def __aiter__(self): + for prefix, groups in self.__resources.items(): + for group, versions in groups.items(): + for version, rg in versions.items(): + # Request resources for this groupVersion if we haven't yet + if not rg.resources: + rg.resources = await self.get_resources_for_api_version( + prefix, group, version, rg.preferred) + self._cache['resources'][prefix][group][version] = rg + self.__update_cache = True + for resource in rg.resources: + yield resource + self.__maybe_write_cache() + + +class EagerDiscoverer(Discoverer): + """ A convenient container for storing discovered API resources. Allows + easy searching and retrieval of specific resources. + + All resources are discovered for the cluster upon object instantiation. + """ + + def update(self, resources): + self.__resources = resources + + def __init__(self, client, cache_file): + self.__resources = None + Discoverer.__init__(self, client, cache_file) + + async def discover(self): + self.__resources = await self.parse_api_groups(request_resources=True) + + @property + async def api_groups(self): + """ list available api groups """ + groups = await self.parse_api_groups(request_resources=True, update=True) + return groups['apis'].keys() + + async def search(self, **kwargs): + """ Takes keyword arguments and returns matching resources. The search + will happen in the following order: + prefix: The api prefix for a resource, ie, /api, /oapi, /apis. Can usually be ignored + group: The api group of a resource. Will also be extracted from api_version if it is present there + api_version: The api version of a resource + kind: The kind of the resource + arbitrary arguments (see below), in random order + + The arbitrary arguments can be any valid attribute for an Resource object + """ + results = self.__search(self.__build_search(**kwargs), self.__resources) + if not results: + await self.invalidate_cache() + results = self.__search(self.__build_search(**kwargs), self.__resources) + return results + + @staticmethod + def __build_search(prefix=None, group=None, api_version=None, kind=None, **kwargs): + if not group and api_version and '/' in api_version: + group, api_version = api_version.split('/') + + items = [prefix, group, api_version, kind, kwargs] + return list(map(lambda x: x or '*', items)) + + def __search(self, parts, resources): + part = parts[0] + resource_part = resources.get(part) + + if part != '*' and resource_part: + if isinstance(resource_part, ResourceGroup): + return self.__search(parts[1:], resource_part.resources) + elif isinstance(resource_part, dict): + return self.__search(parts[1:], resource_part) + else: + if parts[1] != '*' and isinstance(parts[1], dict): + for _resource in resource_part: + for term, value in parts[1].items(): + if getattr(_resource, term) == value: + return [_resource] + return [] + else: + return resource_part + elif part == '*': + matches = [] + for key in resources.keys(): + matches.extend(self.__search([key] + parts[1:], resources)) + return matches + return [] + + def __iter__(self): + for _, groups in self.__resources.items(): + for _, versions in groups.items(): + for _, resources in versions.items(): + for _, resource in resources.items(): + yield resource + + +class ResourceGroup(object): + """Helper class for Discoverer container""" + def __init__(self, preferred, resources=None): + self.preferred = preferred + self.resources = resources or {} + + def to_dict(self): + return { + '_type': 'ResourceGroup', + 'preferred': self.preferred, + 'resources': self.resources, + } + + +class CacheEncoder(json.JSONEncoder): + + def default(self, o): + return o.to_dict() + + +class CacheDecoder(json.JSONDecoder): + def __init__(self, client, *args, **kwargs): + self.client = client + json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs) + + def object_hook(self, obj): + if '_type' not in obj: + return obj + _type = obj.pop('_type') + if _type == 'Resource': + return Resource(client=self.client, **obj) + elif _type == 'ResourceList': + return ResourceList(self.client, **obj) + elif _type == 'ResourceGroup': + return ResourceGroup(obj['preferred'], resources=self.object_hook(obj['resources'])) + return obj diff --git a/kubernetes_asyncio/dynamic/discovery_test.py b/kubernetes_asyncio/dynamic/discovery_test.py new file mode 100644 index 000000000..d360cb363 --- /dev/null +++ b/kubernetes_asyncio/dynamic/discovery_test.py @@ -0,0 +1,69 @@ +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.dynamic import DynamicClient +from kubernetes_asyncio.e2e_test import base + + +class TestDiscoverer(unittest.IsolatedAsyncioTestCase): + + @classmethod + def setUpClass(cls): + cls.config = base.get_e2e_configuration() + + async def test_init_cache_from_file(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + + await client.resources.get(api_version='v1', kind='Node') + mtime1 = os.path.getmtime(client.resources._Discoverer__cache_file) + + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + + await client.resources.get(api_version='v1', kind='Node') + mtime2 = os.path.getmtime(client.resources._Discoverer__cache_file) + + # test no Discoverer._write_cache called + self.assertTrue(mtime1 == mtime2) + + async def test_cache_decoder_resource_and_subresource(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + + # first invalidate cache + await client.resources.invalidate_cache() + + # do Discoverer.__init__ + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + # the resources of client will use _cache['resources'] in memory + deploy1 = await client.resources.get(kind='Deployment', api_version="apps/v1") + + # do Discoverer.__init__ + # async with api_client.ApiClient(configuration=self.config) as apic: + client2 = await DynamicClient(apic) + # the resources of client will use _cache['resources'] decode from cache file + deploy2 = await client2.resources.get(kind='Deployment', api_version="apps/v1") + + deploy2.client = deploy1.client + # test Resource is the same + self.assertDictEqual(deploy1.to_dict(), deploy2.to_dict()) + + # test Subresource is the same + self.assertDictEqual(deploy1.status.to_dict(), deploy2.status.to_dict()) diff --git a/kubernetes_asyncio/dynamic/exceptions.py b/kubernetes_asyncio/dynamic/exceptions.py new file mode 100644 index 000000000..cfb0070b2 --- /dev/null +++ b/kubernetes_asyncio/dynamic/exceptions.py @@ -0,0 +1,138 @@ +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import sys +import traceback + +from kubernetes_asyncio.client.rest import ApiException + + +def api_exception(e): + """ + Returns the proper Exception class for the given kubernetes.client.rest.ApiException object + https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#success-codes + """ + _, _, exc_traceback = sys.exc_info() + tb = '\n'.join(traceback.format_tb(exc_traceback)) + return { + 400: BadRequestError, + 401: UnauthorizedError, + 403: ForbiddenError, + 404: NotFoundError, + 405: MethodNotAllowedError, + 409: ConflictError, + 410: GoneError, + 422: UnprocessibleEntityError, + 429: TooManyRequestsError, + 500: InternalServerError, + 503: ServiceUnavailableError, + 504: ServerTimeoutError, + }.get(e.status, DynamicApiError)(e, tb) + + +class DynamicApiError(ApiException): + """ Generic API Error for the dynamic client """ + def __init__(self, e, tb=None): + self.status = e.status + self.reason = e.reason + self.body = e.body + self.headers = e.headers + self.original_traceback = tb + + def __str__(self): + error_message = [str(self.status), "Reason: {}".format(self.reason)] + if self.headers: + error_message.append("HTTP response headers: {}".format(self.headers)) + + if self.body: + error_message.append("HTTP response body: {}".format(self.body)) + + if self.original_traceback: + error_message.append("Original traceback: \n{}".format(self.original_traceback)) + + return '\n'.join(error_message) + + def summary(self): + if self.body: + if self.headers and self.headers.get('Content-Type') == 'application/json': + message = json.loads(self.body).get('message') + if message: + return message + + return self.body + else: + return "{} Reason: {}".format(self.status, self.reason) + + +class ResourceNotFoundError(Exception): + """ Resource was not found in available APIs """ + + +class ResourceNotUniqueError(Exception): + """ Parameters given matched multiple API resources """ + + +class KubernetesValidateMissing(Exception): + """ kubernetes-validate is not installed """ + +# HTTP Errors + + +class BadRequestError(DynamicApiError): + """ 400: StatusBadRequest """ + + +class UnauthorizedError(DynamicApiError): + """ 401: StatusUnauthorized """ + + +class ForbiddenError(DynamicApiError): + """ 403: StatusForbidden """ + + +class NotFoundError(DynamicApiError): + """ 404: StatusNotFound """ + + +class MethodNotAllowedError(DynamicApiError): + """ 405: StatusMethodNotAllowed """ + + +class ConflictError(DynamicApiError): + """ 409: StatusConflict """ + + +class GoneError(DynamicApiError): + """ 410: StatusGone """ + + +class UnprocessibleEntityError(DynamicApiError): + """ 422: StatusUnprocessibleEntity """ + + +class TooManyRequestsError(DynamicApiError): + """ 429: StatusTooManyRequests """ + + +class InternalServerError(DynamicApiError): + """ 500: StatusInternalServer """ + + +class ServiceUnavailableError(DynamicApiError): + """ 503: StatusServiceUnavailable """ + + +class ServerTimeoutError(DynamicApiError): + """ 504: StatusServerTimeout """ diff --git a/kubernetes_asyncio/dynamic/resource.py b/kubernetes_asyncio/dynamic/resource.py new file mode 100644 index 000000000..e03863b0e --- /dev/null +++ b/kubernetes_asyncio/dynamic/resource.py @@ -0,0 +1,422 @@ +# Copyright 2019 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +from functools import partial +from pprint import pformat + +import yaml + + +class Resource(object): + """ Represents an API resource type, containing the information required to build urls for requests """ + + def __init__(self, prefix=None, group=None, api_version=None, kind=None, + namespaced=False, verbs=None, name=None, preferred=False, client=None, + singular_name=None, short_names=None, categories=None, subresources=None, **kwargs): + + if None in (api_version, kind, prefix): + raise ValueError("At least prefix, kind, and api_version must be provided") + + self.prefix = prefix + self.group = group + self.api_version = api_version + self.kind = kind + self.namespaced = namespaced + self.verbs = verbs + self.name = name + self.preferred = preferred + self.client = client + self.singular_name = singular_name or (name[:-1] if name else "") + self.short_names = short_names + self.categories = categories + self.subresources = { + k: Subresource(self, **v) for k, v in (subresources or {}).items() + } + + self.extra_args = kwargs + + def to_dict(self): + d = { + '_type': 'Resource', + 'prefix': self.prefix, + 'group': self.group, + 'api_version': self.api_version, + 'kind': self.kind, + 'namespaced': self.namespaced, + 'verbs': self.verbs, + 'name': self.name, + 'preferred': self.preferred, + 'singularName': self.singular_name, + 'shortNames': self.short_names, + 'categories': self.categories, + 'subresources': {k: sr.to_dict() for k, sr in self.subresources.items()}, + } + d.update(self.extra_args) + return d + + @property + def group_version(self): + if self.group: + return f'{self.group}/{self.api_version}' + return self.api_version + + def __repr__(self): + return f'<{self.__class__.__name__}({self.group_version}/{self.name})>' + + @property + def urls(self): + full_prefix = f'{self.prefix}/{self.group_version}' + resource_name = self.name.lower() + return { + 'base': '/{}/{}'.format(full_prefix, resource_name), + 'namespaced_base': '/{}/namespaces/{{namespace}}/{}'.format(full_prefix, resource_name), + 'full': '/{}/{}/{{name}}'.format(full_prefix, resource_name), + 'namespaced_full': '/{}/namespaces/{{namespace}}/{}/{{name}}'.format(full_prefix, resource_name) + } + + def path(self, name=None, namespace=None): + url_type = [] + path_params = {} + if self.namespaced and namespace: + url_type.append('namespaced') + path_params['namespace'] = namespace + if name: + url_type.append('full') + path_params['name'] = name + else: + url_type.append('base') + return self.urls['_'.join(url_type)].format(**path_params) + + def __getattr__(self, name): + if name in self.subresources: + return self.subresources[name] + return partial(getattr(self.client, name), self) + + +class ResourceList(Resource): + """ Represents a list of API objects """ + + def __init__(self, client, group='', api_version='v1', base_kind='', kind=None, base_resource_lookup=None): + self.client = client + self.group = group + self.api_version = api_version + self.kind = kind or '{}List'.format(base_kind) + self.base_kind = base_kind + self.base_resource_lookup = base_resource_lookup + self.__base_resource = None + + async def _ainit(self): + if self.base_kind and self.api_version and self.group: + self.__base_resource = await self.client.resources.get(group=self.group, api_version=self.api_version, + kind=self.base_kind) + + @classmethod + async def create_list(cls, client, group='', api_version='v1', base_kind='', kind=None): + self = cls(client=client, group=group, api_version=api_version, base_kind=base_kind, kind=kind) + await self._ainit() + return self + + # TODO: This code appears to be unused, or at least untested + async def base_resource(self): + if self.__base_resource: + return self.__base_resource + elif self.base_resource_lookup: + self.__base_resource = await self.client.resources.get(**self.base_resource_lookup) + return self.__base_resource + elif self.base_kind: + self.__base_resource = await self.client.resources.get(group=self.group, api_version=self.api_version, + kind=self.base_kind) + return self.__base_resource + return None + + # TODO: This code appears to be unused, or at least untested + async def _items_to_resources(self, body): + """ Takes a List body and return a dictionary with the following structure: + { + 'api_version': str, + 'kind': str, + 'items': [{ + 'resource': Resource, + 'name': str, + 'namespace': str, + }] + } + """ + if body is None: + raise ValueError("You must provide a body when calling methods on a ResourceList") + + api_version = body['apiVersion'] + kind = body['kind'] + items = body.get('items') + if not items: + raise ValueError('The `items` field in the body must be populated when calling methods on a ResourceList') + + if self.kind != kind: + raise ValueError(f'Methods on a {self.kind} must be called with a body containing the same kind.' + f' Received {kind} instead') + + return { + 'api_version': api_version, + 'kind': kind, + 'items': [await self._item_to_resource(item) for item in items] + } + + # TODO: This code appears to be unused, or at least untested + async def _item_to_resource(self, item): + metadata = item.get('metadata', {}) + resource = await self.base_resource() + if not resource: + api_version = item.get('apiVersion', self.api_version) + kind = item.get('kind', self.base_kind) + resource = await self.client.resources.get(api_version=api_version, kind=kind) + return { + 'resource': resource, + 'definition': item, + 'name': metadata.get('name'), + 'namespace': metadata.get('namespace') + } + + async def get(self, body, name=None, namespace=None, **kwargs): + if name: + raise ValueError('Operations on ResourceList objects do not support the `name` argument') + resource_list = await self._items_to_resources(body) + response = copy.deepcopy(body) + + response['items'] = [ + item['resource'].get(name=item['name'], namespace=item['namespace'] or namespace, **kwargs).to_dict() + for item in resource_list['items'] + ] + return ResourceInstance(self, response) + + async def delete(self, body, name=None, namespace=None, **kwargs): + if name: + raise ValueError('Operations on ResourceList objects do not support the `name` argument') + resource_list = await self._items_to_resources(body) + response = copy.deepcopy(body) + + response['items'] = [ + item['resource'].delete(name=item['name'], namespace=item['namespace'] or namespace, **kwargs).to_dict() + for item in resource_list['items'] + ] + return ResourceInstance(self, response) + + async def verb_mapper(self, verb, body, **kwargs): + resource_list = await self._items_to_resources(body) + response = copy.deepcopy(body) + response['items'] = [ + getattr(item['resource'], verb)(body=item['definition'], **kwargs).to_dict() + for item in resource_list['items'] + ] + return ResourceInstance(self, response) + + async def create(self, *args, **kwargs): + return await self.verb_mapper('create', *args, **kwargs) + + async def replace(self, *args, **kwargs): + return await self.verb_mapper('replace', *args, **kwargs) + + async def patch(self, *args, **kwargs): + return await self.verb_mapper('patch', *args, **kwargs) + + def to_dict(self): + return { + '_type': 'ResourceList', + 'group': self.group, + 'api_version': self.api_version, + 'kind': self.kind, + 'base_kind': self.base_kind + } + + # This code is not executed in any test scenario - is it needed? + def __getattr__(self, name): + if self.base_resource(): + return getattr(self.base_resource(), name) + return None + + +class Subresource(Resource): + """ Represents a subresource of an API resource. This generally includes operations + like scale, as well as status objects for an instantiated resource + """ + + def __init__(self, parent, **kwargs): # noqa + # super().__init__() + self.parent = parent + self.prefix = parent.prefix + self.group = parent.group + self.api_version = parent.api_version + self.kind = kwargs.pop('kind') + self.name = kwargs.pop('name') + self.subresource = kwargs.pop('subresource', None) or self.name.split('/')[1] + self.namespaced = kwargs.pop('namespaced', False) + self.verbs = kwargs.pop('verbs', None) + self.extra_args = kwargs + + # TODO(fabianvf): Determine proper way to handle differences between resources + subresources + async def create(self, body=None, name=None, namespace=None, **kwargs): + name = name or body.get('metadata', {}).get('name') + body = self.parent.client.serialize_body(body) + if self.parent.namespaced: + namespace = self.parent.client.ensure_namespace(self.parent, namespace, body) + path = self.path(name=name, namespace=namespace) + return await self.parent.client.request('post', path, body=body, **kwargs) + + @property + def urls(self): + full_prefix = f'{self.prefix}/{self.group_version}' + return { + 'full': '/{}/{}/{{name}}/{}'.format(full_prefix, self.parent.name, self.subresource), + 'namespaced_full': '/{}/namespaces/{{namespace}}/{}/{{name}}/{}'.format(full_prefix, self.parent.name, + self.subresource) + } + + def __getattr__(self, name): + return partial(getattr(self.parent.client, name), self) + + def to_dict(self): + d = { + 'kind': self.kind, + 'name': self.name, + 'subresource': self.subresource, + 'namespaced': self.namespaced, + 'verbs': self.verbs + } + d.update(self.extra_args) + return d + + +class ResourceInstance(object): + """ A parsed instance of an API resource. It exists solely to + ease interaction with API objects by allowing attributes to + be accessed with '.' notation. + """ + + def __init__(self, client, instance): + self.client = client + # If we have a list of resources, then set the apiVersion and kind of + # each resource in 'items' + kind = instance['kind'] + if kind.endswith('List') and 'items' in instance: + kind = instance['kind'][:-4] + for item in instance['items']: + if 'apiVersion' not in item: + item['apiVersion'] = instance['apiVersion'] + if 'kind' not in item: + item['kind'] = kind + + self.attributes = self.__deserialize(instance) + self.__initialised = True + + def __deserialize(self, field): + if isinstance(field, dict): + return ResourceField(params={ + k: self.__deserialize(v) for k, v in field.items() + }) + elif isinstance(field, (list, tuple)): + return [self.__deserialize(item) for item in field] + else: + return field + + def __serialize(self, field): + if isinstance(field, ResourceField): + return { + k: self.__serialize(v) for k, v in field.__dict__.items() + } + elif isinstance(field, (list, tuple)): + return [self.__serialize(item) for item in field] + elif isinstance(field, ResourceInstance): + return field.to_dict() + else: + return field + + def to_dict(self): + return self.__serialize(self.attributes) + + def to_str(self): + return repr(self) + + def __repr__(self): + return "ResourceInstance[{}]:\n {}".format( + self.attributes.kind, + ' '.join(yaml.safe_dump(self.to_dict()).splitlines(True)) + ) + + def __getattr__(self, name): + if '_ResourceInstance__initialised' not in self.__dict__: + return super().__getattr__(name) + return getattr(self.attributes, name) + + def __setattr__(self, name, value): + if '_ResourceInstance__initialised' not in self.__dict__: + return super().__setattr__(name, value) + elif name in self.__dict__: + return super().__setattr__(name, value) + else: + self.attributes[name] = value + + def __getitem__(self, name): + return self.attributes[name] + + def __setitem__(self, name, value): + self.attributes[name] = value + + def __dir__(self): + return dir(type(self)) + list(self.attributes.__dict__.keys()) + + +class ResourceField(object): + """ A parsed instance of an API resource attribute. It exists + solely to ease interaction with API objects by allowing + attributes to be accessed with '.' notation + """ + + def __init__(self, params): + self.__dict__.update(**params) + + def __repr__(self): + return pformat(self.__dict__) + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def __getitem__(self, name): + return self.__dict__.get(name) + + # Here resource.items will return items if available or resource.__dict__.items function if not + # resource.get will call resource.__dict__.get after attempting resource.__dict__.get('get') + def __getattr__(self, name): + return self.__dict__.get(name, getattr(self.__dict__, name, None)) + + def __setattr__(self, name, value): + self.__dict__[name] = value + + def __dir__(self): + return dir(type(self)) + list(self.__dict__.keys()) + + def __iter__(self): + for k, v in self.__dict__.items(): + yield (k, v) + + def to_dict(self): + return self.__serialize(self) + + def __serialize(self, field): + if isinstance(field, ResourceField): + return { + k: self.__serialize(v) for k, v in field.__dict__.items() + } + if isinstance(field, (list, tuple)): + return [self.__serialize(item) for item in field] + return field diff --git a/kubernetes_asyncio/e2e_test/test_dynamic_client.py b/kubernetes_asyncio/e2e_test/test_dynamic_client.py new file mode 100644 index 000000000..496905a08 --- /dev/null +++ b/kubernetes_asyncio/e2e_test/test_dynamic_client.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time +import uuid +from unittest import IsolatedAsyncioTestCase + +from kubernetes_asyncio.client import api_client +from kubernetes_asyncio.dynamic import DynamicClient +from kubernetes_asyncio.e2e_test import base + +# from kubernetes_asyncio.stream import WsApiClient + + +def short_uuid(): + id_ = str(uuid.uuid4()) + return id_[-12:] + + +class TestDynamicClient(IsolatedAsyncioTestCase): + + @classmethod + def setUpClass(cls): + cls.config = base.get_e2e_configuration() + + async def test_pod_apis(self): + async with api_client.ApiClient(configuration=self.config) as apic: + client = await DynamicClient(apic) + + pod_resource = await client.resources.get(kind='Pod', api_version='v1') + + name = 'busybox-test-' + short_uuid() + pod_manifest = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': name + }, + 'spec': { + 'containers': [{ + 'image': 'busybox', + 'name': 'sleep', + "args": [ + "/bin/sh", + "-c", + "while true;do date;sleep 5; done" + ] + }] + } + } + + resp = await client.create(pod_resource, body=pod_manifest, namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status.phase) + + while True: + resp = await client.get(pod_resource, name=name, namespace='default') + self.assertEqual(name, resp.metadata.name) + self.assertTrue(resp.status.phase) + if resp.status.phase != 'Pending': + break + time.sleep(1) + + resp = await client.get(pod_resource) + number_of_pods = len(resp.items) + self.assertTrue(number_of_pods > 0) + + await client.delete(pod_resource, name=name, body={}, namespace='default') diff --git a/scripts/rest_client_patch.diff b/scripts/rest_client_patch.diff index 8502f4131..69ca5f863 100644 --- a/scripts/rest_client_patch.diff +++ b/scripts/rest_client_patch.diff @@ -5,7 +5,9 @@ --- 132,140 ---- # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: - if re.search('json', headers['Content-Type'], re.IGNORECASE): +- if re.search('json', headers['Content-Type'], re.IGNORECASE): ++ if (re.search('json', headers['Content-Type'], re.IGNORECASE) or ++ headers['Content-Type'] == 'application/apply-patch+yaml'): + if headers['Content-Type'] == 'application/json-patch+json': + if not isinstance(body, list): + headers['Content-Type'] = 'application/strategic-merge-patch+json'