test: read syslog

This commit is contained in:
Zxilly 2024-11-26 16:51:46 +08:00
parent 7bd875827d
commit c8191f5d7e
No known key found for this signature in database
GPG Key ID: 47AB1DEC841BC6A2
4 changed files with 82 additions and 2 deletions

View File

@ -102,7 +102,12 @@ jobs:
cmake --build build
./build/ua2f --version
sudo python scripts/test.py ./build/ua2f
- name: Upload log
uses: actions/upload-artifact@v4
with:
name: integration-test-log
path: logs.txt
build:
name: Build ${{ matrix.arch }}

67
scripts/journal.py Normal file
View File

@ -0,0 +1,67 @@
import systemd.journal
from typing import List, Dict, Optional
class SystemdLogReader:
def __init__(self):
self.journal = systemd.journal.Reader()
def _format_timestamp(self, timestamp):
"""
格式化时间戳
:param timestamp: systemd日志时间戳
:return: 格式化的日期时间字符串
"""
return timestamp.strftime('%Y-%m-%d %H:%M:%S') if timestamp else None
def read_logs(
self,
count: int = 50,
priority: Optional[int] = None,
match_unit: Optional[str] = None,
match_identifier: Optional[str] = None
) -> List[Dict]:
logs = []
# 配置日志过滤条件
if priority is not None:
self.journal.add_match(PRIORITY=priority)
if match_unit:
self.journal.add_match(_SYSTEMD_UNIT=match_unit)
if match_identifier:
self.journal.add_match(SYSLOG_IDENTIFIER=match_identifier)
# 按时间倒序排序
self.journal.seek_tail()
self.journal.get_previous()
# 读取日志
for entry in self.journal:
log_entry = {
'timestamp': self._format_timestamp(entry.get('__REALTIME_TIMESTAMP')),
'message': entry.get('MESSAGE', ''),
'unit': entry.get('_SYSTEMD_UNIT', ''),
'process_name': entry.get('SYSLOG_IDENTIFIER', ''),
'pid': entry.get('_PID', ''),
'hostname': entry.get('_HOSTNAME', ''),
'priority_level': entry.get('PRIORITY', '')
}
logs.append(log_entry)
if len(logs) >= count:
break
return logs
def filter_logs_by_keyword(
self,
logs: List[Dict],
keyword: str
) -> List[Dict]:
return [
log for log in logs
if keyword.lower() in log['message'].lower()
]

View File

@ -1,3 +1,4 @@
requests
fake-useragent
tqdm
tqdm
systemd-python

View File

@ -1,5 +1,6 @@
import atexit
import http.server
import json
import logging
import os
import socketserver
@ -12,6 +13,8 @@ import requests
from fake_useragent import UserAgent
from tqdm import tqdm
from scripts.journal import SystemdLogReader
ua = UserAgent()
PORT = 37491
@ -88,6 +91,10 @@ if __name__ == "__main__":
assert response.ok
assert response.text == str(len(nxt))
logger = SystemdLogReader()
logs = logger.read_logs(100)
with open("logs.txt", "w") as f:
f.write(json.dumps(logs, indent=4))
# clean
cleanup_iptables()