feat: mazání data/ souborů starších než 30 dní
This commit is contained in:
@@ -0,0 +1,89 @@
|
||||
#!/usr/bin/env python3
|
||||
import html
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime, timezone
|
||||
|
||||
def load_env(path=".env"):
|
||||
env = {}
|
||||
try:
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#") or "=" not in line:
|
||||
continue
|
||||
key, _, val = line.partition("=")
|
||||
env[key.strip()] = val.strip().strip('"').strip("'")
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
return env
|
||||
|
||||
def api_get(url, token):
|
||||
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"})
|
||||
try:
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"HTTP {e.code} při volání {url}: {e.read().decode()}", file=sys.stderr)
|
||||
raise
|
||||
except urllib.error.URLError as e:
|
||||
print(f"Chyba sítě při volání {url}: {e.reason}", file=sys.stderr)
|
||||
raise
|
||||
|
||||
def clean_content(content):
|
||||
text = re.sub(r'<a\b[^>]*class="[^"]*hashtag[^"]*"[^>]*>.*?</a>', "", content, flags=re.IGNORECASE)
|
||||
text = re.sub(r"<[^>]+>", " ", text)
|
||||
text = html.unescape(text)
|
||||
return re.sub(r"\s+", " ", text).strip()
|
||||
|
||||
def main():
|
||||
env = {**load_env(), **os.environ}
|
||||
|
||||
for var in ("NOVINKY_TOKEN", "INSTANCE_URL"):
|
||||
if not env.get(var):
|
||||
print(f"Chybí proměnná prostředí: {var}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
token = env["NOVINKY_TOKEN"]
|
||||
base_url = env["INSTANCE_URL"].rstrip("/")
|
||||
|
||||
try:
|
||||
statuses = api_get(f"{base_url}/api/v1/trends/statuses?limit=10", token)
|
||||
except Exception:
|
||||
sys.exit(1)
|
||||
|
||||
candidates = []
|
||||
for s in statuses:
|
||||
if "@" in s.get("account", {}).get("acct", ""):
|
||||
continue
|
||||
text = clean_content(s.get("content", ""))
|
||||
if len(text) < 10:
|
||||
continue
|
||||
reblogs = s.get("reblogs_count", 0)
|
||||
favourites = s.get("favourites_count", 0)
|
||||
candidates.append({
|
||||
"acct": s["account"]["acct"],
|
||||
"text": text,
|
||||
"reblogs": reblogs,
|
||||
"favourites": favourites,
|
||||
"score": reblogs + favourites,
|
||||
})
|
||||
|
||||
candidates.sort(key=lambda x: x["score"], reverse=True)
|
||||
top = candidates[:3]
|
||||
|
||||
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
os.makedirs("data", exist_ok=True)
|
||||
out_path = os.path.join("data", f"{today}.json")
|
||||
|
||||
with open(out_path, "w", encoding="utf-8") as f:
|
||||
json.dump({"date": today, "top": top}, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print(f"Uloženo: {out_path} ({len(top)} tootů)")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
+39
-21
@@ -108,7 +108,7 @@ def format_date_cs(dt):
|
||||
]
|
||||
return f"{dt.day}. {months[dt.month - 1]}"
|
||||
|
||||
def build_toot(measures_data, tags, trend_status, date_from, date_to, week_number):
|
||||
def build_toot(measures_data, tags, top_tooty, date_from, date_to, week_number):
|
||||
stats = {m["key"]: int(m["total"]) for m in measures_data}
|
||||
new_users = stats.get("new_users", 0)
|
||||
active_users = stats.get("active_users", 0)
|
||||
@@ -118,12 +118,12 @@ def build_toot(measures_data, tags, trend_status, date_from, date_to, week_numbe
|
||||
|
||||
tip = TIPS[week_number % len(TIPS)]
|
||||
|
||||
if trend_status:
|
||||
acct = trend_status.get("account", {}).get("acct", "?")
|
||||
content = truncate(trend_status.get("content", ""), 100)
|
||||
boosts = trend_status.get("reblogs_count", 0)
|
||||
favs = trend_status.get("favourites_count", 0)
|
||||
toot_tyden = f"🌟 Toot týdne od @{acct}:\n\"{content}\"\n🔁 {boosts} ⭐ {favs}\n\n"
|
||||
if top_tooty:
|
||||
lines = "\n".join(
|
||||
f"\"{truncate(s['text'], 80)}\" od @{s['acct']} 🔁 {s['reblogs']} ⭐ {s['favourites']}"
|
||||
for s in top_tooty
|
||||
)
|
||||
toot_tyden = f"🌟 Tooty týdne:\n{lines}\n\n"
|
||||
else:
|
||||
toot_tyden = ""
|
||||
|
||||
@@ -179,21 +179,25 @@ def main():
|
||||
except Exception:
|
||||
tags = []
|
||||
|
||||
try:
|
||||
statuses = api_get(f"{base_url}/api/v1/trends/statuses?limit=5", admin_token)
|
||||
trend_status = None
|
||||
for s in statuses:
|
||||
clean = re.sub(r'<a\b[^>]*class="[^"]*hashtag[^"]*"[^>]*>.*?</a>', "",s.get("content", ""), flags=re.IGNORECASE)
|
||||
clean = re.sub(r"<[^>]+>", " ", clean)
|
||||
clean = html.unescape(clean)
|
||||
clean = re.sub(r"\s+", " ", clean).strip()
|
||||
if len(clean) >= 20:
|
||||
trend_status = s
|
||||
break
|
||||
except Exception:
|
||||
trend_status = None
|
||||
seen = set()
|
||||
all_tooty = []
|
||||
for i in range(7):
|
||||
day = (date_to - timedelta(days=i)).strftime("%Y-%m-%d")
|
||||
path = os.path.join("data", f"{day}.json")
|
||||
try:
|
||||
with open(path, encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
for s in data.get("top", []):
|
||||
key = (s.get("acct", ""), s.get("text", ""))
|
||||
if key not in seen:
|
||||
seen.add(key)
|
||||
all_tooty.append(s)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
all_tooty.sort(key=lambda s: s.get("score", 0), reverse=True)
|
||||
top_tooty = all_tooty[:3]
|
||||
|
||||
toot = build_toot(measures_data, tags, trend_status, date_from, date_to, week_number)
|
||||
toot = build_toot(measures_data, tags, top_tooty, date_from, date_to, week_number)
|
||||
|
||||
if args.dry_run:
|
||||
print(toot)
|
||||
@@ -209,5 +213,19 @@ def main():
|
||||
except Exception:
|
||||
sys.exit(1)
|
||||
|
||||
cutoff = date_to - timedelta(days=30)
|
||||
data_dir = "data"
|
||||
if os.path.isdir(data_dir):
|
||||
for fname in os.listdir(data_dir):
|
||||
if not fname.endswith(".json"):
|
||||
continue
|
||||
try:
|
||||
file_date = datetime.strptime(fname[:-5], "%Y-%m-%d").replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
continue
|
||||
if file_date < cutoff:
|
||||
os.remove(os.path.join(data_dir, fname))
|
||||
print(f"Smazán starý soubor: {fname}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user