schedule.py 6.1 KB
# coding=utf-8
# author:        resioR
#createtime:    2021/12/1
#email:         qianyingz@chinadci.com

# import schedule
from flask import json
from .models import MonitorHost, MonitorInfo
from datetime import datetime, timedelta
import math
import uuid
import schedule
from app.util.component.RunContinuous import run_continuously
from app.util.component.PGUtil import PGUtil
import configure
from app.util.component.StructurePrint import StructurePrint
import traceback
import requests

min_size = 60
size = 60


def pull_metric():
    try:
        # 获获取服务器信息
        sys_session = PGUtil.get_db_session(
            configure.SQLALCHEMY_DATABASE_URI)
        sys_ds = PGUtil.open_pg_data_source(
            0, configure.SQLALCHEMY_DATABASE_URI)

        # 拉取服务器信息
        hosts = sys_session.query(
            MonitorHost.host)
        for host in hosts:
            try:
                host_name = host.host
                request_uri = "http://{}".format(host_name)
                response = requests.request("get", request_uri)
                if response.status_code == 200:
                    text = response.text
                    data = json.loads(text)
                    len_metric = len(data)
                    if len_metric > min_size:
                        metric_data = data[len_metric-min_size:len_metric]
                    else:
                        continue

                    type_list = ["cpu_per", "memory_per", "disk_per",
                                 "disk_read", "disk_write", "disk_read_count",
                                 "disk_write_count", "network_send", "network_recv"]
                    for type in type_list:
                        sample_data = get_sample_data(
                            metric_data, type, host_name)
                        sys_session.add_all(sample_data)
                    sys_session.commit()

            except Exception as e:
                StructurePrint().print(e.__str__()+":" + traceback.format_exc(), "error")
        # 获取数据并汇聚为1min的数据入库
        # 结束
    except Exception as e:

        StructurePrint().print(e.__str__()+":" + traceback.format_exc(), "error")
    finally:
        if sys_session:
            try:
                sys_session.close()
            except Exception as e:
                StructurePrint().print(e.__str__()+":" + traceback.format_exc(), "error")
        if sys_ds:
            try:
                sys_ds.Destroy()
            except Exception as e:
                StructurePrint().print(e.__str__()+":" + traceback.format_exc(), "error")


#每天清理7天前的数据
def monitor_vacuuming():
    try:
        current_date = datetime.now()
        last_7_date = current_date+timedelta(days=-7)
        current_hour = current_date.hour
        if current_hour == 0:
            sys_session = PGUtil.get_db_session(
                configure.SQLALCHEMY_DATABASE_URI)
            sys_ds = PGUtil.open_pg_data_source(
                0, configure.SQLALCHEMY_DATABASE_URI)
            info_orm = sys_session.query(MonitorInfo).filter(
                MonitorInfo.time_stamp < last_7_date)
            count = info_orm.count()
            info_orm.delete()
            sys_session.commit()
            StructurePrint().print("MonitorInfo清理%d条监控数据" % count, "info")
    except Exception as e:

        StructurePrint().print(e.__str__()+":" + traceback.format_exc(), "error")
    finally:
        if sys_session:
            try:
                sys_session.close()
            except Exception as e:
                StructurePrint().print(e.__str__()+":" + traceback.format_exc(), "error")
        if sys_ds:
            try:
                sys_ds.Destroy()
            except Exception as e:
                StructurePrint().print(e.__str__()+":" + traceback.format_exc(), "error")


def start_schedule():

    try:
        StructurePrint().print("start_schedule")
        # 1分钟巡检一次
        schedule.every(1).minutes.do(pull_metric)
        # 每天00:00清理数据
        schedule.every().day.at('00:00').do(monitor_vacuuming)
        stop_run_continuously = run_continuously()
    except Exception as e:
        StructurePrint().print("start_schedule:"+e.__str__() +
                               ":" + traceback.format_exc(), "error")

    # Do some other things...
    # # Stop the background thread
    # time.sleep(10)
    # stop_run_continuously.set()


def get_sample_data(orginal, name, host):
    res = []
    size = len(orginal)
    # orginal_stamp = {'head': 1, 'tail': size}
    if size > 1:
        stamp = {
            # 'P0': 1,
            'P50': math.floor(0.5*size),
            # 'P90': math.floor(0.9*size),
            'P95': math.floor(0.95*size),
            #  'P99': math.floor(0.99*size),
            'P100': size}
    elif size == 1:
        stamp = {
            # 'P0': 1,
            'P50': size,
            # 'P90': size,
            'P95': size,
            #  'P99': size,
            'P100': size}
    else:
        return res

    # for key in dict.keys(orginal_stamp):
    #     cur_data = orginal[orginal_stamp[key]-1]
    #     info = get_data(key, host, name, cur_data)
    #     res.append(info)

    data = sorted(orginal, key=lambda x: x[name])
    for key in dict.keys(stamp):
        cur_data = data[stamp[key]-1]
        info = get_data(key, host, name, cur_data)
        res.append(info)

    return res


def get_data(stamp, host, metrics_name, cur_data):
    time_stamp = datetime.strptime(
        cur_data['timestamp'], "%Y-%m-%d %H:%M:%S")
    date_stamp = time_stamp.strftime("%Y-%m-%d")
    guid = uuid.uuid1().__str__()
    value = cur_data[metrics_name]
    info = MonitorInfo(guid=guid,
                       server=host,
                       date_stamp=date_stamp,
                       time_stamp=time_stamp,
                       stamp=stamp,
                       value=value,
                       metrics=metrics_name)
    return info