How to Monitor AI Pipelines with Telegram Bot Alerts
Why Telegram for Pipeline Monitoring?
When you run AI pipelines on a server, you need to know when things finish and when they break. Email notifications are too slow. Slack is overkill for personal projects. Telegram bots hit the sweet spot: they are free, instant, support rich formatting, and the API is dead simple.
I send myself Telegram alerts for every pipeline run, including start time, completion status, error details, and key metrics like processing time and cost. Setting this up took about 30 minutes and has saved me countless hours of checking logs manually.
Setting Up the Bot
Step 1: Create the Bot
Open Telegram, search for @BotFather, and send /newbot. Follow the prompts to name your bot. You will receive an API token that looks like 123456789:ABCdefGHI-jklMNOpqrSTUvwxYZ.
Step 2: Get Your Chat ID
Send any message to your new bot, then visit this URL in your browser:
https://api.telegram.org/bot<YOUR_TOKEN>/getUpdatesFind the chat.id field in the response. This is the ID you will send messages to.
Step 3: Send a Test Message
Test the connection with a simple curl command:
curl -s -X POST https://api.telegram.org/bot<TOKEN>/sendMessage \
-d chat_id=<CHAT_ID> \
-d text="Pipeline monitor is online"The Python Notification Module
I wrapped the Telegram API in a simple Python class that I import across all my pipeline scripts:
import os
import httpx
from datetime import datetime
class TelegramNotifier:
def __init__(self):
self.token = os.environ["TELEGRAM_BOT_TOKEN"]
self.chat_id = os.environ["TELEGRAM_CHAT_ID"]
self.base_url = f"https://api.telegram.org/bot{self.token}"
async def send(self, message: str, parse_mode: str = "HTML"):
async with httpx.AsyncClient() as client:
await client.post(
f"{self.base_url}/sendMessage",
json={
"chat_id": self.chat_id,
"text": message,
"parse_mode": parse_mode
}
)
async def pipeline_start(self, name: str):
await self.send(
f"<b>Pipeline Started</b>\n"
f"Name: {name}\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}"
)
async def pipeline_complete(self, name: str, duration: float, details: str = ""):
await self.send(
f"<b>Pipeline Complete</b>\n"
f"Name: {name}\n"
f"Duration: {duration:.1f}s\n"
f"{details}"
)
async def pipeline_error(self, name: str, error: str):
await self.send(
f"<b>Pipeline FAILED</b>\n"
f"Name: {name}\n"
f"Error: {error}\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}"
)The class reads credentials from environment variables, which keeps secrets out of source code. I use HTML parse mode for bold text and formatting.
Using the Notifier in Pipelines
Here is how I integrate it into a typical pipeline run:
import time
import traceback
async def run_pipeline():
notifier = TelegramNotifier()
start_time = time.time()
try:
await notifier.pipeline_start("video-production")
# Pipeline steps here
result = await produce_video()
duration = time.time() - start_time
await notifier.pipeline_complete(
"video-production",
duration,
f"Videos: {result['count']}\nCost: ${result['cost']:.2f}"
)
except Exception as e:
await notifier.pipeline_error(
"video-production",
f"{type(e).__name__}: {str(e)}"
)Every pipeline run now sends me three types of messages: a start notification so I know it kicked off, a completion notification with metrics, or an error notification with the exception details.
Advanced Features
Sending Images
For my video pipeline, I send the generated thumbnail as a preview:
async def send_thumbnail(self, image_path: str, caption: str):
async with httpx.AsyncClient() as client:
with open(image_path, "rb") as f:
await client.post(
f"{self.base_url}/sendPhoto",
data={
"chat_id": self.chat_id,
"caption": caption,
"parse_mode": "HTML"
},
files={"photo": f}
)Rate Limiting
Telegram's bot API has rate limits: about 30 messages per second to the same chat. For pipelines that might generate many notifications quickly, I add a simple debounce:
import asyncio
class ThrottledNotifier(TelegramNotifier):
def __init__(self):
super().__init__()
self._last_send = 0
async def send(self, message: str, parse_mode: str = "HTML"):
elapsed = time.time() - self._last_send
if elapsed < 1.0:
await asyncio.sleep(1.0 - elapsed)
await super().send(message, parse_mode)
self._last_send = time.time()What I Monitor
My current alert setup covers:
- Daily blog publishing: Notifies when new posts are generated and deployed
- Video production runs: Start, completion, errors, with thumbnail preview
- API cost tracking: Daily spend across OpenRouter, Runware, and cloud services
- Server health: Disk space warnings and service restart notifications
The total cost is zero. Telegram bots are free, and the API calls are negligible. For the level of visibility it provides, this is the best monitoring ROI I have found for personal AI projects.