Files
zpravobot-digest/digest-bot.py

116 lines
3.0 KiB
Python

#!/usr/bin/env python3
import csv
import requests
import os
import sys
import json
# Config
CSV_PATH = '/app/data/posts-latest.csv'
CLAUDE_API = 'https://api.anthropic.com/v1/messages'
CLAUDE_KEY = os.getenv('ANTHROPIC_API_KEY')
MASTODON_URL = 'https://zpravobot.news'
# Bot selection
bot_name = sys.argv[1] if len(sys.argv) > 1 else 'zpravobot'
TOKENS = {
'zpravobot': os.getenv('TOKEN_ZPRAVOBOT'),
'pozitivni': os.getenv('TOKEN_POZITIVNI'),
'sarkasticky': os.getenv('TOKEN_SARKASTICKY')
}
if bot_name not in TOKENS:
print(f"❌ Unknown bot: {bot_name}")
sys.exit(1)
TOKEN = TOKENS[bot_name]
if not TOKEN or not CLAUDE_KEY:
print(f"❌ Missing env variables")
sys.exit(1)
# 1. Load posts
try:
with open(CSV_PATH, 'r', encoding='utf-8') as f:
posts = list(csv.DictReader(f))
print(f"📊 Loaded {len(posts)} posts for @{bot_name}")
except Exception as e:
print(f"❌ Error loading CSV: {e}")
sys.exit(1)
# 2. Prepare data for Claude
posts_sample = posts[:500] # Limit to 500 posts
posts_json = json.dumps(posts_sample, ensure_ascii=False)
# 3. Claude API
print("🤖 Calling Claude API...")
try:
response = requests.post(
CLAUDE_API,
headers={
'x-api-key': CLAUDE_KEY,
'anthropic-version': '2023-06-01',
'content-type': 'application/json'
},
json={
'model': 'claude-sonnet-4-20250514',
'max_tokens': 4000,
'messages': [{
'role': 'user',
'content': f'Vytvoř denní digest pro bot @{bot_name}. Data: {posts_json[:10000]}'
}]
},
timeout=60
)
if response.status_code != 200:
print(f"❌ Claude API error: {response.text}")
sys.exit(1)
digest = response.json()['content'][0]['text']
print(f"✅ Claude response: {len(digest)} chars")
except Exception as e:
print(f"❌ Claude API exception: {e}")
sys.exit(1)
# 4. Split to 2 toots (500 chars limit)
toot1 = digest[:500]
toot2 = digest[500:1000] if len(digest) > 500 else None
# 5. Publish toot 1
print("📤 Publishing toot 1...")
try:
r1 = requests.post(
f'{MASTODON_URL}/api/v1/statuses',
headers={'Authorization': f'Bearer {TOKEN}'},
json={'status': toot1}
)
if r1.status_code not in [200, 201]:
print(f"❌ Mastodon error: {r1.text}")
sys.exit(1)
toot1_id = r1.json()['id']
print(f"✅ Toot 1 published: {toot1_id}")
except Exception as e:
print(f"❌ Mastodon exception: {e}")
sys.exit(1)
# 6. Publish toot 2 (if exists)
if toot2:
print("📤 Publishing toot 2...")
try:
r2 = requests.post(
f'{MASTODON_URL}/api/v1/statuses',
headers={'Authorization': f'Bearer {TOKEN}'},
json={
'status': toot2,
'in_reply_to_id': toot1_id
}
)
print(f"✅ Toot 2 published (thread)")
except Exception as e:
print(f"⚠️ Toot 2 failed: {e}")
print(f"✅ Done! Published to @{bot_name}")