我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mimetypes.guess_extension()。
def get_extension(media): """Gets the corresponding extension for any Telegram media""" # Photos are always compressed as .jpg by Telegram if isinstance(media, (UserProfilePhoto, ChatPhoto, MessageMediaPhoto)): return '.jpg' # Documents will come with a mime type if isinstance(media, MessageMediaDocument): if isinstance(media.document, Document): if media.document.mime_type == 'application/octet-stream': # Octet stream are just bytes, which have no default extension return '' else: extension = guess_extension(media.document.mime_type) return extension if extension else '' return ''
def _get_email_thread_attachment(ticket, email_category=None): try: _emails = ImplementationFactory.instance.get_singleton_of( 'MailerServiceBase' ).get_emails(ticket) except (KeyError, MailerServiceException) as ex: raise InternalServerError(str(ex)) emails = [email for email in _emails if email.category.lower() == email_category] try: content, filetype = utils.get_email_thread_content(ticket, emails) except (utils.EmailThreadTemplateNotFound, utils.EmailThreadTemplateSyntaxError) as ex: raise InternalServerError(str(ex)) content = base64.b64encode(content) name = 'ticket_{}_emails_{}{}'.format( ticket.publicId, datetime.strftime(datetime.now(), '%d-%m-%Y_%H-%M-%S'), mimetypes.guess_extension(filetype), ) return {'filetype': filetype, 'content': content, 'name': name}
def validate_file(form, field): # File cannot end with a forbidden extension filename, file_extension = os.path.splitext(field.data.filename) if len(file_extension) > 0: forbidden_ext = ForbiddenExtension.query.filter( ForbiddenExtension.extension == file_extension[1:]).first() if forbidden_ext is not None: raise ValidationError('Extension not allowed') mimedata = field.data mimetype = magic.from_buffer(field.data.read(1024), mime=True) # File Pointer returns to beginning field.data.seek(0, 0) # Check for permitted mimetype forbidden_mime = ForbiddenMimeType.query.filter( ForbiddenMimeType.mimetype == mimetype).first() if forbidden_mime is not None: raise ValidationError('File MimeType not allowed') extension = mimetypes.guess_extension(mimetype) if extension is not None: forbidden_real = ForbiddenExtension.query.filter( ForbiddenExtension.extension == extension[1:]).first() if forbidden_real is not None: raise ValidationError('Extension not allowed')
def filename_from_url(url, content_type): fn = urlsplit(url).path.rstrip('/') fn = os.path.basename(fn) if fn else 'index' if '.' not in fn and content_type: content_type = content_type.split(';')[0] if content_type == 'text/plain': # mimetypes returns '.ksh' ext = '.txt' else: ext = mimetypes.guess_extension(content_type) if ext == '.htm': # Python 3 ext = '.html' if ext: fn += ext return fn
def __init__(self, config_file=None): super(SonOfMMM, self).__init__(config_file) self.log_file = '%s.log' % self.instance_id self.log_path = os.path.join(self.working_dir, self.log_file) boto.set_file_logger(self.name, self.log_path) if self.sd.has_option('ffmpeg_args'): self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args') else: self.command = '/usr/local/bin/ffmpeg -y -i %s %s' self.output_mimetype = self.sd.get('output_mimetype') if self.sd.has_option('output_ext'): self.output_ext = self.sd.get('output_ext') else: self.output_ext = mimetypes.guess_extension(self.output_mimetype) self.output_bucket = self.sd.get_obj('output_bucket') self.input_bucket = self.sd.get_obj('input_bucket') # check to see if there are any messages queue # if not, create messages for all files in input_bucket m = self.input_queue.read(1) if not m: self.queue_files()
def save_zip(self, stream, content_type, name=None): '''Save the zip stream to disk and extract its contents ''' # Make sure the storage path exists ensure_dir(self._storage_path) ext = mimetypes.guess_extension(content_type) if not name: name = str(self._uuidgen()) fname = '{uuid}{ext}'.format(uuid=name, ext=ext) archive_path = os.path.join(self._storage_path, fname) self._write(archive_path, stream) # extract the zip file directory = extract_model(archive_path, name, self._storage_path) return fmdb.id_from_path(directory), name
def store(self): if len(self.data) >= self.MAX_SIZE: raise TooBigMedia(self.identifying_name, self.MAX_SIZE) mime = magic.from_buffer(self.data, mime=True) if mime not in self.allowed_mimetypes: raise InvalidMimeType(mime) self.extension = mimetypes.guess_extension(mime) # weirdness from mimetypes if self.extension == '.jpe': self.extension = '.jpeg' checksum = hashlib.sha1(self.data).hexdigest() fn = '{}{}'.format(checksum, self.extension) img = Image(organization=self.organization) img.file.save(fn, ContentFile(self.data)) return img.get_absolute_url()
def fileinfo(self, path): info, images = self.robot_obj.file_info(path) for key, value in info.items(): if len(value) > 30: logger.info(" :%s => %s", key, value[:30]) else: logger.info(" :%s => %s", key, value) logger.info("%s" % info) previews = [] for img in images: ext = mimetypes.guess_extension(img[0]) if ext: ntf = NamedTemporaryFile(suffix=ext, delete=False) ntf.write(img[1]) previews.append(ntf.name) if previews: os.system("open " + " ".join(previews))
def add_media_to_archive(self, media, mime, name=''): """Adds to "Pictures" archive folder the file in `media` and register it into manifest file.""" extension = None if hasattr(media, 'name') and not name: extension = path.splitext(media.name) name = extension[0] extension = extension[1] if not extension: extension = guess_extension(mime) media_path = 'Pictures/%s%s' % (name, extension) media.seek(0) self.files[media_path] = media.read(-1) if hasattr(media, 'close'): media.close() files_node = self.manifest.getElementsByTagName('manifest:manifest')[0] node = self.create_node(self.manifest, 'manifest:file-entry', files_node) node.setAttribute('manifest:full-path', media_path) node.setAttribute('manifest:media-type', mime) return media_path
def generate_email_files(msg): counter = 1 upload_date = time.mktime(email.utils.parsedate(msg["Date"])) for part in msg.walk(): # multipart/* are just containers if part.get_content_maintype() == 'multipart': continue # Applications should really sanitize the given filename so that an # email message can't be used to overwrite important files filename = part.get_filename() if not filename: ext = mimetypes.guess_extension(part.get_content_type()) if not ext: # Use a generic bag-of-bits extension ext = '.bin' filename = 'part-%03d%s' % (counter, ext) counter += 1 data = part.get_payload(decode=True) if parse_pathname(filename).ext == '.zip': for zipfn, zipdata, zipdt in generate_zip_files(data): yield zipfn, zipdata, zipdt else: yield filename, data, upload_date
def __init__(self, config_file=None): Service.__init__(self, config_file) self.log_file = '%s.log' % self.instance_id self.log_path = os.path.join(self.working_dir, self.log_file) boto.set_file_logger(self.name, self.log_path) if self.sd.has_option('ffmpeg_args'): self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args') else: self.command = '/usr/local/bin/ffmpeg -y -i %s %s' self.output_mimetype = self.sd.get('output_mimetype') if self.sd.has_option('output_ext'): self.output_ext = self.sd.get('output_ext') else: self.output_ext = mimetypes.guess_extension(self.output_mimetype) self.output_bucket = self.sd.get_obj('output_bucket') self.input_bucket = self.sd.get_obj('input_bucket') # check to see if there are any messages queue # if not, create messages for all files in input_bucket m = self.input_queue.read(1) if not m: self.queue_files()
def run(self): while True: # The name will be used for saving the file name, url = self.imageQueue.get() res = requests.get(url, headers=HEADERS, timeout=TIMEOUT, stream=True) if res.status_code == 200: content_type = res.headers['content-type'] # With the content type received from the web server, use mimetypes to guess the file extension. extension = mimetypes.guess_extension(content_type) filepath = os.path.join('./' + IMAGE_FOLDER + '/' + name + extension) with open(filepath, 'wb') as f: # Stream the files. for chunk in res: f.write(chunk) # Notify that we have finished one task. self.imageQueue.task_done() # Function to retrieve a list of URL for every pages of cards. # The url parameter is the entry point of the website where we might extract the information.
def get_extension(media): """Gets the corresponding extension for any Telegram media""" # Photos are always compressed as .jpg by Telegram if (isinstance(media, UserProfilePhoto) or isinstance(media, ChatPhoto) or isinstance(media, MessageMediaPhoto)): return '.jpg' # Documents will come with a mime type, from which we can guess their mime type if isinstance(media, MessageMediaDocument): extension = guess_extension(media.document.mime_type) return extension if extension else '' return None
def parse_attachment(part): """ Get attachments of an email :param `Message` part: A `Message` :rtype: list :return: The list of attachments """ attachment = {} attachment['content_type'] = part.get_content_type() if attachment['content_type'].lower() in ['message/rfc822', 'message/delivery-status']: attachment['content'] = str(part) else: attachment['content'] = part.get_payload(decode=True) filename = part.get_filename() if not filename: filename = hashlib.sha1(attachment['content']).hexdigest() if attachment['content_type']: extension = mimetypes.guess_extension(attachment['content_type']) if extension: filename += extension attachment['filename'] = get_valid_filename(utils.decode_every_charset_in_the_world(filename)) return attachment
def remux_detect(f): from detection.utils import filetype f = os.path.abspath(f) mime = filetype(f) ext = mimetypes.guess_extension(mime, strict=False) if ext: if ext[0] == '.': ext = ext[1:] if ext == 'ogx': ext = 'ogg' else: # naive get extension from mime ext = mime.split('/')[1] if ext[:2] == 'x-': ext = ext[2:] with tempfile.NamedTemporaryFile(suffix='.'+ext) as tmp: args = ['ffmpeg', '-loglevel', 'warning', '-y', '-i', f, '-c', 'copy', tmp.name] subprocess.call(args) size = os.path.getsize(tmp.name) if size: return size, False
def save(self, image_stream, image_content_type): ext = mimetypes.guess_extension(image_content_type) name = '{uuid}{ext}'.format(uuid=self._uuidgen(), ext=ext) image_path = os.path.join(self._storage_path, name) with self._fopen(image_path, 'wb') as image_file: while True: chunk = image_stream.read(self._CHUNK_SIZE_BYTES) if not chunk: break image_file.write(chunk) return name
def upload(url): cclive = subprocess.Popen("cclive --support | xargs | tr ' ' '|'", stdout=subprocess.PIPE, shell=True) (cclive_formats, err) = cclive.communicate() re_youtube = "youtube|youtu\.be|yooouuutuuube" search = ".*(?:{}|{}).*".format(re_youtube, cclive_formats) try: if re.match(search, url, re.I): if re.match(".*(?:{}).*".format(re_youtube), url, re.I): cmd = "youtube-dl --quiet --recode-video webm --format webm/mp4 --output /tmp/%\(id\)s.webm {}".format(url) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) yt = ".*(?:youtube.*?(?:v=|/v/)|youtu\.be/|yooouuutuuube.*?id=)([-_a-zA-Z0-9]+).*" file = "/tmp/{}.webm".format(re.match(yt, url, re.I).group(1)) else: cmd = "cclive --quiet -f fmt43_360p {} --O /tmp/pomf.webm --exec 'echo -n %f'".format(url, "/tmp") p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) (file, err) = p.communicate() else: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:37.0) Gecko/20100101 Firefox/37.0', 'Referer': 'http://www.amazon.com/' } extension = guess_extension(guess_type(url)[0]).replace('jpe','jpg') temp = tempfile.NamedTemporaryFile(suffix=extension) content = requests.get(url).content temp.write(content) file = temp.name fh = open(file, "rb") fh.seek(0) content = requests.post(url="http://pomf.se/upload.php", files={"files[]":fh}) if not content.status_code // 100 == 2: raise Exception("Unexpected response {}".format(content)) return "http://a.pomf.se/{}".format(content.json()["files"][0]["url"]) except Exception as e: return "Error: {}".format(e)
def downloadImage(url, folder, name, loop, chunkSize=20): result = {'canAccessURL': False, 'isImage': False, 'fileSaved': False} headers = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3', 'Accept-Encoding': 'none', 'Accept-Language': 'en-US,en;q=0.8', 'Connection': 'keep-alive'} async with aiohttp.ClientSession(loop=loop) as session: with aiohttp.Timeout(10, loop=session.loop): async with session.get(url, headers=headers) as response: content_type = response.headers['content-type'] if response.status == 200: result['canAccessURL'] = True if "image" in content_type: result['isImage'] = True if not result['canAccessURL'] or not result['isImage']: return result extension = mimetypes.guess_extension(content_type) if extension == '.jpe': extension = '.jpg' with open(folder + "/" + name + extension, 'wb') as fd: while True: chunk = await response.content.read(chunkSize) if not chunk: break fd.write(chunk) result['fileSaved'] = True return result
def upload_image(self, image_uri, sync, username, userid, channel_name): token = self.apikey logger.info('downloading %s', image_uri) filename = os.path.basename(image_uri) request = urllib.request.Request(image_uri) request.add_header("Authorization", "Bearer %s" % token) image_response = urllib.request.urlopen(request) content_type = image_response.info().get_content_type() filename_extension = mimetypes.guess_extension(content_type).lower() # returns with "." physical_extension = "." + filename.rsplit(".", 1).pop().lower() if physical_extension == filename_extension: pass elif filename_extension == ".jpe" and physical_extension in [ ".jpg", ".jpeg", ".jpe", ".jif", ".jfif" ]: # account for mimetypes idiosyncrancy to return jpe for valid jpeg pass else: logger.warning("unable to determine extension: {} {}".format(filename_extension, physical_extension)) filename += filename_extension logger.info('uploading as %s', filename) image_id = yield from self.bot._client.upload_image(image_response, filename=filename) logger.info('sending HO message, image_id: %s', image_id) yield from sync._bridgeinstance._send_to_internal_chat( sync.hangoutid, "shared media from slack", { "sync": sync, "source_user": username, "source_uid": userid, "source_title": channel_name }, image_id=image_id )
def download(url, filename): file_url = requests.get(url) file_extension = mimetypes.guess_extension(file_url.headers['content-type']) with open(filename+file_extension, 'wb') as file: file.write(file_url.content)
def __init__(self, s): self.str = s # Extract arguments (anything that follows ':') if ':' in s: self.ts_format, _, arguments_str = s.partition(':') self.arguments = tuple(arguments_str.split(',')) else: self.ts_format = s self.arguments = tuple() # Check if is mimetype, extension or qualifier self.is_qualifier = False self.mimetype = None self.extension = None if '/' in self.ts_format: self.mimetype = self.ts_format ext = mimetypes.guess_extension(self.mimetype) if ext: self.extension = ext.strip('.').upper() elif self.ts_format.isupper(): self.extension = self.ts_format fn = 'fn.%s' % self.extension self.mimetype, _ = mimetypes.guess_type(fn) # discard encoding else: # Is qualifier, can't determine mimetype OR extension self.is_qualifier = True
def guess_extension(mimetype): """guess a file extension from mimetype, without leading `.` Returns `unknown` if an extension could not be guessed """ x = (mimetypes.guess_extension(mimetype.split(';')[0]) or '.unknown')[1:] return x if x != 'htm' else 'html'
def load_conf(self, fd, *, path=None, mime_type=None, response=None): if isinstance(response, http.client.HTTPResponse): url = URL(response.geturl()) self.uris.append(url) mime_type = response.headers.get('Content-Type') if mime_type: mime_type = mime_type.split(';')[0].strip() logger.info('Config found: {} [{}]'.format(url, mime_type)) if path: loader = registry.get(path.suffix) path = path.absolute() self.uris.append(path) logger.info('Config found: {}'.format(path)) elif mime_type in registry: loader = registry.get(mime_type) elif mimetypes.guess_extension(mime_type) in registry: loader = registry.get(mimetypes.guess_extension(mime_type)) elif not mime_type: raise LookupError('Not found mime_type %s' % mime_type) else: raise NotImplemented if response is not None: return loader.load_bytes(response.read()) elif fd is None: return loader.load_path(path) with fd: return loader.load_fd(fd)
def get_filename(self): if self.mimetype == 'application/vnd.ms-excel': # HACK: we want .xls not .xlb for excel files ext = '.xls' else: ext = mimetypes.guess_extension(self.mimetype) or '.bin' name_slices = [ self.doctype.name if self.doctype else 'Unterlage', self.name, self.version, timezone.localtime(self.date).strftime('%Y.%m.%d') ] if self.parent_object and hasattr(self.parent_object, 'get_filename_slice'): name_slices.insert(0, self.parent_object.get_filename_slice()) name = slugify('-'.join(name_slices)) return ''.join([name, ext])
def ext_from_mimetype(mimetype): return mimetypes.guess_extension(mimetype)
def __get_download_filename__(self): #Obtenemos nombre de archivo y extension if "filename" in self.response_headers.get("content-disposition","") and "attachment" in self.response_headers.get("content-disposition",""): cd_filename, cd_ext = os.path.splitext(urllib.unquote_plus(re.compile("attachment; filename ?= ?[\"|']?([^\"']+)[\"|']?").match(self.response_headers.get("content-disposition")).group(1))) if "filename" in self.response_headers.get("content-disposition","") and "inline" in self.response_headers.get("content-disposition",""): cd_filename, cd_ext = os.path.splitext(urllib.unquote_plus(re.compile("inline; filename ?= ?[\"|']?([^\"']+)[\"|']?").match(self.response_headers.get("content-disposition")).group(1))) else: cd_filename, cd_ext = "","" url_filename, url_ext = os.path.splitext(urllib.unquote_plus(filetools.basename(urlparse.urlparse(self.url)[2]))) if self.response_headers.get("content-type","application/octet-stream") <> "application/octet-stream": mime_ext = mimetypes.guess_extension(self.response_headers.get("content-type")) else: mime_ext = "" #Seleccionamos el nombre mas adecuado if cd_filename: self.remote_filename = cd_filename if not self._filename: self._filename = cd_filename elif url_filename: self.remote_filename = url_filename if not self._filename: self._filename = url_filename #Seleccionamos la extension mas adecuada if cd_ext: if not cd_ext in self._filename: self._filename += cd_ext if self.remote_filename: self.remote_filename += cd_ext elif mime_ext: if not mime_ext in self._filename: self._filename += mime_ext if self.remote_filename: self.remote_filename += mime_ext elif url_ext: if not url_ext in self._filename: self._filename += url_ext if self.remote_filename: self.remote_filename += url_ext
def handle(url, data): try: config = Config() config.browser_user_agent = data['user_agent'] article = Article(url, config) article.download() article.parse() if article.top_image: print('\t\tNewspaper located image: %s' % article.top_image) r = requests.get(article.top_image, headers = {'User-Agent': data['user_agent']}, stream=True) if r.status_code == 200: content_type = r.headers['content-type'] ext = mimetypes.guess_extension(content_type) if not ext or ext=='': print('\t\tNewsPaper Error locating file MIME Type: %s' % url) return False if '.jp' in ext: ext = '.jpg' path = data['single_file'] % ext if not os.path.isfile(path): if not os.path.isdir(data['parent_dir']): print("\t\t+Building dir: %s" % data['parent_dir']) os.makedirs(data['parent_dir'])# Parent dir for the full filepath is supplied already. with open(path, 'wb') as f: r.raw.decode_content = True shutil.copyfileobj(r.raw, f) return path else: print('\t\tError Reading Image: %s responded with code %i!' % (url, r.status_code) ) return False except Exception as e: print('\t\t"Newspaper" Generic handler failed. '+(str(e).strip()) ) return False
def play_info(self): metadata, images = self.robot_obj.play_info() logger.info("Metadata:") for k, v in metadata.items(): logger.info(" %s=%s", k, v) tempfiles = [] if images: for mime, buf in images: ext = mimetypes.guess_extension(mime) if ext: ntf = NamedTemporaryFile(suffix=".jpg", delete=False) ntf.write(buf) tempfiles.append(ntf) os.system("open " + " ".join([n.name for n in tempfiles]))
def scan_oneshot(self, filename=None): images = self.task.oneshot() tempfiles = [] for mime, buf in images: ext = mimetypes.guess_extension(mime) if ext: ntf = NamedTemporaryFile(suffix=".jpg", delete=False) ntf.write(buf) tempfiles.append(ntf) os.system("open " + " ".join([n.name for n in tempfiles]))
def scanimages(self, filename=None): images = self.task.scanimages() tempfiles = [] for mime, buf in images: ext = mimetypes.guess_extension(mime) if ext: ntf = NamedTemporaryFile(suffix=".jpg", delete=False) ntf.write(buf) tempfiles.append(ntf) os.system("open " + " ".join([n.name for n in tempfiles]))
def get_filename(self, content_type): if not self._basename: return None typeValue = map(str.strip, content_type.split(";")) if len(typeValue) == 0: return None extension = mimetypes.guess_extension(typeValue[0]) if not extension: return None return "%s%s" % (self._basename, extension)
def _find_attachments_in_email(mesg, expand_attachment, atts): # MHTML detection if mesg.get_content_maintype() == "multipart" and mesg.get_content_subtype() == "related": for part in mesg.walk(): if part.is_multipart(): continue payload = part.get_payload(decode=True) if isinstance(payload, str) and payload.startswith('ActiveMime'): return for part in mesg.walk(): content_type = part.get_content_type() if part.is_multipart(): continue payload = part.get_payload(decode=True) if content_type.startswith('text/') and expand_attachment: normalized = payload.lstrip(" \t\r\n") if any(normalized.startswith(m) for m in EMAIL_MAGIC): new_mesg = email.message_from_string(normalized) _find_attachments_in_email(new_mesg, expand_attachment, atts) continue if content_type in SAFE_MEDIA_TYPE: continue filename = part.get_filename() if filename is None: ext = mimetypes.guess_extension(content_type) or '' filename = '<unknown>' + ext else: # Sanitize the header value filename = _decode_header(filename) filename = utils.get_filename_from_path(filename) tempfile_path = utils.store_temp_file(payload, filename) atts.append((tempfile_path, filename, content_type))
def get_extension(attachment): """ """ try: filename = attachment.get_filename() if filename: extension = os.path.splitext(filename)[1] else: extension = mimetypes.guess_extension(attachment.get_content_type()) return extension or '.bin' except AttributeError: return None
def _determineExtension(determined_type): extension = config.default_ext if determined_type in type_override: return type_override[determined_type] try: extension = guess_extension(determined_type) except: pass return extension
def each(self, target): fp = open(target) msg = email.message_from_file(fp) fp.close() path_temp = tempdir() counter = 1 for part in msg.walk(): # multipart/* are just containers if part.get_content_maintype() == 'multipart': continue # Applications should really sanitize the given filename so that an # email message can't be used to overwrite important files filename = part.get_filename() if not filename: ext = mimetypes.guess_extension(part.get_content_type()) if not ext: # Use a generic bag-of-bits extension ext = '.bin' filename = 'part-%03d%s' % (counter, ext) counter += 1 filepath = os.path.join(path_temp, filename) fp = open(filepath, 'wb') fp.write(part.get_payload(decode=True)) fp.close() self.add_extracted_file(filepath)
def media_post(self, media_file, mime_type=None, description=None): """ Post an image. `media_file` can either be image data or a file name. If image data is passed directly, the mime type has to be specified manually, otherwise, it is determined from the file name. Throws a `MastodonIllegalArgumentError` if the mime type of the passed data or file can not be determined properly. Returns a `media dict`_. This contains the id that can be used in status_post to attach the media file to a toot. """ if mime_type is None and os.path.isfile(media_file): mime_type = mimetypes.guess_type(media_file)[0] media_file = open(media_file, 'rb') if mime_type is None: raise MastodonIllegalArgumentError('Could not determine mime type' ' or data passed directly ' 'without mime type.') random_suffix = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10)) file_name = "mastodonpyupload_" + str(time.time()) + "_" + str(random_suffix) + mimetypes.guess_extension( mime_type) media_file_description = (file_name, media_file, mime_type) return self.__api_request('POST', '/api/v1/media', files={'file': media_file_description}, params={'description': description}) ### # Writing data: Domain blocks ###
def get_buffer_extension(buffer): ext = get_mime_type_buffer(buffer) ext = mimetypes.guess_extension(ext) return ext if ext else '.png'
def _init_document(self, data, mime_type): """Upload the data using the documents API.""" filename = 'upload-{0}{1}'.format(uuid4(), guess_extension(mime_type)) document = self.client.documents.create(filename, len(data)) cursor = 0 while cursor < len(data): chunk = data[cursor:cursor + self.chunk_size] document.upload(cursor, cursor + len(chunk) - 1, chunk) cursor += len(chunk) self._init_url(document.uri)
def email_login(): """ login check :return: """ if flask.request.method == 'POST': content = flask.request.form.get("html") if content is not None: print(content) soup = BeautifulSoup(content, "html.parser") save_key = soup.find(id="save_key").text.strip() # session_id will expire after 24 hours session_id = save_signer.unsign(save_key, max_age=86400) session_id = bytes.decode(session_id) user = User.query.filter_by(session_id=session_id).first_or_404() # try to save the attachment file limit_counter = 0 try: for attachment in flask.request.files: if limit_counter >= 1: break file_name = str(uuid.uuid1()) + guess_extension(flask.request.files[attachment].mimetype) flask.request.files[attachment].save(file_name) flask.session["file_name"] = file_name limit_counter += 1 except AttributeError: flask.session["file_name"] = "" flask.session["entry"] = soup.select('div[style]')[0].text flask.session["user_real_id"] = user.user_id # after login flask_login will push user_id into session and this user_id is our session_id # as the get_id method in User model returns user's session id flask_login.login_user(user) return flask.redirect(flask.url_for('protected_save')) return flask.redirect(flask.url_for('protected_save'))
def prepare_file(self): """ This sets `self.file` to a fitting :class:`InputFile` or a fitting sublcass (:class:`InputFileFromDisk`, :class:`InputFileFromURL`) :return: Nothing """ if self.file_content: file_name = "file" file_suffix = ".blob" if self.file_path: file_name = os.path.basename(os.path.normpath(self.file_path)) # http://stackoverflow.com/a/3925147 file_name, file_suffix = os.path.splitext(file_name) # http://stackoverflow.com/a/541394/3423324 elif self.file_url: from urllib.parse import urlparse # http://stackoverflow.com/a/18727481/3423324 url = urlparse(self.file_url) file_name = os.path.basename(url.path) file_name, file_suffix = os.path.splitext(file_name) # end if if self.file_mime: import mimetypes file_suffix = mimetypes.guess_extension(self.file_mime) file_suffix = '.jpg' if file_suffix == '.jpe' else file_suffix # .jpe -> .jpg # end if if not file_suffix or not file_suffix.strip().lstrip("."): logger.debug("file_suffix was empty. Using '.blob'") file_suffix = ".blob" # end if file_name = "{filename}{suffix}".format(filename=file_name, suffix=file_suffix) self.file = InputFile(self.file_content, file_name=file_name, file_mime=self.file_mime) elif self.file_path: self.file = InputFileFromDisk(self.file_path, file_mime=self.file_mime) elif self.file_url: self.file = InputFileFromURL(self.file_url, file_mime=self.file_mime) # end if # end def prepare_file
def send(self, sender: PytgbotApiBot, receiver, reply_id)->PytgbotApiMessage: if self.receiver: receiver = self.receiver # end if if self.reply_id is not DEFAULT_MESSAGE_ID: reply_id = self.reply_id # end if self.prepare_file() assert isinstance(self.file, (InputFile, InputFileFromDisk, InputFileFromURL)) if not any([self.file.file_name.endswith(x) for x in [".jpg", ".jpeg", ".gif", ".png", ".tif", ".bmp"]]): if self.file.file_mime in ["image/jpg", "image/jpeg", "image/jpe"]: # manually, to avoid .jpe ending. self.file.file_name+=".jpg" else: import mimetypes ext = mimetypes.guess_extension(self.file.file_mime) # automatically if ext not in [".jpg", ".jpeg", ".gif", ".png", ".tif", ".bmp"]: ext = ".unknown-file-type.png" # At least we can try setting it as .png self.file.file_name += ext try: return sender.send_photo( receiver, self.file, caption=self.caption, reply_to_message_id=reply_id, reply_markup=self.reply_markup, disable_notification = self.disable_notification ) except TgApiServerException as e: should_backoff(e) # checks if it should raise an DoRetryException raise # else it just raises as usual # end try # end def send # end class PhotoMessage
def _download_http_url(link, session, temp_dir, hashes): """Download link url into temp_dir using provided session""" target_url = link.url.split('#', 1)[0] try: resp = session.get( target_url, # We use Accept-Encoding: identity here because requests # defaults to accepting compressed responses. This breaks in # a variety of ways depending on how the server is configured. # - Some servers will notice that the file isn't a compressible # file and will leave the file alone and with an empty # Content-Encoding # - Some servers will notice that the file is already # compressed and will leave the file alone and will add a # Content-Encoding: gzip header # - Some servers won't notice anything at all and will take # a file that's already been compressed and compress it again # and set the Content-Encoding: gzip header # By setting this to request only the identity encoding We're # hoping to eliminate the third case. Hopefully there does not # exist a server which when given a file will notice it is # already compressed and that you're not asking for a # compressed file and will then decompress it before sending # because if that's the case I don't think it'll ever be # possible to make this work. headers={"Accept-Encoding": "identity"}, stream=True, ) resp.raise_for_status() except requests.HTTPError as exc: logger.critical( "HTTP error %s while getting %s", exc.response.status_code, link, ) raise content_type = resp.headers.get('content-type', '') filename = link.filename # fallback # Have a look at the Content-Disposition header for a better guess content_disposition = resp.headers.get('content-disposition') if content_disposition: type, params = cgi.parse_header(content_disposition) # We use ``or`` here because we don't want to use an "empty" value # from the filename param. filename = params.get('filename') or filename ext = splitext(filename)[1] if not ext: ext = mimetypes.guess_extension(content_type) if ext: filename += ext if not ext and link.url != resp.url: ext = os.path.splitext(resp.url)[1] if ext: filename += ext file_path = os.path.join(temp_dir, filename) with open(file_path, 'wb') as content_file: _download_url(resp, link, content_file, hashes) return file_path, content_type