关于client-go
- client-go是kubernetes官方提供的go语言的客户端库,go应用使用该库可以访问kubernetes的API Server,这样我们就能通过编程来对kubernetes资源进行增删改查操作
client-go版本和kubernetes版本对应关系
参考官方文档
client-go 使用之Clientset
- clientset为client-go将kubernetes原生资源的各自操作进行了封装,可以直接使用,但是如果是需要操作自己实现的operator,就需要使用到下文提到的dynamicClient
package main
import (
"context"
"flag"
"fmt"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/utils/pointer"
"path/filepath"
)
const (
NAMESPACE = "test-clientset"
DEPLOYMENT_NAME = "client-test-deployment"
SERVICE_NAME = "client-test-service"
)
func main() {
var kubeconfig *string
// home是家目录,如果能取得家目录的值,就可以用来做默认值
if home:=homedir.HomeDir(); home != "" {
// 如果输入了kubeconfig参数,该参数的值就是kubeconfig文件的绝对路径,
// 如果没有输入kubeconfig参数,就用默认路径~/.kube/config
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
// 如果取不到当前用户的家目录,就没办法设置kubeconfig的默认目录了,只能从入参中取
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
// 获取用户输入的操作类型,默认是create,还可以输入clean,用于清理所有资源
operate := flag.String("operate", "create", "operate type : create or clean")
flag.Parse()
// 从本机加载kubeconfig配置文件,因此第一个参数为空字符串
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
// kubeconfig加载失败就直接退出了
if err != nil {
panic(err.Error())
}
// 实例化clientset对象
clientset, err := kubernetes.NewForConfig(config)
if err!= nil {
panic(err.Error())
}
fmt.Printf("operation is %v\n", *operate)
// 如果要执行清理操作
if "clean"==*operate {
clean(clientset)
} else {
// 创建namespace
createNamespace(clientset)
// 创建deployment
createDeployment(clientset)
// 创建service
createService(clientset)
}
}
// 清理本次实战创建的所有资源
func clean(clientset *kubernetes.Clientset) {
emptyDeleteOptions := metav1.DeleteOptions{}
// 删除service
if err := clientset.CoreV1().Services(NAMESPACE).Delete(context.TODO(), SERVICE_NAME, emptyDeleteOptions) ; err != nil {
panic(err.Error())
}
// 删除deployment
if err := clientset.AppsV1().Deployments(NAMESPACE).Delete(context.TODO(), DEPLOYMENT_NAME, emptyDeleteOptions) ; err != nil {
panic(err.Error())
}
// 删除namespace
if err := clientset.CoreV1().Namespaces().Delete(context.TODO(), NAMESPACE, emptyDeleteOptions) ; err != nil {
panic(err.Error())
}
}
// 新建namespace
func createNamespace(clientset *kubernetes.Clientset) {
namespaceClient := clientset.CoreV1().Namespaces()
namespace := &apiv1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: NAMESPACE,
},
}
result, err := namespaceClient.Create(context.TODO(), namespace, metav1.CreateOptions{})
if err!=nil {
panic(err.Error())
}
fmt.Printf("Create namespace %s \n", result.GetName())
}
// 新建service
func createService(clientset *kubernetes.Clientset) {
// 得到service的客户端
serviceClient := clientset.CoreV1().Services(NAMESPACE)
// 实例化一个数据结构
service := &apiv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: SERVICE_NAME,
},
Spec: apiv1.ServiceSpec{
Ports: []apiv1.ServicePort{{
Name: "http",
Port: 8080,
NodePort: 30080,
},
},
Selector: map[string]string{
"app" : "tomcat",
},
Type: apiv1.ServiceTypeNodePort,
},
}
result, err := serviceClient.Create(context.TODO(), service, metav1.CreateOptions{})
if err!=nil {
panic(err.Error())
}
fmt.Printf("Create service %s \n", result.GetName())
}
// 新建deployment
func createDeployment(clientset *kubernetes.Clientset) {
// 得到deployment的客户端
deploymentClient := clientset.
AppsV1().
Deployments(NAMESPACE)
// 实例化一个数据结构
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: DEPLOYMENT_NAME,
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.Int32Ptr(2),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app" : "tomcat",
},
},
Template: apiv1.PodTemplateSpec{
ObjectMeta:metav1.ObjectMeta{
Labels: map[string]string{
"app" : "tomcat",
},
},
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{
Name: "tomcat",
Image: "tomcat:8.0.18-jre8",
ImagePullPolicy: "IfNotPresent",
Ports: []apiv1.ContainerPort{
{
Name: "http",
Protocol: apiv1.ProtocolSCTP,
ContainerPort: 8080,
},
},
},
},
},
},
},
}
result, err := deploymentClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
if err!=nil {
panic(err.Error())
}
fmt.Printf("Create deployment %s \n", result.GetName())
}
client-go 使用之dynamicClient
- dynamicClient主要是操作非kubernetes内置的资源比如,pod,service,deployment等
- Unstructured Data(非结构化数据) 和 Structured Data(结构化数据)
// Structured Data(结构化数据)例子
type User struct {
ID string
Name string
}
// 对于k8s,会遇到各自复制的数据结构,因此使用的是Unstructured Data,类似interface{}
type Unstructured struct {
// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
// map[string]interface{}
// children.
Object map[string]interface{}
}
- Unstructured与资源对象的相互转换
用Unstructured实例生成资源对象,也可以用资源对象生成Unstructured实例,方法是unstructuredConverter的FromUnstructured和ToUnstructured方法分别实现的,下面的代码片段展示了如何将Unstructured实例转为PodList实例:
// 实例化一个PodList数据结构,用于接收从unstructObj转换后的结果
podList := &apiv1.PodList{}
// unstructObj
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), podList)
package main
import (
"bufio"
"context"
"flag"
"fmt"
"os"
"path/filepath"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/client-go/util/retry"
//
// Uncomment to load all auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth"
//
// Or uncomment to load specific auth plugins
// _ "k8s.io/client-go/plugin/pkg/client/auth/azure"
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
// _ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
client, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
deployment := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": "demo-deployment",
},
"spec": map[string]interface{}{
"replicas": 2,
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": "demo",
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": "demo",
},
},
"spec": map[string]interface{}{
"containers": []map[string]interface{}{
{
"name": "web",
"image": "nginx:1.12",
"ports": []map[string]interface{}{
{
"name": "http",
"protocol": "TCP",
"containerPort": 80,
},
},
},
},
},
},
},
},
}
// Create Deployment
fmt.Println("Creating deployment...")
result, err := client.Resource(deploymentRes).Namespace(apiv1.NamespaceDefault).Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Created deployment %q.\n", result.GetName())
// Update Deployment
prompt()
fmt.Println("Updating deployment...")
// You have two options to Update() this Deployment:
//
// 1. Modify the "deployment" variable and call: Update(deployment).
// This works like the "kubectl replace" command and it overwrites/loses changes
// made by other clients between you Create() and Update() the object.
// 2. Modify the "result" returned by Get() and retry Update(result) until
// you no longer get a conflict error. This way, you can preserve changes made
// by other clients between Create() and Update(). This is implemented below
// using the retry utility package included with client-go. (RECOMMENDED)
//
// More Info:
// https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Deployment before attempting update
// RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
result, getErr := client.Resource(deploymentRes).Namespace(apiv1.NamespaceDefault).Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
if getErr != nil {
panic(fmt.Errorf("failed to get latest version of Deployment: %v", getErr))
}
// update replicas to 1
if err := unstructured.SetNestedField(result.Object, int64(1), "spec", "replicas"); err != nil {
panic(fmt.Errorf("failed to set replica value: %v", err))
}
// extract spec containers
containers, found, err := unstructured.NestedSlice(result.Object, "spec", "template", "spec", "containers")
if err != nil || !found || containers == nil {
panic(fmt.Errorf("deployment containers not found or error in spec: %v", err))
}
// update container[0] image
if err := unstructured.SetNestedField(containers[0].(map[string]interface{}), "nginx:1.13", "image"); err != nil {
panic(err)
}
if err := unstructured.SetNestedField(result.Object, containers, "spec", "template", "spec", "containers"); err != nil {
panic(err)
}
_, updateErr := client.Resource(deploymentRes).Namespace(apiv1.NamespaceDefault).Update(context.TODO(), result, metav1.UpdateOptions{})
return updateErr
})
if retryErr != nil {
panic(fmt.Errorf("update failed: %v", retryErr))
}
fmt.Println("Updated deployment...")
// List Deployments
prompt()
fmt.Printf("Listing deployments in namespace %q:\n", apiv1.NamespaceDefault)
list, err := client.Resource(deploymentRes).Namespace(apiv1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, d := range list.Items {
replicas, found, err := unstructured.NestedInt64(d.Object, "spec", "replicas")
if err != nil || !found {
fmt.Printf("Replicas not found for deployment %s: error=%s", d.GetName(), err)
continue
}
fmt.Printf(" * %s (%d replicas)\n", d.GetName(), replicas)
}
// Delete Deployment
prompt()
fmt.Println("Deleting deployment...")
deletePolicy := metav1.DeletePropagationForeground
deleteOptions := metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}
if err := client.Resource(deploymentRes).Namespace(apiv1.NamespaceDefault).Delete(context.TODO(), "demo-deployment", deleteOptions); err != nil {
panic(err)
}
fmt.Println("Deleted deployment.")
}
func prompt() {
fmt.Printf("-> Press Return key to continue.")
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
break
}
if err := scanner.Err(); err != nil {
panic(err)
}
fmt.Println()
}
参考文档
client-go官方文档
client-go实战