#!/usr/bin/env python3 import os import io import json import logging import subprocess import shlex from datetime import datetime from pathlib import Path from typing import Dict, Tuple, List from telegram import Update, InputFile from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes # --------------------------- # ЛОГИ # --------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) log = logging.getLogger("logbot") # --------------------------- # ENV ЗАГРУЗКА # --------------------------- def load_env() -> None: env_file = Path(".env") if not env_file.exists(): log.error("Файл .env не найден") return with env_file.open() as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue key, val = line.split("=", 1) os.environ[key.strip()] = val.strip() log.info(f"ENV: {key.strip()} загружено") load_env() BOT_TOKEN = os.getenv("TG_BOT_TOKEN") ALLOWED_CHATS = { int(x.strip()) for x in os.getenv("ALLOWED_CHATS", "").split(",") if x.strip().isdigit() } # Путь к kubectl НА АДМИНБОКСЕ KUBECTL = os.getenv("KUBECTL_BIN", "kubectl") CONTEXTS_FILE = os.getenv("CONTEXTS_FILE", "./contexts.json") # SSH-настройки: на какой сервер ходим за логами SSH_HOST = os.getenv("SSH_HOST") # ОБЯЗАТЕЛЬНО SSH_USER = os.getenv("SSH_USER", "root") # опционально SSH_PORT = os.getenv("SSH_PORT", "22") # строка, чтобы проще пихать в команду # --------------------------- # Утилиты # --------------------------- def run_cmd(cmd: List[str]) -> Tuple[int, str, str]: """ Запуск команды на удалённом сервере через SSH. cmd — это массив вида ["kubectl", "--context", ...] и т.п. """ if not SSH_HOST: raise RuntimeError("SSH_HOST не задан в .env") # Собираем удалённую команду как одну строку с экранированием remote_cmd = " ".join(shlex.quote(x) for x in cmd) ssh_cmd: List[str] = ["ssh", "-o", "BatchMode=yes"] if SSH_PORT: ssh_cmd.extend(["-p", SSH_PORT]) destination = f"{SSH_USER}@{SSH_HOST}" if SSH_USER else SSH_HOST ssh_cmd.append(destination) ssh_cmd.append(remote_cmd) log.info(f"RUN (remote): {' '.join(ssh_cmd)}") proc = subprocess.run( ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) return proc.returncode, proc.stdout, proc.stderr def load_contexts() -> Dict[str, str]: """ Читает локальный contexts.json на бастионе: { "prod": "k8s-erot-prod-context", "demo": "k8s-erot-demo-context", ... } """ if not Path(CONTEXTS_FILE).exists(): raise RuntimeError(f"contexts.json не найден: {CONTEXTS_FILE}") with open(CONTEXTS_FILE, "r", encoding="utf-8") as f: data = json.load(f) if not isinstance(data, dict): raise RuntimeError("contexts.json должен быть объектом {alias: context}") return data def detect_kind(context: str, namespace: str, name: str) -> str: """ Определяем тип ресурса по имени: - сначала пробуем deployment - если не найден, пробуем statefulset """ # deployment code, _, _ = run_cmd([ KUBECTL, "--context", context, "-n", namespace, "get", "deploy", name, ]) if code == 0: return "deployment" # statefulset code, _, _ = run_cmd([ KUBECTL, "--context", context, "-n", namespace, "get", "statefulset", name, ]) if code == 0: return "statefulset" raise RuntimeError(f"Не найден deployment/statefulset '{name}' в ns={namespace}, context={context}") def get_selector(context: str, namespace: str, kind: str, name: str) -> Dict[str, str]: """ Получаем spec.selector.matchLabels у deployment/statefulset. """ code, out, err = run_cmd([ KUBECTL, "--context", context, "-n", namespace, "get", kind, name, "-o", "json", ]) if code != 0: raise RuntimeError(err or out) obj = json.loads(out) selector = obj["spec"]["selector"]["matchLabels"] return selector def get_pod(context: str, namespace: str, selector: Dict[str, str]) -> str: """ По selector-лейблам находим первый pod. """ label = ",".join(f"{k}={v}" for k, v in selector.items()) code, out, err = run_cmd([ KUBECTL, "--context", context, "-n", namespace, "get", "pod", "-l", label, "-o", "jsonpath={.items[0].metadata.name}", ]) pod = out.strip() if code != 0 or not pod: raise RuntimeError(f"Pod не найден по селектору: {label}\n{err or ''}") return pod def get_logs(context: str, namespace: str, pod: str, previous: bool) -> str: """ Забираем логи pod-а через kubectl logs. """ cmd = [ KUBECTL, "--context", context, "-n", namespace, "logs", pod, "--all-containers", ] if previous: cmd.append("--previous") code, out, err = run_cmd(cmd) if code != 0: return err or out if err: out += "\n[stderr]\n" + err return out # --------------------------- # ПАРСИНГ КОМАНДЫ # --------------------------- def parse_args(raw: List[str]): """ /logs ctx ns name /logs ctx ns name -p """ if len(raw) < 3: raise ValueError("нужно: /logs [-p]") ctx_alias = raw[0] namespace = raw[1] name = raw[2] previous = False if len(raw) == 4 and raw[3] == "-p": previous = True return ctx_alias, namespace, name, previous # --------------------------- # HANDLERS # --------------------------- async def logs_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = update.effective_chat.id msg = update.effective_message.text log.info(f"MSG from {chat_id}: {msg}") if ALLOWED_CHATS and chat_id not in ALLOWED_CHATS: log.warning(f"CHAT {chat_id} не в ALLOWED_CHATS") return try: ctx_alias, ns, name, previous = parse_args(context.args) except Exception as e: await update.message.reply_text(f"Ошибка: {e}") return # Контексты try: contexts = load_contexts() except Exception as e: await update.message.reply_text(f"Ошибка contexts.json: {e}") return if ctx_alias not in contexts: await update.message.reply_text(f"Нет такого контекста: {ctx_alias}") return ctx_full = contexts[ctx_alias] try: kind = detect_kind(ctx_full, ns, name) selector = get_selector(ctx_full, ns, kind, name) pod = get_pod(ctx_full, ns, selector) logs = get_logs(ctx_full, ns, pod, previous) except Exception as e: await update.message.reply_text(f"Ошибка: {e}") return ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") fname = f"logs_{ctx_alias}_{ns}_{name}_{'prev' if previous else 'curr'}_{ts}.log" buf = io.BytesIO(logs.encode("utf-8", errors="ignore")) buf.name = fname await update.message.reply_document( InputFile(buf, filename=fname), caption=f"{kind}/{name}\npod={pod}\ncontext={ctx_full}\nprevious={previous}", ) async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text( "Использование:\n" "/logs \n" "/logs -p" ) # --------------------------- # MAIN # --------------------------- def main(): if not BOT_TOKEN: log.error("TG_BOT_TOKEN не задан") raise SystemExit if not SSH_HOST: log.error("SSH_HOST не задан в .env") raise SystemExit log.info(f"Старт бота. SSH → {SSH_USER}@{SSH_HOST}:{SSH_PORT}, KUBECTL={KUBECTL}") app = ApplicationBuilder().token(BOT_TOKEN).build() app.add_handler(CommandHandler("start", start_handler)) app.add_handler(CommandHandler("logs", logs_handler)) log.info("Бот запущен, ждёт команды") app.run_polling() if __name__ == "__main__": main()