241 lines
6.5 KiB
Python
Executable File
241 lines
6.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
import os
|
||
import io
|
||
import json
|
||
import logging
|
||
import subprocess
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Dict, Tuple, List
|
||
|
||
from telegram import Update, InputFile
|
||
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
|
||
|
||
# ---------------------------
|
||
# ЛОГИ
|
||
# ---------------------------
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
||
)
|
||
log = logging.getLogger("logbot")
|
||
|
||
|
||
# ---------------------------
|
||
# ENV ЗАГРУЗКА
|
||
# ---------------------------
|
||
def load_env():
|
||
env_file = Path(".env")
|
||
if not env_file.exists():
|
||
log.error("Файл .env не найден")
|
||
return
|
||
|
||
with env_file.open() as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
if "=" not in line:
|
||
continue
|
||
key, val = line.split("=", 1)
|
||
os.environ[key.strip()] = val.strip()
|
||
log.info(f"ENV: {key.strip()} загружено")
|
||
|
||
|
||
load_env()
|
||
|
||
BOT_TOKEN = os.getenv("TG_BOT_TOKEN")
|
||
ALLOWED_CHATS = {
|
||
int(x.strip()) for x in os.getenv("ALLOWED_CHATS", "").split(",") if x.strip().isdigit()
|
||
}
|
||
KUBECTL = os.getenv("KUBECTL_BIN", "kubectl")
|
||
CONTEXTS_FILE = os.getenv("CONTEXTS_FILE", "./contexts.json")
|
||
|
||
|
||
# ---------------------------
|
||
# Утилиты
|
||
# ---------------------------
|
||
def run_cmd(cmd: List[str]) -> Tuple[int, str, str]:
|
||
log.info(f"RUN: {' '.join(cmd)}")
|
||
proc = subprocess.run(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True
|
||
)
|
||
return proc.returncode, proc.stdout, proc.stderr
|
||
|
||
|
||
def load_contexts() -> Dict[str, str]:
|
||
if not Path(CONTEXTS_FILE).exists():
|
||
raise RuntimeError(f"contexts.json не найден: {CONTEXTS_FILE}")
|
||
|
||
with open(CONTEXTS_FILE, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
if not isinstance(data, dict):
|
||
raise RuntimeError("contexts.json должен быть объектом {alias: context}")
|
||
|
||
return data
|
||
|
||
|
||
def detect_kind(context: str, namespace: str, name: str) -> str:
|
||
# Пробуем deployment
|
||
code, _, _ = run_cmd([KUBECTL, "--context", context, "-n", namespace, "get", "deploy", name])
|
||
if code == 0:
|
||
return "deployment"
|
||
|
||
# Пробуем sts
|
||
code, _, _ = run_cmd([KUBECTL, "--context", context, "-n", namespace, "get", "statefulset", name])
|
||
if code == 0:
|
||
return "statefulset"
|
||
|
||
raise RuntimeError(f"Не найден deployment/statefulset '{name}' в ns={namespace}")
|
||
|
||
|
||
def get_selector(context: str, namespace: str, kind: str, name: str) -> Dict[str, str]:
|
||
code, out, err = run_cmd([
|
||
KUBECTL, "--context", context, "-n", namespace,
|
||
"get", kind, name, "-o", "json"
|
||
])
|
||
if code != 0:
|
||
raise RuntimeError(err or out)
|
||
|
||
obj = json.loads(out)
|
||
selector = obj["spec"]["selector"]["matchLabels"]
|
||
return selector
|
||
|
||
|
||
def get_pod(context: str, namespace: str, selector: Dict[str, str]) -> str:
|
||
label = ",".join(f"{k}={v}" for k, v in selector.items())
|
||
|
||
code, out, err = run_cmd([
|
||
KUBECTL, "--context", context, "-n", namespace,
|
||
"get", "pod", "-l", label,
|
||
"-o", "jsonpath={.items[0].metadata.name}"
|
||
])
|
||
|
||
pod = out.strip()
|
||
if code != 0 or not pod:
|
||
raise RuntimeError(f"Pod не найден по селектору: {label}")
|
||
|
||
return pod
|
||
|
||
|
||
def get_logs(context: str, namespace: str, pod: str, previous: bool) -> str:
|
||
cmd = [
|
||
KUBECTL, "--context", context, "-n", namespace,
|
||
"logs", pod, "--all-containers"
|
||
]
|
||
if previous:
|
||
cmd.append("--previous")
|
||
|
||
code, out, err = run_cmd(cmd)
|
||
if code != 0:
|
||
return err or out
|
||
|
||
if err:
|
||
out += "\n[stderr]\n" + err
|
||
|
||
return out
|
||
|
||
|
||
# ---------------------------
|
||
# ПАРСИНГ КОМАНДЫ
|
||
# ---------------------------
|
||
def parse_args(raw: List[str]):
|
||
"""
|
||
/logs ctx ns name
|
||
/logs ctx ns name -p
|
||
"""
|
||
if len(raw) < 3:
|
||
raise ValueError("нужно: /logs <ctx_alias> <namespace> <name> [-p]")
|
||
|
||
ctx_alias = raw[0]
|
||
namespace = raw[1]
|
||
name = raw[2]
|
||
previous = False
|
||
|
||
if len(raw) == 4 and raw[3] == "-p":
|
||
previous = True
|
||
|
||
return ctx_alias, namespace, name, previous
|
||
|
||
|
||
# ---------------------------
|
||
# HANDLERS
|
||
# ---------------------------
|
||
async def logs_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||
chat_id = update.effective_chat.id
|
||
msg = update.effective_message.text
|
||
log.info(f"MSG from {chat_id}: {msg}")
|
||
|
||
if ALLOWED_CHATS and chat_id not in ALLOWED_CHATS:
|
||
log.warning(f"CHAT {chat_id} не в ALLOWED_CHATS")
|
||
return
|
||
|
||
try:
|
||
ctx_alias, ns, name, previous = parse_args(context.args)
|
||
except Exception as e:
|
||
await update.message.reply_text(f"Ошибка: {e}")
|
||
return
|
||
|
||
# Контексты
|
||
try:
|
||
contexts = load_contexts()
|
||
except Exception as e:
|
||
await update.message.reply_text(f"Ошибка contexts.json: {e}")
|
||
return
|
||
|
||
if ctx_alias not in contexts:
|
||
await update.message.reply_text(f"Нет такого контекста: {ctx_alias}")
|
||
return
|
||
|
||
ctx_full = contexts[ctx_alias]
|
||
|
||
try:
|
||
kind = detect_kind(ctx_full, ns, name)
|
||
selector = get_selector(ctx_full, ns, kind, name)
|
||
pod = get_pod(ctx_full, ns, selector)
|
||
logs = get_logs(ctx_full, ns, pod, previous)
|
||
except Exception as e:
|
||
await update.message.reply_text(f"Ошибка: {e}")
|
||
return
|
||
|
||
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||
fname = f"logs_{ctx_alias}_{ns}_{name}_{'prev' if previous else 'curr'}_{ts}.log"
|
||
|
||
buf = io.BytesIO(logs.encode("utf-8", errors="ignore"))
|
||
buf.name = fname
|
||
|
||
await update.message.reply_document(
|
||
InputFile(buf, filename=fname),
|
||
caption=f"{kind}/{name}\npod={pod}\ncontext={ctx_full}\nprevious={previous}"
|
||
)
|
||
|
||
|
||
async def start_handler(update, context):
|
||
await update.message.reply_text(
|
||
"Использование:\n/logs <ctx> <ns> <name>\n/logs <ctx> <ns> <name> -p"
|
||
)
|
||
|
||
|
||
# ---------------------------
|
||
# MAIN
|
||
# ---------------------------
|
||
def main():
|
||
if not BOT_TOKEN:
|
||
log.error("TG_BOT_TOKEN не задан")
|
||
raise SystemExit
|
||
|
||
app = ApplicationBuilder().token(BOT_TOKEN).build()
|
||
app.add_handler(CommandHandler("start", start_handler))
|
||
app.add_handler(CommandHandler("logs", logs_handler))
|
||
|
||
log.info("Бот запущен")
|
||
app.run_polling()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|