diff --git a/pyproject.toml b/pyproject.toml index edc0b63..e11798e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] -bandcamp_importer = "bandcamp_importer.cli:app" +import-albums = "bandcamp_importer.cli:app" ### SLAM diff --git a/src/bandcamp_importer/cli.py b/src/bandcamp_importer/cli.py index 68d2cd4..0b2ad45 100644 --- a/src/bandcamp_importer/cli.py +++ b/src/bandcamp_importer/cli.py @@ -1,5 +1,6 @@ import io import logging +import os from pathlib import Path from typing import Optional @@ -7,8 +8,16 @@ import typer from dotenv import load_dotenv from rich.logging import RichHandler +from bandcamp_importer import importer + CONFIG_DEFAULTS = """ -# bandcamp-importer Defaults +# Album Importer Defaults + +# Where to store extracted media +MEDIA_ROOT=/music + +# Where to look for downloaded zip files +DOWNLOADS = ~/Downloads LOG_LEVEL=INFO """ @@ -30,14 +39,26 @@ def main( app_state["config_file"], help="Path to the bandcamp_importer configuration file", ), + media_root: Optional[Path] = typer.Option( + None, + help="The root of your media folder. Defaults to /music, set by $MEDIA_ROOT." + ), + downloads: Optional[Path] = typer.Option( + None, + help="The path to your downloads folder." + ) ): """ - Configure the execution environment with global parameters. + Extract album zip files downloaded from Bandcamp into your media root. """ app_state["config_file"] = config_file load_dotenv(stream=io.StringIO(CONFIG_DEFAULTS)) + print(f"Loading config from {app_state['config_file']}") load_dotenv(app_state["config_file"]) + app_state['MEDIA_ROOT'] = (media_root or Path(os.environ['MEDIA_ROOT'])).expanduser().resolve() + app_state['DOWNLOADS'] = (downloads or Path(os.environ['DOWNLOADS'])).expanduser().resolve() + logging.basicConfig( format="%(message)s", level=getattr(logging, log_level.upper()), @@ -45,13 +66,26 @@ def main( ) app_state["verbose"] = verbose + logging.debug(f"{app_state = }") + if context.invoked_subcommand is None: logger.debug("No command specified; invoking default handler.") - run(context) + import_album(context) -def run(context: typer.Context): +@app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +def import_album(context: typer.Context): """ - The default CLI entrypoint is bandcamp_importer.cli.run(). + The default command: Extract downloaded zip files into your media root. """ - raise NotImplementedError("Please define bandcamp_importer.cli.run().") + file_list = [Path(arg) for arg in context.args] + if file_list: + paths = importer.import_zip_files(file_list, os.environ['DOWNLOADS']) + logger.info(f"Imported {len(paths)}: " + "\n".join([str(p) for p in paths])) + print(f"Imported {len(paths)} downloads.") + return + + paths = importer.import_from_directory(app_state['DOWNLOADS'], app_state['MEDIA_ROOT']) + logger.info(f"Imported {len(paths)} downloads from {app_state['DOWNLOADS']}: " + "\n".join([str(p) for p in paths])) + print(f"Imported {len(paths)} downloads from {app_state['DOWNLOADS']}") + return diff --git a/src/bandcamp_importer/importer.py b/src/bandcamp_importer/importer.py new file mode 100644 index 0000000..6ae2f53 --- /dev/null +++ b/src/bandcamp_importer/importer.py @@ -0,0 +1,61 @@ +import logging +import zipfile + +from pathlib import Path + + +logger = logging.getLogger('bandcamp_importer.importer') + + +class FilenameProcessingError(Exception): + """ + Raise when we cannot process a zip file's filename to extract artist, album, and track info. + """ + + +def process_zip_filename(filename: str) -> tuple: + try: + artist, album = filename.split(" - ", 1) + except ValueError: # pragma: no cover + raise FilenameProcessingError(f"Could not parse artist and album from {filename}.") + album = album[:-4] + logger.debug(f"{artist = }, {album = }") + return artist, album + + +def import_zip_file(path: Path, media_root: Path) -> bool: + imported = [] + + try: + artist, album = process_zip_filename(path.name) + except FilenameProcessingError: # pragma: no cover + logger.error(f"Could not process zip filename {path}.") + return imported + + target_path = media_root / artist / album + if target_path.exists(): + logger.info(f"Skipping existing album in the media root: {target_path}") + return False + logger.debug(f"Extracting {path} to {str(target_path)}") + with zipfile.ZipFile(path) as archive: + archive.extractall(target_path) + return True + + +def import_zip_files(zip_files: list, media_root: Path) -> list: + files = [] + logger.debug(f"{zip_files = }") + for path in zip_files: + logger.debug(f"Importing {path}") + if not zipfile.is_zipfile(path): + logger.warning(f"{path} does not appear to be a .zip file; skipping.") + continue + if import_zip_file(path, media_root): + files.append(path) + return files + + +def import_from_directory(path: Path, media_root: Path) -> list: + globbed = list(path.glob("*.zip")) + logger.debug(f"Found {len(globbed)} zip files to import: {globbed = }") + return import_zip_files(globbed, media_root) diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..92e1869 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,32 @@ +import os + +from pathlib import Path + +import pytest + + +@pytest.fixture(autouse=True) +def fixtures(): + return Path(__file__).parent / "fixtures" + + +@pytest.fixture +def media_root(tmp_path_factory): + return tmp_path_factory.mktemp('media') + + +@pytest.fixture +def main_args(media_root, fixtures): + return { + 'verbose': True, + 'log_level': 'DEBUG', + 'media_root': media_root, + 'downloads': fixtures / "downloads", + 'config_file': fixtures / 'bandcamp-importer.conf', + } + + +@pytest.fixture(autouse=True) +def mock_env(monkeypatch, fixtures, media_root): + if 'LOG_LEVEL' in os.environ: + del os.environ['LOG_LEVEL'] diff --git a/test/fixtures/invalid - zipfile_format.zip b/test/fixtures/invalid - zipfile_format.zip new file mode 100644 index 0000000..e69de29 diff --git a/test/fixtures/invalid_filename.zip b/test/fixtures/invalid_filename.zip new file mode 120000 index 0000000..f1c2d82 --- /dev/null +++ b/test/fixtures/invalid_filename.zip @@ -0,0 +1 @@ +downloads/artist - title.zip \ No newline at end of file diff --git a/test/test_bandcamp_importer.py b/test/test_bandcamp_importer.py deleted file mode 100644 index e3c0b4e..0000000 --- a/test/test_bandcamp_importer.py +++ /dev/null @@ -1,7 +0,0 @@ -import pytest - - -@pytest.mark.xfail -def test_tests_are_implemented(): - print("Yyou have not implemented any tests yet.") - assert False diff --git a/test/test_bandcamp_importer_cli.py b/test/test_bandcamp_importer_cli.py index a5d214e..4f56a11 100644 --- a/test/test_bandcamp_importer_cli.py +++ b/test/test_bandcamp_importer_cli.py @@ -1,8 +1,50 @@ +import os + +from pathlib import Path +from unittest.mock import MagicMock + import pytest +import typer from bandcamp_importer import cli -@pytest.mark.xfail -def test_tests_are_implemented(): - assert cli.main() +@pytest.fixture +def context(): + return MagicMock(spec=typer.Context, args=[], invoked_subcommand=None) + + +@pytest.mark.parametrize("files, downloads, expected", [ + ([], None, [ + "artist", + "artist/album", + "artist/album/one.mp3", + "artist/album/two.m4a", + "artist/album/cover.jpg", + ]), + (["invalid_filename.zip"], None, []), + (["does_not_exist.zip"], None, []), + (["invalid - zipfile_format.zip"], None, []), +]) +def test_import_album(context, main_args, media_root, fixtures, files, downloads, expected): + context.configure_mock(args=[fixtures / filename for filename in files]) + cli.main(context, **main_args) + results = sorted([str(Path(p).relative_to(media_root)) for p in media_root.rglob("*")]) + assert results == sorted(expected) + + +def test_imports_are_idempotent(context, main_args, media_root, fixtures): + cli.main(context, **main_args) + cover = media_root / 'artist' / 'album' / 'cover.jpg' + mtime = cover.stat().st_mtime + cli.main(context, **main_args) + assert cover.stat().st_mtime == mtime + + +def test_main(context, main_args): + context.configure_mock(invoked_subcommand=None) + if 'LOG_LEVEL' in os.environ: + del os.environ['LOG_LEVEL'] + cli.main(context, **main_args) + assert cli.app_state['verbose'] is True + assert os.environ['LOG_LEVEL'] == 'INFO'