import gzip import os import warnings from django.apps import apps from django.core import serializers from django.core.management.base import BaseCommand, CommandError from django.core.management.utils import parse_apps_and_model_labels from django.db import DEFAULT_DB_ALIAS, router try: import bz2 has_bz2 = True except ImportError: has_bz2 = False try: import lzma has_lzma = True except ImportError: has_lzma = False class ProxyModelWarning(Warning): pass class Command(BaseCommand): help = ( "Output the contents of the database as a fixture of the given format " "(using each model's default manager unless --all is specified)." ) def add_arguments(self, parser): parser.add_argument( "args", metavar="app_label[.ModelName]", nargs="*", help=( "Restricts dumped data to the specified app_label or " "app_label.ModelName." ), ) parser.add_argument( "--format", default="json", help="Specifies the output serialization format for fixtures.", ) parser.add_argument( "--indent", type=int, help="Specifies the indent level to use when pretty-printing output.", ) parser.add_argument( "--database", default=DEFAULT_DB_ALIAS, help="Nominates a specific database to dump fixtures from. " 'Defaults to the "default" database.', ) parser.add_argument( "-e", "--exclude", action="append", default=[], help="An app_label or app_label.ModelName to exclude " "(use multiple --exclude to exclude multiple apps/models).", ) parser.add_argument( "--natural-foreign", action="store_true", dest="use_natural_foreign_keys", help="Use natural foreign keys if they are available.", ) parser.add_argument( "--natural-primary", action="store_true", dest="use_natural_primary_keys", help="Use natural primary keys if they are available.", ) parser.add_argument( "-a", "--all", action="store_true", dest="use_base_manager", help=( "Use Django's base manager to dump all models stored in the database, " "including those that would otherwise be filtered or modified by a " "custom manager." ), ) parser.add_argument( "--pks", dest="primary_keys", help="Only dump objects with given primary keys. Accepts a comma-separated " "list of keys. This option only works when you specify one model.", ) parser.add_argument( "-o", "--output", help="Specifies file to which the output is written." ) def handle(self, *app_labels, **options): format = options["format"] indent = options["indent"] using = options["database"] excludes = options["exclude"] output = options["output"] show_traceback = options["traceback"] use_natural_foreign_keys = options["use_natural_foreign_keys"] use_natural_primary_keys = options["use_natural_primary_keys"] use_base_manager = options["use_base_manager"] pks = options["primary_keys"] if pks: primary_keys = [pk.strip() for pk in pks.split(",")] else: primary_keys = [] excluded_models, excluded_apps = parse_apps_and_model_labels(excludes) if not app_labels: if primary_keys: raise CommandError("You can only use --pks option with one model") app_list = dict.fromkeys( app_config for app_config in apps.get_app_configs() if app_config.models_module is not None and app_config not in excluded_apps ) else: if len(app_labels) > 1 and primary_keys: raise CommandError("You can only use --pks option with one model") app_list = {} for label in app_labels: try: app_label, model_label = label.split(".") try: app_config = apps.get_app_config(app_label) except LookupError as e: raise CommandError(str(e)) if app_config.models_module is None or app_config in excluded_apps: continue try: model = app_config.get_model(model_label) except LookupError: raise CommandError( "Unknown model: %s.%s" % (app_label, model_label) ) app_list_value = app_list.setdefault(app_config, []) # We may have previously seen an "all-models" request for # this app (no model qualifier was given). In this case # there is no need adding specific models to the list. if app_list_value is not None and model not in app_list_value: app_list_value.append(model) except ValueError: if primary_keys: raise CommandError( "You can only use --pks option with one model" ) # This is just an app - no model qualifier app_label = label try: app_config = apps.get_app_config(app_label) except LookupError as e: raise CommandError(str(e)) if app_config.models_module is None or app_config in excluded_apps: continue app_list[app_config] = None # Check that the serialization format exists; this is a shortcut to # avoid collating all the objects and _then_ failing. if format not in serializers.get_public_serializer_formats(): try: serializers.get_serializer(format) except serializers.SerializerDoesNotExist: pass raise CommandError("Unknown serialization format: %s" % format) def get_objects(count_only=False): """ Collate the objects to be serialized. If count_only is True, just count the number of objects to be serialized. """ if use_natural_foreign_keys: models = serializers.sort_dependencies( app_list.items(), allow_cycles=True ) else: # There is no need to sort dependencies when natural foreign # keys are not used. models = [] for app_config, model_list in app_list.items(): if model_list is None: models.extend(app_config.get_models()) else: models.extend(model_list) for model in models: if model in excluded_models: continue if model._meta.proxy and model._meta.proxy_for_model not in models: warnings.warn( "%s is a proxy model and won't be serialized." % model._meta.label, category=ProxyModelWarning, ) if not model._meta.proxy and router.allow_migrate_model(using, model): if use_base_manager: objects = model._base_manager else: objects = model._default_manager queryset = objects.using(using).order_by(model._meta.pk.name) if primary_keys: queryset = queryset.filter(pk__in=primary_keys) if count_only: yield queryset.order_by().count() else: yield from queryset.iterator() try: self.stdout.ending = None progress_output = None object_count = 0 # If dumpdata is outputting to stdout, there is no way to display progress if output and self.stdout.isatty() and options["verbosity"] > 0: progress_output = self.stdout object_count = sum(get_objects(count_only=True)) if output: file_root, file_ext = os.path.splitext(output) compression_formats = { ".bz2": (open, {}, file_root), ".gz": (gzip.open, {}, output), ".lzma": (open, {}, file_root), ".xz": (open, {}, file_root), ".zip": (open, {}, file_root), } if has_bz2: compression_formats[".bz2"] = (bz2.open, {}, output) if has_lzma: compression_formats[".lzma"] = ( lzma.open, {"format": lzma.FORMAT_ALONE}, output, ) compression_formats[".xz"] = (lzma.open, {}, output) try: open_method, kwargs, file_path = compression_formats[file_ext] except KeyError: open_method, kwargs, file_path = (open, {}, output) if file_path != output: file_name = os.path.basename(file_path) warnings.warn( f"Unsupported file extension ({file_ext}). " f"Fixtures saved in '{file_name}'.", RuntimeWarning, ) stream = open_method(file_path, "wt", **kwargs) else: stream = None try: serializers.serialize( format, get_objects(), indent=indent, use_natural_foreign_keys=use_natural_foreign_keys, use_natural_primary_keys=use_natural_primary_keys, stream=stream or self.stdout, progress_output=progress_output, object_count=object_count, ) finally: if stream: stream.close() except Exception as e: if show_traceback: raise raise CommandError("Unable to serialize database: %s" % e)