v1.0
This commit is contained in:
parent
85c8afa405
commit
b682c9f945
|
@ -24,7 +24,7 @@ build-backend = "poetry.core.masonry.api"
|
||||||
|
|
||||||
|
|
||||||
[tool.poetry.scripts]
|
[tool.poetry.scripts]
|
||||||
bandcamp_importer = "bandcamp_importer.cli:app"
|
import-albums = "bandcamp_importer.cli:app"
|
||||||
|
|
||||||
|
|
||||||
### SLAM
|
### SLAM
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import io
|
import io
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
@ -7,8 +8,16 @@ import typer
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from rich.logging import RichHandler
|
from rich.logging import RichHandler
|
||||||
|
|
||||||
|
from bandcamp_importer import importer
|
||||||
|
|
||||||
CONFIG_DEFAULTS = """
|
CONFIG_DEFAULTS = """
|
||||||
# bandcamp-importer Defaults
|
# Album Importer Defaults
|
||||||
|
|
||||||
|
# Where to store extracted media
|
||||||
|
MEDIA_ROOT=/music
|
||||||
|
|
||||||
|
# Where to look for downloaded zip files
|
||||||
|
DOWNLOADS = ~/Downloads
|
||||||
|
|
||||||
LOG_LEVEL=INFO
|
LOG_LEVEL=INFO
|
||||||
"""
|
"""
|
||||||
|
@ -30,14 +39,26 @@ def main(
|
||||||
app_state["config_file"],
|
app_state["config_file"],
|
||||||
help="Path to the bandcamp_importer configuration file",
|
help="Path to the bandcamp_importer configuration file",
|
||||||
),
|
),
|
||||||
|
media_root: Optional[Path] = typer.Option(
|
||||||
|
None,
|
||||||
|
help="The root of your media folder. Defaults to /music, set by $MEDIA_ROOT."
|
||||||
|
),
|
||||||
|
downloads: Optional[Path] = typer.Option(
|
||||||
|
None,
|
||||||
|
help="The path to your downloads folder."
|
||||||
|
)
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Configure the execution environment with global parameters.
|
Extract album zip files downloaded from Bandcamp into your media root.
|
||||||
"""
|
"""
|
||||||
app_state["config_file"] = config_file
|
app_state["config_file"] = config_file
|
||||||
load_dotenv(stream=io.StringIO(CONFIG_DEFAULTS))
|
load_dotenv(stream=io.StringIO(CONFIG_DEFAULTS))
|
||||||
|
print(f"Loading config from {app_state['config_file']}")
|
||||||
load_dotenv(app_state["config_file"])
|
load_dotenv(app_state["config_file"])
|
||||||
|
|
||||||
|
app_state['MEDIA_ROOT'] = (media_root or Path(os.environ['MEDIA_ROOT'])).expanduser().resolve()
|
||||||
|
app_state['DOWNLOADS'] = (downloads or Path(os.environ['DOWNLOADS'])).expanduser().resolve()
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
format="%(message)s",
|
format="%(message)s",
|
||||||
level=getattr(logging, log_level.upper()),
|
level=getattr(logging, log_level.upper()),
|
||||||
|
@ -45,13 +66,26 @@ def main(
|
||||||
)
|
)
|
||||||
app_state["verbose"] = verbose
|
app_state["verbose"] = verbose
|
||||||
|
|
||||||
|
logging.debug(f"{app_state = }")
|
||||||
|
|
||||||
if context.invoked_subcommand is None:
|
if context.invoked_subcommand is None:
|
||||||
logger.debug("No command specified; invoking default handler.")
|
logger.debug("No command specified; invoking default handler.")
|
||||||
run(context)
|
import_album(context)
|
||||||
|
|
||||||
|
|
||||||
def run(context: typer.Context):
|
@app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True})
|
||||||
|
def import_album(context: typer.Context):
|
||||||
"""
|
"""
|
||||||
The default CLI entrypoint is bandcamp_importer.cli.run().
|
The default command: Extract downloaded zip files into your media root.
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError("Please define bandcamp_importer.cli.run().")
|
file_list = [Path(arg) for arg in context.args]
|
||||||
|
if file_list:
|
||||||
|
paths = importer.import_zip_files(file_list, os.environ['DOWNLOADS'])
|
||||||
|
logger.info(f"Imported {len(paths)}: " + "\n".join([str(p) for p in paths]))
|
||||||
|
print(f"Imported {len(paths)} downloads.")
|
||||||
|
return
|
||||||
|
|
||||||
|
paths = importer.import_from_directory(app_state['DOWNLOADS'], app_state['MEDIA_ROOT'])
|
||||||
|
logger.info(f"Imported {len(paths)} downloads from {app_state['DOWNLOADS']}: " + "\n".join([str(p) for p in paths]))
|
||||||
|
print(f"Imported {len(paths)} downloads from {app_state['DOWNLOADS']}")
|
||||||
|
return
|
||||||
|
|
61
src/bandcamp_importer/importer.py
Normal file
61
src/bandcamp_importer/importer.py
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
import logging
|
||||||
|
import zipfile
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger('bandcamp_importer.importer')
|
||||||
|
|
||||||
|
|
||||||
|
class FilenameProcessingError(Exception):
|
||||||
|
"""
|
||||||
|
Raise when we cannot process a zip file's filename to extract artist, album, and track info.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def process_zip_filename(filename: str) -> tuple:
|
||||||
|
try:
|
||||||
|
artist, album = filename.split(" - ", 1)
|
||||||
|
except ValueError: # pragma: no cover
|
||||||
|
raise FilenameProcessingError(f"Could not parse artist and album from {filename}.")
|
||||||
|
album = album[:-4]
|
||||||
|
logger.debug(f"{artist = }, {album = }")
|
||||||
|
return artist, album
|
||||||
|
|
||||||
|
|
||||||
|
def import_zip_file(path: Path, media_root: Path) -> bool:
|
||||||
|
imported = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
artist, album = process_zip_filename(path.name)
|
||||||
|
except FilenameProcessingError: # pragma: no cover
|
||||||
|
logger.error(f"Could not process zip filename {path}.")
|
||||||
|
return imported
|
||||||
|
|
||||||
|
target_path = media_root / artist / album
|
||||||
|
if target_path.exists():
|
||||||
|
logger.info(f"Skipping existing album in the media root: {target_path}")
|
||||||
|
return False
|
||||||
|
logger.debug(f"Extracting {path} to {str(target_path)}")
|
||||||
|
with zipfile.ZipFile(path) as archive:
|
||||||
|
archive.extractall(target_path)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def import_zip_files(zip_files: list, media_root: Path) -> list:
|
||||||
|
files = []
|
||||||
|
logger.debug(f"{zip_files = }")
|
||||||
|
for path in zip_files:
|
||||||
|
logger.debug(f"Importing {path}")
|
||||||
|
if not zipfile.is_zipfile(path):
|
||||||
|
logger.warning(f"{path} does not appear to be a .zip file; skipping.")
|
||||||
|
continue
|
||||||
|
if import_zip_file(path, media_root):
|
||||||
|
files.append(path)
|
||||||
|
return files
|
||||||
|
|
||||||
|
|
||||||
|
def import_from_directory(path: Path, media_root: Path) -> list:
|
||||||
|
globbed = list(path.glob("*.zip"))
|
||||||
|
logger.debug(f"Found {len(globbed)} zip files to import: {globbed = }")
|
||||||
|
return import_zip_files(globbed, media_root)
|
32
test/conftest.py
Normal file
32
test/conftest.py
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def fixtures():
|
||||||
|
return Path(__file__).parent / "fixtures"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def media_root(tmp_path_factory):
|
||||||
|
return tmp_path_factory.mktemp('media')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def main_args(media_root, fixtures):
|
||||||
|
return {
|
||||||
|
'verbose': True,
|
||||||
|
'log_level': 'DEBUG',
|
||||||
|
'media_root': media_root,
|
||||||
|
'downloads': fixtures / "downloads",
|
||||||
|
'config_file': fixtures / 'bandcamp-importer.conf',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def mock_env(monkeypatch, fixtures, media_root):
|
||||||
|
if 'LOG_LEVEL' in os.environ:
|
||||||
|
del os.environ['LOG_LEVEL']
|
0
test/fixtures/invalid - zipfile_format.zip
vendored
Normal file
0
test/fixtures/invalid - zipfile_format.zip
vendored
Normal file
1
test/fixtures/invalid_filename.zip
vendored
Symbolic link
1
test/fixtures/invalid_filename.zip
vendored
Symbolic link
|
@ -0,0 +1 @@
|
||||||
|
downloads/artist - title.zip
|
|
@ -1,7 +0,0 @@
|
||||||
import pytest
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.xfail
|
|
||||||
def test_tests_are_implemented():
|
|
||||||
print("Yyou have not implemented any tests yet.")
|
|
||||||
assert False
|
|
|
@ -1,8 +1,50 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
import typer
|
||||||
|
|
||||||
from bandcamp_importer import cli
|
from bandcamp_importer import cli
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.xfail
|
@pytest.fixture
|
||||||
def test_tests_are_implemented():
|
def context():
|
||||||
assert cli.main()
|
return MagicMock(spec=typer.Context, args=[], invoked_subcommand=None)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("files, downloads, expected", [
|
||||||
|
([], None, [
|
||||||
|
"artist",
|
||||||
|
"artist/album",
|
||||||
|
"artist/album/one.mp3",
|
||||||
|
"artist/album/two.m4a",
|
||||||
|
"artist/album/cover.jpg",
|
||||||
|
]),
|
||||||
|
(["invalid_filename.zip"], None, []),
|
||||||
|
(["does_not_exist.zip"], None, []),
|
||||||
|
(["invalid - zipfile_format.zip"], None, []),
|
||||||
|
])
|
||||||
|
def test_import_album(context, main_args, media_root, fixtures, files, downloads, expected):
|
||||||
|
context.configure_mock(args=[fixtures / filename for filename in files])
|
||||||
|
cli.main(context, **main_args)
|
||||||
|
results = sorted([str(Path(p).relative_to(media_root)) for p in media_root.rglob("*")])
|
||||||
|
assert results == sorted(expected)
|
||||||
|
|
||||||
|
|
||||||
|
def test_imports_are_idempotent(context, main_args, media_root, fixtures):
|
||||||
|
cli.main(context, **main_args)
|
||||||
|
cover = media_root / 'artist' / 'album' / 'cover.jpg'
|
||||||
|
mtime = cover.stat().st_mtime
|
||||||
|
cli.main(context, **main_args)
|
||||||
|
assert cover.stat().st_mtime == mtime
|
||||||
|
|
||||||
|
|
||||||
|
def test_main(context, main_args):
|
||||||
|
context.configure_mock(invoked_subcommand=None)
|
||||||
|
if 'LOG_LEVEL' in os.environ:
|
||||||
|
del os.environ['LOG_LEVEL']
|
||||||
|
cli.main(context, **main_args)
|
||||||
|
assert cli.app_state['verbose'] is True
|
||||||
|
assert os.environ['LOG_LEVEL'] == 'INFO'
|
||||||
|
|
Loading…
Reference in New Issue
Block a user