This commit is contained in:
XSwankyS 2025-11-17 21:00:47 +05:00
parent e5e39ac045
commit dc230e0b03
5 changed files with 593 additions and 1 deletions

View File

@ -1,2 +1,32 @@
# k8s_log_bot
###### Подготовка окружения
```bash
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install python-telegram-bot==21.4
```
В файле `run_bot.sh` установить нужную реализацию бота:
`bot_local.py` - выполняет команды kubectl локально
`bot_remote.py` - выполняяет команды на удалённом сервере, ходит по ssh
###### Заполнить алиасы:
`contexts.json` - алиасы контекстов k8s: `kubectl config get-contexts`
###### Файл .env заполнить следующим образом:
```bash
TG_BOT_TOKEN=
ALLOWED_CHATS= #вайтлист peer id чатов тг, заполнять через запятую без пробела
KUBECTL_BIN=/usr/local/bin/kubectl #указать корректный путь до бинарника kubectl на целевой машине
CONTEXTS_FILE=./contexts.json
#Опционально, если используется bot_remote.py
SSH_HOST=erot-adminbox
SSH_USER=root
SSH_PORT=22
```
###### Запуск:
`./run_bot.sh`

240
bot_local.py Executable file
View File

@ -0,0 +1,240 @@
#!/usr/bin/env python3
import os
import io
import json
import logging
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Dict, Tuple, List
from telegram import Update, InputFile
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
# ---------------------------
# ЛОГИ
# ---------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
log = logging.getLogger("logbot")
# ---------------------------
# ENV ЗАГРУЗКА
# ---------------------------
def load_env():
env_file = Path(".env")
if not env_file.exists():
log.error("Файл .env не найден")
return
with env_file.open() as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, val = line.split("=", 1)
os.environ[key.strip()] = val.strip()
log.info(f"ENV: {key.strip()} загружено")
load_env()
BOT_TOKEN = os.getenv("TG_BOT_TOKEN")
ALLOWED_CHATS = {
int(x.strip()) for x in os.getenv("ALLOWED_CHATS", "").split(",") if x.strip().isdigit()
}
KUBECTL = os.getenv("KUBECTL_BIN", "kubectl")
CONTEXTS_FILE = os.getenv("CONTEXTS_FILE", "./contexts.json")
# ---------------------------
# Утилиты
# ---------------------------
def run_cmd(cmd: List[str]) -> Tuple[int, str, str]:
log.info(f"RUN: {' '.join(cmd)}")
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return proc.returncode, proc.stdout, proc.stderr
def load_contexts() -> Dict[str, str]:
if not Path(CONTEXTS_FILE).exists():
raise RuntimeError(f"contexts.json не найден: {CONTEXTS_FILE}")
with open(CONTEXTS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
raise RuntimeError("contexts.json должен быть объектом {alias: context}")
return data
def detect_kind(context: str, namespace: str, name: str) -> str:
# Пробуем deployment
code, _, _ = run_cmd([KUBECTL, "--context", context, "-n", namespace, "get", "deploy", name])
if code == 0:
return "deployment"
# Пробуем sts
code, _, _ = run_cmd([KUBECTL, "--context", context, "-n", namespace, "get", "statefulset", name])
if code == 0:
return "statefulset"
raise RuntimeError(f"Не найден deployment/statefulset '{name}' в ns={namespace}")
def get_selector(context: str, namespace: str, kind: str, name: str) -> Dict[str, str]:
code, out, err = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", kind, name, "-o", "json"
])
if code != 0:
raise RuntimeError(err or out)
obj = json.loads(out)
selector = obj["spec"]["selector"]["matchLabels"]
return selector
def get_pod(context: str, namespace: str, selector: Dict[str, str]) -> str:
label = ",".join(f"{k}={v}" for k, v in selector.items())
code, out, err = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", "pod", "-l", label,
"-o", "jsonpath={.items[0].metadata.name}"
])
pod = out.strip()
if code != 0 or not pod:
raise RuntimeError(f"Pod не найден по селектору: {label}")
return pod
def get_logs(context: str, namespace: str, pod: str, previous: bool) -> str:
cmd = [
KUBECTL, "--context", context, "-n", namespace,
"logs", pod, "--all-containers"
]
if previous:
cmd.append("--previous")
code, out, err = run_cmd(cmd)
if code != 0:
return err or out
if err:
out += "\n[stderr]\n" + err
return out
# ---------------------------
# ПАРСИНГ КОМАНДЫ
# ---------------------------
def parse_args(raw: List[str]):
"""
/logs ctx ns name
/logs ctx ns name -p
"""
if len(raw) < 3:
raise ValueError("нужно: /logs <ctx_alias> <namespace> <name> [-p]")
ctx_alias = raw[0]
namespace = raw[1]
name = raw[2]
previous = False
if len(raw) == 4 and raw[3] == "-p":
previous = True
return ctx_alias, namespace, name, previous
# ---------------------------
# HANDLERS
# ---------------------------
async def logs_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
msg = update.effective_message.text
log.info(f"MSG from {chat_id}: {msg}")
if ALLOWED_CHATS and chat_id not in ALLOWED_CHATS:
log.warning(f"CHAT {chat_id} не в ALLOWED_CHATS")
return
try:
ctx_alias, ns, name, previous = parse_args(context.args)
except Exception as e:
await update.message.reply_text(f"Ошибка: {e}")
return
# Контексты
try:
contexts = load_contexts()
except Exception as e:
await update.message.reply_text(f"Ошибка contexts.json: {e}")
return
if ctx_alias not in contexts:
await update.message.reply_text(f"Нет такого контекста: {ctx_alias}")
return
ctx_full = contexts[ctx_alias]
try:
kind = detect_kind(ctx_full, ns, name)
selector = get_selector(ctx_full, ns, kind, name)
pod = get_pod(ctx_full, ns, selector)
logs = get_logs(ctx_full, ns, pod, previous)
except Exception as e:
await update.message.reply_text(f"Ошибка: {e}")
return
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
fname = f"logs_{ctx_alias}_{ns}_{name}_{'prev' if previous else 'curr'}_{ts}.log"
buf = io.BytesIO(logs.encode("utf-8", errors="ignore"))
buf.name = fname
await update.message.reply_document(
InputFile(buf, filename=fname),
caption=f"{kind}/{name}\npod={pod}\ncontext={ctx_full}\nprevious={previous}"
)
async def start_handler(update, context):
await update.message.reply_text(
"Использование:\n/logs <ctx> <ns> <name>\n/logs <ctx> <ns> <name> -p"
)
# ---------------------------
# MAIN
# ---------------------------
def main():
if not BOT_TOKEN:
log.error("TG_BOT_TOKEN не задан")
raise SystemExit
app = ApplicationBuilder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler("start", start_handler))
app.add_handler(CommandHandler("logs", logs_handler))
log.info("Бот запущен")
app.run_polling()
if __name__ == "__main__":
main()

298
bot_remote.py Executable file
View File

@ -0,0 +1,298 @@
#!/usr/bin/env python3
import os
import io
import json
import logging
import subprocess
import shlex
from datetime import datetime
from pathlib import Path
from typing import Dict, Tuple, List
from telegram import Update, InputFile
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
# ---------------------------
# ЛОГИ
# ---------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("logbot")
# ---------------------------
# ENV ЗАГРУЗКА
# ---------------------------
def load_env() -> None:
env_file = Path(".env")
if not env_file.exists():
log.error("Файл .env не найден")
return
with env_file.open() as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, val = line.split("=", 1)
os.environ[key.strip()] = val.strip()
log.info(f"ENV: {key.strip()} загружено")
load_env()
BOT_TOKEN = os.getenv("TG_BOT_TOKEN")
ALLOWED_CHATS = {
int(x.strip()) for x in os.getenv("ALLOWED_CHATS", "").split(",") if x.strip().isdigit()
}
# Путь к kubectl НА АДМИНБОКСЕ
KUBECTL = os.getenv("KUBECTL_BIN", "kubectl")
CONTEXTS_FILE = os.getenv("CONTEXTS_FILE", "./contexts.json")
# SSH-настройки: на какой сервер ходим за логами
SSH_HOST = os.getenv("SSH_HOST") # ОБЯЗАТЕЛЬНО
SSH_USER = os.getenv("SSH_USER", "root") # опционально
SSH_PORT = os.getenv("SSH_PORT", "22") # строка, чтобы проще пихать в команду
# ---------------------------
# Утилиты
# ---------------------------
def run_cmd(cmd: List[str]) -> Tuple[int, str, str]:
"""
Запуск команды на удалённом сервере через SSH.
cmd это массив вида ["kubectl", "--context", ...] и т.п.
"""
if not SSH_HOST:
raise RuntimeError("SSH_HOST не задан в .env")
# Собираем удалённую команду как одну строку с экранированием
remote_cmd = " ".join(shlex.quote(x) for x in cmd)
ssh_cmd: List[str] = ["ssh", "-o", "BatchMode=yes"]
if SSH_PORT:
ssh_cmd.extend(["-p", SSH_PORT])
destination = f"{SSH_USER}@{SSH_HOST}" if SSH_USER else SSH_HOST
ssh_cmd.append(destination)
ssh_cmd.append(remote_cmd)
log.info(f"RUN (remote): {' '.join(ssh_cmd)}")
proc = subprocess.run(
ssh_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
return proc.returncode, proc.stdout, proc.stderr
def load_contexts() -> Dict[str, str]:
"""
Читает локальный contexts.json на бастионе:
{ "prod": "k8s-erot-prod-context", "demo": "k8s-erot-demo-context", ... }
"""
if not Path(CONTEXTS_FILE).exists():
raise RuntimeError(f"contexts.json не найден: {CONTEXTS_FILE}")
with open(CONTEXTS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
raise RuntimeError("contexts.json должен быть объектом {alias: context}")
return data
def detect_kind(context: str, namespace: str, name: str) -> str:
"""
Определяем тип ресурса по имени:
- сначала пробуем deployment
- если не найден, пробуем statefulset
"""
# deployment
code, _, _ = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", "deploy", name,
])
if code == 0:
return "deployment"
# statefulset
code, _, _ = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", "statefulset", name,
])
if code == 0:
return "statefulset"
raise RuntimeError(f"Не найден deployment/statefulset '{name}' в ns={namespace}, context={context}")
def get_selector(context: str, namespace: str, kind: str, name: str) -> Dict[str, str]:
"""
Получаем spec.selector.matchLabels у deployment/statefulset.
"""
code, out, err = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", kind, name, "-o", "json",
])
if code != 0:
raise RuntimeError(err or out)
obj = json.loads(out)
selector = obj["spec"]["selector"]["matchLabels"]
return selector
def get_pod(context: str, namespace: str, selector: Dict[str, str]) -> str:
"""
По selector-лейблам находим первый pod.
"""
label = ",".join(f"{k}={v}" for k, v in selector.items())
code, out, err = run_cmd([
KUBECTL, "--context", context, "-n", namespace,
"get", "pod", "-l", label,
"-o", "jsonpath={.items[0].metadata.name}",
])
pod = out.strip()
if code != 0 or not pod:
raise RuntimeError(f"Pod не найден по селектору: {label}\n{err or ''}")
return pod
def get_logs(context: str, namespace: str, pod: str, previous: bool) -> str:
"""
Забираем логи pod-а через kubectl logs.
"""
cmd = [
KUBECTL, "--context", context, "-n", namespace,
"logs", pod, "--all-containers",
]
if previous:
cmd.append("--previous")
code, out, err = run_cmd(cmd)
if code != 0:
return err or out
if err:
out += "\n[stderr]\n" + err
return out
# ---------------------------
# ПАРСИНГ КОМАНДЫ
# ---------------------------
def parse_args(raw: List[str]):
"""
/logs ctx ns name
/logs ctx ns name -p
"""
if len(raw) < 3:
raise ValueError("нужно: /logs <ctx_alias> <namespace> <name> [-p]")
ctx_alias = raw[0]
namespace = raw[1]
name = raw[2]
previous = False
if len(raw) == 4 and raw[3] == "-p":
previous = True
return ctx_alias, namespace, name, previous
# ---------------------------
# HANDLERS
# ---------------------------
async def logs_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
msg = update.effective_message.text
log.info(f"MSG from {chat_id}: {msg}")
if ALLOWED_CHATS and chat_id not in ALLOWED_CHATS:
log.warning(f"CHAT {chat_id} не в ALLOWED_CHATS")
return
try:
ctx_alias, ns, name, previous = parse_args(context.args)
except Exception as e:
await update.message.reply_text(f"Ошибка: {e}")
return
# Контексты
try:
contexts = load_contexts()
except Exception as e:
await update.message.reply_text(f"Ошибка contexts.json: {e}")
return
if ctx_alias not in contexts:
await update.message.reply_text(f"Нет такого контекста: {ctx_alias}")
return
ctx_full = contexts[ctx_alias]
try:
kind = detect_kind(ctx_full, ns, name)
selector = get_selector(ctx_full, ns, kind, name)
pod = get_pod(ctx_full, ns, selector)
logs = get_logs(ctx_full, ns, pod, previous)
except Exception as e:
await update.message.reply_text(f"Ошибка: {e}")
return
ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
fname = f"logs_{ctx_alias}_{ns}_{name}_{'prev' if previous else 'curr'}_{ts}.log"
buf = io.BytesIO(logs.encode("utf-8", errors="ignore"))
buf.name = fname
await update.message.reply_document(
InputFile(buf, filename=fname),
caption=f"{kind}/{name}\npod={pod}\ncontext={ctx_full}\nprevious={previous}",
)
async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"Использование:\n"
"/logs <ctx> <ns> <name>\n"
"/logs <ctx> <ns> <name> -p"
)
# ---------------------------
# MAIN
# ---------------------------
def main():
if not BOT_TOKEN:
log.error("TG_BOT_TOKEN не задан")
raise SystemExit
if not SSH_HOST:
log.error("SSH_HOST не задан в .env")
raise SystemExit
log.info(f"Старт бота. SSH → {SSH_USER}@{SSH_HOST}:{SSH_PORT}, KUBECTL={KUBECTL}")
app = ApplicationBuilder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler("start", start_handler))
app.add_handler(CommandHandler("logs", logs_handler))
log.info("Бот запущен, ждёт команды")
app.run_polling()
if __name__ == "__main__":
main()

6
contexts.json Normal file
View File

@ -0,0 +1,6 @@
{
"prod": "k8s-prod-context",
"demo": "k8s-demo-context",
"test": "k8s-test-context"
}

18
run_bot.sh Executable file
View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
set -euo pipefail
# Переходим в директорию, где лежит бот
cd "$(dirname "$0")"
# Подхватываем переменные из файла .env
if [ -f .env ]; then
set -a
. ./.env
set +a
fi
# Активируем venv
. venv/bin/activate
# Запускаем бота
exec python bot_remote.py