Python script to validate files referenced in hledger comments

So over the weekend I vibe coded a python script to validate data in my ledger journals, and I thought it would be worth sharing.

I haven't reviewed the code, just that its output was correct. When I get time I plan to review the code properly and improve the robustness and quality and put it up on GitHub. It's also heavily tied to how I capture information in hledger.

I have my journals split by financial year, which is 1 July - 30 June in my part of the world. I reference receipts using a comment and a relative file path. All receipt filenames start with YYYYMMDD -

Currently receipts are stored in ./receipts/ so the comment looks like

; Receipt ./receipts/YYYYMMDD - Invoice.pdf

I keep all other things like payslips, in an information folder and they follow the same format, so that are in comments like

; ./information/YYYMDD - Payslip.pdf

This script is supposed to check 3 things:

  1. all files referenced in a journal exist
  2. all files in folders in the same location as the journal are referenced in a journal, excluding things like .git
  3. filenames match the YYYYMMDD format at the start and are in the right financial year (with a whitelist for things that may violate this check)

The script can also run as a git hook

import os
import re
import sys
import subprocess
from pathlib import Path
from collections import defaultdict
from typing import Set, List, Dict, Tuple, Optional
import argparse
from datetime import datetime, date


class HledgerValidator:
    """
    Main validator class for hledger journal files and their file references.
    """

    def __init__(self, main_journal_path: str, verbose: bool = False, skip_orphan_check: bool = False,
                 git_mode: bool = False, whitelist_file: Optional[str] = None):
        """
        Initialize the validator.

        Args:
            main_journal_path: Path to the main hledger journal file
            verbose: Enable verbose output
            skip_orphan_check: Skip checking for orphaned files (only validate references)
            git_mode: Enable git pre-commit hook mode
            whitelist_file: Path to file containing filenames exempt from date validation
                          (defaults to .hledger-validation-whitelist in the main journal's directory)
        """
        self.main_journal_path = Path(main_journal_path).resolve()
        self.verbose = verbose
        self.skip_orphan_check = skip_orphan_check
        self.git_mode = git_mode

        # Default whitelist filename
        self.default_whitelist_filename = '.hledger-validation-whitelist'

        # Track processed journals to avoid infinite loops
        self.processed_journals: Set[Path] = set()

        # Store all file references with their source journal
        self.file_references: Dict[Path, List[Tuple[Path, int]]] = defaultdict(list)

        # Track all directories that contain referenced files
        self.referenced_directories: Set[Path] = set()

        # Track all journal files found in the filesystem
        self.filesystem_journals: Set[Path] = set()

        # Validation results
        self.missing_files: List[Tuple[Path, Path, int]] = []
        self.orphaned_files: List[Path] = []
        self.unincluded_journals: List[Path] = []
        self.invalid_date_files: List[Tuple[Path, Path, int, str]] = []  # Added for date validation issues

        # Whitelist for date validation exemptions
        self.date_validation_whitelist: Set[str] = set()
        self.whitelist_path: Optional[Path] = None

        # Load whitelist (either specified or default)
        if whitelist_file:
            self._load_whitelist(whitelist_file)
        else:
            # Try to load default whitelist from main journal directory
            default_whitelist = self.main_journal_path.parent / self.default_whitelist_filename
            if default_whitelist.exists():
                self._load_whitelist(str(default_whitelist))

        # Compile regex patterns for performance
        self.include_pattern = re.compile(r'^\s*include\s+(.+)$', re.IGNORECASE)
        self.file_ref_patterns = [
            re.compile(r'^\s*;\s*(\.\/[^\s].*)$'),  # "; ./folder/file"
            re.compile(r'^\s*;\s*Receipt\s+(\.\/[^\s].*)$', re.IGNORECASE)  # "; Receipt ./folder/file"
        ]

        # Date validation patterns
        self.filename_date_pattern = re.compile(r'^(\d{8})\s*-\s*.*$')
        self.journal_fy_pattern = re.compile(r'^(\d{4})-(\d{2})\.journal$')

    def _load_whitelist(self, whitelist_file: str) -> None:
        """
        Load filenames from whitelist file that should be exempt from date validation.

        Args:
            whitelist_file: Path to the whitelist file
        """
        whitelist_path = Path(whitelist_file)

        try:
            if not whitelist_path.exists():
                self.log(f"Whitelist file not found: {whitelist_path} - proceeding without whitelist", "WARNING")
                return

            if not whitelist_path.is_file():
                self.log(f"Whitelist path is not a file: {whitelist_path} - proceeding without whitelist", "WARNING")
                return

            with open(whitelist_path, 'r', encoding='utf-8') as file:
                for line_num, line in enumerate(file, 1):
                    # Strip whitespace and skip empty lines and comments
                    cleaned_line = line.strip()
                    if not cleaned_line or cleaned_line.startswith('#'):
                        continue

                    # Add filename to whitelist
                    self.date_validation_whitelist.add(cleaned_line)
                    self.log(f"Added to date validation whitelist: {cleaned_line}")

            self.log(f"Loaded {len(self.date_validation_whitelist)} entries from whitelist file: {whitelist_path}")

        except Exception as e:
            self.log(f"Error reading whitelist file {whitelist_path}: {e} - proceeding without whitelist", "WARNING")

    def _is_whitelisted(self, filename: str) -> bool:
        """
        Check if a filename is whitelisted for date validation exemption.

        Args:
            filename: The filename to check

        Returns:
            True if the filename is whitelisted, False otherwise
        """
        return filename in self.date_validation_whitelist

    def log(self, message: str, level: str = "INFO") -> None:
        """Log messages with optional verbosity control."""
        if self.git_mode:
            # In git mode, only show errors and warnings
            if level in ["ERROR", "WARNING"]:
                print(f"[{level}] {message}")
        elif self.verbose or level in ["ERROR", "WARNING"]:
            print(f"[{level}] {message}")

    def get_git_tracked_files(self) -> Set[Path]:
        """
        Get all .journal files tracked by git in the repository.

        Returns:
            Set of paths to .journal files tracked by git
        """
        git_journals = set()

        try:
            # Get git repository root
            result = subprocess.run(
                ['git', 'rev-parse', '--show-toplevel'],
                capture_output=True,
                text=True,
                check=True
            )
            git_root = Path(result.stdout.strip())

            # Get all tracked files
            result = subprocess.run(
                ['git', 'ls-files', '*.journal'],
                capture_output=True,
                text=True,
                check=True,
                cwd=git_root
            )

            for file_line in result.stdout.strip().split('\n'):
                if file_line.strip():
                    journal_path = (git_root / file_line.strip()).resolve()
                    git_journals.add(journal_path)
                    self.log(f"Found git-tracked journal: {journal_path}")

        except subprocess.CalledProcessError as e:
            self.log(f"Error getting git-tracked files: {e}", "WARNING")
        except FileNotFoundError:
            self.log("Git not found - scanning filesystem instead", "WARNING")

        return git_journals

    def find_journal_files(self, search_root: Optional[Path] = None) -> Set[Path]:
        """
        Find all .journal files in the filesystem or git repository.

        Args:
            search_root: Root directory to search (defaults to main journal's directory)

        Returns:
            Set of paths to .journal files found
        """
        if self.git_mode:
            # In git mode, only consider git-tracked files
            return self.get_git_tracked_files()

        # Filesystem mode - search from the main journal's directory
        if search_root is None:
            search_root = self.main_journal_path.parent

        journal_files = set()

        try:
            # Search for all .journal files recursively
            for journal_path in search_root.rglob('*.journal'):
                if journal_path.is_file():
                    # Skip hidden files and files in hidden directories
                    if not self._is_hidden_path(journal_path):
                        journal_files.add(journal_path.resolve())
                        self.log(f"Found journal file: {journal_path}")

        except Exception as e:
            self.log(f"Error scanning for journal files: {e}", "ERROR")

        return journal_files

    def _is_hidden_path(self, path: Path) -> bool:
        """
        Check if a path or any of its parents are hidden (start with .).

        Args:
            path: Path to check

        Returns:
            True if the path is hidden, False otherwise
        """
        for part in path.parts:
            if part.startswith('.') and part not in ['.', '..']:
                return True
        return False

    def parse_journal_file(self, journal_path: Path, parent_dir: Optional[Path] = None) -> None:
        """
        Parse a journal file and recursively process includes and file references.

        Args:
            journal_path: Path to the journal file to parse
            parent_dir: Parent directory for resolving relative paths
        """
        # Resolve the absolute path
        if parent_dir and not journal_path.is_absolute():
            resolved_path = (parent_dir / journal_path).resolve()
        else:
            resolved_path = journal_path.resolve()

        # Avoid processing the same journal twice
        if resolved_path in self.processed_journals:
            self.log(f"Skipping already processed journal: {resolved_path}")
            return

        # Check if journal file exists
        if not resolved_path.exists():
            self.log(f"Journal file not found: {resolved_path}", "ERROR")
            return

        if not resolved_path.is_file():
            self.log(f"Path is not a file: {resolved_path}", "ERROR")
            return

        self.processed_journals.add(resolved_path)
        self.log(f"Processing journal: {resolved_path}")

        # Get the directory containing this journal for resolving relative paths
        journal_dir = resolved_path.parent

        try:
            with open(resolved_path, 'r', encoding='utf-8') as file:
                for line_num, line in enumerate(file, 1):
                    line = line.strip()

                    # Skip empty lines
                    if not line:
                        continue

                    # Check for include directives
                    include_match = self.include_pattern.match(line)
                    if include_match:
                        include_path = include_match.group(1).strip()
                        # Remove quotes if present
                        include_path = include_path.strip('\'"')
                        self.log(f"Found include directive: {include_path} (line {line_num})")

                        # Recursively process included journal
                        self.parse_journal_file(Path(include_path), journal_dir)
                        continue

                    # Check for file references in comments
                    for pattern in self.file_ref_patterns:
                        match = pattern.match(line)
                        if match:
                            file_path = match.group(1).strip()
                            # Remove quotes if present
                            file_path = file_path.strip('\'"')

                            self.log(f"Found file reference: {file_path} (line {line_num})")

                            # Resolve relative path from journal's directory
                            if file_path.startswith('./'):
                                file_path = file_path[2:]  # Remove './'

                            referenced_file = (journal_dir / file_path).resolve()
                            self.file_references[referenced_file].append((resolved_path, line_num))

                            # Track the directory containing this file
                            self.referenced_directories.add(referenced_file.parent)

                            break  # Stop checking patterns once we find a match

        except Exception as e:
            self.log(f"Error reading journal file {resolved_path}: {e}", "ERROR")

    def validate_file_references(self) -> None:
        """
        Validate that all referenced files exist on the filesystem.
        """
        self.log("Validating file references...")

        for file_path, references in self.file_references.items():
            if not file_path.exists():
                for journal_path, line_num in references:
                    self.missing_files.append((file_path, journal_path, line_num))
                    self.log(f"Missing file: {file_path} (referenced in {journal_path}:{line_num})", "WARNING")
            else:
                # For existing files, validate the date format in filename
                for journal_path, line_num in references:
                    self.validate_filename_date(file_path, journal_path, line_num)

    def validate_filename_date(self, file_path: Path, journal_path: Path, line_num: int) -> None:
        """
        Validate that the filename starts with a valid date in YYYYMMDD format
        and that the date is within the financial year of the journal.

        Args:
            file_path: Path to the referenced file
            journal_path: Path to the journal file containing the reference
            line_num: Line number of the reference in the journal
        """
        # Skip validation for directories
        if file_path.is_dir():
            self.log(f"Skipping date validation for directory: {file_path}")
            return

        # Extract the filename without path
        filename = file_path.name

        # Check if filename is whitelisted
        if self._is_whitelisted(filename):
            self.log(f"Skipping date validation for whitelisted file: {filename}")
            return

        # Skip validation for .journal and .prices files
        if filename.lower().endswith(('.journal', '.prices')):
            self.log(f"Skipping date validation for {filename} (excluded file type)")
            return

        # Check if filename starts with a date
        date_match = self.filename_date_pattern.match(filename)
        if not date_match:
            self.invalid_date_files.append((file_path, journal_path, line_num, "Filename does not start with YYYYMMDD format"))
            self.log(f"Invalid date format in filename: {filename} (referenced in {journal_path}:{line_num})", "WARNING")
            return

        # Extract the date string and parse it
        date_str = date_match.group(1)
        try:
            file_date = datetime.strptime(date_str, "%Y%m%d").date()
        except ValueError:
            self.invalid_date_files.append((file_path, journal_path, line_num, f"Invalid date {date_str}"))
            self.log(f"Invalid date in filename: {date_str} in {filename} (referenced in {journal_path}:{line_num})", "WARNING")
            return

        # Extract financial year from journal filename
        journal_name = journal_path.name
        fy_match = self.journal_fy_pattern.match(journal_name)

        if not fy_match:
            # If journal name doesn't match expected pattern, skip financial year validation
            self.log(f"Journal filename does not match YYYY-YY.journal pattern: {journal_name}", "INFO")
            return

        # Extract start year and calculate end year
        start_year = int(fy_match.group(1))
        end_year_suffix = fy_match.group(2)

        # Calculate the full end year based on the two-digit suffix
        if int(end_year_suffix) == (start_year + 1) % 100:
            end_year = start_year + 1
        else:
            # Handle edge cases or non-standard naming
            end_year = int(f"{start_year // 100 * 100 + int(end_year_suffix)}")
            if end_year < start_year:
                end_year += 100  # Handle century rollover

        # Define financial year boundaries (July 1 to June 30)
        fy_start = date(start_year, 7, 1)
        fy_end = date(end_year, 6, 30)

        # Check if file date is within financial year
        if not (fy_start <= file_date <= fy_end):
            self.invalid_date_files.append((
                file_path,
                journal_path,
                line_num,
                f"Date {file_date.strftime('%Y-%m-%d')} not within financial year {fy_start.strftime('%Y-%m-%d')} to {fy_end.strftime('%Y-%m-%d')}"
            ))
            self.log(
                f"Date in filename ({file_date.strftime('%Y-%m-%d')}) not within financial year "
                f"{fy_start.strftime('%Y-%m-%d')} to {fy_end.strftime('%Y-%m-%d')}: "
                f"{filename} (referenced in {journal_path}:{line_num})",
                "WARNING"
            )

    def validate_journal_includes(self) -> None:
        """
        Validate that all .journal files are properly included.
        """
        self.log("Validating journal includes...")

        # Find all journal files in the filesystem/git
        self.filesystem_journals = self.find_journal_files()

        # The main journal file should not be considered as needing inclusion
        main_journal_resolved = self.main_journal_path.resolve()

        # Debug output
        if self.verbose:
            self.log(f"Found {len(self.filesystem_journals)} journal files in filesystem")
            self.log(f"Processed {len(self.processed_journals)} journal files")
            self.log("Filesystem journals:")
            for j in sorted(self.filesystem_journals):
                self.log(f"  - {j}")
            self.log("Processed journals:")
            for j in sorted(self.processed_journals):
                self.log(f"  - {j}")

        # Find journals that exist but are not included
        for journal_file in self.filesystem_journals:
            # Skip the main journal file itself
            if journal_file == main_journal_resolved:
                self.log(f"Skipping main journal: {journal_file}")
                continue

            # Skip if this journal is actually processed (which means it was included and parsed)
            if journal_file in self.processed_journals:
                self.log(f"Journal properly included: {journal_file}")
                continue

            # This journal exists but was not processed, so it's not included
            self.unincluded_journals.append(journal_file)
            self.log(f"Unincluded journal found: {journal_file}", "WARNING")

    def find_orphaned_files(self) -> None:
        """
        Find files in referenced directories that aren't referenced by any journal.
        """
        if self.skip_orphan_check:
            self.log("Skipping orphaned file check as requested")
            return

        self.log("Finding orphaned files...")

        # Get all referenced files
        referenced_files = set(self.file_references.keys())

        # Scan all directories that contain referenced files
        for directory in self.referenced_directories:
            if not directory.exists():
                continue

            try:
                for item in directory.iterdir():
                    if item.is_file() and not self._is_hidden_path(item):
                        # Skip if this file is already referenced
                        if item.resolve() in referenced_files:
                            continue

                        # Check if it might be a document file (basic heuristic)
                        if self._is_potential_document(item):
                            self.orphaned_files.append(item)
                            self.log(f"Orphaned file found: {item}", "WARNING")

            except Exception as e:
                self.log(f"Error scanning directory {directory}: {e}", "ERROR")

    def _is_potential_document(self, file_path: Path) -> bool:
        """
        Check if a file is potentially a document that should be referenced.

        Args:
            file_path: Path to the file to check

        Returns:
            True if the file appears to be a document, False otherwise
        """
        # Common document extensions
        document_extensions = {
            '.pdf', '.jpg', '.jpeg', '.png', '.gif', '.tiff', '.tif',
            '.doc', '.docx', '.txt', '.rtf', '.odt',
            '.xls', '.xlsx', '.csv', '.ods',
            '.ppt', '.pptx', '.odp'
        }

        return file_path.suffix.lower() in document_extensions

    def run_validation(self) -> bool:
        """
        Run all validation checks.

        Returns:
            True if all validations pass, False if there are any failures
        """
        self.log("Starting hledger journal validation...")

        # Parse the main journal file and all its includes
        self.parse_journal_file(self.main_journal_path)

        # Run all validation checks
        self.validate_file_references()
        self.validate_journal_includes()

        if not self.skip_orphan_check:
            self.find_orphaned_files()

        # Report results
        return self._report_results()

    def _report_results(self) -> bool:
        """
        Report validation results and return success status.

        Returns:
            True if all validations passed, False otherwise
        """
        all_passed = True

        # Report missing files
        if self.missing_files:
            all_passed = False
            self.log(f"\nFound {len(self.missing_files)} missing file reference(s):", "ERROR")
            for file_path, journal_path, line_num in self.missing_files:
                self.log(f"  - {file_path} (referenced in {journal_path}:{line_num})", "ERROR")

        # Report invalid date files
        if self.invalid_date_files:
            all_passed = False
            self.log(f"\nFound {len(self.invalid_date_files)} file(s) with date validation issues:", "ERROR")
            for file_path, journal_path, line_num, reason in self.invalid_date_files:
                self.log(f"  - {file_path.name}: {reason} (referenced in {journal_path}:{line_num})", "ERROR")

        # Report unincluded journals
        if self.unincluded_journals:
            all_passed = False
            self.log(f"\nFound {len(self.unincluded_journals)} unincluded journal file(s):", "ERROR")
            for journal_path in self.unincluded_journals:
                self.log(f"  - {journal_path}", "ERROR")

        # Report orphaned files
        if self.orphaned_files:
            all_passed = False
            self.log(f"\nFound {len(self.orphaned_files)} orphaned file(s):", "ERROR")
            for file_path in self.orphaned_files:
                self.log(f"  - {file_path}", "ERROR")

        # Summary
        if all_passed:
            self.log("\nāœ“ All validations passed!", "INFO")
        else:
            total_issues = (len(self.missing_files) + len(self.invalid_date_files) +
                          len(self.unincluded_journals) + len(self.orphaned_files))
            self.log(f"\nāœ— Validation failed with {total_issues} issue(s)", "ERROR")

        return all_passed


def main():
    """Main entry point for the validator script."""
    parser = argparse.ArgumentParser(
        description="Validate hledger journal files and their file references",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s main.journal                    # Basic validation
  %(prog)s main.journal -v                 # Verbose output
  %(prog)s main.journal --skip-orphans     # Skip orphaned file check
  %(prog)s main.journal --git              # Git pre-commit mode
  %(prog)s main.journal --whitelist whitelist.txt  # Use date validation whitelist
        """
    )

    parser.add_argument(
        'journal',
        help='Path to the main hledger journal file'
    )

    parser.add_argument(
        '-v', '--verbose',
        action='store_true',
        help='Enable verbose output'
    )

    parser.add_argument(
        '--skip-orphans',
        action='store_true',
        help='Skip checking for orphaned files'
    )

    parser.add_argument(
        '--git',
        action='store_true',
        help='Enable git pre-commit hook mode (quieter output)'
    )

    parser.add_argument(
        '--whitelist',
        type=str,
        help='Path to file containing filenames exempt from date validation'
    )

    args = parser.parse_args()

    # Validate that the journal file exists
    if not os.path.exists(args.journal):
        print(f"Error: Journal file '{args.journal}' not found", file=sys.stderr)
        sys.exit(1)

    # Create and run validator
    validator = HledgerValidator(
        main_journal_path=args.journal,
        verbose=args.verbose,
        skip_orphan_check=args.skip_orphans,
        git_mode=args.git,
        whitelist_file=args.whitelist
    )

    success = validator.run_validation()
    sys.exit(0 if success else 1)


if __name__ == "__main__":
    main()
1 Like