脚本为什么要用python写呢?当初我也是想用shell直接处理,那样通用性更强,但是钉钉的消息推送需要加签,shell没找到如何加签,用python比较方便,后面就写了下面的Python脚本。
服务器上通过ansible部署Python虚拟化环境、启动脚本
下面的脚本不包含钉钉消息推送模块
- 目录结构
├── common
│ ├── config.py
│ ├── dingding.py
│ └── logger.py
├── conf
│ └── config.conf
├── login.py
└── logs
login.py
# coding:utf-8
import datetime
import socket
import sys
import time
from common.config import ReadConfig
from common.dingding import Ding
from common.logger import Logs
class Auth:
def __init__(self):
self.now_time = None
self.last_time = None
self.nowtime_utc = None
self.last_time_utc = None
# 初始化类做为对象
self.log = Logs.get_logger(file_path=f'./logs/auth_secure_{datetime.datetime.now().strftime("%Y-%m-%d")}.log',
logger_name='auth', lvl=20)
self.dd = Ding()
config = ReadConfig()
# 读取配置文件
self.debian_auth = config.debian()
self.centos_secure = config.centos()
# 月历数字
self.months = {'Jan': '1', 'Feb': '2', 'Mar': '3', 'Apr': '4', 'May': '5', 'Jun': '6',
'Jul': '7', 'Aug': '8', 'Sep': '9', 'Oct': '10', 'Nov': '11', 'Dec': '12'}
# 计算时间区间
def time_interval(self):
self.now_time = datetime.datetime.now() # 现在时间
self.last_time = self.now_time + datetime.timedelta(minutes=-TIME_INTERVAL) # minute分钟之前
nowtime_str = self.now_time.strftime('%Y-%m-%d %H:%M:00') # 格式化时间 2022-09-20 09:27:00
last_time_str = self.last_time.strftime('%Y-%m-%d %H:%M:00') # 格式化时间 2022-09-20 09:27:00
self.now_time = datetime.datetime.strptime(nowtime_str, '%Y-%m-%d %H:%M:%S')
self.last_time = datetime.datetime.strptime(last_time_str, '%Y-%m-%d %H:%M:%S')
def extract_ip(self):
st = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
st.connect(('10.255.255.255', 1))
IP = st.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
st.close()
return IP
# 读取登录日志,查找包含'Failed password'的行,提取解析时间字符串,如果该时间在扫描时间内,将改行加入列表,最后返回给钉钉告警
def auth_secure(self):
file_lists = []
# 读取登录日志(debian、centos)
try:
with open(self.debian_auth, 'r') as f:
file = f.read()
except Exception as ex:
with open(self.centos_secure, 'r') as f:
file = f.read()
except:
self.log.info(f'没有找到登录日志')
sys.exit()
'''
Sep 18 01:57:26 ECS-master sshd[9748]: Failed password for invalid user user from 103.188.176.251 port 56642 ssh2
按照行对日志进行切割,然后查找包含'Failed password'的行,对每行进行切割
将时间'Sep 20 12:16:19'转成格式'2022-09-20 12:16:19',
'''
log_file = file.splitlines()
for file_list in log_file:
if file_list.find('Failed password') != -1:
str_list = file_list.split()
log_mon = self.months[str_list[0]]
log_day = str_list[1]
log_time = str_list[2]
log_end = ' '.join(str_list[3:])
log_year = datetime.datetime.now().strftime('%Y')
# 按照指定格式将字符串转成时间
err_time = datetime.datetime.strptime(f'{log_year}-{log_mon}-{log_day} {log_time}', '%Y-%m-%d %H:%M:%S')
if err_time >= self.last_time:
# 拼接日志
file_list = f'{err_time} {log_end}'
file_lists.append(file_list)
return file_lists
# 钉钉告警
def dingding(self):
starttime = time.time()
# 获取限定时间段内的错误登录总数
file_lists = self.auth_secure()
if not file_lists:
self.log.info(f'{self.now_time} 到 {self.last_time}, 未检查到登录错误')
# 推送钉钉告警
if file_lists:
x = f'{self.now_time} 到 {self.last_time},' \
f'\n服务器 {self.extract_ip()} 在 {TIME_INTERVAL} 分钟以内,登录失败日志:'
# 拼接告警信息:时间 数据库主机 语句执行时间 锁占用时间 语句扫描行数 发送到客户端行数
for n in range(len(file_lists)):
list_print = file_lists[n]
x = f'{x}\n{n + 1}: {list_print}'
self.log.info(x.replace('[', '').replace(']', ''))
self.dd.dingding_request(x.replace('[', '').replace(']', ''))
endtime = time.time()
runtime = ("%.2f" % (endtime - starttime))
return runtime
if __name__ == '__main__':
# 查询告警时间段
TIME_INTERVAL = 2
# 初始化类做为对象
auth = Auth()
# 记录起始时间
auth.time_interval()
# 运行时间
auth.log.info(f'运行时间: {auth.dingding()}秒')
common/config.py
# coding:utf-8
import configparser
class ReadConfig:
def __init__(self):
self.config = configparser.ConfigParser()
configfile = './conf/config.conf'
self.config.read(configfile, encoding='utf-8')
def dingding(self):
url = self.config.get('dingding', 'url')
secret = self.config.get('dingding', 'secret')
return url, secret
def debian(self):
#url1 = self.config.get('debian', 'auth_test')
url1 = self.config.get('debian', 'auth')
return url1
def centos(self):
url1 = self.config.get('centos', 'secure')
return url1
if __name__ == '__main__':
read = ReadConfig()
print(read.dingding()[0])
common/logger.py
# -*- coding: utf-8 -*-
import logging
import logging.config
import logging.handlers
class Logs:
@staticmethod
def get_logger(file_path, logger_name=None, lvl=10):
"""
file_path:日志文件路径名字
lvl:日志记录等级:
级别 数值
CRITICAL 50
ERROR 40
WARNING 30
INFO 20
DEBUG 10
NOTSET 0
interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
S 秒 、M 分、H 小时、D 天、W 每星期(interval==0时代表星期一)、midnight 每天凌晨
logger_name:getLogger() 返回对具有指定名称的记录器实例的引用(如果已提供),或者如果没有则返回 root 。名称是以句点分隔的层次结构。
多次调用 getLogger() 具有相同的名称将返回对同一记录器对象的引用。在分层列表中较低的记录器是列表中较高的记录器的子项。
例如,给定一个名为 foo 的记录器,名称为 foo.bar 、 foo.bar.baz 和 foo.bam 的记录器都是 foo 子项。
"""
# 创建logger
logger = logging.getLogger(logger_name)
logger.setLevel(level=lvl)
# 创建日志输出格式
formatter = logging.Formatter('%(asctime)s %(name)-10s %(levelname)-8s %(processName)-10s %(message)s')
# 创建日志处理器,标准输出和文件存储
ch = logging.StreamHandler() # 标准输出
ch.setLevel(level=10) # 日志输出等级
fh = logging.handlers.TimedRotatingFileHandler(filename=file_path, interval=1, backupCount=3, when='D',
encoding='utf8') # 文件滚动输出
fh.setLevel(level=10) # 日志输出等级
# 加载输出格式到处理器
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# 加载处理器到logger
logger.addHandler(ch)
logger.addHandler(fh)
return logging.getLogger(logger_name)
if __name__ == '__main__':
file_path = './logs/log2.log'
log = Logs.get_logger(file_path=file_path, logger_name='test', lvl=10)
print(log)
log.debug('debug message')
log.info('info message')
log.warning('warn message')
log.error('error message')
log.critical('critical message')
conf/config.conf
[dingding]
url = https://oapi.dingtalk.com/robot/send?access_token=...
secret = ...
[debian]
auth = /var/log/auth.log
[centos]
secure = /var/log/secure