# !/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/5/13 11:42
# @Author : xxx
# @File : s.py
# @Description : 这个类封装kube-sdk
import os
import re
import time
import kubernetes
import requests
from kubernetes.stream import stream
from kubernetes import client, config
from kubernetes.client.api.networking_v1beta1_api import NetworkingV1beta1Api
class Kube(object):
def __init__(self, source=None,namespace=None):
# print("开始请求".center(40, "*"), time.ctime())
# print("初始化kube对象",source)
self.source = source
if source == "intranet-dev":
# print(local_kubeconfig)
self.config_file = local_kubeconfig
# self.config_file = "/application/Repair/Repair/libs/local-config"
config.kube_config.load_kube_config(config_file=self.config_file)
elif source == "tencent-prod":
# print(tencent_kubeconfig)
# self.config_file = "D:\pyrepair\Repair\Repair\libs\\ten-config"
self.config_file = tencent_kubeconfig
config.kube_config.load_kube_config(config_file=self.config_file)
elif source == "aliyun-prod":
# print(aliyun_kubeconfig)
self.config_file = aliyun_kubeconfig
config.kube_config.load_kube_config(config_file=self.config_file)
elif os.name == "nt":
# print(local_kubeconfig)
self.config_file = local_kubeconfig
# self.config_file = "/application/Repair/Repair/libs/config/local-config"
config.kube_config.load_kube_config(config_file=self.config_file)
else:
pass
# 获取api对象
self.namespace = namespace
self.Api_Instance = client.CoreV1Api()
self.Api_Beatch = client.BatchApi()
self.Api_Apps = client.AppsV1Api()
self.k8s_client = client.ApiClient()
self.Api_Network = NetworkingV1beta1Api()
# 获取ingress
def get_ingress(self, flag=False):
ingress_list = []
# flag = True 进行单空间查询 反之 为多空间
if flag:
time_list_namespace = self.list_namespaces(flag)
# print(time_list_namespace)
ingress_obj = self.Api_Network.list_ingress_for_all_namespaces()
for ingress in ingress_obj.items:
# if ingress.metadata.namespace == "wuhan":
for rule in ingress.spec.rules:
for path in rule.http.paths:
if ingress.metadata.namespace in time_list_namespace:
# if ingress.metadata.namespace == "muleizhwl":
data = {
"namespace": ingress.metadata.namespace,
"ingress": ingress.metadata.name,
"dns": ingress.spec.rules[0].host,
"service": path.backend.service_name,
"port": path.backend.service_port,
"path": path.path,
}
ingress_list.append(data)
else:
ingress_obj = self.Api_Network.list_ingress_for_all_namespaces()
for ingress in ingress_obj.items:
# if ingress.metadata.namespace == "wuhan":
for rule in ingress.spec.rules:
for path in rule.http.paths:
data = {
"namespace": ingress.metadata.namespace,
"ingress": ingress.metadata.name,
"dns": ingress.spec.rules[0].host,
"service": path.backend.service_name,
"port": path.backend.service_port,
"path": path.path,
}
ingress_list.append(data)
return ingress_list
# 写入文件
def write_execl(self, flag=False):
ingress_list = self.get_ingress(flag)
data_tp = ("空间", "ingress", "域名", "服务", "端口", "路由")
_execl = Execl()
data = _execl.format_data(data_info=ingress_list)
result = _execl.write_excel(file="ingress情况表.xlsx", data=data, title=data_tp)
if result:
print("写入完成")
# 获取空间
def list_namespaces(self, flag=False):
# print("获取空间")
namespaces = self.Api_Instance.list_namespace()
namespace_list_line = []
time_list_namespace = []
for namespace in namespaces.items:
# 启用时间筛选
if flag:
if namespace.metadata.name == "sjj-wangmangling-dev":
print(namespace.metadata.annotations.get("kubesphere.io/alias-name").split("-"))
break
else:
if namespace.metadata.labels.get("kubesphere.io/devopsproject") == None:
desc = namespace.metadata.annotations
data = self.get_patch_namespace(namespace,desc)
namespace_list_line.append(data)
if len(namespace_list_line) == 0:
return time_list_namespace
return namespace_list_line
def get_patch_namespace(self,namespace,desc):
cloud_type = models.DictItems.objects.filter(dict_code=self.source).first()
# print("获取到的",cloud_type)
head = self.re_head(namespace)
try:
#环境
environment = namespace.metadata.annotations.get("kubesphere.io/alias-name").split("-")[-3]
except Exception:
environment = ''
ex_time = self.re_time(namespace)
if self.source == 'aliyun-prod':
url = "https://xxx/{0}/clusters/default/projects/{1}/deployments".format(
namespace.metadata.labels.get("kubesphere.io/workspace"), namespace.metadata.name)
elif self.source == "tencent-prod" or self.source == "intranet-dev":
url = f"https://xxxx/{namespace.metadata.labels.get('kubesphere.io/workspace')}/clusters/{self.source}/projects/{namespace.metadata.name}/deployments"
data = {
"namespace": namespace.metadata.name,
"desc": desc,
"work": namespace.metadata.labels.get("kubesphere.io/workspace"),
"createtime": namespace.metadata.creation_timestamp.strftime("%Y-%m-%d"),
"url": url,
"cloud_type": cloud_type,
"create_time": namespace.metadata.creation_timestamp,
"head": head,
"ex_time": ex_time,
"environment": environment,
}
return data
# 处理配额
def get_namespace_resource_quota(self, flag=False):
namespace_quota = self.Api_Instance.list_resource_quota_for_all_namespaces()
quota_new_list = []
for quota in namespace_quota.items:
if flag:
break
if quota.status.used == None:
quotas = "无配置"
else:
quotas = quota.status.used.get("limits.memory")
data = {
"namespace": quota.metadata.namespace,
"limit_memory": quotas,
}
quota_new_list.append(data)
return quota_new_list
"""
处理有状态服务
"""
def get_statefulset_pod(self,namespace=None):
new_statefulset_list = []
statefulset_list = self.Api_Apps.list_namespaced_stateful_set(namespace=namespace)
for statefulset_pod in statefulset_list.items:
new_statefulset_list.append(statefulset_pod.metadata.name)
return new_statefulset_list
# 处理deployment控制器
def get_list_deployments_probe(self, namespace=None, flag=False, namespaces=False):
print("本次要获取的空间",namespace)
new_namespaces_dict = {}
# 获取所有空间
if namespaces:
namespace = self.list_namespaces()
for name in namespace:
deployment_list_line, deployment_list_limit, deployment_list_desc,middleware_list = self.get_list_deployment_dict_data(
name.get("namespace"), flag)
new_namespaces_dict["deployment_list_line"] = deployment_list_line
new_namespaces_dict["deployment_list_limit"] = deployment_list_limit
new_namespaces_dict["deployment_list_desc"] = deployment_list_desc
return new_namespaces_dict
else:
print("手动获取空间")
deployment_list_line, deployment_list_limit, deployment_list_desc,middleware_list,clse_list = self.get_list_deployment_dict_data(namespace,flag)
return deployment_list_line, deployment_list_limit, deployment_list_desc,middleware_list,clse_list
def re_head(self,namespace):
try:
if "-" in re.search("负责人(.*)",
namespace.metadata.annotations.get(
"kubesphere.io/alias-name")).group(1):
head = re.search("负责人(.*)",
namespace.metadata.annotations.get(
"kubesphere.io/alias-name")).group(1).split("-")[0]
else:
head = re.search("负责人(.*)",
namespace.metadata.annotations.get(
"kubesphere.io/alias-name")).group(1)
except Exception:
return ""
return head
def re_time(self,namespace):
try:
if re.search("-([0-9]{4}-[0-9]{2}-[0-9]{2})", namespace.metadata.annotations.get(
"kubesphere.io/alias-name")):
ex_time = re.search("-([0-9]{4}-[0-9]{2}-[0-9]{2})", namespace.metadata.annotations.get(
"kubesphere.io/alias-name")).group(1)
else:
ex_time = namespace.metadata.annotations.get("kubesphere.io/alias-name").split("-")[-1]
except Exception:
return ""
# print(ex_time)
return ex_time
# 处理deployment的资源组合成dict数据
def get_list_deployment_dict_data(self, namespace, flag=False):
deployment_list_line = []
deployment_list_limit = []
deployment_list_desc = []
middleware_list = []
clse_list = []
statefulset_list = self.get_statefulset_pod(namespace)
list_deployment_response = self.Api_Apps.list_namespaced_deployment(namespace)
data_middleware_list = ["redis","mysql","elasticsearch","es","mongo-bi-v1","redis-v1","mongo-bi","rabbitmq-server","nacos","rabbitmq","mongo"
,"elasticsearch-v1",
"mongdo","rabbitmq-v1","rmq","mysql-v1","sentinel","minio","redis5","mongodb","mysql-server","mongodb-bi"]
for deployment in list_deployment_response.items:
if deployment.metadata.name in data_middleware_list:
middleware_list.append(deployment.metadata.name)
continue
else:
if flag:
print("控制器", deployment)
else:
if deployment.spec.replicas == 0:
print("以关闭空间",deployment.metadata.self_link.split("/")[5])
clse_list.append(deployment.metadata.self_link.split("/")[5])
time.sleep(2)
else:
if deployment.spec.template.spec.containers[0].liveness_probe == None and \
deployment.spec.template.spec.containers[0].readiness_probe == None:
try:
data = {
"namespace": deployment.metadata.self_link.split("/")[5],
"deployment": deployment.metadata.name,
"line": "×",
"cloud_type":models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_line.append(data)
except Exception:
data = {
"namespace": "无命名空间",
"deployment": deployment.metadata.name,
"line": "×",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_line.append(data)
else:
try:
data = {
"namespace": deployment.metadata.self_link.split("/")[5],
"deployment": deployment.metadata.name,
"line": "√",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_line.append(data)
except Exception:
data = {
"namespace": "无命名空间",
"deployment": deployment.metadata.name,
"line": "√",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_line.append(data)
if deployment.metadata.annotations.get(
"kubesphere.io/description") == None and deployment.spec.replicas == 1:
try:
data = {
"namespace": deployment.metadata.self_link.split("/")[5],
"deployment": deployment.metadata.name,
"desc": "×",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_desc.append(data)
except Exception:
data = {
"namespace": "无命名空间",
"deployment": deployment.metadata.name,
"desc": "×",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_desc.append(data)
else:
try:
data = {
"namespace": deployment.metadata.self_link.split("/")[5],
"deployment": deployment.metadata.name,
"desc": "√",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_desc.append(data)
except Exception:
data = {
"namespace": "无命名空间",
"deployment": deployment.metadata.name,
"desc": "√",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_desc.append(data)
if deployment.spec.template.spec.containers[0] \
.resources.limits == None and deployment.spec.replicas == 1:
try:
data = {
"namespace": deployment.metadata.self_link.split("/")[5],
"deployment": deployment.metadata.name,
"limit": "×",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_limit.append(data)
except Exception:
data = {
"namespace": "无命名空间",
"deployment": deployment.metadata.name,
"limit": "×",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_limit.append(data)
else:
try:
data = {
"namespace": deployment.metadata.self_link.split("/")[5],
"deployment": deployment.metadata.name,
"limit": "√",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_limit.append(data)
except Exception:
data = {
"namespace": "无命名空间",
"deployment": deployment.metadata.name,
"limit": "√",
"cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
}
deployment_list_limit.append(data)
new_middleware_list = middleware_list + statefulset_list
return deployment_list_line, deployment_list_limit, deployment_list_desc,new_middleware_list,clse_list
# 获取非running状态
def get_not_running(self, running=True):
list_pod_data = []
project_data = []
# 获取所有空间的pod
pod_list = self.Api_Instance.list_pod_for_all_namespaces(watch=False)
for pod in pod_list.items:
# print(pod)
if pod.status.phase != "Running":
if pod.status.phase != "Succeeded":
# print("结果",pod)
try:
if pod.status.container_statuses[0].ready == True:
ready = "准备好"
else:
ready = "未准备好"
if pod.status.container_statuses[0].started == True:
status = "已启动"
else:
status = "未启动"
restart = pod.status.container_statuses[0].restart_count
except Exception:
status, ready, restart = None, None, None
msg_dict = {"project": pod.metadata.namespace, "service": pod.metadata.name,
"status": pod.status.phase, "reay": ready,
"runing": status,
"restart": restart}
list_pod_data.append("项目:{0}\t 服务名称:{1}\t 当前状态:{2}\t 是否准备好:{3}\t 是否启动:{4}\t 自动重启次数:{5}".format(
pod.metadata.namespace, pod.metadata.name,
pod.status.phase, ready,
status, restart))
project_data.append(msg_dict)
# self.send_msg(msg=msg_dict)
if running != True:
return project_data
return list_pod_data
# 创建deployment使用yaml格式
def create_namespace_deployment_yaml(self, namespace, images, name, port,cluster_name):
#print("端口号",port)
if int(port) != 80:
print("是java")
init_time = 60
time_out = 3
path = "/health"
else:
print("非java")
init_time = 5
time_out = 1
path = "/"
deployment_pvc_json = {'apiVersion': 'v1', 'kind': 'PersistentVolumeClaim', 'metadata': {'name': name},
'spec': {'accessModes': ['ReadWriteOnce'],
'the__resources': {'requests': {'storage': '10Gi'}},
'storageClassName': 'nfs-client'}}
deployment_json = {'kind': 'Deployment', 'apiVersion': 'apps/v1',
'metadata': {'name': name, 'namespace': namespace, "labels": {'app': name},
'annotations': {'deployment.kubernetes.io/revision': '1.txt'}},
'spec': {'replicas': 1, 'selector': {'matchLabels': {'app': name}},
'template': {'metadata': {'labels': {'app': name}}, 'spec': {'volumes': [
{'name': 'host-time', 'hostPath': {'path': '/etc/localtime', 'type': ''}},
{'name': name}], 'containers': [{'name': name, 'image': images, 'ports': [
{'name': 'tcp-{0}'.format(port), 'containerPort': int(port),
'protocol': 'TCP'}],
'the__resources': {
'limits': {'cpu': '1.txt', 'memory': '1500Mi'},
'requests': {'cpu': '500m',
'memory': '500Mi'}},
'volumeMounts': [
{'name': 'host-time', 'readOnly': True,
'mountPath': '/etc/localtime'},
{'name': name, 'mountPath': '/data/db'}],
'livenessProbe': {
'httpGet': {
'path': path,
'port': int(port),
'scheme': 'HTTP'
},
'initialDelaySeconds': init_time,
'timeoutSeconds': time_out,
'periodSeconds': 10,
'successThreshold': 1,
'failureThreshold': 3
},
'readinessProbe': {
'httpGet': {
'path': path,
'port': int(port),
'scheme': 'HTTP'
},
'initialDelaySeconds': init_time,
'timeoutSeconds': time_out,
'periodSeconds': 10,
'successThreshold': 1,
'failureThreshold': 3
},
'imagePullPolicy': 'Always'}],
'imagePullSecrets': [{"name": "harbor"}],
'restartPolicy': 'Always'},
}}}
deployment_svc_json = {'kind': 'Service', 'apiVersion': 'v1',
'metadata': {'name': name, 'namespace': namespace, 'labels': {'app': name}}, 'spec': {
'ports': [
{'name': 'tcp-{0}'.format(port), 'protocol': 'TCP', 'port': int(port), 'targetPort': int(port)}],
'selector': {'app': name}, 'type': 'ClusterIP'}}
"""{'kind': 'Secret', 'apiVersion': 'v1', 'metadata': {'name': 'xx', 'namespace': 'devops', 'annotations': {'kubesphere.io/alias-name': 'xx', 'kubesphere.io/creator': 'liuchengguo', 'kubesphere.io/description': 'xx'}}, 'data': {'tls.crt': '1.txt', 'tls.key': '2'}, 'type': 'kubernetes.io/tls'}"""
# print(json.dumps(deployment_svc_json))
deploy_list = self.Api_Apps.list_namespaced_deployment(namespace=namespace)
new_deploy_list = []
if len(deploy_list.items) == 0:
print("检验部署")
try:
self.Api_Apps.create_namespaced_deployment(body=deployment_json, namespace=namespace)
self.Api_Instance.create_namespaced_service(body=deployment_svc_json, namespace=namespace)
print("部署成功")
return True
except Exception as e:
return "创建deployment失败,部署失败原因:{0}".format(json.loads(e.body).get("message"))
else:
print("检验部署1")
for deploy in deploy_list.items:
new_deploy_list.append(deploy.metadata.name)
print(new_deploy_list)
if name in new_deploy_list:
print("存在")
try:
depl = self.Api_Apps.replace_namespaced_deployment(body=deployment_json, namespace=namespace,
name=name)
return True
except Exception as e:
return "创建deployment失败,部署失败原因:{0} 部署环境:{1}".format(json.loads(e.body).get("message"),cluster_name)
else:
try:
self.Api_Apps.create_namespaced_deployment(body=deployment_json, namespace=namespace)
self.Api_Instance.create_namespaced_service(body=deployment_svc_json, namespace=namespace)
print("部署成功")
return True
except Exception as e:
return "创建deployment失败,部署失败原因:{0} 部署环境:{1}".format(json.loads(e.body).get("message"),cluster_name)
# try:
# pv = self.Api_Instance.create_namespaced_persistent_volume_claim(body=deployment_pvc_json,
# namespace=namespace)
# except Exception:
# print("创建pv失败")
"""监听ingress对象"""
def watch_all_namespaces_ingress(self):
try:
api_response = self.Api_Network.list_ingress_for_all_namespaces(allow_watch_bookmarks=True, watch=True)
except Exception as e:
print("watch失败:{0}", e)
"""监听deployment对象"""
def watch_all_namespaces_deployment(self):
# watch = kubernetes.watch.Watch()
# for e in watch.stream(self.Api_Instance.list_event_for_all_namespaces):
# print("操作:{0}\t\t空间:{1.txt}\t\t服务:{2}".format(e.get("type"),e.get("object").metadata.namespace,e.get("object").metadata.name))
deployment_list = self.Api_Apps.list_deployment_for_all_namespaces()
len_deployment = len(deployment_list.items)
new_len_add_deploy = []
new_len_deployment = 1
watch = kubernetes.watch.Watch()
for e in watch.stream(self.Api_Apps.list_deployment_for_all_namespaces):
if new_len_deployment <= len_deployment and e.get("type") == "ADDED":
new_len_deployment += 1
else:
alias_name = e.get("object").metadata.annotations.get("kubesphere.io/alias-name") if e.get(
"object").metadata.annotations.get("kubesphere.io/alias-name") else "无别名"
description = e.get("object").metadata.annotations.get("kubesphere.io/description") if e.get(
"object").metadata.annotations.get("kubesphere.io/description") else "无描述"
new_len_add_deploy.append(e.get("type"))
namespace = e.get("object").metadata.namespace
name = e.get("object").metadata.name
replicas = 0 if e.get("object").status.replicas == None else e.get("object").status.replicas
ready_replicas = 0 if e.get("object").status.ready_replicas == None else e.get(
"object").status.ready_replicas
try:
image = e.get("raw_object").get("spec").get("template").get("spec").get("containers")[0].get(
"image")
except Exception:
print("非更新操作")
image = "无"
"删除"
if e.get("type") == "DELETED":
kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1).first()
print("删除匹配", kube_namespace)
if kube_namespace:
msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 删除\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n > \n\n > **当前镜像:** {4}".format(
namespace, name, alias_name, description, image)
self.send_msg(msg_json=msg_json, token=kube_namespace.token)
new_len_add_deploy = []
"修改"
if e.get("type") == "MODIFIED" and len(new_len_add_deploy) == 6:
print(new_len_add_deploy)
kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1).first()
print("更新匹配", kube_namespace)
if kube_namespace:
msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 修改\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n > \n\n > **当前镜像:** {4}".format(
namespace, name, alias_name, description, image)
self.send_msg(msg_json=msg_json, token=kube_namespace.token)
new_len_add_deploy = []
"添加"
if e.get("type") == "ADDED" and "ADDED" in new_len_add_deploy:
print("ADDED", new_len_add_deploy)
kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1).first()
print("添加匹配", kube_namespace)
if kube_namespace:
msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 添加\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n > \n\n > **当前镜像:** {4}".format(
namespace, name, alias_name, description, image)
self.send_msg(msg_json=msg_json, token=kube_namespace.token)
new_len_add_deploy = []
#监听pod状态
def watch_all_namespaces_pod(self):
pod_list = self.Api_Instance.list_pod_for_all_namespaces()
len_pod = len(pod_list.items)
new_len_add_pod = []
new_len_pod = 1
watch = kubernetes.watch.Watch()
for e in watch.stream(self.Api_Instance.list_pod_for_all_namespaces):
if new_len_pod <= len_pod and e.get("type") == "ADDED":
new_len_pod += 1
else:
print("开始监听pod",e.get("type"))
description = e.get("object").metadata.annotations
# description = e.get("object").metadata.annotations.get("kubesphere.io/description") if e.get(
# "object").metadata.annotations.get("kubesphere.io/description") else "无描述"
new_len_add_pod.append(e.get("type"))
namespace = e.get("object").metadata.namespace
name = e.get("object").metadata.name
status = e.get("object").status.phase
"删除"
if e.get("type") == "DELETED":
# kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1.txt).first()
# print("删除匹配", kube_namespace)
# if kube_namespace:
# msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 删除\n\n > **空间:** {0}\n\n > **服务:** {1.txt}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n > \n\n > **当前镜像:** {4}".format(
# namespace, name, alias_name, description, image)
# self.send_msg(msg_json=msg_json, token=kube_namespace.token)
msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 删除\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n > **当前镜像:** {2}".format(
namespace, name, description)
print(msg_json)
new_len_add_deploy = []
"修改"
if e.get("type") == "MODIFIED" and len(new_len_add_pod) == 6:
# print(new_len_add_deploy)
#
# kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1.txt).first()
# print("更新匹配", kube_namespace)
# if kube_namespace:
# msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 修改\n\n > **空间:** {0}\n\n > **服务:** {1.txt}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n > \n\n > **当前镜像:** {4}".format(
# namespace, name, alias_name, description, image)
# self.send_msg(msg_json=msg_json, token=kube_namespace.token)
msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 更新\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n > **当前镜像:** {2}".format(
namespace, name, description)
print(msg_json)
new_len_add_deploy = []
"添加"
if e.get("type") == "ADDED" and "ADDED" in new_len_add_pod:
# print("ADDED", new_len_add_deploy)
#
# kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1.txt).first()
# print("添加匹配", kube_namespace)
# if kube_namespace:
# msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 添加\n\n > **空间:** {0}\n\n > **服务:** {1.txt}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n > \n\n > **当前镜像:** {4}".format(
# namespace, name, alias_name, description, image)
# self.send_msg(msg_json=msg_json, token=kube_namespace.token)
msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 添加\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n > **当前镜像:** {2}".format(
namespace, name, description)
# print(msg_json)
new_len_add_deploy = []
def pod_exec(self,pod,container=""):
exec_command = [
"/bin/sh",
"-c",
'TERM=xterm-256color; export TERM; [ -x /bin/bash ] '
'&& ([ -x /usr/bin/script ] '
'&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash) '
'|| exec /bin/sh']
cont_stream = stream(self.Api_Instance.connect_get_namespaced_pod_exec,
name="pyrepair-f5fd48f6-nfh2k",
namespace="cmdb",
container=container,
command=exec_command,
stderr=True, stdin=True,
stdout=True, tty=True,
_preload_content=False
)
return cont_stream
def get_deployment_pod(self, RAND):
try:
r = self.Api_Instance.list_namespaced_pod(
namespace=self.namespace,
label_selector="app=%s" % RAND
)
return True, r
except Exception as e:
return False, 'Get Deployment: ' + str(e)
# 发送报警
def send_msg(self, msg=None, token="331c2210aca410f0b3f333113579db3caacadca491a2e6060e2e3bbf2aafcf82",
msg_json=None):
# print(msg_json)
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token={0}'.format(token)
headers = {'Content-Type': 'application/json;charset=utf-8'}
if msg:
msg_str = "## K8S-POD状态异常告警提示:\n\n 项目:{0}\t 服务名称:{1}\t 当前状态:{2}\t 是否准备好:{3}\t 是否启动:{4}\t 重启次数:{5}\t".format(
msg.get("project"), msg.get("service"), msg.get("status"), msg.get("reay"), msg.get("runing"),
msg.get("restart"))
else:
msg_str = msg_json
data = {
"msgtype": "markdown",
"markdown": {
"title": "### K8S-POD状态异常告警提示:",
"text": msg_str,
},
"at": {
"atMobiles": 'reminders',
"isAtAll": False,
},
}
r = requests.post(url=webhook_url, json=data, headers=headers)
return r.text
#销毁对象
def __del__(self):
pass
# print("销毁对象:{0}".format(self.source))
msg = {"msg": "服务器炸了"}
if __name__ == '__main__':
client = Kube(source="intranet-dev")
print(client.test(namespace="zoudao"))
作者这里面基本覆盖了大多调用api的示例
附带:导出execl
"""
公共写入execl方法
"""
from xlsxwriter import workbook
class Execl(object):
def format_data(self, data_info, kube=False):
'''
从全部数据中整理出需要的数据
'''
result = []
# 自定义组装数据
for line in data_info:
if kube:
# 健康检查
data = (
line.get('namespaces'),
line.get('name'),
line.get("server"),
line.get('porbe'),
line.get("limit"),
line.get("desc"),
line.get('url'),
line.get('kubetotalscore'),
line.get("cloud_type"),
)
# 空间
# data = (
# line.get("cloud_type"),
# line.get('desc'),
# line.get('namespaces'),
# line.get('head'),
# line.get('environment'),
# line.get('time'),
# line.get('middleware'),
# line.get('url'),
# )
else:
# data = (
# line.get("namespace"),
# line.get("ingress"),
# line.get("dns"),
# line.get("service"),
# line.get("port"),
# line.get("path")
# )
# print(line)
# result.append(data)
# break
# 域名导出
# data = (
# line.get("status"),
# line.get("domain"),
# line.get("type"),
# line.get("value"),
# line.get("remark"),
# line.get("etime"),
# )
# 数据库
data = (
line.get("CreateTime"),
line.get("desc"),
line.get("DBName"),
line.get("DB"),
line.get("MySQLTotalExecutionCounts"),
line.get("MySQLTotalExecutionTimes"),
line.get("MaxExecutionTime"),
line.get("MaxLockTime"),
line.get("ParseTotalRowCounts"),
line.get("ParseMaxRowCount"),
line.get("ReturnTotalRowCounts"),
line.get("ReturnMaxRowCount"),
line.get("SQLText")
)
# 空间内存
# data = (
# line.get('namespaces'),
# line.get('name'),
# line.get("memory"),
# line.get("url"),
# )
#代码统计
# data = (
# line.get('project_name'),
# line.get('author_name'),
# line.get("author_email"),
# line.get("code"),
# )
# 短信模板
# data = (
# line.get('template_code'),
# line.get('template_content'),
# line.get("template_name"),
# line.get('create_time'),
# )
result.append(data)
return result
def write_excel(self, file=None, data=None, title=None):
'''
1.txt、设置 Excel 样式
2、将数据写入到 Excel 中
'''
# 生成 Excel 文件
work = workbook.Workbook(file)
# 建立工作表,表名默认
worksheet = work.add_worksheet()
# 设置字体加粗、字体大小
format_title = work.add_format({'bold': True, 'font_size': 10})
# 设置水平对齐、垂直对齐
format_title.set_align('center')
format_title.set_align('vcenter')
format_body = work.add_format({'font_size': 10})
# 设置样式,行高、列宽
worksheet.set_row(0, 25)
worksheet.set_column(0, 0, 30)
worksheet.set_column(1, 1, 20)
worksheet.set_column(2, 3, 28)
worksheet.set_column(4, 5, 25)
worksheet.set_column(6, 6, 12)
worksheet.set_column(7, 9, 16)
worksheet.set_column(10, 11, 25)
"""
自定义表头
"""
# 定义表头
title = title
row = 0
col = 0
# 表头写入文件,引用样式
for item in title:
worksheet.write(row, col, item, format_title)
col += 1
for line in data:
row += 1
col = 0
for key in line:
worksheet.write(row, col, key, format_body)
col += 1
work.close()
return True