125 lines
5.1 KiB
Python
125 lines
5.1 KiB
Python
|
|
import argparse
|
|
from mastodon import Mastodon
|
|
import os
|
|
import sqlite3
|
|
import logging
|
|
import time
|
|
import tomllib
|
|
|
|
ACCOUNT_FETCH_LIMIT = 100000
|
|
|
|
def check_db_exists(cursor: sqlite3.Cursor) -> bool:
|
|
res = cursor.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='welcome_log'")
|
|
if res.fetchone()[0] == 0:
|
|
cursor.execute("CREATE TABLE welcome_log (id INTEGER PRIMARY KEY, username TEXT, userdb_id INTEGER, welcomed INTEGER DEFAULT 0)")
|
|
return False
|
|
return True
|
|
|
|
def user_exists(cursor: sqlite3.Cursor, userid: int) -> bool:
|
|
res = cursor.execute("SELECT COUNT(*) FROM welcome_log WHERE userdb_id = ?", (userid,))
|
|
return res.fetchone()[0] > 0
|
|
|
|
def user_welcomed(cursor: sqlite3.Cursor, userid: int) -> bool:
|
|
res = cursor.execute("SELECT COUNT(*) FROM welcome_log WHERE userdb_id = ? AND welcomed = 1", (userid,))
|
|
return res.fetchone()[0] > 0
|
|
|
|
def set_user_welcomed(cursor: sqlite3.Cursor, userid: int):
|
|
cursor.execute("UPDATE welcome_log SET welcomed = 1 WHERE userdb_id = ?", (userid, ))
|
|
|
|
def create_user(cursor: sqlite3.Cursor, userid: int, username: str):
|
|
cursor.execute("INSERT INTO welcome_log (userdb_id, username) VALUES(?, ?)", (userid, username))
|
|
|
|
def check_and_welcome_new_users():
|
|
logging.debug("Checking if database exists")
|
|
fresh_database = not check_db_exists(cursor)
|
|
if fresh_database:
|
|
logging.info("Database was freshly created - we'll set all pre-existing users as of now to 'welcomed' to avoid spamming everyone on the server.")
|
|
|
|
logging.info("Starting the welcome process")
|
|
all_accounts = mastodon.admin_accounts(remote=False, status='active', limit=ACCOUNT_FETCH_LIMIT)
|
|
for account in all_accounts:
|
|
logging.debug(f"Checking if user {account['id']} exists")
|
|
if not user_exists(cursor, account['id']):
|
|
logging.debug(f"Creating user {account['username']} with ID {account['id']}")
|
|
create_user(cursor, account['id'], account['username'])
|
|
connection.commit()
|
|
|
|
logging.debug(f"Checking if user {account['id']} was welcomed")
|
|
if fresh_database:
|
|
set_user_welcomed(cursor, account['id'])
|
|
connection.commit()
|
|
elif not user_welcomed(cursor, account['id']):
|
|
result_id = None
|
|
for message in config['messages']:
|
|
content_warning = message['content_warning'] if 'content_warning' in message else None
|
|
result = mastodon.status_post(status=f"@{account['username']}, {message['content']}", in_reply_to_id=result_id, visibility='public', spoiler_text=content_warning)
|
|
result_id = result.id
|
|
|
|
set_user_welcomed(cursor, account['id'])
|
|
connection.commit()
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
import tomllib
|
|
except ModuleNotFoundError:
|
|
import tomli as tomllib
|
|
|
|
logging.basicConfig(filename='/home/archos/welcome_bot/welcome_bot.log', level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s')
|
|
|
|
arg_parser = argparse.ArgumentParser(
|
|
description="Welcomes users to the Mastodon instance",
|
|
epilog="Note: the user this bot logs in as must have admin:read:accounts access"
|
|
)
|
|
arg_parser.add_argument('--email', required=False, help="Only required for first execution")
|
|
arg_parser.add_argument('--password', required=False, help="Only required for first execution")
|
|
arg_parser.add_argument('--config', default="config.toml")
|
|
args = arg_parser.parse_args()
|
|
|
|
with open(args.config, "rb") as toml_file:
|
|
config = tomllib.load(toml_file)
|
|
|
|
mastodon = None
|
|
|
|
if not os.path.exists(config['mastodon']['credential_storage']):
|
|
if not (args.email and args.password):
|
|
logging.error("Initial login has not yet occurred - this is required to generate the credential file. Please supply login credentials (--email, --password)")
|
|
exit(-1)
|
|
else:
|
|
logging.info("Registering app")
|
|
Mastodon.create_app(
|
|
client_name="Welcome Bot",
|
|
scopes=["write:statuses", "admin:read:accounts"],
|
|
to_file=config['mastodon']['secret_storage'],
|
|
api_base_url=config['mastodon']['base_url']
|
|
)
|
|
|
|
logging.info("Initializing client")
|
|
mastodon = Mastodon(
|
|
client_id=config['mastodon']['secret_storage'],
|
|
api_base_url=config['mastodon']['base_url']
|
|
)
|
|
|
|
logging.info("Performing login")
|
|
mastodon.log_in(
|
|
username=args.email,
|
|
password=args.password,
|
|
to_file=config['mastodon']['credential_storage'],
|
|
scopes=["write:statuses", "admin:read:accounts"]
|
|
)
|
|
else:
|
|
mastodon = Mastodon(
|
|
access_token=config['mastodon']['credential_storage'],
|
|
api_base_url=config['mastodon']['base_url']
|
|
)
|
|
|
|
assert mastodon is not None, "Mastodon client not initialized"
|
|
|
|
connection = sqlite3.connect(config['database']['sqlite_path'])
|
|
cursor = connection.cursor()
|
|
|
|
while True:
|
|
check_and_welcome_new_users()
|
|
logging.info("Sleeping for 10 minutes")
|
|
time.sleep(600)
|