ModelVisitor.py 3.4 KB
# coding=utf-8
#author:        4N
#createtime:    2021/5/17
#email:         nheweijun@sina.com
from .PGUtil import PGUtil
from app.models import DES

import datetime
from flask_sqlalchemy import SQLAlchemy,Model



class ModelVisitor:
    @classmethod
    def to_json(cls,model):
        pass

    @classmethod
    def object_to_json(cls,obj):
        info={}
        if obj:
            info = cls.formatter(obj.__dict__)
        return info

    @classmethod
    def formatter(cls,kvdict:dict):
        def generator():
            for k,value in kvdict.items():
                if isinstance(value, Model) or k.__eq__('_sa_instance_state'):
                    continue
                if isinstance(value, datetime.datetime):
                    value = value.strftime('%Y-%m-%d %H:%M:%S')
                elif isinstance(value, datetime.time) or isinstance(value, datetime.date):
                    value = value.strftime('%H:%M:%S')
                yield k,value
        return dict(generator())


    @classmethod
    def objects_to_jsonarray(cls,objects):
        result = []
        for o in objects:
            result.append(cls.object_to_json(o))
        return result



    @classmethod
    def table_to_json(cls,table):
        info = {}
        info["catalog_name"] = table.relate_catalog.name if table.relate_catalog else None
        info["database_alias"] = table.relate_database.alias if table.relate_database else None
        if table:
            info.update(cls.formatter(table.__dict__))
        return info


    @classmethod
    def task_to_json(cls,task):
        info = {}
        info["catalog_name"] = task.relate_catalog.name if task.relate_catalog else None
        info["database_alias"] = task.relate_database.alias if task.relate_database else None
        if task.update_time:
            dd: datetime.timedelta = task.update_time - task.create_time
            info["spend"] = float("{}.{}".format(dd.seconds, str(dd.microseconds)[:3]))
        if object:
            info.update(cls.formatter(task.__dict__))


        return info

    # @classmethod
    # def task_to_json(cls, task):
    #     info = {}
    #     info["catalog_name"] = task.relate_catalog.name if task.relate_catalog else None
    #     info["database_alias"] = task.relate_database.alias if task.relate_database else None
    #     info["consume_time"] = None
    #     if task.update_time:
    #
    #         consume_time = int(task.update_time.timestamp() - task.create_time.timestamp())
    #         minute = int(consume_time / 60)
    #         sec = 1 if (int(consume_time % 60) == 0 and minute == 0) else int(consume_time % 60)
    #         if minute > 0:
    #             info["consume_time"] = "{}分{}秒".format(minute, sec)
    #         else:
    #             info["consume_time"] = "{}秒".format(sec)
    #     if object:
    #         info.update(cls.formatter(task.__dict__))
    #
    #     return info

    @classmethod
    def database_to_json(cls,database):
        info = {}
        if object:
            info.update(cls.formatter(database.__dict__))
            user, password, host, port, db= PGUtil.get_info_from_sqlachemy_uri(DES.decode(info["sqlalchemy_uri"]))
            info["user"] = user
            info["host"] = host
            info["port"] = port
            info["database"] = db
            del info["sqlalchemy_uri"]
        return info