popyone
发布于 2023-06-30 / 160 阅读
0
0

分享一个以前用python写的,监控linux登录错误的脚本

脚本为什么要用python写呢?当初我也是想用shell直接处理,那样通用性更强,但是钉钉的消息推送需要加签,shell没找到如何加签,用python比较方便,后面就写了下面的Python脚本。
服务器上通过ansible部署Python虚拟化环境、启动脚本

下面的脚本不包含钉钉消息推送模块

  • 目录结构
├── common
│   ├── config.py
│   ├── dingding.py
│   └── logger.py
├── conf
│   └── config.conf
├── login.py
└── logs

login.py

# coding:utf-8
import datetime
import socket
import sys
import time

from common.config import ReadConfig
from common.dingding import Ding
from common.logger import Logs


class Auth:
    def __init__(self):
        self.now_time = None
        self.last_time = None
        self.nowtime_utc = None
        self.last_time_utc = None
        # 初始化类做为对象
        self.log = Logs.get_logger(file_path=f'./logs/auth_secure_{datetime.datetime.now().strftime("%Y-%m-%d")}.log',
                                     logger_name='auth', lvl=20)
        self.dd = Ding()
        config = ReadConfig()
        # 读取配置文件
        self.debian_auth = config.debian()
        self.centos_secure = config.centos()
        # 月历数字
        self.months = {'Jan': '1', 'Feb': '2', 'Mar': '3', 'Apr': '4', 'May': '5', 'Jun': '6',
                       'Jul': '7', 'Aug': '8', 'Sep': '9', 'Oct': '10', 'Nov': '11', 'Dec': '12'}

    # 计算时间区间
    def time_interval(self):
        self.now_time = datetime.datetime.now()  # 现在时间
        self.last_time = self.now_time + datetime.timedelta(minutes=-TIME_INTERVAL)  # minute分钟之前
        nowtime_str = self.now_time.strftime('%Y-%m-%d %H:%M:00')  # 格式化时间 2022-09-20 09:27:00
        last_time_str = self.last_time.strftime('%Y-%m-%d %H:%M:00')  # 格式化时间 2022-09-20 09:27:00
        self.now_time = datetime.datetime.strptime(nowtime_str, '%Y-%m-%d %H:%M:%S')
        self.last_time = datetime.datetime.strptime(last_time_str, '%Y-%m-%d %H:%M:%S')

    def extract_ip(self):
        st = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        try:
            st.connect(('10.255.255.255', 1))
            IP = st.getsockname()[0]
        except Exception:
            IP = '127.0.0.1'
        finally:
            st.close()
        return IP

    # 读取登录日志,查找包含'Failed password'的行,提取解析时间字符串,如果该时间在扫描时间内,将改行加入列表,最后返回给钉钉告警
    def auth_secure(self):
        file_lists = []
        # 读取登录日志(debian、centos)
        try:
            with open(self.debian_auth, 'r') as f:
                file = f.read()
        except Exception as ex:
            with open(self.centos_secure, 'r') as f:
                file = f.read()
        except:
            self.log.info(f'没有找到登录日志')
            sys.exit()
        '''
        Sep 18 01:57:26 ECS-master sshd[9748]: Failed password for invalid user user from 103.188.176.251 port 56642 ssh2
        按照行对日志进行切割,然后查找包含'Failed password'的行,对每行进行切割
        将时间'Sep 20 12:16:19'转成格式'2022-09-20 12:16:19',
        '''
        log_file = file.splitlines()
        for file_list in log_file:
            if file_list.find('Failed password') != -1:
                str_list = file_list.split()
                log_mon = self.months[str_list[0]]
                log_day = str_list[1]
                log_time = str_list[2]
                log_end = ' '.join(str_list[3:])
                log_year = datetime.datetime.now().strftime('%Y')
                # 按照指定格式将字符串转成时间
                err_time = datetime.datetime.strptime(f'{log_year}-{log_mon}-{log_day} {log_time}', '%Y-%m-%d %H:%M:%S')
                if err_time >= self.last_time:
                    # 拼接日志
                    file_list = f'{err_time} {log_end}'
                    file_lists.append(file_list)
        return file_lists

    # 钉钉告警
    def dingding(self):
        starttime = time.time()
        # 获取限定时间段内的错误登录总数
        file_lists = self.auth_secure()
        if not file_lists:
            self.log.info(f'{self.now_time} 到 {self.last_time}, 未检查到登录错误')
        # 推送钉钉告警
        if file_lists:
            x = f'{self.now_time} 到 {self.last_time},' \
                f'\n服务器 {self.extract_ip()} 在 {TIME_INTERVAL} 分钟以内,登录失败日志:'
            # 拼接告警信息:时间 数据库主机 语句执行时间 锁占用时间 语句扫描行数 发送到客户端行数
            for n in range(len(file_lists)):
                list_print = file_lists[n]
                x = f'{x}\n{n + 1}: {list_print}'
            self.log.info(x.replace('[', '').replace(']', ''))
            self.dd.dingding_request(x.replace('[', '').replace(']', ''))
        endtime = time.time()
        runtime = ("%.2f" % (endtime - starttime))
        return runtime


if __name__ == '__main__':
    # 查询告警时间段
    TIME_INTERVAL = 2
    # 初始化类做为对象
    auth = Auth()
    # 记录起始时间
    auth.time_interval()
    # 运行时间
    auth.log.info(f'运行时间: {auth.dingding()}秒')

common/config.py

# coding:utf-8
import configparser


class ReadConfig:
    def __init__(self):
        self.config = configparser.ConfigParser()
        configfile = './conf/config.conf'
        self.config.read(configfile, encoding='utf-8')

    def dingding(self):
        url = self.config.get('dingding', 'url')
        secret = self.config.get('dingding', 'secret')
        return url, secret

    def debian(self):
        #url1 = self.config.get('debian', 'auth_test')
        url1 = self.config.get('debian', 'auth')
        return url1

    def centos(self):
        url1 = self.config.get('centos', 'secure')
        return url1


if __name__ == '__main__':
    read = ReadConfig()
    print(read.dingding()[0])

common/logger.py

# -*- coding: utf-8 -*-
import logging
import logging.config
import logging.handlers


class Logs:
    @staticmethod
    def get_logger(file_path, logger_name=None, lvl=10):

        """
        file_path:日志文件路径名字
        lvl:日志记录等级:
            级别        数值
            CRITICAL    50
            ERROR       40
            WARNING     30
            INFO        20
            DEBUG       10
            NOTSET      0
        interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
                S 秒 、M 分、H 小时、D 天、W 每星期(interval==0时代表星期一)、midnight 每天凌晨
        logger_name:getLogger() 返回对具有指定名称的记录器实例的引用(如果已提供),或者如果没有则返回 root 。名称是以句点分隔的层次结构。
            多次调用 getLogger() 具有相同的名称将返回对同一记录器对象的引用。在分层列表中较低的记录器是列表中较高的记录器的子项。
            例如,给定一个名为 foo 的记录器,名称为 foo.bar 、 foo.bar.baz 和 foo.bam 的记录器都是 foo 子项。
        """
        # 创建logger
        logger = logging.getLogger(logger_name)
        logger.setLevel(level=lvl)

        # 创建日志输出格式
        formatter = logging.Formatter('%(asctime)s %(name)-10s %(levelname)-8s %(processName)-10s %(message)s')

        # 创建日志处理器,标准输出和文件存储
        ch = logging.StreamHandler()  # 标准输出
        ch.setLevel(level=10)  # 日志输出等级

        fh = logging.handlers.TimedRotatingFileHandler(filename=file_path, interval=1, backupCount=3, when='D',
                                                       encoding='utf8')  # 文件滚动输出
        fh.setLevel(level=10)  # 日志输出等级

        # 加载输出格式到处理器
        ch.setFormatter(formatter)
        fh.setFormatter(formatter)

        # 加载处理器到logger
        logger.addHandler(ch)
        logger.addHandler(fh)
        return logging.getLogger(logger_name)


if __name__ == '__main__':
    file_path = './logs/log2.log'
    log = Logs.get_logger(file_path=file_path, logger_name='test', lvl=10)
    print(log)
    log.debug('debug message')
    log.info('info message')
    log.warning('warn message')
    log.error('error message')
    log.critical('critical message')

conf/config.conf

[dingding]
url = https://oapi.dingtalk.com/robot/send?access_token=...
secret = ...

[debian]
auth = /var/log/auth.log

[centos]
secure = /var/log/secure

评论