tarc/tarc/main.py
2025-03-10 21:33:25 -04:00

241 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""
Torrent Archiver
Provides functionality for managing datasets distributed through BitTorrent.
It tracks files and reconciles hardlinks between download directories and
archival locations.
"""
import os
import sys
import re
import uuid
import argparse
from datetime import datetime, timezone
import qbittorrentapi
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import Session
from .models import Base, SchemaVersion, Client
# SCHEMA format is YYYYMMDDX
SCHEMA = 202503100
def init_db(engine):
"""
Initialize database
"""
Base.metadata.create_all(engine)
with Session(engine) as session:
if not session.query(SchemaVersion).first():
now = datetime.now(timezone.utc)
version = SchemaVersion(version=SCHEMA, applied_at=now)
session.add(version)
session.commit()
def get_schema_version(engine):
"""
Get current schema version from database
"""
with Session(engine) as session:
version = session.query(SchemaVersion).order_by(SchemaVersion.id.desc()).first()
return version.version if version else None
def list_tables(engine):
"""
List all tables in database
"""
inspector = inspect(engine)
return inspector.get_table_names()
def add_client(engine, name, endpoint, last_seen):
"""
Add a new client endpoint to database
"""
with Session(engine) as session:
client = Client(
uuid=str(uuid.uuid4()), name=name, endpoint=endpoint, last_seen=last_seen
)
session.add(client)
session.commit()
def find_client(engine, endpoint):
"""
Find existing client
"""
with Session(engine) as session:
clients = (
session.query(Client.id, Client.name, Client.uuid)
.filter_by(endpoint=endpoint)
.all()
)
return clients
def list_clients(engine):
"""
List all stored clients
"""
with Session(engine) as session:
return session.query(Client).all()
def authenticate_qbittorrent(endpoint, username, password):
"""
Authenticate with the qBittorrent client
"""
qb = qbittorrentapi.Client(host=endpoint, username=username, password=password)
try:
qb.auth_log_in()
except qbittorrentapi.LoginFailed as e:
raise ValueError(f'Login failed for endpoint "{endpoint}": {e}') from e
if not re.match(r"^v?\d+(\.\d+)*$", qb.app.version):
raise ValueError(f'Invalid version "{qb.app.version}" found at "{endpoint}"')
return qb
def scan_torrents(qb_client, debug=False):
"""
Scan torrents using the provided qBittorrent client.
"""
torrents = qb_client.torrents_info()
print(f"[INFO]: There are {len(torrents)} torrents\n")
for torrent in torrents[:2]:
files = qb_client.torrents_files(torrent.hash)
trackers = qb_client.torrents_trackers(torrent.hash)
print(f"[name]: {torrent.name}")
print(f"[infohash_v1]: {torrent.hash}")
print(f"[content_path]: {torrent.content_path}")
print(f"[magnet_uri]: {torrent.magnet_uri[:80]}")
print(f"[completed_on]: {torrent.completed}\n")
print(f"[trackers]: {len(trackers)}")
print(f"[file_count]: {len(files)}\n")
if debug:
print(f"[DEBUG]: {repr(torrent)}")
for elem in trackers:
print(f"[DEBUG]: Tracker {repr(elem)}")
print("\n", end="")
def handle_scan(args, engine):
"""
Handle the scan command to authenticate with the qBittorrent client and scan torrents.
"""
if args.endpoint:
qb_client = authenticate_qbittorrent(args.endpoint, args.username, args.password)
clients = find_client(engine, args.endpoint)
if args.confirm_add:
if len(clients) == 0:
if args.name:
now = datetime.now(timezone.utc)
add_client(engine, args.name, args.endpoint, now)
print(f"[INFO]: Added client {args.name} ({args.endpoint})")
else:
raise ValueError("Must specify --name for a new client")
elif len(clients) == 1:
raise ValueError(f"{clients[0][1]} ({clients[0][2]}) already exists")
else:
raise ValueError(f"Multiple clients with the same endpoint: {args.endpoint}")
elif len(clients) == 0:
raise ValueError(
f'Client using endpoint "{args.endpoint}" not found. '
'Use --confirm-add to add a new endpoint'
)
elif len(clients) == 1:
scan_torrents(qb_client, debug=args.debug)
else:
raise ValueError(f'Multiple clients ({len(clients)}) using "{args.endpoint}"')
elif args.directory:
print("[INFO]: --directory is not implemented")
else:
raise ValueError("Must specify directory OR client endpoint")
def handle_client_add(args, engine):
"""
Handle the client addition command to add a new client to the database.
"""
if args.name and args.endpoint:
now = datetime.now(timezone.utc)
add_client(engine, args.name, args.endpoint, now)
print(f"[INFO]: Added client {args.name} ({args.endpoint})")
else:
raise ValueError("Must specify --name and --endpoint to add a client")
def handle_client_list(args, engine):
"""
Handle the client listing command to display all stored clients.
"""
clients = list_clients(engine)
for client in clients:
print(f"{client.name} ({client.endpoint}) - Last seen: {client.last_seen}")
def main():
"""
Parses command-line arguments and executes the corresponding command.
"""
parser = argparse.ArgumentParser(description="Manage BT archives", prog="tarc")
subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands")
# scan command
scan_parser = subparsers.add_parser("scan", help="Scan torrents")
scan_parser.add_argument("--debug", action="store_true", help="Enable debug mode")
scan_parser.add_argument(
"--confirm-add", action="store_true", help="Confirm adding a new client"
)
scan_parser.add_argument("-n", "--name", help="Name of client")
scan_parser.add_argument("-d", "--directory", help="Directory to scan")
scan_parser.add_argument("-e", "--endpoint", help="Endpoint URL")
scan_parser.add_argument("-u", "--username", help="Username")
scan_parser.add_argument("-p", "--password", help="Password")
scan_parser.add_argument("-s", "--storage", help="Path of sqlite3 database")
# client add command
client_add_parser = subparsers.add_parser("client-add", help="Add a new client")
client_add_parser.add_argument("-n", "--name", required=True, help="Name of client")
client_add_parser.add_argument("-e", "--endpoint", required=True, help="Endpoint URL")
client_add_parser.add_argument("-s", "--storage", help="Path of sqlite3 database")
# client list command
client_list_parser = subparsers.add_parser("client-list", help="List all clients")
client_list_parser.add_argument("-s", "--storage", help="Path of sqlite3 database")
args = parser.parse_args()
storage_path = args.storage or os.path.expanduser("~/.tarc.db")
engine = create_engine(f"sqlite:///{storage_path}")
if not list_tables(engine):
print(f"[INFO]: Initializing database at {storage_path}")
init_db(engine)
schema_found = get_schema_version(engine)
if schema_found != SCHEMA:
raise ValueError(f"SCHEMA {schema_found}, expected {SCHEMA}")
try:
if args.command == "scan":
handle_scan(args, engine)
elif args.command == "client-add":
handle_client_add(args, engine)
elif args.command == "client-list":
handle_client_list(args, engine)
except ValueError as e:
print(f"[ERROR]: {e}")
sys.exit(1)
if __name__ == "__main__":
main()