#!/usr/bin/env python3 import os import io import json import logging import subprocess from datetime import datetime from pathlib import Path from typing import Dict, Tuple, List from telegram import Update, InputFile from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes # --------------------------- # ЛОГИ # --------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) log = logging.getLogger("logbot") # --------------------------- # ENV ЗАГРУЗКА # --------------------------- def load_env(): env_file = Path(".env") if not env_file.exists(): log.error("Файл .env не найден") return with env_file.open() as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue key, val = line.split("=", 1) os.environ[key.strip()] = val.strip() log.info(f"ENV: {key.strip()} загружено") load_env() BOT_TOKEN = os.getenv("TG_BOT_TOKEN") ALLOWED_CHATS = { int(x.strip()) for x in os.getenv("ALLOWED_CHATS", "").split(",") if x.strip().isdigit() } KUBECTL = os.getenv("KUBECTL_BIN", "kubectl") CONTEXTS_FILE = os.getenv("CONTEXTS_FILE", "./contexts.json") # --------------------------- # Утилиты # --------------------------- def run_cmd(cmd: List[str]) -> Tuple[int, str, str]: log.info(f"RUN: {' '.join(cmd)}") proc = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) return proc.returncode, proc.stdout, proc.stderr def load_contexts() -> Dict[str, str]: if not Path(CONTEXTS_FILE).exists(): raise RuntimeError(f"contexts.json не найден: {CONTEXTS_FILE}") with open(CONTEXTS_FILE, "r", encoding="utf-8") as f: data = json.load(f) if not isinstance(data, dict): raise RuntimeError("contexts.json должен быть объектом {alias: context}") return data def detect_kind(context: str, namespace: str, name: str) -> str: # Пробуем deployment code, _, _ = run_cmd([KUBECTL, "--context", context, "-n", namespace, "get", "deploy", name]) if code == 0: return "deployment" # Пробуем sts code, _, _ = run_cmd([KUBECTL, "--context", context, "-n", namespace, "get", "statefulset", name]) if code == 0: return "statefulset" raise RuntimeError(f"Не найден deployment/statefulset '{name}' в ns={namespace}") def get_selector(context: str, namespace: str, kind: str, name: str) -> Dict[str, str]: code, out, err = run_cmd([ KUBECTL, "--context", context, "-n", namespace, "get", kind, name, "-o", "json" ]) if code != 0: raise RuntimeError(err or out) obj = json.loads(out) selector = obj["spec"]["selector"]["matchLabels"] return selector def get_pod(context: str, namespace: str, selector: Dict[str, str]) -> str: label = ",".join(f"{k}={v}" for k, v in selector.items()) code, out, err = run_cmd([ KUBECTL, "--context", context, "-n", namespace, "get", "pod", "-l", label, "-o", "jsonpath={.items[0].metadata.name}" ]) pod = out.strip() if code != 0 or not pod: raise RuntimeError(f"Pod не найден по селектору: {label}") return pod def get_logs(context: str, namespace: str, pod: str, previous: bool) -> str: cmd = [ KUBECTL, "--context", context, "-n", namespace, "logs", pod, "--all-containers" ] if previous: cmd.append("--previous") code, out, err = run_cmd(cmd) if code != 0: return err or out if err: out += "\n[stderr]\n" + err return out # --------------------------- # ПАРСИНГ КОМАНДЫ # --------------------------- def parse_args(raw: List[str]): """ /logs ctx ns name /logs ctx ns name -p """ if len(raw) < 3: raise ValueError("нужно: /logs [-p]") ctx_alias = raw[0] namespace = raw[1] name = raw[2] previous = False if len(raw) == 4 and raw[3] == "-p": previous = True return ctx_alias, namespace, name, previous # --------------------------- # HANDLERS # --------------------------- async def logs_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = update.effective_chat.id msg = update.effective_message.text log.info(f"MSG from {chat_id}: {msg}") if ALLOWED_CHATS and chat_id not in ALLOWED_CHATS: log.warning(f"CHAT {chat_id} не в ALLOWED_CHATS") return try: ctx_alias, ns, name, previous = parse_args(context.args) except Exception as e: await update.message.reply_text(f"Ошибка: {e}") return # Контексты try: contexts = load_contexts() except Exception as e: await update.message.reply_text(f"Ошибка contexts.json: {e}") return if ctx_alias not in contexts: await update.message.reply_text(f"Нет такого контекста: {ctx_alias}") return ctx_full = contexts[ctx_alias] try: kind = detect_kind(ctx_full, ns, name) selector = get_selector(ctx_full, ns, kind, name) pod = get_pod(ctx_full, ns, selector) logs = get_logs(ctx_full, ns, pod, previous) except Exception as e: await update.message.reply_text(f"Ошибка: {e}") return ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") fname = f"logs_{ctx_alias}_{ns}_{name}_{'prev' if previous else 'curr'}_{ts}.log" buf = io.BytesIO(logs.encode("utf-8", errors="ignore")) buf.name = fname await update.message.reply_document( InputFile(buf, filename=fname), caption=f"{kind}/{name}\npod={pod}\ncontext={ctx_full}\nprevious={previous}" ) async def start_handler(update, context): await update.message.reply_text( "Использование:\n/logs \n/logs -p" ) # --------------------------- # MAIN # --------------------------- def main(): if not BOT_TOKEN: log.error("TG_BOT_TOKEN не задан") raise SystemExit app = ApplicationBuilder().token(BOT_TOKEN).build() app.add_handler(CommandHandler("start", start_handler)) app.add_handler(CommandHandler("logs", logs_handler)) log.info("Бот запущен") app.run_polling() if __name__ == "__main__": main()