查看原文
其他

太强了!利用 Python 写了一个监控服务器资源利用率的脚本!

IT服务圈儿 2023-02-06

The following article is from 杰哥的IT之旅 Author JackTian

来源丨经授权转自 杰哥的IT之旅(ID:Jake_Internet)

作者丨JackTian


部署完 自动巡检多个接口地址是否正常并按 crontab 定时任务通过企业微信机器人以文本的形式发送到告警群的脚本 后,又研究了一个脚本,其主要目的是:基于 Python 编程语言来监控服务器的 CPU、内存、/目录、/appslog、/bigdata目录使用率以及网卡接收和发送情况。

该脚本部署场景分为:服务端和客户端。

  • 服务端:一台固定 IP 地址的服务器

  • 客户端:N 台指定固定 IP 地址的服务器

服务端脚本:

# -*- coding:utf-8 -*-
import io
import os
import sys
import logging
from logging import handlers
import MySQLdb
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr
import requests, json
import datetime
import time
import shutil,re
import uuid
import socket
import SocketServer

if sys.getdefaultencoding() != 'utf-8':
    reload(sys)
    sys.setdefaultencoding('utf-8')

class Logger(object):
    level_relations = {
        'debug': logging.DEBUG,
        'info': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'crit': logging.CRITICAL
    }  # 日志级别关系映射

    def __init__(self,logname, level='info', when='D', backCount=10, fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
        CURRENT_DIR = os.path.dirname(__file__)
        LOG_FILE = os.path.abspath(os.path.join(CURRENT_DIR, logname))
        self.logger = logging.getLogger(LOG_FILE)
        format_str = logging.Formatter(fmt)  # 设置日志格式
        self.logger.setLevel(self.level_relations.get(level))  # 设置日志级别
        sh = logging.StreamHandler()  # 往屏幕上输出
        sh.setFormatter(format_str)  # 设置屏幕上显示的格式
        th = handlers.TimedRotatingFileHandler(
            filename=LOG_FILE, when=when, backupCount=backCount, encoding='utf-8')  # 往文件里写入#指定间隔时间自动生成文件的处理器
        #实例化TimedRotatingFileHandler
        #interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
        # S 秒
        # M 分
        # H 小时、
        # D 天、
        # W 每星期(interval==0时代表星期一)
        # midnight 每天凌晨
        th.setFormatter(format_str)  # 设置文件里写入的格式
        #self.logger.addHandler(sh)  # 把对象加到logger里
        if not self.logger.handlers:
            self.logger.addHandler(th)

class Analysis(object):    
    def buildMsg(self,msg):
        print('构造预警信息'+str(msg))
        icount = 0
        if(float(msg[4]) > 90):
            icount+=1
            CPU ="> CPU预警:使用率高于90%,使用"+str(msg[4])+"% \n"
        else:
            CPU=""
        if(float(msg[5]) > 90):
            icount+=1
            mem ="> 内存预警:使用率高于90%,使用"+str(msg[5])+"% \n"
        else:
            mem=""
        if(float(msg[6]) > 85):
            icount+=1
            disk_root ="> 磁盘根目录预警:使用率高于85%,使用"+str(msg[6])+"% \n"
        else:
            disk_root=""
        if(float(msg[7]) > 85):
            icount+=1
            disk_appslog ="> 业务磁盘预警:使用率高于85%,使用"+str(msg[7])+"% \n"
        else:
            disk_appslog=""
        if(float(msg[8]) > 3000):
            icount+=1
            networkRecv ="> 网卡10秒内接收数据预警:接收数据大于4000M,接收"+str(msg[8])+"M \n"
        else:
            networkRecv=""
        if(float(msg[9]) > 3000):
            icount+=1
            networkSend ="> 网卡10秒内发送数据预警:发送数据大于4000M,发送"+str(msg[9])+"M \n"
        else:
            networkSend=""
        s= alarmName+"\n"+msg[2]+":" +msg[3]+"\n" +CPU+mem+disk_root+disk_appslog+networkRecv+networkSend
        #print(s)
        log.logger.info('预警信息:'+s)

        #发送预警
        if(icount>0):
            #发送预警邮件、企业微信
            if mailconf ==1:
                self.send_mail(s,msg[3])
            if wxconf ==1:
                self.send_WX(s)

    def send_mail(self,content,ip):
        smtpserver = 'smtp.163.com'
        mail_user="xxx@163.com"
        mail_pass="passwordxxx"
        mail_res=["xxx@163.com","xxx@163.com","xxx@163.com","xxx@163.com","xxx@163.com","xxx@163.com","xxx@163.com"]
        sub = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        msg = MIMEText( sub + "\n"+content, _subtype='plain',_charset='utf-8')
        msg['Subject'] = Header(alarmName+':'+ip, 'utf-8' )
        #msg['From'] = Header("系统预警", 'utf-8')
        msg['From'] = formataddr(pair=('设备预警', mail_user))
        msg['To'] = ', '.join(mail_res)
        smtp  = smtplib.SMTP()
        smtp.connect(smtpserver)
        smtp.starttls()
        smtp.login(mail_user, mail_pass)
        smtp.sendmail(mail_user, mail_res, msg.as_string())
        smtp.quit()

    def send_WX(self,msg):
        headers = {"Content-Type""text/plain"}
        #s="服务器预警:{},验证码{}".format({str(printCode)},{str(verifyCode)})
        data = {
            "msgtype""text",
            "text": {
                "content":  msg,
            }
        }
        r = requests.post(
            url='企业微信机器人地址(需要根据实际机器人地址配置)',
            headers=headers, json=data)
        print(r.text)

    def Write_to_Mysql_alarm(self,valuelist):
        #log = Logger('all.log',level='debug')
        #业务监控:id,project,tpye,exceptiontype,details(xx,大数据,无es进程/es集群不健康,)
        try:
            db = MySQLdb.connect("xxx""xxx""xxx""xxx", charset='utf8' )
            log.logger.info("数据库连接成功"
        except:
            log.logger.info("数据库连接失败"
        # 创建游标
        cursor = db.cursor()
        uid = uuid.uuid1()
        result=0
        sql =''
        try:
            sql = 'insert into test_serverresourcealarm values (%s, %s, %s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s,%s)'
            #val = (str(uid),valuelist[1], valuelist[2], valuelist[3], valuelist[4],valuelist[5], valuelist[6],'',valuelist[7], valuelist[8],valuelist[9],valuelist[10],'','','')
            val = (str(uid),valuelist[2], valuelist[3], valuelist[4], valuelist[5],valuelist[6], valuelist[7],'',valuelist[8], valuelist[9],valuelist[10],'','','')            
            cursor.execute(sql,val)
            db.commit()
            log.logger.error('设备预警信息已入库!')
            #发送企业微信预警信息
            self.buildMsg(valuelist)
        except:
            into = sys.exc_info()
            #log.logger.error('插入数据失败!')
            log.logger.error('设备预警信息入库失败!'+str(into))
            result=0
            #str = self.obj_to_string(sys.exc_info(), self)
            print('error',into)

        # 关闭游标
        db.close()
        return result

    def Write_to_Mysql_temp(self,valuelist):
        # 打开数据库连接
        #db = MySQLdb.connect("xxx", "xxx", "xxx", "xxx", charset='utf8' )
        try:
            db = MySQLdb.connect("xxx""xxx""xxx""xxx", charset='utf8' )
            log.logger.info("数据库连接成功"
        except:
            log.logger.info("数据库连接失败"
        # 使用cursor()方法获取操作游标 
        cursor = db.cursor()
        uid = uuid.uuid1()
        result=0
        try:
            sql = 'insert into test_serverresourcetemp values (%s, %s, %s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s,%s)'
            val = (str(uid),valuelist[2], valuelist[3], valuelist[4], valuelist[5],valuelist[6], valuelist[7],'',valuelist[8], valuelist[9],valuelist[10],'','','')
            cursor.execute(sql,val)
            db.commit()
            result=1
            log.logger.info("临时表sql执行状态:"+str(result))       
        except:
            into = sys.exc_info()
            result = 0
            print(into)
            log.logger.info('临时表sql执行失败: '+str(into))
        # 关闭数据库连接

        db.close()
        return result

class MyServer(SocketServer.BaseRequestHandler):
    def handle(self):
        conn = self.request
        log.logger.info('... connected from {}'.format(self.client_address))
        #print('1多线程监控')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            #print(data)
            if len(data)>10:
                log.logger.info('接收到的客户端数据:'+data)
                conn.sendall('1')
                sub = data.strip('\n')
                str = sub.split('|'
                #print(str)
                a = Analysis()
                #报警信息入库,#将监控数据写入临时表中test_serverresourcetemp_lty
                result = a.Write_to_Mysql_temp(str)
                if(float(str[4])>90 or float(str[5])>90 or float(str[6])>85 or float(str[7])>85 or float(str[8])>3000 or float(str[9])>3000):
                    result1 = a.Write_to_Mysql_alarm(str)
                #result = 1
                if result == 0:
                    log.logger.info('预警信息入库失败!')
                else:
                    log.logger.info('预警信息入库完成!')
                #发送预警邮件、企业微信
                #a.buildMsg(str)

            if data =='exit':
                log.logger.info('... connecte end ...')
                Flag = False

if __name__ == "__main__":
    #每分钟执行一次
    log = Logger('socketservice.logs')
    log.logger.info('----start----')
    alarmName ='服务器资源预警'
    #是否开启邮件报警,1为开启,0为关闭
    mailconf =1
    #是否开启企业微信报警,1为开启,0为关闭
    wxconf =0
    server = SocketServer.ThreadingTCPServer(('IP',port),MyServer)
    server.serve_forever()

客户端脚本:

# -*- coding:utf-8 -*-
import io
import os
import sys
import time
import datetime
import socket
import commands
import logging
from logging import handlers
import psutil
import struct
import fcntl

if sys.getdefaultencoding() != 'utf-8':
    reload(sys)
    sys.setdefaultencoding('utf-8')

class Logger(object):
    level_relations = {
        'debug': logging.DEBUG, 
        'info': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'crit': logging.CRITICAL
    }  # 日志级别关系映射

    def __init__(self,logname, level='info', when='D', backCount=10, fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
        CURRENT_DIR = os.path.dirname(__file__)
        LOG_FILE = os.path.abspath(os.path.join(CURRENT_DIR, logname))
        self.logger = logging.getLogger(LOG_FILE)
        format_str = logging.Formatter(fmt)  # 设置日志格式
        self.logger.setLevel(self.level_relations.get(level))  # 设置日志级别
        sh = logging.StreamHandler()  # 往屏幕上输出
        sh.setFormatter(format_str)  # 设置屏幕上显示的格式
        th = handlers.TimedRotatingFileHandler(
            filename=LOG_FILE, when=when, backupCount=backCount, encoding='utf-8')  # 往文件里写入#指定间隔时间自动生成文件的处理器
        #实例化TimedRotatingFileHandler
        #interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
        # S 秒
        # M 分
        # H 小时、
        # D 天、
        # W 每星期(interval==0时代表星期一)
        # midnight 每天凌晨
        th.setFormatter(format_str)  # 设置文件里写入的格式
        #self.logger.addHandler(sh)  # 把对象加到logger里
        if not self.logger.handlers:
            self.logger.addHandler(th)

class clientMonitor(object):
     #获取指定网卡ip
    def getIpAddress(self,dev):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)      
        a = s.fileno()      
        b = 0x8915      
        c = struct.pack('256s', dev[:15])      
        res = fcntl.ioctl(a, b, c)[20:24]      
        return socket.inet_ntoa(res)

    #获取网络的使用情况 ,取的是eth0的发送和收取的总字节数
    #readNetInfo('eth0')       
    def readNetInfo(self,dev):         
        f = open('/proc/net/dev')         
        lines = f.readlines()         
        f.close()         
        res = {'in':0'out':0}         
        for line in lines:         
            if line.lstrip().startswith(dev):         
                # for centos         
                line = line.replace(':'' ')         
                items = line.split()         
                res['in'] = long(items[1])#/1024        
                res['out'] = long(items[len(items)/2 + 1])#/1024         
        return res

    def readNetInfo_new(self,dev):
        res = {'in':0'out':0}   
        res['in'] = psutil.net_io_counters(pernic=True).get(dev).bytes_recv
        res['out'] = psutil.net_io_counters(pernic=True).get(dev).bytes_sent
        return res

    #磁盘使用率,path:磁盘路径
    def disk_stat(self,path):    
        hd={}
        disk = os.statvfs(path)
        percent = (disk.f_blocks - disk.f_bfree) * 100 / (disk.f_blocks -disk.f_bfree + disk.f_bavail) + 1
        return percent   

    def net_loop(self,dev):
        #end = {'in':0, 'out':0} 
        res =  self.readNetInfo_new(dev)
        #推迟执行的秒数
        time.sleep(2)
        #new_recv,new_send = get_net_data()
        new_res =  self.readNetInfo_new(dev)
        recv_data = (new_res['in']-res['in'])/1024/1024
        send_data = (new_res['out']-res['out'])/1024/1024
        print ("recv_data: %s M, send_data: %s M"%(recv_data, send_data))
        return recv_data,send_data

    def processcheck(self,cmd):
        #cmd='ps -aux | sort -k3nr | head -1'
        (status,output) = commands.getstatusoutput(cmd)
        #Pid= output.split(' ')[6]
        log.logger.info('资源占用top:\n'+output)

#查看占用内存最高的进程的PID
#ps aux|head -1;ps aux|grep -v PID|sort -rn -k +4|head
#ps -aux | sort -k4nr | head -1 ,-k3 cpu占用最高,-k4 内存占用最高
#root      14668  1.9  0.0  90504  3256 ?        Ss   4月23 2811:48 /sbin/rngd -f
#索引:-k3 b.split(' ')[6] 28进程路径(/sbin/rngd)
#索引:-k4 b.split(' ')[4]
if __name__ == "__main__":
    #10分钟执行一次,数据上报到服务端,服务端负责报警
    #需要修改的参数:custom,deviceType,netName
    custom ='test'
    deviceType ='客户端服务器'
    #网卡名称
    netName = 'ens3f0'

    log = Logger('socketclient.logs')
    log.logger.info("----start----")
    info=clientMonitor()

    locatIp = info.getIpAddress(netName)
    recv_data,send_data = info.net_loop(netName)
    cpuinfo = psutil.cpu_percent(1)
    #svmem(total=67268558848, available=32022245376, percent=52.4, used=34601009152, free=29655695360, active=17274105856, inactive=2927910912, buffers=10100736, cached=3001753600, shared=298610688, slab=11243315200)
    svmem = psutil.virtual_memory()
    meminfo =  svmem[2]
    disk_root = info.disk_stat('/')
    disk_appslog = info.disk_stat('/appslog')
    disk_bigdata = info.disk_stat('/bigdata')
    #如果CPU或内存的占用率大于80%,将占用CPU或内存资源最多的进程找出来
    issendmsg =1
    if(cpuinfo>80 or meminfo>80 or disk_root>80 or disk_appslog>80 or disk_bigdata>80 or recv_data>3000 or send_data>3000):
        #发送预警邮件
        sendmsg=locatIp +' 服务器资源占用高!请检查!\n'
        sendmsg += "CPU占用:"+str(cpuinfo)+'\n'
        sendmsg += "内存占用:"+str(meminfo)+'\n'
        sendmsg += "/目录占用:"+str(disk_root)+'\n'
        sendmsg += "/appslog目录占用:"+str(disk_appslog)+'\n'
        sendmsg += "/bigdata目录占用:"+str(disk_bigdata)+'\n'
        sendmsg += "网卡接收流量:"+str(recv_data)+'M,发送流量 '+str(send_data)+'M \n'
        #sendmsg += "网卡10秒发送流量:"+str(send_data)+'\n'
        log.logger.info(sendmsg)
        if cpuinfo>80 :
            info.processcheck('ps -aux | sort -k3nr | head -10')
        if meminfo>80 :
            info.processcheck('ps -aux | sort -k4nr | head -10')
        issendmsg = 1
    else:
        #log.logger.info(locatIp+" 正常")
        log.logger.info("CPU使用率:"+str(cpuinfo))
        log.logger.info("内存使用率:"+str(meminfo))
        log.logger.info("/目录使用率:"+str(disk_root))
        log.logger.info("/appslog使用率:"+str(disk_appslog))
        log.logger.info("/bigdata使用率:"+str(disk_bigdata))
        log.logger.info("网卡接收和发送情况:接收"+str(recv_data) +"M, 发送 "+str(send_data)+"M")

    #Id,custom,deviceType,IP,cpu,mem,disk_root,disk_appslog,disk_bigdata,networkRecv,networkSend,uploadTime,temp2,temp3,temp4
    msg = '1'+'|'+custom+'|'+deviceType+'|'+locatIp+'|'+str(cpuinfo)+'|'+str(meminfo)+'|'+str(disk_root)+'|'+str(disk_appslog)+'|'+str(disk_bigdata)+'|'+str(recv_data)+'|'+str(send_data)+'|'+time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

    if issendmsg ==1:
        ip_port = ('IP',port)
        sk = socket.socket()
        sk.connect(ip_port)
        sk.sendall(msg)
        data = sk.recv(1024)
        if data=='1':
            log.logger.info("本地预警信息传输成功!")
        else:
            log.logger.info("本地预警信息传输失败!")
        sk.sendall('exit')
        sk.close()

服务端和客户端部署好后,执行脚本过程中如遇到缺少 psutil 依赖包的话,则需要进行安装。

因为我这有准备好的 psutil_rpm 包,可执行命令:rpm -ivh python2-psutil-5.6.7-1.el7.x86_64.rpm

psutil_rpm 包获取方式:
链接:https://pan.baidu.com/s/19iMY8b9nVITtgBq8F3Um_A
提取码:PsRm

写个定时任务,以每 2 小时执行一次该脚本。

crontab -e

*/2 * * * cd /opt/jiaoben;python test_socket_resourcemonitor.py

客户端打印日志效果:

tail -200f socketclient.logs


1、高并发系统设计的15个锦囊

2、到底什么样的 REST 才是最佳 REST?

3、SQLSERVER 居然也能调 C# 代码 ?

4、线程池夺命10连问

5、写了个自动巡检多个接口地址的脚本!

点分享

点点赞

点在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存