diff --git a/welcome_bot.py b/welcome_bot.py index d60e306..0df117d 100644 --- a/welcome_bot.py +++ b/welcome_bot.py @@ -1,7 +1,10 @@ + import argparse from mastodon import Mastodon import os import sqlite3 +import logging +import time import tomllib ACCOUNT_FETCH_LIMIT = 100000 @@ -27,18 +30,52 @@ def set_user_welcomed(cursor: sqlite3.Cursor, userid: int): def create_user(cursor: sqlite3.Cursor, userid: int, username: str): cursor.execute("INSERT INTO welcome_log (userdb_id, username) VALUES(?, ?)", (userid, username)) +def check_and_welcome_new_users(): + logging.debug("Checking if database exists") + fresh_database = not check_db_exists(cursor) + if fresh_database: + logging.info("Database was freshly created - we'll set all pre-existing users as of now to 'welcomed' to avoid spamming everyone on the server.") + + logging.info("Starting the welcome process") + all_accounts = mastodon.admin_accounts(remote=False, status='active', limit=ACCOUNT_FETCH_LIMIT) + for account in all_accounts: + logging.debug(f"Checking if user {account['id']} exists") + if not user_exists(cursor, account['id']): + logging.debug(f"Creating user {account['username']} with ID {account['id']}") + create_user(cursor, account['id'], account['username']) + connection.commit() + + logging.debug(f"Checking if user {account['id']} was welcomed") + if fresh_database: + set_user_welcomed(cursor, account['id']) + connection.commit() + elif not user_welcomed(cursor, account['id']): + result_id = None + for message in config['messages']: + content_warning = message['content_warning'] if 'content_warning' in message else None + result = mastodon.status_post(status=f"@{account['username']}, {message['content']}", in_reply_to_id=result_id, visibility='public', spoiler_text=content_warning) + result_id = result.id + + set_user_welcomed(cursor, account['id']) + connection.commit() if __name__ == '__main__': + try: + import tomllib + except ModuleNotFoundError: + import tomli as tomllib + + logging.basicConfig(filename='/home/archos/welcome_bot/welcome_bot.log', level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s') + arg_parser = argparse.ArgumentParser( - description = "Welcomes users to the Mastodon instance", - epilog = "Note: the user this bot logs in as must have admin:read:accounts access" + description="Welcomes users to the Mastodon instance", + epilog="Note: the user this bot logs in as must have admin:read:accounts access" ) arg_parser.add_argument('--email', required=False, help="Only required for first execution") arg_parser.add_argument('--password', required=False, help="Only required for first execution") arg_parser.add_argument('--config', default="config.toml") args = arg_parser.parse_args() - config = None # not strictly required, but I like explicit scope with open(args.config, "rb") as toml_file: config = tomllib.load(toml_file) @@ -46,68 +83,42 @@ if __name__ == '__main__': if not os.path.exists(config['mastodon']['credential_storage']): if not (args.email and args.password): - print("Initial login has not yet occured - this is required to generate the credential file. Please supply login credentials (--email, --password)") + logging.error("Initial login has not yet occurred - this is required to generate the credential file. Please supply login credentials (--email, --password)") exit(-1) else: - print("Registering app") + logging.info("Registering app") Mastodon.create_app( - client_name = "Welcome Bot", - scopes = ["write:statuses", "admin:read:accounts"], - to_file = config['mastodon']['secret_storage'], - api_base_url = config['mastodon']['base_url'] + client_name="Welcome Bot", + scopes=["write:statuses", "admin:read:accounts"], + to_file=config['mastodon']['secret_storage'], + api_base_url=config['mastodon']['base_url'] ) - print("Initializing client") + logging.info("Initializing client") mastodon = Mastodon( - client_id = config['mastodon']['secret_storage'], - api_base_url = config['mastodon']['base_url'] + client_id=config['mastodon']['secret_storage'], + api_base_url=config['mastodon']['base_url'] ) - print("Performing login") + logging.info("Performing login") mastodon.log_in( - username = args.email, - password = args.password, - to_file = config['mastodon']['credential_storage'], - scopes = ["write:statuses", "admin:read:accounts"] + username=args.email, + password=args.password, + to_file=config['mastodon']['credential_storage'], + scopes=["write:statuses", "admin:read:accounts"] ) else: mastodon = Mastodon( - access_token = config['mastodon']['credential_storage'], - api_base_url = config['mastodon']['base_url'] + access_token=config['mastodon']['credential_storage'], + api_base_url=config['mastodon']['base_url'] ) assert mastodon is not None, "Mastodon client not initialized" connection = sqlite3.connect(config['database']['sqlite_path']) cursor = connection.cursor() - - # are our tables defined? - fresh_database = not check_db_exists(cursor) - if fresh_database: - print("Database was freshly created - we'll set all pre-existing users as of now to 'welcomed' to avoid spamming everyone on the server.") - all_accounts = mastodon.admin_accounts(remote=False, status='active', limit=ACCOUNT_FETCH_LIMIT) - for account in all_accounts: - # despite status='active', we still get zombie users from the API - if not (account.confirmed and account.approved) or account.disabled or account.suspended or account.silenced: - continue - - # does our welcome bot know about this user? - if not user_exists(cursor, account.id): - create_user(cursor, account.id, account.username) - connection.commit() - - # have we welcomed them yet? - if fresh_database: - set_user_welcomed(cursor, account.id) - connection.commit() - - elif not user_welcomed(cursor, account.id): - result_id = None - for message in config['messages']: - content_warning = message['content_warning'] if 'content_warning' in message else None - result = mastodon.status_post(status=f"@{account.username}, {message['content']}", visibility='public', spoiler_text=content_warning) - result_id = result.id - - set_user_welcomed(cursor, account.id) - connection.commit() + while True: + check_and_welcome_new_users() + logging.info("Sleeping for 10 minutes") + time.sleep(600)