first commit

This commit is contained in:
ytc1012
2026-02-04 16:11:55 +08:00
commit 0f3ee050dc
165 changed files with 25795 additions and 0 deletions

View File

@@ -0,0 +1,360 @@
#!/usr/bin/env python3
"""
Postmortem 匹配检查脚本
检查当前变更是否触发历史 postmortem
使用方法:
python tools/postmortem_check.py [--base main] [--mode warn|block]
返回码:
0 - 无匹配或仅警告
1 - 发现高置信度匹配blocking mode
2 - 错误
"""
import argparse
import fnmatch
import re
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import yaml
POSTMORTEM_DIR = Path(__file__).parent.parent / "postmortem"
@dataclass
class MatchResult:
"""单个匹配结果"""
pm_id: str
reason: str
confidence: float
@dataclass
class AggregatedMatch:
"""聚合后的匹配结果"""
pm_id: str
reasons: List[str] = field(default_factory=list)
max_confidence: float = 0.0
match_count: int = 0
final_confidence: float = 0.0
has_file_match: bool = False
has_content_match: bool = False
class PostmortemMatcher:
"""Postmortem 匹配器"""
# 匹配权重
WEIGHT_FILE_EXACT = 0.8
WEIGHT_FILE_PATTERN = 0.6
WEIGHT_FUNCTION = 0.7
WEIGHT_PATTERN = 0.5
WEIGHT_KEYWORD = 0.4
def __init__(self):
self.postmortems = self._load_all_postmortems()
def _load_all_postmortems(self) -> List[Dict]:
"""加载所有 postmortem 文件"""
pms = []
if not POSTMORTEM_DIR.exists():
return pms
for f in POSTMORTEM_DIR.glob("PM-*.yaml"):
try:
with open(f, encoding="utf-8") as fp:
pm = yaml.safe_load(fp)
if pm:
pms.append(pm)
except Exception as e:
print(f"Warning: Failed to load {f}: {e}", file=sys.stderr)
return pms
def match_files(self, changed_files: List[str]) -> List[MatchResult]:
"""匹配修改的文件"""
matches = []
for pm in self.postmortems:
triggers = pm.get("triggers", {})
pm_files = triggers.get("files", [])
pm_id = pm.get("id", "unknown")
for changed in changed_files:
for pattern in pm_files:
if self._file_matches(changed, pattern):
# 精确匹配 vs 模式匹配
is_exact = "*" not in pattern and "?" not in pattern
confidence = (
self.WEIGHT_FILE_EXACT if is_exact else self.WEIGHT_FILE_PATTERN
)
matches.append(
MatchResult(
pm_id=pm_id,
reason=f"File: {changed} ~ {pattern}",
confidence=confidence,
)
)
return matches
def match_diff_content(self, diff: str) -> List[MatchResult]:
"""匹配 diff 内容中的函数名和模式"""
matches = []
for pm in self.postmortems:
triggers = pm.get("triggers", {})
pm_id = pm.get("id", "unknown")
# 函数名匹配
for func in triggers.get("functions", []):
if not func:
continue
try:
if re.search(rf"\b{re.escape(func)}\b", diff):
matches.append(
MatchResult(
pm_id=pm_id,
reason=f"Function: {func}",
confidence=self.WEIGHT_FUNCTION,
)
)
except re.error:
continue
# 正则模式匹配
for pattern in triggers.get("patterns", []):
if not pattern:
continue
try:
if re.search(pattern, diff, re.IGNORECASE):
matches.append(
MatchResult(
pm_id=pm_id,
reason=f"Pattern: {pattern}",
confidence=self.WEIGHT_PATTERN,
)
)
except re.error:
continue
# 关键词匹配
for keyword in triggers.get("keywords", []):
if not keyword:
continue
if keyword.lower() in diff.lower():
matches.append(
MatchResult(
pm_id=pm_id,
reason=f"Keyword: {keyword}",
confidence=self.WEIGHT_KEYWORD,
)
)
return matches
def _file_matches(self, filepath: str, pattern: str) -> bool:
"""检查文件路径是否匹配模式"""
# 支持 glob 模式
if fnmatch.fnmatch(filepath, pattern):
return True
# 也尝试匹配文件名部分
if fnmatch.fnmatch(Path(filepath).name, pattern):
return True
# 检查是否包含(用于简单的路径匹配)
if "*" not in pattern and "?" not in pattern:
return pattern in filepath
return False
def aggregate_matches(
self, file_matches: List[MatchResult], content_matches: List[MatchResult]
) -> Dict[str, AggregatedMatch]:
"""聚合匹配结果,计算综合置信度"""
result: Dict[str, AggregatedMatch] = {}
all_matches = file_matches + content_matches
for match in all_matches:
if match.pm_id not in result:
result[match.pm_id] = AggregatedMatch(pm_id=match.pm_id)
agg = result[match.pm_id]
agg.reasons.append(match.reason)
agg.max_confidence = max(agg.max_confidence, match.confidence)
agg.match_count += 1
if "File" in match.reason:
agg.has_file_match = True
if "Function" in match.reason or "Pattern" in match.reason:
agg.has_content_match = True
# 计算综合置信度
for pm_id, agg in result.items():
base = agg.max_confidence
# 每增加一个匹配项,加 0.1 bonus最多 0.3
count_bonus = min(0.3, 0.1 * (agg.match_count - 1))
# 如果同时有文件匹配和内容匹配,额外加 0.1
cross_bonus = 0.1 if (agg.has_file_match and agg.has_content_match) else 0
agg.final_confidence = min(1.0, base + count_bonus + cross_bonus)
return result
def get_postmortem_details(self, pm_id: str) -> Optional[Dict]:
"""获取 postmortem 详情"""
for pm in self.postmortems:
if pm.get("id") == pm_id:
return pm
return None
def get_changed_files(base_ref: str) -> List[str]:
"""获取相对于 base 的变更文件"""
cwd = POSTMORTEM_DIR.parent
# 尝试不同的 diff 方式
# 1. 三点 diff用于 PR
cmd = ["git", "diff", "--name-only", f"{base_ref}...HEAD"]
result = subprocess.run(cmd, capture_output=True, text=True, cwd=cwd)
if result.returncode != 0:
# 2. 两点 diff
cmd = ["git", "diff", "--name-only", f"{base_ref}..HEAD"]
result = subprocess.run(cmd, capture_output=True, text=True, cwd=cwd)
if result.returncode != 0:
# 3. 直接对比
cmd = ["git", "diff", "--name-only", base_ref]
result = subprocess.run(cmd, capture_output=True, text=True, cwd=cwd)
return [f for f in result.stdout.strip().split("\n") if f]
def get_diff_content(base_ref: str) -> str:
"""获取相对于 base 的 diff 内容"""
cwd = POSTMORTEM_DIR.parent
# 只看代码文件的 diff
extensions = ["*.py", "*.js", "*.ts", "*.jsx", "*.tsx"]
cmd = ["git", "diff", f"{base_ref}...HEAD", "--"] + extensions
result = subprocess.run(cmd, capture_output=True, text=True, cwd=cwd)
if result.returncode != 0:
cmd = ["git", "diff", f"{base_ref}..HEAD", "--"] + extensions
result = subprocess.run(cmd, capture_output=True, text=True, cwd=cwd)
if result.returncode != 0:
cmd = ["git", "diff", base_ref, "--"] + extensions
result = subprocess.run(cmd, capture_output=True, text=True, cwd=cwd)
return result.stdout
def main():
parser = argparse.ArgumentParser(description="Postmortem Check")
parser.add_argument("--base", default="main", help="Base branch/commit")
parser.add_argument(
"--mode",
choices=["warn", "block"],
default="warn",
help="warn: only print, block: exit 1 on high confidence",
)
parser.add_argument(
"--threshold",
type=float,
default=0.7,
help="Confidence threshold for blocking (default: 0.7)",
)
parser.add_argument(
"--output",
choices=["text", "json"],
default="text",
help="Output format",
)
args = parser.parse_args()
if not POSTMORTEM_DIR.exists():
print("No postmortem directory found. Run postmortem_init.py first.")
sys.exit(0)
matcher = PostmortemMatcher()
if not matcher.postmortems:
print("No postmortems found.")
sys.exit(0)
# 获取变更
changed_files = get_changed_files(args.base)
diff_content = get_diff_content(args.base)
if not changed_files:
print("No changes detected.")
sys.exit(0)
print(f"Checking {len(changed_files)} changed files against {len(matcher.postmortems)} postmortems...")
# 执行匹配
file_matches = matcher.match_files(changed_files)
content_matches = matcher.match_diff_content(diff_content)
# 聚合结果
results = matcher.aggregate_matches(file_matches, content_matches)
if not results:
print("No postmortem matches found.")
sys.exit(0)
# 输出结果
blocking = []
warnings = []
sorted_results = sorted(
results.items(), key=lambda x: x[1].final_confidence, reverse=True
)
for pm_id, agg in sorted_results:
confidence = agg.final_confidence
is_blocking = confidence >= args.threshold
level = "BLOCK" if is_blocking else "WARN"
# 获取 postmortem 详情
pm = matcher.get_postmortem_details(pm_id)
if not pm:
pm = {"title": "Unknown", "severity": "unknown"}
print(f"\n[{level}] {pm_id} ({confidence:.0%} confidence)")
print(f" Title: {pm.get('title', 'N/A')}")
print(f" Severity: {pm.get('severity', 'N/A')}")
print(f" Reasons:")
for reason in agg.reasons[:5]: # 最多显示5个原因
print(f" - {reason}")
if pm.get("verification"):
print(f" Verification checklist:")
for check in pm["verification"]:
print(f" [ ] {check}")
if is_blocking:
blocking.append(pm_id)
else:
warnings.append(pm_id)
# 总结
print(f"\n{'=' * 50}")
print(f"Summary: {len(blocking)} blocking, {len(warnings)} warnings")
# 决定退出码
if blocking and args.mode == "block":
print(f"\n{len(blocking)} postmortem(s) triggered with high confidence.")
print("Please review the changes and verify they don't reintroduce past issues.")
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()