我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用werkzeug.utils.secure_filename()。
def temporary_upload(name, fileobj): """ Upload a file to a temporary location. Flask will not load sufficiently large files into memory, so it makes sense to always load files into a temporary directory. """ tempdir = mkdtemp() filename = secure_filename(fileobj.filename) filepath = join(tempdir, filename) fileobj.save(filepath) try: yield name, filepath, fileobj.filename finally: rmtree(tempdir)
def get(*args, **kwargs): if ( ('user_group' in kwargs) == False): return {"success":'false', "reason":"Cannot get user_group"} user_group = kwargs['user_group'] if (not ((user_group == 'admin') or (user_group == 'root'))): return {"success": 'false', "reason": 'Unauthorized Action'} filepath = logsPath + secure_filename(kwargs['filename']) try: if not os.path.exists(filepath): return {"success": 'false', "reason": 'file not exist'} logfile = open(filepath, 'r') logtext = logfile.read() logfile.close() return {'success': 'true', 'result': logtext} except: return {'success': 'false', 'reason': 'file read error'}
def _make_zip(self, project, ty): name = self._project_name_latin_encoded(project) json_task_generator = self._respond_json(ty, project.id) if json_task_generator is not None: datafile = tempfile.NamedTemporaryFile() try: datafile.write(json.dumps(json_task_generator)) datafile.flush() zipped_datafile = tempfile.NamedTemporaryFile() try: _zip = self._zip_factory(zipped_datafile.name) _zip.write(datafile.name, secure_filename('%s_%s.json' % (name, ty))) _zip.close() container = "user_%d" % project.owner_id _file = FileStorage(filename=self.download_name(project, ty), stream=zipped_datafile) uploader.upload_file(_file, container=container) finally: zipped_datafile.close() finally: datafile.close()
def _make_zip(self, project, ty): name = self._project_name_latin_encoded(project) csv_task_generator = self._respond_csv(ty, project.id) if csv_task_generator is not None: # TODO: use temp file from csv generation directly datafile = tempfile.NamedTemporaryFile() try: for line in csv_task_generator: datafile.write(str(line)) datafile.flush() csv_task_generator.close() # delete temp csv file zipped_datafile = tempfile.NamedTemporaryFile() try: _zip = self._zip_factory(zipped_datafile.name) _zip.write( datafile.name, secure_filename('%s_%s.csv' % (name, ty))) _zip.close() container = "user_%d" % project.owner_id _file = FileStorage( filename=self.download_name(project, ty), stream=zipped_datafile) uploader.upload_file(_file, container=container) finally: zipped_datafile.close() finally: datafile.close()
def predict(): import ipdb; ipdb.set_trace(context=20) if 'file' not in request.files: flash('No file part') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): filename = secure_filename(file.filename) try: pokemon_name = predict_mlp(file).capitalize() pokemon_desc = pokemon_entries.get(pokemon_name) msg = "" except Exception as e: pokemon_name = None pokemon_desc = None msg = str(e) return jsonify({'name': pokemon_name, 'description': pokemon_desc, "msg": msg})
def upload_file(): if request.method == 'POST': file = request.files.get('file') if not file or file.filename == '': # return 'No file uploaded' return redirect(url_for('response_page')+'?res=no_file') if allowed_file(file.filename): filename = secure_filename(file.filename) file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) return redirect(url_for('response_page')+'?res=succeeded') return redirect(url_for('response_page')+'?res=failed') return ''' <!doctype html> <title>Upload new File</title> <h1>Upload new File</h1> <form action="" method=post enctype=multipart/form-data> <p><input type=file name=file> <input type=submit value=Upload> </form> '''
def _save_analysis_file(self, id, path): file = request.files['file'] analysis = Analysis(get_or_404(current_user.analyses, _id=id)) dirpath = os.path.join(path, str(analysis['_id'])) filepath = os.path.join(dirpath, secure_filename(file.filename)) # Create parent dirs if they don't exist try: os.makedirs(dirpath) except: pass with open(filepath, "wb") as fd: copyfileobj(file.stream, fd) return filepath
def download_support_file(self, id, module, filename): """Download a support file. .. :quickref: Analysis; Download a support file. :param id: id of the analysis. :param module: name of the module. :param filename: name of the file to download. """ analysis = get_or_404(current_user.analyses, _id=id) filepath = os.path.join(fame_config.storage_path, 'support_files', module, str(analysis['_id']), secure_filename(filename)) if os.path.isfile(filepath): return file_download(filepath) else: # This code is here for compatibility # with older analyses filepath = os.path.join(fame_config.storage_path, 'support_files', str(analysis['_id']), secure_filename(filename)) if os.path.isfile(filepath): return file_download(filepath) else: abort(404)
def new_analysis(): analysis_pcap = None analysis_title = None analysis_ruleset = None pcap_name = None pcap_path = None if "title" in request.form: analysis_title = request.form["title"] if "pcap" in request.files: analysis_pcap = request.files["pcap"] if "ruleset" in request.form: analysis_ruleset = request.form["ruleset"] if analysis_pcap is None: return render_template("index.html", analyses = db_handler.search_analyses()) pcap_name = analysis_pcap.filename pcap_name = secure_filename(str(int(time.time()))+"_"+hashlib.sha256(pcap_name).hexdigest()+".pcap") pcap_path = os.path.join(storage_folder, pcap_name) analysis_pcap.save(pcap_path) x = analysis_handler(pcap_file=pcap_path, ruleset=analysis_ruleset, pcap_name=pcap_name, title=analysis_title) analysis_pool.add_analysis(x) return render_template("index.html", analyses=db_handler.search_analyses())
def api_new_analysis(): analysis_pcap = None analysis_title = None analysis_ruleset = None pcap_name = None pcap_path = None if "title" in request.form: analysis_title = request.form["title"] if "pcap" in request.files: analysis_pcap = request.files["pcap"] if "ruleset" in request.form: analysis_ruleset = request.form["ruleset"] if analysis_pcap is None: return "{'id':0}" pcap_name = analysis_pcap.filename pcap_name = secure_filename(str(int(time.time()))+"_"+hashlib.sha256(pcap_name).hexdigest()+".pcap") pcap_path = os.path.join(storage_folder, pcap_name) analysis_pcap.save(pcap_path) x = analysis_handler(pcap_file=pcap_path, ruleset=analysis_ruleset, pcap_name=pcap_name, title=analysis_title) analysis_pool.add_analysis(x) return json.dumps({"id":x.analysis_id})
def update(year, month): date = datetime.date(year, month, 1) filename = None delete_file = False if bank_adapter.fetch_type == 'file': if 'file' not in request.files: abort(400) file = request.files['file'] if config.get('imports_dir'): filename = os.path.join(config['imports_dir'], '%s-%s' % (datetime.date.today().isoformat(), secure_filename(file.filename))) if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) else: temp_file = NamedTemporaryFile(delete=False) filename = temp_file.name temp_file.close() delete_file = True file.save(filename) update_local_data(config, date=date, filename=filename, storage=storage) if delete_file: os.unlink(filename) return redirect(url_for('index', year=year, month=month))
def _parse_request(): ''' Separates data information from parameters and assures that values in data parameters are safe ''' # Parse json request data_params = None module_params = None if request.json: req = request.json assert isinstance(req, dict) print(req) if 'data_params' in req: data_params = req['data_params'] # Make paths secure for key, value in data_params.items(): data_params[key] = secure_filename(value) if 'module_params' in req: module_params = req['module_params'] return data_params, module_params
def _parse_linking_request(): data_params = None module_params = None if request.json: params = request.json assert isinstance(params, dict) if 'data_params' in params: data_params = params['data_params'] for file_role in ['ref', 'source']: # Make paths secure for key, value in data_params[file_role].items(): data_params[file_role][key] = secure_filename(value) if 'module_params' in params: module_params = params['module_params'] return data_params, module_params
def save_file(file): if file and allowed_file(file.filename): filename = secure_filename(file.filename) upload_to = join(app.config['UPLOAD_FOLDER'], filename) if exists(upload_to): filename = '{}_{}.xmind'.format(filename[:-6], arrow.now().strftime('%Y%m%d_%H%M%S')) upload_to = join(app.config['UPLOAD_FOLDER'], filename) file.save(upload_to) insert_record(filename) g.is_success = True elif file.filename == '': g.is_success = False g.error = "Please select a file!" else: g.is_success = False g.invalid_files.append(file.filename)
def FUN_upload_image(): if request.method == 'POST': # check if the post request has the file part if 'file' not in request.files: return(redirect(url_for("FUN_root"))) file = request.files['file'] # if user does not select file, browser also submit a empty part without filename if file.filename == '': return(redirect(url_for("FUN_root"))) if file and allowed_file(file.filename): filename = os.path.join("static/img_pool", hashlib.sha256(str(datetime.datetime.now())).hexdigest() + secure_filename(file.filename).lower()) file.save(filename) prediction_result = mx_predict(filename, local=True) FUN_resize_img(filename) return render_template("index.html", img_src = filename, prediction_result = prediction_result) return(redirect(url_for("FUN_root"))) ################################################ # Start the service ################################################
def upload_file(): if request.method == 'POST': file = request.files['file'] if file and allowed_file(file.filename, ALLOWED_EXTENSIONS): filename = secure_filename(file.filename) file_location = os.path.join(app.config['UPLOAD_FOLDER'], filename) print "file location", file_location file.save(file_location) saved_file = url_for('uploaded_file', filename=filename) endpoint_name = os.path.splitext(filename)[0] database = request.form.get("database") if not database: print request.form["database"] raise InvalidUsage("Database name was not provided", 400, '{ "error" : "database was not provided" }') else: handle_file(database, file_location, endpoint_name, host) return redirect('/loxo/' + database + '/collections/' + endpoint_name) return render_template('upload.html')
def upload(): #pprint.pprint(request.files) # check if the post request has the file part if 'file' not in request.files: #print("not file") return redirect("/") file = request.files['file'] # if user does not select file, browser also # submit a empty part without filename if file.filename == '': return redirect("/") filename = secure_filename(file.filename) path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(path) FLAGS.input_files = path # inference result = main(None) #print(result) return jsonify(result)
def load_researcher_affiliations(): """Preload organisation data.""" form = FileUploadForm() if form.validate_on_submit(): filename = secure_filename(form.file_.data.filename) try: task = Task.load_from_csv(read_uploaded_file(form), filename=filename) flash(f"Successfully loaded {task.record_count} rows.") return redirect(url_for("affiliationrecord.index_view", task_id=task.id)) except ( ValueError, ModelException, ) as ex: flash(f"Failed to load affiliation record file: {ex}", "danger") app.logger.exception("Failed to load affiliation records.") return render_template("fileUpload.html", form=form, form_title="Researcher")
def logo(): """Manage organisation 'logo'.""" org = current_user.organisation best = request.accept_mimetypes.best_match(["text/html", "image/*"]) if best == "image/*" and org.logo: return send_file( BytesIO(org.logo.data), mimetype=org.logo.mimetype, attachment_filename=org.logo.filename) form = LogoForm() if form.validate_on_submit(): f = form.logo.data filename = secure_filename(f.filename) logo = File.create(data=f.read(), mimetype=f.mimetype, filename=f.filename) org.logo = logo org.save() flash(f"Saved organisation logo '{filename}'", "info") return render_template("logo.html", form=form)
def upload(): form = TrainingDataForm() if form.validate_on_submit(): model_name = form.model_name.data learning_rate = float(form.learning_rate.data) batch_size = int(form.batch_size.data) filename = secure_filename(form.training_data.data.filename) print(form.__dict__) # Save to Redis here form.training_data.data.save('wine_quality/data/' + filename) dataframe = pd.read_csv('wine_quality/data/' + filename, sep=',') my_model.train(dataframe, learning_rate, batch_size, model_name) else: filename = None return render_template('test_data_upload.html', form=form, filename=filename)
def submit_crash(instance_id): instance = models.FuzzerInstance.get(id=instance_id) campaign = instance.campaign for filename, file in request.files.items(): crash = models.Crash.create( instance_id=instance.id, campaign_id=instance.campaign_id, created=request.args.get('time'), name=file.filename, analyzed=False ) crash_dir = os.path.join(current_app.config['DATA_DIRECTORY'], secure_filename(campaign.name), 'crashes') os.makedirs(crash_dir, exist_ok=True) upload_path = os.path.join(crash_dir, '%d_%s' % (crash.id, secure_filename(file.filename.replace(',', '_')))) file.save(upload_path) crash.path = os.path.abspath(upload_path) crash.commit() return ''
def validate(self): check_validate = super().validate() if not check_validate: return False campaign = Campaign.get(name=self.name.data) if campaign or os.path.exists(os.path.join(current_app.config['DATA_DIRECTORY'], secure_filename(self.name.data))): self.name.errors.append('Campaign with that name already exists') return False if self.copy_of.data == -1: if not self.executable.has_file(): self.executable.errors.append('Must provide an executable or campaign to copy') return False if not self.testcases.has_file(): self.testcases.errors.append('Must provide testcases or campaign to copy') return False else: copy_of = Campaign.get(id=self.copy_of.data) if not copy_of: self.copy_of.errors.append('Campaign to copy does not exist') return False return True
def plugin_upload(): message = None if request.method == 'POST': # check if the post request has the file part if 'file' not in request.files: message = 'No file part' else: file = request.files['file'] # if user does not select file, browser also # submit a empty part without filename if file.filename == '': message = 'No selected file' elif file and '.' in file.filename and \ file.filename.rsplit('.', 1)[1] == 'zip': file.filename = secure_filename(file.filename) # Create plugin try: plugin = Plugin.create_from_zip(file, file.filename) message = 'Plugin {} version {} created'.format(plugin.name, plugin.version) except ValidationError as e: render_error('<b>{}</b> is not valid {}'.format(key, e)) else: message = 'File does not have a zip extension' log("Plugin upload message: %s" % message) return render_template('upload.html', message=message)
def process_request(): source = request.form['source'] if source == 'local' and 'path' in request.form: keypath = request.form['path'] path = get_local(request.form['path']) if not path: return None, None elif source == 'url' and 'path' in request.form: keypath = request.form['path'] path = get_url(request.form['path']) elif source == 'embedded' and 'file' in request.files: keypath = secure_filename(request.files['file'].filename) path = get_file(request.files['file']) else: req_errors.append('Invalid source, path or file parameters') return None, None return keypath, path
def upload_file(): if 'file' not in request.files: return 'No file part', 400 form_file = request.files['file'] if form_file.filename == '': return 'No selected file', 400 if form_file.filename.lower().endswith(ALLOWED_EXTENSIONS): filename = secure_filename(form_file.filename) file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) form_file.save(file_path) pman.upload_pepe(file_path) # We delete the file because ipfs has copied it into its repository os.remove(file_path) return redirect('/') else: return 'Bad extension', 400
def upload_file(sha): if not redis.get("upload-lock:" + sha): abort(403) # check if the post request has the file part if 'file' not in request.files: abort(400) f = request.files['file'] # if user does not select file, browser also # submit a empty part without filename if f.filename == '': abort(400) if f and f.filename == secure_filename(f.filename): filename = secure_filename(f.filename) # Store files in redis with an expiration so we hopefully don't leak resources. redis.setex("file:" + filename, 120 * 60, f.read()) print(filename, "uploaded") else: abort(400) return jsonify({'msg': 'Ok'})
def nlp(): text_in = request.args.get('text') # if text param is present and is true if text_in is not None: text_out = text_in else: if 'data' not in request.files: abort(400) audio_in = request.files['data'] if audio_in: audio_filename = secure_filename(audio_in.filename) audio_in_path = os.path.join(app.config['UPLOAD_FOLDER'], audio_filename) audio_in.save(audio_in_path) text_out = story_to_text.recognize(audio_in_path) # return 200 and audio text return jsonify(text=text_out)
def upload(): # Create instance of long task if 'file' not in request.files: print "No file found" return render_template('index.html') file = request.files['file'] print file if file.filename == '': flash('No file selected!') return redirect(request.url) if file and valid_image(file.filename): filename = secure_filename(file.filename) out_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(out_path) return render_template("index.html", image_name=filename)
def validate(self): if not Form.validate(self): return False if self.uploadfile.name not in request.files: self.uploadfile.errors.append("Please select a valid file.") return False self.file_ = request.files[self.uploadfile.name] self.filename = secure_filename(self.file_.filename) uploads = os.listdir(os.path.join(app.root_path, app.config['UPLOAD_FOLDER'])) if self.filename in uploads: self.uploadfile.errors.append("A file with this name already exists.") return False if not self.filename: self.uploadfile.errors.append("Invalid file name.") return False return True
def uploadPlugins(): f = request.files['file'] file_name = '' try: if f: fname = secure_filename(f.filename) suffix = fname.split('.')[-1] if suffix in ['py','json']: path = FILE_PATH + fname if os.path.exists(FILE_PATH + fname): fname = fname.split('.')[0] + '_' + str(datetime.now().second) + "." + suffix path = FILE_PATH + fname f.save(path) if os.path.exists(path): file_name = fname.split('.')[0] # redis publish with open(path) as pf: r = getStrictRedis() r.publish('updateplugins', json.dumps({"filename":file_name+"."+suffix, "content":pf.read()})) # update esplugins vulScan.updatePlugins(file_name, suffix) return jsonify({"message":"ok"}) except Exception as e: pass
def upload_video(): if 'video_file' not in request.files: return jsonify(error='upload not found'), 200, {'ContentType': 'application/json'} upload_file = request.files['video_file'] if upload_file.filename == '': return jsonify(error='the file has no name'), 200, {'ContentType': 'application/json'} use_rgb = request.form['use_rgb'] use_flow = request.form['use_flow'] if upload_file and allowed_file(upload_file.filename): filename = secure_filename(upload_file.filename) # first save the file savename = os.path.join(app.config['UPLOAD_FOLDER'], filename) upload_file.save(savename) # classify the video return run_classification(savename, use_rgb, use_flow) else: return jsonify(error='empty or not allowed file'), 200, {'ContentType': 'application/json'}
def to_os_path(self, _path, _case, _run_id): path_out = self.run_path(_case, str(_run_id)) _path = str(_path).split('/') file_out = None counter = 1 for res in _path: if counter < len(_path): res = secure_filename(res) path_out = os.path.join(path_out, res) if counter == len(_path): file_out = res counter += 1 return path_out, file_out ## Function to read file content # @param _file_name file name <string>
def update_user_image(user_id): user = models.Users.query.get(user_id) user_serialized = serializer.UsersSerializer().serialize([user]) if not user: return jsonify(result='invalid user id'), 404 if 'image_file' not in request.files: return jsonify(result='No file part'), 404 file = request.files['image_file'] if file.filename == '': return jsonify(result='No selected file'), 400 if not file or not allowed_file(file.filename): return jsonify(result='File with invalid format'), 400 filename = secure_filename(file.filename) if not user.save_image(file): return jsonify(user=user_serialized), 400 db.session.commit() user_serialized = serializer.UsersSerializer().serialize([user]) return jsonify(user=user_serialized), 200
def getAppInfo(): global packageName,main_activity,apk f = request.files['file'] fname = secure_filename(f.filename) apk = os.path.join(UPLOAD_FOLDER,fname) f.save(apk) cmd_activity = "aapt d badging %s|findstr launchable-activity" %apk cmd_package = "aapt d badging %s|findstr package" %apk activity = Popen(cmd_activity,stdout=PIPE,shell=True) package = Popen(cmd_package,stdout=PIPE,shell=True) main_activity = activity.stdout.read().decode().split("name='")[1].split("'")[0] packageName = package.stdout.read().decode().split("name='")[1].split("'")[0] activity.kill() package.kill() return redirect(url_for("index"))
def editor_pic(self): image_file = request.files['editormd-image-file'] if image_file and allowed_photo(image_file.filename): try: filename = secure_filename(image_file.filename) filename = str(date.today()) + '-' + random_str() + '-' + filename file_path = os.path.join(bpdir, 'static/editor.md/photoupdate/', filename) qiniu_path = os.path.join(bpdir, 'static/blog/qiniu_pic/', filename) image_file.save(file_path) ting_pic(file_path, qiniu_path) qiniu_link = get_link(qiniu_path, filename) data = { 'success': 1, 'message': 'image of editor.md', 'url': qiniu_link } return json.dumps(data) except Exception as e: current_app.logger.error(e) else: return u"??????????????"
def upload_recipe(request): def isAllowed(filename): return len(filter_by_extensions(filename)) > 0 redirect_url = "index" print("request: {}".format(request.FILES.getlist("recipes"))) for file in request.FILES.getlist("recipes"): print("file: {}".format(file.name)) if isAllowed(file.name): filename = path.join("recipes", secure_filename(file.name)) print("filename: {}".format(filename)) # file.save(filename) handle_uploaded_file(file, filename) redirect_url = "validate" request.session["recipe_file"] = filename else: print("Invalid BeerXML file <%s>." % file.filename) return redirect(redirect_url) # @frontend.route("/validate")
def avatar(): basedir = os.path.abspath(os.path.dirname(__file__)) file_dir = os.path.join(basedir,current_app.config['UPLOAD_IMG_FOLDER']) username = request.values.get('username') form = AvatarForm() if not os.path.exists(file_dir): os.makedirs(file_dir) if request.method == 'POST': file = request.files['file'] if file and allowed_file(file.filename): size = (50, 50) im = Image.open(file) im.thumbnail(size) if file and allowed_file(file.filename): filename = secure_filename(file.filename) newname = username+'.jpg' im.save(os.path.join(file_dir, newname)) token = base64.b64encode(newname) user = User.query.filter_by(username=username).first() user.token = token db.session.add(user) db.session.commit() return redirect(url_for('main.index')) return render_template('auth/change_avatar.html', form=form)
def download_name(self, project, ty, _format): """Get the filename (without) path of the file which should be downloaded. This function does not check if this filename actually exists!""" # TODO: Check if ty is valid name = self._project_name_latin_encoded(project) filename = '%s_%s_%s_%s.zip' % (str(project.id), name, ty, _format) # Example: 123_feynman_tasks_json.zip filename = secure_filename(filename) return filename
def test_export_task_json_support_non_latin1_project_names(self): project = ProjectFactory.create(name=u'?????? ????!', short_name=u'?????? ????!') self.clear_temp_container(project.owner_id) res = self.app.get('project/%s/tasks/export?type=task&format=json' % project.short_name, follow_redirects=True) filename = secure_filename(unidecode(u'?????? ????!')) assert filename in res.headers.get('Content-Disposition'), res.headers
def test_export_taskrun_json_support_non_latin1_project_names(self): project = ProjectFactory.create(name=u'?????? ????!', short_name=u'?????? ????!') res = self.app.get('project/%s/tasks/export?type=task_run&format=json' % project.short_name, follow_redirects=True) filename = secure_filename(unidecode(u'?????? ????!')) assert filename in res.headers.get('Content-Disposition'), res.headers
def test_export_taskrun_csv_support_non_latin1_project_names(self): project = ProjectFactory.create(name=u'?????? ????!', short_name=u'?????? ????!') task = TaskFactory.create(project=project) TaskRunFactory.create(task=task) res = self.app.get('/project/%s/tasks/export?type=task_run&format=csv' % project.short_name, follow_redirects=True) filename = secure_filename(unidecode(u'?????? ????!')) assert filename in res.headers.get('Content-Disposition'), res.headers
def encrypt(): if request.method == 'POST': # check if the post request has the file part if 'up_pic' not in request.files: return "No file selected" return redirect(request.url) file = request.files['up_pic'] degree = int(request.form.get('degree')) pwd = request.form.get('pwd') # if user does not select file, browser also # submit a empty part without filename if file.filename == '': return "No file selected" return redirect(request.url) if file and allowed_file(file.filename): filename = secure_filename(file.filename) file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) links = [str(os.path.join(app.config['UPLOAD_FOLDER'], filename))] if request.form['submit']=="enc": (im,arr,path)=enc.encode(links[0],degree,pwd) else: (im,arr,path)=dec.decode(links[0],degree,pwd) links.append(path) return render_template("display.html",link=links)
def admin_files(chalid): if request.method == 'GET': files = Files.query.filter_by(chal=chalid).all() json_data = {'files':[]} for x in files: json_data['files'].append({'id':x.id, 'file':x.location}) return jsonify(json_data) if request.method == 'POST': if request.form['method'] == "delete": f = Files.query.filter_by(id=request.form['file']).first_or_404() if os.path.exists(os.path.join(app.static_folder, 'uploads', f.location)): ## Some kind of os.path.isfile issue on Windows... os.unlink(os.path.join(app.static_folder, 'uploads', f.location)) db.session.delete(f) db.session.commit() db.session.close() return "1" elif request.form['method'] == "upload": files = request.files.getlist('files[]') for f in files: filename = secure_filename(f.filename) if len(filename) <= 0: continue md5hash = hashlib.md5(os.urandom(64)).hexdigest() if not os.path.exists(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash)): os.makedirs(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash)) f.save(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash, filename)) db_f = Files(chalid, os.path.join('static', 'uploads', md5hash, filename)) db.session.add(db_f) db.session.commit() db.session.close() return redirect('/admin/chals')
def admin_create_chal(): files = request.files.getlist('files[]') ## TODO: Expand to support multiple flags flags = [{'flag':request.form['key'], 'type':int(request.form['key_type[0]'])}] # Create challenge chal = Challenges(request.form['name'], request.form['desc'], request.form['value'], request.form['category'], flags) db.session.add(chal) db.session.commit() for f in files: filename = secure_filename(f.filename) if len(filename) <= 0: continue md5hash = hashlib.md5(os.urandom(64)).hexdigest() if not os.path.exists(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash)): os.makedirs(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash)) f.save(os.path.join(os.path.normpath(app.static_folder), 'uploads', md5hash, filename)) db_f = Files(chal.id, os.path.join('static', 'uploads', md5hash, filename)) db.session.add(db_f) db.session.commit() db.session.close() return redirect('/admin/chals')
def post(self): args = self.parser.parse_args() if not args.file: abort(400, message={'file': 'parameter is required'}) if (args.xres is None) != (args.yres is None): a, b = ('xres', 'yres') if args.yres is None else ('yres', 'xres') abort(400, message={a: "can not stand alone without {} being set".format(b)}) if not args.name: args.name = os.path.basename(os.path.splitext(args.file.filename)[0]) inputdim = (args.xres, args.yres) if args.xres else None svgdata = args.file.read() try: data = convert(io.BytesIO(svgdata), inputdim=inputdim, name=args.name) data = json.dumps(data, indent=2, sort_keys=True) except Exception as exc: abort(400, message={'file': str(exc)}) # Save the shape. dirname = storage_dir() if not os.path.isdir(dirname): os.makedirs(dirname) basename = secure_filename(datetime.now().strftime('%Y%m%d%H%M%S_' + args.name)) filename = os.path.join(dirname, basename) with open(filename + '.svg', 'wb') as fp: fp.write(svgdata) with open(filename + '.json', 'w') as fp: fp.write(data) return {'status': 'ok', 'name': args.name, 'id': basename}
def upload_file(self, folder, param): if request.method == 'POST': # check if the post request has the file part if param not in request.files: return None file = request.files[param] # if user does not select file, browser also submit a empty part without filename if not file or len(file.filename) == 0: return None if '.' in file.filename and file.filename.rsplit('.', 1)[1] == 'py': filename = secure_filename(file.filename) file.save(os.path.join(folder, filename)) return filename
def store(self, filename, file_data): filename = secure_filename(filename) if self.snake_case: filename = convert_to_snake_case(filename) if self._exists(filename): raise StorageExists() if self.all_allowed or any(filename.endswith('.' + x) for x in self.allowed): self.s3.put_object(Bucket=self.bucket_name, Key=filename, Body=file_data, ACL=self.acl) else: raise StorageNotAllowed() return filename
def store(self, filename, file_data): filename = secure_filename(filename) if self.snake_case: filename = convert_to_snake_case(filename) if filename in self.get_existing_files(): raise StorageExists() if self.all_allowed or any(filename.endswith('.' + x) for x in self.allowed): file_data.save(os.path.join(self.abs_img_folder, filename)) else: raise StorageNotAllowed() return filename
def create_new_attachment(paste_id, file_name, file_size, mime_type, file_data): """ Create a new database entry for an attachment with the given file_name, associated with a particular paste ID. :param paste_id: Paste ID to associate with this attachment :param file_name: Raw name of the file :param file_size: Size of the file in bytes :param mime_type: MIME type of the file :param file_data: Binary, base64-encoded file data :return: An instance of models.Attachment describing this attachment entry :raises PasteDoesNotExistException: If the associated paste does not exist """ # Add an entry into the database describing this file new_attachment = models.Attachment( paste_id=paste_id, file_name=secure_filename(file_name), file_size=file_size, mime_type=mime_type, ) _store_attachment_file(paste_id, file_data, new_attachment.hash_name) session.add(new_attachment) session.commit() return new_attachment
def upload(): from run import config form = UploadForm() if form.validate_on_submit(): # Process uploaded file uploaded_file = request.files[form.file.name] if uploaded_file: filename = secure_filename(uploaded_file.filename) temp_path = os.path.join( config.get('SAMPLE_REPOSITORY', ''), 'TempFiles', filename) # Save to temporary location uploaded_file.save(temp_path) # Get hash and check if it's already been submitted file_hash = create_hash_for_sample(temp_path) if sample_already_uploaded(file_hash): # Remove existing file and notice user os.remove(temp_path) form.errors['file'] = [ 'Sample with same hash already uploaded or queued'] else: add_sample_to_queue(file_hash, temp_path, g.user.id, g.db) # Redirect return redirect(url_for('.index')) return { 'form': form, 'accept': form.accept, 'upload_size': (config.get('MAX_CONTENT_LENGTH', 0) / (1024 * 1024)), }