import shutil import sys from django.apps import apps from django.core.management.base import BaseCommand, CommandError from django.core.management.utils import run_formatters from django.db import migrations from django.db.migrations.exceptions import AmbiguityError from django.db.migrations.loader import MigrationLoader from django.db.migrations.optimizer import MigrationOptimizer from django.db.migrations.writer import MigrationWriter from django.utils.version import get_docs_version class Command(BaseCommand): help = "Optimizes the operations for the named migration." def add_arguments(self, parser): parser.add_argument( "app_label", help="App label of the application to optimize the migration for.", ) parser.add_argument( "migration_name", help="Migration name to optimize the operations for." ) parser.add_argument( "--check", action="store_true", help="Exit with a non-zero status if the migration can be optimized.", ) def handle(self, *args, **options): verbosity = options["verbosity"] app_label = options["app_label"] migration_name = options["migration_name"] check = options["check"] # Validate app_label. try: apps.get_app_config(app_label) except LookupError as err: raise CommandError(str(err)) # Load the current graph state. loader = MigrationLoader(None) if app_label not in loader.migrated_apps: raise CommandError(f"App '{app_label}' does not have migrations.") # Find a migration. try: migration = loader.get_migration_by_prefix(app_label, migration_name) except AmbiguityError: raise CommandError( f"More than one migration matches '{migration_name}' in app " f"'{app_label}'. Please be more specific." ) except KeyError: raise CommandError( f"Cannot find a migration matching '{migration_name}' from app " f"'{app_label}'." ) # Optimize the migration. optimizer = MigrationOptimizer() new_operations = optimizer.optimize(migration.operations, migration.app_label) if len(migration.operations) == len(new_operations): if verbosity > 0: self.stdout.write("No optimizations possible.") return else: if verbosity > 0: self.stdout.write( "Optimizing from %d operations to %d operations." % (len(migration.operations), len(new_operations)) ) if check: sys.exit(1) # Set the new migration optimizations. migration.operations = new_operations # Write out the optimized migration file. writer = MigrationWriter(migration) migration_file_string = writer.as_string() if writer.needs_manual_porting: if migration.replaces: raise CommandError( "Migration will require manual porting but is already a squashed " "migration.\nTransition to a normal migration first: " "https://docs.djangoproject.com/en/%s/topics/migrations/" "#squashing-migrations" % get_docs_version() ) # Make a new migration with those operations. subclass = type( "Migration", (migrations.Migration,), { "dependencies": migration.dependencies, "operations": new_operations, "replaces": [(migration.app_label, migration.name)], }, ) optimized_migration_name = "%s_optimized" % migration.name optimized_migration = subclass(optimized_migration_name, app_label) writer = MigrationWriter(optimized_migration) migration_file_string = writer.as_string() if verbosity > 0: self.stdout.write( self.style.MIGRATE_HEADING("Manual porting required") + "\n" " Your migrations contained functions that must be manually " "copied over,\n" " as we could not safely copy their implementation.\n" " See the comment at the top of the optimized migration for " "details." ) if shutil.which("black"): self.stdout.write( self.style.WARNING( "Optimized migration couldn't be formatted using the " '"black" command. You can call it manually.' ) ) with open(writer.path, "w", encoding="utf-8") as fh: fh.write(migration_file_string) run_formatters([writer.path]) if verbosity > 0: self.stdout.write( self.style.MIGRATE_HEADING(f"Optimized migration {writer.path}") )