k8s_log_bot/bot_local.py
2025-11-17 21:00:47 +05:00

241 lines
6.5 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import io
import json
import logging
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Dict, Tuple, List
from telegram import Update, InputFile
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
# ---------------------------
# ЛОГИ
# ---------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
log = logging.getLogger("logbot")
# ---------------------------
# ENV ЗАГРУЗКА
# ---------------------------
def load_env():
env_file = Path(".env")
if not env_file.exists():
log.error("Файл .env не найден")
return
with env_file.open() as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, val = line.split("=", 1)
os.environ[key.strip()] = val.strip()
log.info(f"ENV: {key.strip()} загружено")
load_env()
BOT_TOKEN = os.getenv("TG_BOT_TOKEN")
ALLOWED_CHATS = {
int(x.strip()) for x in os.getenv("ALLOWED_CHATS", "").split(",") if x.strip().isdigit()
}
KUBECTL = os.getenv("KUBECTL_BIN", "kubectl")
CONTEXTS_FILE = os.getenv("CONTEXTS_FILE", "./contexts.json")
# ---------------------------
# Утилиты
# ---------------------------
def run_cmd(cmd: List[str]) -> Tuple[int, str, str]:
log.info(f"RUN: {' '.join(cmd)}")
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return proc.returncode, proc.stdout, proc.stderr
def load_contexts() -> Dict[str, str]:
if not Path(CONTEXTS_FILE).exists():
raise RuntimeError(f"contexts.json не найден: {CONTEXTS_FILE}")
with open(CONTEXTS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
raise RuntimeError("contexts.json должен быть объектом {alias: context}")
return data
def detect_kind(context: str, namespace: str, name: str) -> str:
# Пробуем deployment
code, _, _ = run_cmd([KUBECTL, "--context", context, "-n", namespace, "get", "deploy", name])
if code == 0:
return "deployment"
# Пробуем sts
code, _, _ = run_cmd([KUBECTL, "--context", context, "-n", namespace, "get", "statefulset", name])
if code == 0:
return "statefulset"
raise RuntimeError(f"Не найден deployment/statefulset '{name}' в ns={namespace}")
def get_selector(context: str, namespace: str, kind: str, name: str) -> Dict[str, str]:
code, out, err = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", kind, name, "-o", "json"
])
if code != 0:
raise RuntimeError(err or out)
obj = json.loads(out)
selector = obj["spec"]["selector"]["matchLabels"]
return selector
def get_pod(context: str, namespace: str, selector: Dict[str, str]) -> str:
label = ",".join(f"{k}={v}" for k, v in selector.items())
code, out, err = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", "pod", "-l", label,
"-o", "jsonpath={.items[0].metadata.name}"
])
pod = out.strip()
if code != 0 or not pod:
raise RuntimeError(f"Pod не найден по селектору: {label}")
return pod
def get_logs(context: str, namespace: str, pod: str, previous: bool) -> str:
cmd = [
KUBECTL, "--context", context, "-n", namespace,
"logs", pod, "--all-containers"
]
if previous:
cmd.append("--previous")
code, out, err = run_cmd(cmd)
if code != 0:
return err or out
if err:
out += "\n[stderr]\n" + err
return out
# ---------------------------
# ПАРСИНГ КОМАНДЫ
# ---------------------------
def parse_args(raw: List[str]):
"""
/logs ctx ns name
/logs ctx ns name -p
"""
if len(raw) < 3:
raise ValueError("нужно: /logs <ctx_alias> <namespace> <name> [-p]")
ctx_alias = raw[0]
namespace = raw[1]
name = raw[2]
previous = False
if len(raw) == 4 and raw[3] == "-p":
previous = True
return ctx_alias, namespace, name, previous
# ---------------------------
# HANDLERS
# ---------------------------
async def logs_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
msg = update.effective_message.text
log.info(f"MSG from {chat_id}: {msg}")
if ALLOWED_CHATS and chat_id not in ALLOWED_CHATS:
log.warning(f"CHAT {chat_id} не в ALLOWED_CHATS")
return
try:
ctx_alias, ns, name, previous = parse_args(context.args)
except Exception as e:
await update.message.reply_text(f"Ошибка: {e}")
return
# Контексты
try:
contexts = load_contexts()
except Exception as e:
await update.message.reply_text(f"Ошибка contexts.json: {e}")
return
if ctx_alias not in contexts:
await update.message.reply_text(f"Нет такого контекста: {ctx_alias}")
return
ctx_full = contexts[ctx_alias]
try:
kind = detect_kind(ctx_full, ns, name)
selector = get_selector(ctx_full, ns, kind, name)
pod = get_pod(ctx_full, ns, selector)
logs = get_logs(ctx_full, ns, pod, previous)
except Exception as e:
await update.message.reply_text(f"Ошибка: {e}")
return
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
fname = f"logs_{ctx_alias}_{ns}_{name}_{'prev' if previous else 'curr'}_{ts}.log"
buf = io.BytesIO(logs.encode("utf-8", errors="ignore"))
buf.name = fname
await update.message.reply_document(
InputFile(buf, filename=fname),
caption=f"{kind}/{name}\npod={pod}\ncontext={ctx_full}\nprevious={previous}"
)
async def start_handler(update, context):
await update.message.reply_text(
"Использование:\n/logs <ctx> <ns> <name>\n/logs <ctx> <ns> <name> -p"
)
# ---------------------------
# MAIN
# ---------------------------
def main():
if not BOT_TOKEN:
log.error("TG_BOT_TOKEN не задан")
raise SystemExit
app = ApplicationBuilder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler("start", start_handler))
app.add_handler(CommandHandler("logs", logs_handler))
log.info("Бот запущен")
app.run_polling()
if __name__ == "__main__":
main()