Files

635 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import html
import json
import os
import re
import sys
import argparse
import urllib.request
import urllib.error
from collections import Counter
from datetime import datetime, timezone, timedelta
TIPS = [
"Hashtagy fungují jako klíčová slova používej je a ostatní tě snáz najdou.",
"Pomocí seznamů si můžeš organizovat sledované účty do tematických skupin.",
"Příspěvky s viditelností \"Pouze sledující\" vidí jen tvoji sledující, ne celý fediverse.",
"Filtrovat nežádoucí obsah lze přes Nastavení → Filtry.",
"Zmínit někoho funguje i napříč instancemi stačí napsat @uzivatel@instance.tld.",
"Záložky ti umožní uložit toot na později nikdo o tom neví.",
"Na Mamutovo.cz máš limit 2000 znaků víc než dost na dlouhý příspěvek.",
"Zvýrazněné hashtagy v profilu pomáhají ostatním tě najít podle zájmů.",
"Boost = sdílení. Pomáhá dobrému obsahu se šířit po fediversu.",
"Obsah za varováním (CW) vidí jen ti, kdo kliknou hodí se na citlivá témata.",
"Sleduj hashtag místo účtu přes fedi.mamutovo.cz najdeš české účty.",
"Fediverse není jen Mastodon komunikuješ i s uživateli Pixelfedu, PeerTube a dalších.",
"V profilu můžeš zvýraznit oblíbené účty ostatní je uvidí přímo u tebe.",
]
def load_env(path=".env"):
env = {}
try:
with open(path) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
env[key.strip()] = val.strip().strip('"').strip("'")
except FileNotFoundError:
pass
return env
def api_get(url, token=None):
headers = {"Authorization": f"Bearer {token}"} if token else {}
req = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"HTTP {e.code} při volání {url}: {e.read().decode()}", file=sys.stderr)
raise
except urllib.error.URLError as e:
print(f"Chyba sítě při volání {url}: {e.reason}", file=sys.stderr)
raise
def api_post(url, token, data):
body = json.dumps(data).encode()
req = urllib.request.Request(
url,
data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"HTTP {e.code} při odesílání tootu: {e.read().decode()}", file=sys.stderr)
raise
except urllib.error.URLError as e:
print(f"Chyba sítě při odesílání tootu: {e.reason}", file=sys.stderr)
raise
def get_measures(base_url, admin_token, date_from, date_to, keys=None):
if keys is None:
keys = ["new_users", "active_users", "interactions"]
url = f"{base_url}/api/v1/admin/measures"
payload = {
"keys": keys,
"start_at": date_from.isoformat(),
"end_at": date_to.isoformat(),
}
body = json.dumps(payload).encode()
req = urllib.request.Request(
url,
data=body,
headers={
"Authorization": f"Bearer {admin_token}",
"Content-Type": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"HTTP {e.code} při volání measures API: {e.read().decode()}", file=sys.stderr)
raise
except urllib.error.URLError as e:
print(f"Chyba sítě při volání measures API: {e.reason}", file=sys.stderr)
raise
def truncate(text, max_chars=100):
text = re.sub(r"<[^>]+>", " ", text)
text = html.unescape(text)
text = re.sub(r"\s+", " ", text).strip()
if len(text) <= max_chars:
return text
cut = text[:max_chars].rsplit(" ", 1)[0].rstrip(".,!?;:")
return cut + "…"
def format_date_cs(dt):
months = [
"ledna", "února", "března", "dubna", "května", "června",
"července", "srpna", "září", "října", "listopadu", "prosince",
]
return f"{dt.day}. {months[dt.month - 1]}"
def format_month_cs(dt):
months = [
"leden", "únor", "březen", "duben", "květen", "červen",
"červenec", "srpen", "září", "říjen", "listopad", "prosinec",
]
return f"{months[dt.month - 1]} {dt.year}"
def build_monthly_toot(measures_data, tags, top_tooty, date_to, prev_stats, instance_info,
total_count=0, top_author=None, newest_account=None, top_links=None,
media_count=None, hourly_count=None, most_discussed=None):
stats = {m["key"]: int(m["total"]) for m in measures_data}
new_users = stats.get("new_users", 0)
active_users = stats.get("active_users", 0)
interactions = stats.get("interactions", 0)
def fmt_diff(current, key, long=False):
if not prev_stats or key not in prev_stats:
return ""
d = current - prev_stats[key]
sign = "+" if d >= 0 else ""
suffix = " oproti minulému měsíci" if long else ""
return f" ({sign}{d}{suffix})"
hashtags = " ".join(f"#{t['name']}" for t in tags[:5]) if tags else "(žádné)"
inst_stats = instance_info.get("stats", {}) if instance_info else {}
user_count_val = inst_stats.get("user_count", 0)
user_count = user_count_val if user_count_val else "?"
domain_count = inst_stats.get("domain_count", "?")
author_line = f"✍️ Top přispěvatel: @{top_author['acct']} ({top_author['count']} tootů)\n" if top_author else ""
newest_line = f"👋 Nový účet: @{newest_account['acct']}\n" if newest_account else ""
milestone_line = (
f"🎉 Milník: Mamutovo dosáhlo {user_count_val} uživatelů!\n"
if user_count_val and user_count_val % 10 == 0 else ""
)
extra = author_line + newest_line + milestone_line
if top_tooty:
blocks = "\n\n".join(
f"🐘 @{s['acct']}\n\"{truncate(s['text'], 80).replace(chr(10), ' ')}\"\n🔁 {s['reblogs']}{s['favourites']}\n🔗 {s.get('url', '')}"
for s in top_tooty
)
tooty_sekce = f"\n🌟 Tooty měsíce:\n\n{blocks}"
else:
tooty_sekce = ""
if top_links:
link_lines = "\n".join(
f"🔗 {link.get('title', link.get('url', ''))}\n{link.get('provider_name', '')} · {link.get('url', '')}"
for link in top_links[:3]
)
links_sekce = f"\n🌐 Top odkazy měsíce:\n\n{link_lines}"
else:
links_sekce = ""
if media_count and media_count.get("total"):
videos = media_count.get("video", 0) + media_count.get("gifv", 0)
media_line = f"📸 Sdílená média: {media_count['total']} ({media_count.get('image', 0)} fotek, {videos} videí)\n"
else:
media_line = ""
if hourly_count and any(hourly_count.values()):
peak_h = max(hourly_count, key=lambda h: hourly_count[h])
peak_line = f"⏰ Nejaktivnější čas: {int(peak_h):02d}:00{(int(peak_h) + 1) % 24:02d}:00 ({hourly_count[peak_h]} tootů)\n"
else:
peak_line = ""
if most_discussed:
discussed_sekce = (
f"\n💬 Nejdiskutovanější toot:\n"
f"@{most_discussed['acct']} ({most_discussed['replies']} odpovědí)\n"
f"\"{truncate(most_discussed['text'], 80).replace(chr(10), ' ')}\"\n"
f"🔗 {most_discussed.get('url', '')}"
)
else:
discussed_sekce = ""
return (
f"🐘 Měsíční přehled Mamutovo.cz\n"
f"📅 {format_month_cs(date_to)}\n"
f"\n"
f"👥 Noví uživatelé: {new_users}{fmt_diff(new_users, 'new_users', long=True)}\n"
f"✅ Aktivní uživatelé: {active_users}{fmt_diff(active_users, 'active_users')}\n"
f"📝 Interakce: {interactions}{fmt_diff(interactions, 'interactions')}\n"
f"📝 Tooty měsíce: {total_count}\n"
f"{media_line}"
f"{peak_line}"
f"\n"
f"📊 Celkem uživatelů: {user_count}\n"
f"🌐 Federovaných instancí: {domain_count}\n"
f"\n"
f"🔥 Top hashtagy měsíce:\n"
f"{hashtags}\n"
f"\n"
f"{extra}"
f"{tooty_sekce}"
f"{links_sekce}"
f"{discussed_sekce}"
)
def build_toot(measures_data, tags, top_tooty, date_from, date_to, week_number,
total_count=0, top_author=None, newest_account=None, user_count=0, top_links=None,
prev_stats=None, media_count=None, hourly_count=None, most_discussed=None):
stats = {m["key"]: int(m["total"]) for m in measures_data}
new_users = stats.get("new_users", 0)
active_users = stats.get("active_users", 0)
interactions = stats.get("interactions", 0)
def fmt_diff(current, key):
if not prev_stats or key not in prev_stats:
return ""
d = current - prev_stats[key]
sign = "+" if d >= 0 else ""
return f" ({sign}{d})"
hashtags = " ".join(f"#{t['name']}" for t in tags[:3]) if tags else "(žádné)"
tip = TIPS[week_number % len(TIPS)]
if top_tooty:
blocks = "\n\n".join(
f"🐘 @{s['acct']}\n\"{truncate(s['text'], 80).replace(chr(10), ' ')}\"\n🔁 {s['reblogs']}{s['favourites']}\n🔗 {s.get('url', '')}"
for s in top_tooty
)
toot_tyden = f"🌟 Tooty týdne:\n\n{blocks}\n\n"
else:
toot_tyden = ""
author_line = f"✍️ Top přispěvatel: @{top_author['acct']} ({top_author['count']} tootů)\n" if top_author else ""
newest_line = f"👋 Nový účet: @{newest_account['acct']}\n" if newest_account else ""
milestone_line = f"🎉 Milník: Mamutovo dosáhlo {user_count} uživatelů!\n" if user_count and user_count % 10 == 0 else ""
extra = author_line + newest_line + milestone_line
if top_links:
link_lines = "\n".join(
f"🔗 {link.get('title', link.get('url', ''))}\n{link.get('provider_name', '')} · {link.get('url', '')}"
for link in top_links[:3]
)
links_sekce = f"🌐 Top odkazy týdne:\n\n{link_lines}\n\n"
else:
links_sekce = ""
if media_count and media_count.get("total"):
videos = media_count.get("video", 0) + media_count.get("gifv", 0)
media_line = f"📸 Sdílená média: {media_count['total']} ({media_count.get('image', 0)} fotek, {videos} videí)\n"
else:
media_line = ""
if hourly_count and any(hourly_count.values()):
peak_h = max(hourly_count, key=lambda h: hourly_count[h])
peak_line = f"⏰ Nejaktivnější čas: {int(peak_h):02d}:00{(int(peak_h) + 1) % 24:02d}:00 ({hourly_count[peak_h]} tootů)\n"
else:
peak_line = ""
if most_discussed:
discussed_sekce = (
f"💬 Nejdiskutovanější toot:\n"
f"@{most_discussed['acct']} ({most_discussed['replies']} odpovědí)\n"
f"\"{truncate(most_discussed['text'], 80).replace(chr(10), ' ')}\"\n"
f"🔗 {most_discussed.get('url', '')}\n\n"
)
else:
discussed_sekce = ""
date_from_str = format_date_cs(date_from)
date_to_str = format_date_cs(date_to)
year = date_to.year
return (
f"🐘 Týdenní přehled Mamutovo.cz\n"
f"📅 {date_from_str} {date_to_str} {year}\n"
f"\n"
f"👥 Noví uživatelé: {new_users}{fmt_diff(new_users, 'new_users')}\n"
f"✅ Aktivní uživatelé: {active_users}{fmt_diff(active_users, 'active_users')}\n"
f"📝 Interakce: {interactions}{fmt_diff(interactions, 'interactions')}\n"
f"📝 Tooty týdne: {total_count}\n"
f"{media_line}"
f"{peak_line}"
f"\n"
f"🔥 Populární hashtagy:\n"
f"{hashtags}\n"
f"\n"
f"{toot_tyden}"
f"{links_sekce}"
f"{discussed_sekce}"
f"{extra}"
f"💡 Tip týdne: {tip}"
)
def load_tags_from_data(date_to, days, top_n):
counts = Counter()
found_any = False
for i in range(days):
day = (date_to - timedelta(days=i)).strftime("%Y-%m-%d")
path = os.path.join("data", f"{day}.json")
try:
with open(path, encoding="utf-8") as f:
file_data = json.load(f)
tags = file_data.get("tags")
if tags:
found_any = True
for tag in tags:
counts[tag] += 1
except FileNotFoundError:
pass
if not found_any:
return None
return [{"name": tag} for tag, _ in counts.most_common(top_n)]
def load_tooty_from_data(date_to, days):
seen = set()
all_tooty = []
for i in range(days):
day = (date_to - timedelta(days=i)).strftime("%Y-%m-%d")
path = os.path.join("data", f"{day}.json")
try:
with open(path, encoding="utf-8") as f:
file_data = json.load(f)
for s in file_data.get("top", []):
key = (s.get("acct", ""), s.get("text", ""))
if key not in seen:
seen.add(key)
all_tooty.append(s)
except FileNotFoundError:
pass
all_tooty.sort(key=lambda s: s.get("score", 0), reverse=True)
return all_tooty[:3]
def load_total_count_from_data(date_to, days):
total = 0
for i in range(days):
day = (date_to - timedelta(days=i)).strftime("%Y-%m-%d")
path = os.path.join("data", f"{day}.json")
try:
with open(path, encoding="utf-8") as f:
file_data = json.load(f)
total += file_data.get("total_count", 0)
except FileNotFoundError:
pass
return total
def load_top_author_from_data(date_to, days):
counts = Counter()
for i in range(days):
day = (date_to - timedelta(days=i)).strftime("%Y-%m-%d")
path = os.path.join("data", f"{day}.json")
try:
with open(path, encoding="utf-8") as f:
file_data = json.load(f)
for acct, n in file_data.get("authors_count", {}).items():
counts[acct] += n
except FileNotFoundError:
pass
if not counts:
return None
acct, n = counts.most_common(1)[0]
return {"acct": acct, "count": n}
def load_top_links_from_data(date_to, days):
seen = set()
result = []
for i in range(days):
day = (date_to - timedelta(days=i)).strftime("%Y-%m-%d")
path = os.path.join("data", f"{day}.json")
try:
with open(path, encoding="utf-8") as f:
file_data = json.load(f)
for link in file_data.get("top_links", []):
url = link.get("url", "")
if url and url not in seen:
seen.add(url)
result.append(link)
if len(result) >= 3:
return result
except FileNotFoundError:
pass
return result
def load_media_count_from_data(date_to, days):
totals = {"image": 0, "video": 0, "gifv": 0, "audio": 0, "total": 0}
for i in range(days):
day = (date_to - timedelta(days=i)).strftime("%Y-%m-%d")
path = os.path.join("data", f"{day}.json")
try:
with open(path, encoding="utf-8") as f:
file_data = json.load(f)
mc = file_data.get("media_count", {})
for key in totals:
totals[key] += mc.get(key, 0)
except FileNotFoundError:
pass
return totals
def load_hourly_count_from_data(date_to, days):
totals = {str(h): 0 for h in range(24)}
for i in range(days):
day = (date_to - timedelta(days=i)).strftime("%Y-%m-%d")
path = os.path.join("data", f"{day}.json")
try:
with open(path, encoding="utf-8") as f:
file_data = json.load(f)
hc = file_data.get("hourly_count", {})
for h in totals:
totals[h] += hc.get(h, 0)
except FileNotFoundError:
pass
return totals
def load_most_discussed_from_data(date_to, days):
best = None
for i in range(days):
day = (date_to - timedelta(days=i)).strftime("%Y-%m-%d")
path = os.path.join("data", f"{day}.json")
try:
with open(path, encoding="utf-8") as f:
file_data = json.load(f)
md = file_data.get("most_discussed")
if md and (best is None or md.get("replies", 0) > best.get("replies", 0)):
best = md
except FileNotFoundError:
pass
return best
def load_newest_account_from_data(date_to, days):
for i in range(days):
day = (date_to - timedelta(days=i)).strftime("%Y-%m-%d")
path = os.path.join("data", f"{day}.json")
try:
with open(path, encoding="utf-8") as f:
file_data = json.load(f)
acc = file_data.get("newest_account")
if acc:
return acc
except FileNotFoundError:
pass
return None
def main():
parser = argparse.ArgumentParser(description="Statistiky Mamutovo.cz")
parser.add_argument("--dry-run", action="store_true", help="Pouze vypíše toot, neodešle")
parser.add_argument("--monthly", action="store_true", help="Měsíční přehled místo týdenního")
args = parser.parse_args()
env = {**load_env(), **os.environ}
for var in ("NOVINKY_TOKEN", "INSTANCE_URL", "STATS_TOKEN"):
if not env.get(var):
print(f"Chybí proměnná prostředí: {var}", file=sys.stderr)
sys.exit(1)
novinky_token = env["NOVINKY_TOKEN"]
admin_token = env["STATS_TOKEN"]
base_url = env["INSTANCE_URL"].rstrip("/")
now = datetime.now(timezone.utc)
date_to = now.replace(hour=0, minute=0, second=0, microsecond=0)
if args.monthly:
date_from = date_to - timedelta(days=30)
try:
measures_data = get_measures(
base_url, admin_token, date_from, date_to,
keys=["new_users", "active_users", "interactions"],
)
except Exception:
sys.exit(1)
tags = load_tags_from_data(date_to, 30, 5)
if tags is None:
try:
tags = api_get(f"{base_url}/api/v1/trends/tags?limit=5", admin_token)
except Exception:
tags = []
try:
instance_info = api_get(f"{base_url}/api/v1/instance")
except Exception:
instance_info = {}
monthly_stats_path = os.path.join("data", "monthly_stats.json")
prev_stats = None
try:
with open(monthly_stats_path, encoding="utf-8") as f:
prev_stats = json.load(f)
except FileNotFoundError:
pass
top_tooty = load_tooty_from_data(date_to, 30)
total_count = load_total_count_from_data(date_to, 30)
top_author = load_top_author_from_data(date_to, 30)
newest_account = load_newest_account_from_data(date_to, 30)
top_links = load_top_links_from_data(date_to, 30)
media_count = load_media_count_from_data(date_to, 30)
hourly_count = load_hourly_count_from_data(date_to, 30)
most_discussed = load_most_discussed_from_data(date_to, 30)
toot = build_monthly_toot(
measures_data, tags, top_tooty, date_to, prev_stats, instance_info,
total_count, top_author, newest_account, top_links, media_count, hourly_count,
most_discussed,
)
if args.dry_run:
print(toot)
return
try:
result = api_post(
f"{base_url}/api/v1/statuses",
novinky_token,
{"status": toot, "visibility": "public"},
)
print(f"Toot odeslán: {result.get('url', '(bez URL)')}")
except Exception:
sys.exit(1)
cur_stats = {m["key"]: int(m["total"]) for m in measures_data}
os.makedirs("data", exist_ok=True)
with open(monthly_stats_path, "w", encoding="utf-8") as f:
json.dump({
"date": date_to.strftime("%Y-%m-%d"),
"new_users": cur_stats.get("new_users", 0),
"active_users": cur_stats.get("active_users", 0),
"interactions": cur_stats.get("interactions", 0),
}, f, ensure_ascii=False, indent=2)
print("Měsíční statistiky uloženy.")
return
# Týdenní přehled
date_from = date_to - timedelta(days=7)
week_number = now.isocalendar()[1]
try:
measures_data = get_measures(base_url, admin_token, date_from, date_to)
except Exception:
sys.exit(1)
tags = load_tags_from_data(date_to, 7, 3)
if tags is None:
try:
tags = api_get(f"{base_url}/api/v1/trends/tags?limit=3", admin_token)
except Exception:
tags = []
try:
instance_info = api_get(f"{base_url}/api/v1/instance")
except Exception:
instance_info = {}
user_count = instance_info.get("stats", {}).get("user_count", 0)
weekly_stats_path = os.path.join("data", "weekly_stats.json")
prev_stats = None
try:
with open(weekly_stats_path, encoding="utf-8") as f:
prev_stats = json.load(f)
except FileNotFoundError:
pass
top_tooty = load_tooty_from_data(date_to, 7)
total_count = load_total_count_from_data(date_to, 7)
top_author = load_top_author_from_data(date_to, 7)
newest_account = load_newest_account_from_data(date_to, 7)
top_links = load_top_links_from_data(date_to, 7)
media_count = load_media_count_from_data(date_to, 7)
hourly_count = load_hourly_count_from_data(date_to, 7)
most_discussed = load_most_discussed_from_data(date_to, 7)
toot = build_toot(
measures_data, tags, top_tooty, date_from, date_to, week_number,
total_count, top_author, newest_account, user_count, top_links, prev_stats, media_count,
hourly_count, most_discussed,
)
if args.dry_run:
print(toot)
return
try:
result = api_post(
f"{base_url}/api/v1/statuses",
novinky_token,
{"status": toot, "visibility": "public"},
)
print(f"Toot odeslán: {result.get('url', '(bez URL)')}")
except Exception:
sys.exit(1)
cur_stats = {m["key"]: int(m["total"]) for m in measures_data}
os.makedirs("data", exist_ok=True)
with open(weekly_stats_path, "w", encoding="utf-8") as f:
json.dump({
"date": date_to.strftime("%Y-%m-%d"),
"new_users": cur_stats.get("new_users", 0),
"active_users": cur_stats.get("active_users", 0),
"interactions": cur_stats.get("interactions", 0),
}, f, ensure_ascii=False, indent=2)
print("Týdenní statistiky uloženy.")
cutoff = date_to - timedelta(days=60)
data_dir = "data"
if os.path.isdir(data_dir):
for fname in os.listdir(data_dir):
if not fname.endswith(".json"):
continue
try:
file_date = datetime.strptime(fname[:-5], "%Y-%m-%d").replace(tzinfo=timezone.utc)
except ValueError:
continue
if file_date < cutoff:
os.remove(os.path.join(data_dir, fname))
print(f"Smazán starý soubor: {fname}")
if __name__ == "__main__":
main()