我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用binascii.b2a_qp()。
def set_bytes_content(msg, data, maintype, subtype, cte='base64', disposition=None, filename=None, cid=None, params=None, headers=None): _prepare_set(msg, maintype, subtype, headers) if cte == 'base64': data = _encode_base64(data, max_line_length=msg.policy.max_line_length) elif cte == 'quoted-printable': # XXX: quoprimime.body_encode won't encode newline characters in data, # so we can't use it. This means max_line_length is ignored. Another # bug to fix later. (Note: encoders.quopri is broken on line ends.) data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True) data = data.decode('ascii') elif cte == '7bit': # Make sure it really is only ASCII. The early warning here seems # worth the overhead...if you care write your own content manager :). data.encode('ascii') elif cte in ('8bit', 'binary'): data = data.decode('ascii', 'surrogateescape') msg.set_payload(data) msg['Content-Transfer-Encoding'] = cte _finalize_set(msg, disposition, filename, cid, params)
def _apply_content_transfer_encoding(self, stream): encoding = self.headers[CONTENT_TRANSFER_ENCODING].lower() if encoding == 'base64': buffer = bytearray() while True: if buffer: div, mod = divmod(len(buffer), 3) chunk, buffer = buffer[:div * 3], buffer[div * 3:] if chunk: yield base64.b64encode(chunk) chunk = next(stream, None) if not chunk: if buffer: yield base64.b64encode(buffer[:]) return buffer.extend(chunk) elif encoding == 'quoted-printable': for chunk in stream: yield binascii.b2a_qp(chunk) elif encoding == 'binary': yield from stream else: raise RuntimeError('unknown content transfer encoding: {}' ''.format(encoding))
def encodestring(s, quotetabs = 0, header = 0): if b2a_qp is not None: return b2a_qp(s, quotetabs = quotetabs, header = header) from cStringIO import StringIO infp = StringIO(s) outfp = StringIO() encode(infp, outfp, quotetabs, header) return outfp.getvalue()
def test_qp(self): # A test for SF bug 534347 (segfaults without the proper fix) try: binascii.a2b_qp(b"", **{1:1}) except TypeError: pass else: self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError") self.assertEqual(binascii.a2b_qp(b"= "), b"= ") self.assertEqual(binascii.a2b_qp(b"=="), b"=") self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX") self.assertRaises(TypeError, binascii.b2a_qp, foo="bar") self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00") self.assertEqual( binascii.b2a_qp(b"\xff\r\n\xff\n\xff"), b"=FF\r\n=FF\r\n=FF") self.assertEqual( binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"), b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF") self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n') self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n') self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n') self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True), b'foo=09bar=09\n') self.assertEqual(binascii.b2a_qp(b'.'), b'=2E') self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n') self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
def encodestring(s, quotetabs=False, header=False): if b2a_qp is not None: return b2a_qp(s, quotetabs=quotetabs, header=header) from io import BytesIO infp = BytesIO(s) outfp = BytesIO() encode(infp, outfp, quotetabs, header) return outfp.getvalue()
def test_qp(self): # A test for SF bug 534347 (segfaults without the proper fix) try: binascii.a2b_qp("", **{1:1}) except TypeError: pass else: self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError") self.assertEqual(binascii.a2b_qp("= "), "= ") self.assertEqual(binascii.a2b_qp("=="), "=") self.assertEqual(binascii.a2b_qp("=AX"), "=AX") self.assertRaises(TypeError, binascii.b2a_qp, foo="bar") self.assertEqual(binascii.a2b_qp("=00\r\n=00"), "\x00\r\n\x00") self.assertEqual( binascii.b2a_qp("\xff\r\n\xff\n\xff"), "=FF\r\n=FF\r\n=FF" ) self.assertEqual( binascii.b2a_qp("0"*75+"\xff\r\n\xff\r\n\xff"), "0"*75+"=\r\n=FF\r\n=FF\r\n=FF" ) self.assertEqual(binascii.b2a_qp('\0\n'), '=00\n') self.assertEqual(binascii.b2a_qp('\0\n', quotetabs=True), '=00\n') self.assertEqual(binascii.b2a_qp('foo\tbar\t\n'), 'foo\tbar=09\n') self.assertEqual(binascii.b2a_qp('foo\tbar\t\n', quotetabs=True), 'foo=09bar=09\n') self.assertEqual(binascii.b2a_qp('.'), '=2E') self.assertEqual(binascii.b2a_qp('.\n'), '=2E\n') self.assertEqual(binascii.b2a_qp('a.\n'), 'a.\n')
def test_qp(self): binascii.a2b_qp(data=b"", header=False) # Keyword arguments allowed # A test for SF bug 534347 (segfaults without the proper fix) try: binascii.a2b_qp(b"", **{1:1}) except TypeError: pass else: self.fail("binascii.a2b_qp(**{1:1}) didn't raise TypeError") self.assertEqual(binascii.a2b_qp(b"= "), b"= ") self.assertEqual(binascii.a2b_qp(b"=="), b"=") self.assertEqual(binascii.a2b_qp(b"=AX"), b"=AX") self.assertRaises(TypeError, binascii.b2a_qp, foo="bar") self.assertEqual(binascii.a2b_qp(b"=00\r\n=00"), b"\x00\r\n\x00") self.assertEqual( binascii.b2a_qp(b"\xff\r\n\xff\n\xff"), b"=FF\r\n=FF\r\n=FF") self.assertEqual( binascii.b2a_qp(b"0"*75+b"\xff\r\n\xff\r\n\xff"), b"0"*75+b"=\r\n=FF\r\n=FF\r\n=FF") self.assertEqual(binascii.b2a_qp(b'\0\n'), b'=00\n') self.assertEqual(binascii.b2a_qp(b'\0\n', quotetabs=True), b'=00\n') self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n'), b'foo\tbar=09\n') self.assertEqual(binascii.b2a_qp(b'foo\tbar\t\n', quotetabs=True), b'foo=09bar=09\n') self.assertEqual(binascii.b2a_qp(b'.'), b'=2E') self.assertEqual(binascii.b2a_qp(b'.\n'), b'=2E\n') self.assertEqual(binascii.b2a_qp(b'a.\n'), b'a.\n')
def printchar(char): """Useful debugging function for milter developers.""" print ('char: %s [qp=%s][hex=%s][base64=%s]' % (char, binascii.b2a_qp(char), binascii.b2a_hex(char), binascii.b2a_base64(char))) #end def printchar(char).
def __send_response(self, response): """Send data down the milter socket. Args: response: The data to send. """ #logging.debug(' >>> %s', binascii.b2a_qp(response[0])) self.push(struct.pack('!I', len(response))) self.push(response)
def read_milter_data(self): """ Callback from asynchat once we have read the milter packet length worth of bytes on the socket and it is accumulated in our input buffer (which is the milter command + data to send to the dispatcher). """ import binascii inbuff = "".join(self.__input) self.__input = [] if not inbuff.startswith("B"): if self.chmilt == "Receiver": logging.debug(' read: %s %s', self.chmilt, binascii.b2a_qp(inbuff)) try: response = self.__milter_dispatcher.Dispatch(inbuff) #logging.debug(' >>> resp: [%s]', response) if type(response) == list: for r in response: self.__send_response(r) elif response: self.__send_response(response) # rinse and repeat :) self.found_terminator = self.read_packetlen self.set_terminator(MILTER_LEN_BYTES) except kimpf.PpyMilterCloseConnection, e: self.close() #end class ConnectionHandler(asynchat.async_chat). #end class AsyncPpyMilterServer(asyncore.dispatcher). #SN July 13, 2015 #Bring configuration processing inside here, instead of in commons.
def base64code(s,d): global b64str global f if(d==len(b64str)): f.write(binascii.b2a_qp(base64.b64decode(s))+'\n') else: base64code(s+b64str[d],d+1) if b64str[d].isalpha(): base64code(s+b64str[d].lower(),d+1)
def encode(input, output, quotetabs, header=False): """Read 'input', apply quoted-printable encoding, and write to 'output'. 'input' and 'output' are binary file objects. The 'quotetabs' flag indicates whether embedded tabs and spaces should be quoted. Note that line-ending tabs and spaces are always encoded, as per RFC 1521. The 'header' flag indicates whether we are encoding spaces as _ as per RFC 1522.""" if b2a_qp is not None: data = input.read() odata = b2a_qp(data, quotetabs=quotetabs, header=header) output.write(odata) return def write(s, output=output, lineEnd=b'\n'): # RFC 1521 requires that the line ending in a space or tab must have # that trailing character encoded. if s and s[-1:] in b' \t': output.write(s[:-1] + quote(s[-1:]) + lineEnd) elif s == b'.': output.write(quote(s) + lineEnd) else: output.write(s + lineEnd) prevline = None while 1: line = input.readline() if not line: break outline = [] # Strip off any readline induced trailing newline stripped = b'' if line[-1:] == b'\n': line = line[:-1] stripped = b'\n' # Calculate the un-length-limited encoded line for c in line: c = bytes((c,)) if needsquoting(c, quotetabs, header): c = quote(c) if header and c == b' ': outline.append(b'_') else: outline.append(c) # First, write out the previous line if prevline is not None: write(prevline) # Now see if we need any soft line breaks because of RFC-imposed # length limitations. Then do the thisline->prevline dance. thisline = EMPTYSTRING.join(outline) while len(thisline) > MAXLINESIZE: # Don't forget to include the soft line break `=' sign in the # length calculation! write(thisline[:MAXLINESIZE-1], lineEnd=b'=\n') thisline = thisline[MAXLINESIZE-1:] # Write out the current line prevline = thisline # Write out the last line, without a trailing newline if prevline is not None: write(prevline, lineEnd=stripped)