Mercurial > agora
view apps/bundle/tasks.py @ 209:4033ebe1867f
Add ability to download files
This makes use of a new model (BundleVersion) to keep track of the
locations on disk of the original uploads for each version. This will
require some manual processing to get it working for existing bundles,
since the information needed isn't being stored at the moment.
author | dellsystem <ilostwaldo@gmail.com> |
---|---|
date | Sun, 17 Feb 2013 14:57:39 -0500 |
parents | b711f0087709 |
children | 95edf106bdef |
line wrap: on
line source
from __future__ import with_statement import mimetypes import os import archive import magic from celery import task from apps.bundle.models import Bundle, BundleFile, BundleVersion mimetypes.add_type('application/x-gzip', '.tgz') mimetypes.add_type('application/x-bzip2', '.tz2') archive_extensions = ('.zip', '.tgz', '.tar', '.tz2') text_mimetypes = ( 'application/xml', ) def process_files_in_dir(bundle, dir_name, parent_dir): """ Go through all the subdirectories and files (in that order) in this directory. """ dir_contents = [os.path.join(dir_name, f) for f in os.listdir(dir_name)] dirs = filter(os.path.isdir, dir_contents) files = filter(os.path.isfile, dir_contents) for file_path in sorted(dirs) + sorted(files): filename = os.path.basename(file_path) full_path = file_path[len(bundle.get_temp_path()) + 1:] bundle_file = BundleFile(bundle=bundle, name=filename, parent=parent_dir, full_path=full_path, version=bundle.latest_version) if file_path in files: is_desc = False bundle_file.is_dir = False bundle_file.file_size = os.path.getsize(file_path) # Only highlight the file contents if it's plain text mime_type = magic.from_file(file_path, mime=True) if mime_type.startswith('text/') or mime_type in text_mimetypes: with open(file_path, 'rt') as file: # Store the contents of the file in the code field bundle_file.save_file_contents(file) # DESCRIPTION file should be at most 1 level deep single_parent = (parent_dir is not None or '/' not in parent_dir) is_desc = single_parent and filename == 'DESCRIPTION' # Check if this is the description file (if no description exists) if bundle.octave_format and bundle.description == '' and is_desc: bundle.description_file = bundle_file bundle.save() bundle_file.save() else: # It's a directory - call this function on it recursively bundle_file.is_dir = True bundle_file.save() process_files_in_dir(bundle, file_path, bundle_file) @task() def handle_bundle_upload(bundle_id): bundle = Bundle.objects.get(pk=bundle_id) file = bundle.get_temp_path() # Figure out the most likely mime-type mime_type = magic.from_file(file, mime=True) extension = mimetypes.guess_extension(mime_type) if extension in archive_extensions: new_path = file + extension # Treat it as an archive. Rename it to that, then extract os.rename(file, new_path) # Create the new BundleVersion new_file_name = os.path.basename(new_path) BundleVersion.objects.create(bundle=bundle, file_name=new_file_name, version=bundle.latest_version) try: # Extract it to a directory, same path as the filename archive.extract(new_path, to_path=file) # Now go through the extracted files, make BundleFiles from them process_files_in_dir(bundle, file, None) except archive.ArchiveException: pass elif mime_type.startswith('text/'): # Should be a plain text file - create a CodeFile for it bundle_file = BundleFile(bundle=bundle, name=bundle.file_name, full_path=bundle.file_name, file_size=os.path.getsize(file), version=bundle.latest_version) bundle_file.save_file_contents(open(file, 'rt'), original_filename=bundle.file_name) bundle.done_uploading = True bundle.save()