diff --git a/tarc/main.py b/tarc/main.py index 5e8dd0a..4cdeadd 100644 --- a/tarc/main.py +++ b/tarc/main.py @@ -18,7 +18,6 @@ from datetime import datetime, timezone import qbittorrentapi from sqlalchemy import create_engine, inspect from sqlalchemy.orm import Session -from sqlalchemy.exc import DatabaseError from .models import Base, SchemaVersion, Client @@ -90,121 +89,149 @@ def list_clients(engine): return session.query(Client).all() +def auth_qbittorrent(endpoint, username, password): + """ + Authenticate with the qBittorrent client + """ + qb = qbittorrentapi.Client(host=endpoint, username=username, password=password) + try: + qb.auth_log_in() + except qbittorrentapi.LoginFailed as e: + raise ValueError(f'Login failed for endpoint "{endpoint}"') from e + except Exception as e: + raise ValueError(f"An unexpected error occurred: {str(e)}") from e + if not re.match(r"^v?\d+(\.\d+)*$", qb.app.version): + raise ValueError(f'Invalid version "{qb.app.version}" found at "{endpoint}"') + return qb + + +def scan_torrents(qb_client, debug=False): + """ + Scan torrents using the provided qBittorrent client. + """ + torrents = qb_client.torrents_info() + print(f"[INFO]: There are {len(torrents)} torrents\n") + for torrent in torrents[:2]: + files = qb_client.torrents_files(torrent.hash) + trackers = qb_client.torrents_trackers(torrent.hash) + print(f"[name]: {torrent.name}") + print(f"[infohash_v1]: {torrent.hash}") + print(f"[content_path]: {torrent.content_path}") + print(f"[magnet_uri]: {torrent.magnet_uri[:80]}") + print(f"[completed_on]: {torrent.completed}\n") + print(f"[trackers]: {len(trackers)}") + print(f"[file_count]: {len(files)}\n") + if debug: + print(f"[DEBUG]: {repr(torrent)}") + for elem in trackers: + print(f"[DEBUG]: Tracker {repr(elem)}") + print("\n", end="") + + +def scan(args, engine): + """ + Scan command to authenticate with the qBittorrent client and scan torrents. + """ + if args.name: + clients = find_client(engine, args.name) + if len(clients) == 1: + client_info = clients[0] + qb_client = auth_qbittorrent(client_info.endpoint, client_info.username, args.password) + scan_torrents(qb_client, debug=args.debug) + elif len(clients) == 0: + raise ValueError( + f'Client with name "{args.name}" not found. ' + "Please use the 'client add' command to add a new client." + ) + else: + raise ValueError( + f"Multiple clients with the same name: {args.name}" + ) + elif args.directory: + print("[INFO]: --directory is not implemented") + else: + raise ValueError("Must specify directory OR client name") + + +def client_add(args, engine): + """ + Add a new client to the database. + """ + if args.name and args.endpoint and args.username: + now = datetime.now(timezone.utc) + add_client(engine, args.name, args.endpoint, now) + print(f"[INFO]: Added client {args.name} ({args.endpoint})") + else: + raise ValueError("Must specify --name, --endpoint, and --username to add a client") + + +def client_list(engine): + """ + List all stored clients. + """ + clients = list_clients(engine) + for client in clients: + print(f"{client.name} ({client.endpoint}) - Last seen: {client.last_seen}") + + def main(): """ - Entrypoint of the program. + Parses command-line arguments and executes the corresponding command. """ - parser = argparse.ArgumentParser(description="Manage BT archives", prog="tarc") subparsers = parser.add_subparsers( dest="command", required=True, help="Available commands" ) - scan_parser = subparsers.add_parser("scan", help="Scan command") + # scan command + scan_parser = subparsers.add_parser("scan", help="Scan torrents") scan_parser.add_argument("--debug", action="store_true", help="Enable debug mode") - scan_parser.add_argument( - "--confirm-add", action="store_true", help="Confirm adding a new client" - ) scan_parser.add_argument("-n", "--name", help="Name of client") scan_parser.add_argument("-d", "--directory", help="Directory to scan") - scan_parser.add_argument("-t", "--type", help="Scan type") - scan_parser.add_argument("-e", "--endpoint", help="Endpoint URL") scan_parser.add_argument("-u", "--username", help="Username") scan_parser.add_argument("-p", "--password", help="Password") - scan_parser.add_argument("-s", "--storage", help="Path of sqlite3 database") + + # client command + client_parser = subparsers.add_parser("client", help="Manage clients") + client_subparsers = client_parser.add_subparsers(dest="client_command", required=True) + + # client add command + client_add_parser = client_subparsers.add_parser("add", help="Add a new client") + client_add_parser.add_argument("-n", "--name", required=True, help="Name of client") + client_add_parser.add_argument( + "-e", "--endpoint", required=True, help="Endpoint URL" + ) + + # client list command + client_list_parser = client_subparsers.add_parser("list", help="List all clients") args = parser.parse_args() - if args.command == "scan": - if args.storage is None: - storage_path = os.path.expanduser("~/.tarc.db") - else: - storage_path = args.storage + # Check for valid subcommand for client + if args.command == "client" and args.client_command is None: + parser.error("The 'client' command requires a subcommand (add or list).") - try: - engine = create_engine(f"sqlite:///{storage_path}") - tables = list_tables(engine) - except DatabaseError as e: - print(f'[ERROR]: Database Error "{storage_path}" ({str(e)})') - sys.exit(1) + storage_path = os.path.expanduser("~/.tarc.db") + engine = create_engine(f"sqlite:///{storage_path}") - if not tables: - print(f"[INFO]: Initializing database at {storage_path}") - init_db(engine) + if not list_tables(engine): + print(f"[INFO]: Initializing database at {storage_path}") + init_db(engine) - schema_found = get_schema_version(engine) - if schema_found is None: - print("[ERROR]: Could not determine schema version") - sys.exit(1) - if not SCHEMA == schema_found: - print(f"[ERROR]: SCHEMA {schema_found}, expected {SCHEMA}") - sys.exit(1) + schema_found = get_schema_version(engine) + if schema_found != SCHEMA: + raise ValueError(f"SCHEMA {schema_found}, expected {SCHEMA}") - if args.directory is not None: - print("[INFO]: --directory is not implemented") - sys.exit(0) - elif args.endpoint is not None: - qb = qbittorrentapi.Client(host=args.endpoint, - username=args.username, password=args.password) - try: - qb.auth_log_in() - except qbittorrentapi.LoginFailed as e: - print(f'[ERROR]: Login failed for endpoint "{args.endpoint}": {e}') - sys.exit(1) - if not re.match(r"^v?\d+(\.\d+)*$", qb.app.version): - print(f'[ERROR]: Invalid version "{qb.app.version}" found at "{args.endpoint}"') - sys.exit(1) - else: - print(f'[INFO]: Found qBittorrent {qb.app.version} at "{args.endpoint}"') - - clients = find_client(engine, args.endpoint) - if args.confirm_add: - if len(clients) == 0: - if args.name is not None: - now = datetime.now(timezone.utc) - add_client(engine, args.name, args.endpoint, now) - print(f"[INFO]: Added client {args.name} ({args.endpoint})") - else: - print("[ERROR]: Must specify --name for a new client") - sys.exit(1) - elif len(clients) == 1: - print(f"[ERROR]: {clients[0][1]} ({clients[0][2]}) already exists") - sys.exit(1) - else: - print( - f"[ERROR]: Multiple clients with the same endpoint: {args.endpoint}" - ) - sys.exit(1) - elif len(clients) == 0: - print(f'[ERROR]: Client using endpoint "{args.endpoint}" not found') - print("[ERROR]: Use --confirm-add to add a new endpoint") - sys.exit(1) - elif len(clients) == 1: - torrents = qb.torrents_info() - print(f"[INFO]: There are {len(torrents)} torrents\n") - for torrent in torrents[:2]: - files = qb.torrents_files(torrent.hash) - trackers = qb.torrents_trackers(torrent.hash) - print(f"[name]: {torrent.name}") - print(f"[infohash_v1]: {torrent.hash}") - print(f"[content_path]: {torrent.content_path}") - print(f"[magnet_uri]: {torrent.magnet_uri[:80]}") - print(f"[completed_on]: {torrent.completed}\n") - print(f"[trackers]: {len(trackers)}") - print(f"[file_count]: {len(files)}\n") - if args.debug: - print(f"[DEBUG]: {repr(torrent)}") - for elem in trackers: - print(f"[DEBUG]: Tracker {repr(elem)}") - print("\n", end="") - else: - print( - f'[ERROR]: Multiple clients ({len(clients)}) using "{args.endpoint}"' - ) - sys.exit(1) - else: - print("[ERROR]: Must specify directory OR client endpoint") - sys.exit(1) + try: + if args.command == "scan": + scan(args, engine) + elif args.command == "client" and args.client_command == "add": + client_add(args, engine) + elif args.command == "client" and args.client_command == "list": + client_list(engine) + except ValueError as e: + print(f"[ERROR]: {e}") + sys.exit(1) if __name__ == "__main__":