view apps/bundle/tasks.py @ 151:c7be7def8b57

Bundles! (basic functionality) Changes made in this commit: * Added new dependencies (see pip-requirements) * Added new dependency and setup information to README * Deleted the included mptt app (in apps/mptt) in favour of just adding the dependency to pip-requirements (makes it easier to update, etc) * Changed the import convention to use `from apps.bundle.models import Bundle` rather than `from agora.apps.bundle.models import Bundle` because Celery was having problems with the latter style. Everything should still work. * Moved the syntax-highlighting and related code for snippets into separate HTML files so that they can be used by the bundle app And, of course, the ability to upload bundles. But wait! There's more! Changes still to come, for only $19.95 a month: * Bundle versioning * Automatic license integration (i.e. adding headers to files) * The ability to download bundles (zip, tar, etc) * Rating bundles * And much, much more! Batteries not included.
author dellsystem <ilostwaldo@gmail.com>
date Mon, 15 Oct 2012 00:52:00 -0400
parents
children 27de4f31f9d5
line wrap: on
line source

import mimetypes
import os

import archive
import magic
from celery import task

from apps.bundle.models import Bundle, BundleFile


mimetypes.add_type('application/x-gzip', '.tgz')
mimetypes.add_type('application/x-bzip2', '.tz2')
archive_extensions = ('.zip', '.tgz', '.tar', '.tz2')


def process_files_in_dir(bundle, dir_name, parent_dir):
    """
    Go through all the subdirectories and files (in that order) in this
    directory.
    """
    dir_contents = [os.path.join(dir_name, f) for f in os.listdir(dir_name)]
    dirs = filter(os.path.isdir, dir_contents)
    files = filter(os.path.isfile, dir_contents)
    print "Directories: %s" % ", ".join(dirs)
    print "Files: %s" % ", ".join(files)

    for file_path in sorted(dirs) + sorted(files):
        filename = os.path.basename(file_path)
        full_path = file_path[len(bundle.get_temp_path()) + 1:]
        bundle_file = BundleFile(bundle=bundle, name=filename,
            parent=parent_dir, full_path=full_path)

        if file_path in files:
            bundle_file.is_dir = False
            bundle_file.file_size = os.path.getsize(file_path)

            # Only highlight the file contents if it's plain text
            mime_type = magic.from_file(file_path, mime=True)
            if mime_type.startswith('text/'):
                with open(file_path, 'rt') as file:
                    # Store the contents of the file in the code field
                    bundle_file.save_file_contents(file)

            bundle_file.save()
        else:
            # It's a directory - call this function on it recursively
            bundle_file.is_dir = True
            bundle_file.save()
            process_files_in_dir(bundle, file_path, bundle_file)

@task()
def handle_bundle_upload(bundle_id):
    bundle = Bundle.objects.get(pk=bundle_id)
    file = bundle.get_temp_path()

    # Figure out the most likely mime-type
    mime_type = magic.from_file(file, mime=True)
    extension = mimetypes.guess_extension(mime_type)

    print "mime type: %s" % mime_type
    print "extension: %s" % extension

    if extension in archive_extensions:
        new_path = file + extension
        # Treat it as an archive. Rename it to that, then extract
        os.rename(file, new_path)

        try:
            # Extract it to a directory, same path as the filename
            archive.extract(new_path, to_path=file)

            # Now go through the extracted files, make BundleFiles from them
            process_files_in_dir(bundle, file, None)
        except archive.ArchiveException:
            print "Archive exception"
            pass
    elif mime_type.startswith('text/'):
        # Should be a plain text file - create a CodeFile for it
        bundle_file = BundleFile(bundle=bundle, name=bundle.file_name,
            full_path=bundle.file_name, file_size=os.path.getsize(file))
        bundle_file.save_file_contents(open(file, 'rt'),
            original_filename=bundle.file_name)

    print "Done uploading!"
    bundle.done_uploading = True
    bundle.save()