k8s_log_bot/bot_remote.py

349 lines
10 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import io
import json
import logging
import subprocess
import shlex
from datetime import datetime
from pathlib import Path
from typing import Dict, Tuple, List
from telegram import Update, InputFile
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
# ---------------------------
# ЛОГИ
# ---------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("logbot")
# ---------------------------
# ENV ЗАГРУЗКА
# ---------------------------
def load_env() -> None:
env_file = Path(".env")
if not env_file.exists():
log.error("Файл .env не найден")
return
with env_file.open() as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, val = line.split("=", 1)
os.environ[key.strip()] = val.strip()
log.info(f"ENV: {key.strip()} загружено")
load_env()
BOT_TOKEN = os.getenv("TG_BOT_TOKEN")
raw_allowed = os.getenv("ALLOWED_CHATS", "")
ALLOWED_CHATS: set[int] = set()
for part in raw_allowed.split(","):
part = part.strip()
if not part:
continue
try:
ALLOWED_CHATS.add(int(part))
except ValueError:
log.warning(f"Не могу распарсить chat_id '{part}' из ALLOWED_CHATS, пропускаю")
log.info(f"ALLOWED_CHATS инициализирован: {ALLOWED_CHATS}")
# Путь к kubectl НА АДМИНБОКСЕ
KUBECTL = os.getenv("KUBECTL_BIN", "kubectl")
CONTEXTS_FILE = os.getenv("CONTEXTS_FILE", "./contexts.json")
# SSH-настройки: на какой сервер ходим за логами
SSH_HOST = os.getenv("SSH_HOST") # ОБЯЗАТЕЛЬНО
SSH_USER = os.getenv("SSH_USER", "root") # опционально
SSH_PORT = os.getenv("SSH_PORT", "22") # строка, чтобы проще пихать в команду
MAX_DOC_BYTES = int(os.getenv("MAX_DOC_BYTES", str(45 * 1024 * 1024)))
# ---------------------------
# Утилиты
# ---------------------------
def run_cmd(cmd: List[str]) -> Tuple[int, str, str]:
"""
Запуск команды на удалённом сервере через SSH.
cmd — это массив вида ["kubectl", "--context", ...] и т.п.
"""
if not SSH_HOST:
raise RuntimeError("SSH_HOST не задан в .env")
# Собираем удалённую команду как одну строку с экранированием
remote_cmd = " ".join(shlex.quote(x) for x in cmd)
ssh_cmd: List[str] = ["ssh", "-o", "BatchMode=yes"]
if SSH_PORT:
ssh_cmd.extend(["-p", SSH_PORT])
destination = f"{SSH_USER}@{SSH_HOST}" if SSH_USER else SSH_HOST
ssh_cmd.append(destination)
ssh_cmd.append(remote_cmd)
log.info(f"RUN (remote): {' '.join(ssh_cmd)}")
proc = subprocess.run(
ssh_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
return proc.returncode, proc.stdout, proc.stderr
def load_contexts() -> Dict[str, str]:
"""
Читает локальный contexts.json на бастионе:
{ "prod": "k8s-erot-prod-context", "demo": "k8s-erot-demo-context", ... }
"""
if not Path(CONTEXTS_FILE).exists():
raise RuntimeError(f"contexts.json не найден: {CONTEXTS_FILE}")
with open(CONTEXTS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
raise RuntimeError("contexts.json должен быть объектом {alias: context}")
return data
def detect_kind(context: str, namespace: str, name: str) -> str:
"""
Определяем тип ресурса по имени:
- сначала пробуем deployment
- если не найден, пробуем statefulset
"""
# deployment
code, _, _ = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", "deploy", name,
])
if code == 0:
return "deployment"
# statefulset
code, _, _ = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", "statefulset", name,
])
if code == 0:
return "statefulset"
raise RuntimeError(f"Не найден deployment/statefulset '{name}' в ns={namespace}, context={context}")
def get_selector(context: str, namespace: str, kind: str, name: str) -> Dict[str, str]:
"""
Получаем spec.selector.matchLabels у deployment/statefulset.
"""
code, out, err = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", kind, name, "-o", "json",
])
if code != 0:
raise RuntimeError(err or out)
obj = json.loads(out)
selector = obj["spec"]["selector"]["matchLabels"]
return selector
def get_pod(context: str, namespace: str, selector: Dict[str, str]) -> str:
"""
По selector-лейблам находим первый pod.
"""
label = ",".join(f"{k}={v}" for k, v in selector.items())
code, out, err = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", "pod", "-l", label,
"-o", "jsonpath={.items[0].metadata.name}",
])
pod = out.strip()
if code != 0 or not pod:
raise RuntimeError(f"Pod не найден по селектору: {label}\n{err or ''}")
return pod
def get_logs(context: str, namespace: str, pod: str, previous: bool) -> str:
"""
Забираем логи pod-а через kubectl logs.
"""
cmd = [
KUBECTL, "--context", context, "-n", namespace,
"logs", pod, "--all-containers",
]
if previous:
cmd.append("--previous")
code, out, err = run_cmd(cmd)
if code != 0:
return err or out
if err:
out += "\n[stderr]\n" + err
return out
# ---------------------------
# ПАРСИНГ КОМАНДЫ
# ---------------------------
def parse_args(raw: List[str]):
"""
/logs ctx ns name
/logs ctx ns name -p
"""
if len(raw) < 3:
raise ValueError("нужно: /logs <ctx_alias> <namespace> <name> [-p]")
ctx_alias = raw[0]
namespace = raw[1]
name = raw[2]
previous = False
if len(raw) == 4 and raw[3] == "-p":
previous = True
return ctx_alias, namespace, name, previous
# ---------------------------
# HANDLERS
# ---------------------------
async def logs_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
msg = update.effective_message.text
log.info(f"MSG from {chat_id}: {msg}")
if ALLOWED_CHATS and chat_id not in ALLOWED_CHATS:
log.warning(f"CHAT {chat_id} не в ALLOWED_CHATS")
return
try:
ctx_alias, ns, name, previous = parse_args(context.args)
except Exception as e:
await update.message.reply_text(f"Ошибка: {e}")
return
# Контексты
try:
contexts = load_contexts()
except Exception as e:
await update.message.reply_text(f"Ошибка contexts.json: {e}")
return
if ctx_alias not in contexts:
await update.message.reply_text(f"Нет такого контекста: {ctx_alias}")
return
ctx_full = contexts[ctx_alias]
try:
kind = detect_kind(ctx_full, ns, name)
selector = get_selector(ctx_full, ns, kind, name)
pod = get_pod(ctx_full, ns, selector)
logs = get_logs(ctx_full, ns, pod, previous)
except Exception as e:
await update.message.reply_text(f"Ошибка: {e}")
return
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
base_fname = f"logs_{ctx_alias}_{ns}_{name}_{'prev' if previous else 'curr'}_{ts}"
# кодируем один раз
data = logs.encode("utf-8", errors="ignore")
total_size = len(data)
# Если влезает в один файл — шлём как раньше
if total_size <= MAX_DOC_BYTES:
fname = f"{base_fname}.log"
buf = io.BytesIO(data)
buf.name = fname
await update.message.reply_document(
InputFile(buf, filename=fname),
caption=f"{kind}/{name}\npod={pod}\ncontext={ctx_full}\nprevious={previous}",
)
return
# Слишком жирный лог — режем на части
num_parts = (total_size + MAX_DOC_BYTES - 1) // MAX_DOC_BYTES
await update.message.reply_text(
f"Лог {total_size} байт, делю на {num_parts} файлов по ~{MAX_DOC_BYTES} байт."
)
for idx in range(num_parts):
start = idx * MAX_DOC_BYTES
end = min(start + MAX_DOC_BYTES, total_size)
chunk = data[start:end]
part_no = idx + 1
fname = f"{base_fname}.part{part_no:02d}_of_{num_parts:02d}.log"
buf = io.BytesIO(chunk)
buf.name = fname
caption = (
f"{kind}/{name}\n"
f"pod={pod}\n"
f"context={ctx_full}\n"
f"previous={previous}\n"
f"part {part_no}/{num_parts}"
)
await update.message.reply_document(
InputFile(buf, filename=fname),
caption=caption,
)
async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"Использование:\n"
"/logs <ctx> <ns> <name>\n"
"/logs <ctx> <ns> <name> -p"
)
# ---------------------------
# MAIN
# ---------------------------
def main():
if not BOT_TOKEN:
log.error("TG_BOT_TOKEN не задан")
raise SystemExit
if not SSH_HOST:
log.error("SSH_HOST не задан в .env")
raise SystemExit
log.info(f"Старт бота. SSH → {SSH_USER}@{SSH_HOST}:{SSH_PORT}, KUBECTL={KUBECTL}")
app = ApplicationBuilder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler("start", start_handler))
app.add_handler(CommandHandler("logs", logs_handler))
log.info("Бот запущен, ждёт команды")
app.run_polling()
if __name__ == "__main__":
main()