我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用codecs.iterdecode()。
def parse_csv_file(thefile): """Parse csv file, yielding rows as dictionary. The csv file should have an header. Args: thefile (file): File like object Yields: dict: Dictionary with column header name as key and cell as value """ reader = csv.reader(codecs.iterdecode(thefile, 'ISO-8859-1')) # read header colnames = next(reader) # data rows for row in reader: pdb = {} for k, v in zip(colnames, row): if v is '': v = None pdb[k] = v yield pdb
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertEqual(sorted(api), sorted(codecs.__all__)) for api in codecs.__all__: getattr(codecs, api)
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertCountEqual(api, codecs.__all__) for api in codecs.__all__: getattr(codecs, api)
def test_incremental_decode(self): self.assertEqual( "".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")), "python.org" ) self.assertEqual( "".join(codecs.iterdecode((bytes([c]) for c in b"python.org."), "idna")), "python.org." ) self.assertEqual( "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")), "pyth\xf6n.org." ) self.assertEqual( "".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."), "idna")), "pyth\xf6n.org." ) decoder = codecs.getincrementaldecoder("idna")() self.assertEqual(decoder.decode(b"xn--xam", ), "") self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.") self.assertEqual(decoder.decode(b"rg"), "") self.assertEqual(decoder.decode(b"", True), "org") decoder.reset() self.assertEqual(decoder.decode(b"xn--xam", ), "") self.assertEqual(decoder.decode(b"ple-9ta.o", ), "\xe4xample.") self.assertEqual(decoder.decode(b"rg."), "org.") self.assertEqual(decoder.decode(b"", True), "")
def test_incremental_decode(self): self.assertEqual( "".join(codecs.iterdecode("python.org", "idna")), u"python.org" ) self.assertEqual( "".join(codecs.iterdecode("python.org.", "idna")), u"python.org." ) self.assertEqual( "".join(codecs.iterdecode("xn--pythn-mua.org.", "idna")), u"pyth\xf6n.org." ) self.assertEqual( "".join(codecs.iterdecode("xn--pythn-mua.org.", "idna")), u"pyth\xf6n.org." ) decoder = codecs.getincrementaldecoder("idna")() self.assertEqual(decoder.decode("xn--xam", ), u"") self.assertEqual(decoder.decode("ple-9ta.o", ), u"\xe4xample.") self.assertEqual(decoder.decode(u"rg"), u"") self.assertEqual(decoder.decode(u"", True), u"org") decoder.reset() self.assertEqual(decoder.decode("xn--xam", ), u"") self.assertEqual(decoder.decode("ple-9ta.o", ), u"\xe4xample.") self.assertEqual(decoder.decode("rg."), u"org.") self.assertEqual(decoder.decode("", True), u"")
def test_basics_capi(self): from _testcapi import codec_incrementalencoder, codec_incrementaldecoder s = u"abc123" # all codecs should be able to encode these for encoding in all_unicode_encodings: if encoding not in broken_incremental_coders: # check incremental decoder/encoder and iterencode()/iterdecode() try: cencoder = codec_incrementalencoder(encoding) except LookupError: # no IncrementalEncoder pass else: # check C API encodedresult = "" for c in s: encodedresult += cencoder.encode(c) encodedresult += cencoder.encode(u"", True) cdecoder = codec_incrementaldecoder(encoding) decodedresult = u"" for c in encodedresult: decodedresult += cdecoder.decode(c) decodedresult += cdecoder.decode("", True) self.assertEqual(decodedresult, s, "encoding=%r" % encoding) if encoding not in only_strict_mode: # check incremental decoder/encoder with errors argument try: cencoder = codec_incrementalencoder(encoding, "ignore") except LookupError: # no IncrementalEncoder pass else: encodedresult = "".join(cencoder.encode(c) for c in s) cdecoder = codec_incrementaldecoder(encoding, "ignore") decodedresult = u"".join(cdecoder.decode(c) for c in encodedresult) self.assertEqual(decodedresult, s, "encoding=%r" % encoding)
def _fetch(self): feed = {} try: for base in self.bases: feed[base] = {} for quote in self.quotes: if quote == base: continue ticker = "%s%s" % (quote, base) url = ( 'http://www.google.com/finance/getprices' '?i={period}&p={days}d&f=d,c&df=cpct&q={ticker}' ).format(ticker=ticker, period=self.period, days=self.days) response = requests.get(url=url, headers=_request_headers, timeout=self.timeout) reader = csv.reader(codecs.iterdecode(response.content.splitlines(), "utf-8")) prices = [] for row in reader: if re.match('^[a\d]', row[0]): prices.append(float(row[1])) if hasattr(self, "quoteNames") and quote in self.quoteNames: quote = self.quoteNames[quote] feed[base][quote] = {"price": sum(prices) / len(prices), "volume": 1.0} except Exception as e: raise Exception("\nError fetching results from {1}! ({0})".format(str(e), type(self).__name__)) return feed
def wmo_importer(url='http://tgftp.nws.noaa.gov/data/nsd_bbsss.txt'): if PY2: delimiter = b';' data = urlopen(url) else: delimiter = ';' import codecs data = codecs.iterdecode(urlopen(url), 'utf-8') reader = csv.reader(data, delimiter=delimiter, quoting=csv.QUOTE_NONE) def geo_normalize(value): # recognize NSEW or undefined (which is interpreted as North) orientation = value[-1] sign = -1 if orientation in 'SW' else 1 coords = value if orientation not in 'NEWS' else value[:-1] coords += '-0-0' # ensure missing seconds or minutes are 0 degrees, minutes, seconds = map(float, coords.split('-', 3)[:3]) return sign * (degrees + (minutes / 60) + (seconds / 3600)) not_airport = '----' for row in reader: name = row[0] + row[1] if row[2] == not_airport else row[2] yield name, geo_normalize(row[8]), geo_normalize(row[7]) # dependence between hashtag's precision and distance accurate calculating # in fact it's sizes of grids in km
def _add_csv_file_to_db(self, decoder): f = codecs.iterdecode( self.upload_file_form.cleaned_data['marketing_file'], decoder ) reader = csv.reader(f) if not self.uploaded_file: new_file = UploadedFile( filename=self.upload_file_form.cleaned_data['marketing_file'].name, uploaded_by=self.request.user, num_columns=0, ) new_file.save() self.uploaded_file = new_file is_first_row = True self.num_cols = None row_number = 0 for row in reader: if not self.num_cols: self.num_cols = len(row) if self._csv_row_is_not_blank(row): self._add_csv_row_to_db(row, is_first_row, row_number) is_first_row = False row_number += 1 if self.num_cols: self.uploaded_file.num_columns = self.num_cols self.uploaded_file.save()
def load_from_ktipp(self): """ load the blacklist from ktipp. """ url = 'http://trick77.com/tools/latest_cc_blacklist.txt' response = urlopen(url) self._import_csv(codecs.iterdecode(response, 'utf-8'), source_name='ktipp')
def get_csv_entries(): if TEST: action = open(LOCAL_CSV) else: action = closing(urlopen(REMOTE_CSV, context=CONTEXT)) with action as f: if not TEST and sys.version_info.major > 2: f = codecs.iterdecode(f, 'utf-8') # needed for urlopen and py3 for entry in csv.DictReader(f, fieldnames=FIELDS): yield entry
def check_partial(self, input, partialresults): # get a StreamReader for the encoding and feed the bytestring version # of input to the reader byte by byte. Read everything available from # the StreamReader and check that the results equal the appropriate # entries from partialresults. q = Queue(b"") r = codecs.getreader(self.encoding)(q) result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): q.write(bytes([c])) result += r.read() self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(r.read(), "") self.assertEqual(r.bytebuffer, b"") # do the check again, this time using a incremental decoder d = codecs.getincrementaldecoder(self.encoding)() result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): result += d.decode(bytes([c])) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(d.decode(b"", True), "") self.assertEqual(d.buffer, b"") # Check whether the reset method works properly d.reset() result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): result += d.decode(bytes([c])) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(d.decode(b"", True), "") self.assertEqual(d.buffer, b"") # check iterdecode() encoded = input.encode(self.encoding) self.assertEqual( input, "".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding)) )
def check_partial(self, input, partialresults): # get a StreamReader for the encoding and feed the bytestring version # of input to the reader byte by byte. Read everything available from # the StreamReader and check that the results equal the appropriate # entries from partialresults. q = Queue() r = codecs.getreader(self.encoding)(q) result = u"" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): q.write(c) result += r.read() self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(r.read(), u"") self.assertEqual(r.bytebuffer, "") self.assertEqual(r.charbuffer, u"") # do the check again, this time using an incremental decoder d = codecs.getincrementaldecoder(self.encoding)() result = u"" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): result += d.decode(c) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(d.decode("", True), u"") self.assertEqual(d.buffer, "") # Check whether the reset method works properly d.reset() result = u"" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): result += d.decode(c) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(d.decode("", True), u"") self.assertEqual(d.buffer, "") # check iterdecode() encoded = input.encode(self.encoding) self.assertEqual( input, u"".join(codecs.iterdecode(encoded, self.encoding)) )
def import_products(self, request, pk=None): """ Create products on a project using CSV and ZIP files. """ # Two files to import: CSV and ZIP of products # Parse CSV # Unzip designs # Go through CSV file, creating products # For each product parse the design # Return list of created projects + failures products_file = request.data.get('products_file') designs_file = request.data.get('designs_file') rejected = [] completed = [] if products_file: # Read the CSV file of products into a list decoded_file = codecs.iterdecode(products_file, 'utf-8-sig') try: products = [line for line in csv.DictReader(decoded_file, skipinitialspace=True)] except UnicodeDecodeError: return Response({'message': 'Please supply file in UTF-8 CSV format.'}, status=400) # Open the zip file for reading, assign the files within to a dict with filenames designs = {} if designs_file: with zipfile.ZipFile(designs_file, 'r') as dzip: for file_path in dzip.namelist(): filename = file_path.split('/')[-1] with dzip.open(file_path, 'rU') as d: designs[filename] = d.read() # Iteratre through products creating them and linking design for p in products: # Replace the name of the design file with the actual contents if p.get('design', None): p['design'] = designs[p['design']].decode('utf-8-sig') p['project'] = self.get_object().id serializer = ProductSerializer(data=p) if serializer.is_valid(): instance = serializer.save(created_by=request.user) items = [] parser = DesignFileParser(instance.design) if instance.design_format == 'csv': items, sbol = parser.parse_csv() elif instance.design_format == 'gb': items, sbol = parser.parse_gb() for i in items: instance.linked_inventory.add(i) completed.append(p) else: p['reason'] = serializer.errors rejected.append(p) return Response({'message': 'Import completed', 'completed': completed, 'rejected': rejected}) else: return Response({'message': 'Please supply a product definition and file of designs'}, status=400)
def check_partial(self, input, partialresults): # get a StreamReader for the encoding and feed the bytestring version # of input to the reader byte by byte. Read everything available from # the StreamReader and check that the results equal the appropriate # entries from partialresults. q = Queue() r = codecs.getreader(self.encoding)(q) result = u"" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): q.write(c) result += r.read() self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(r.read(), u"") self.assertEqual(r.bytebuffer, "") self.assertEqual(r.charbuffer, u"") # do the check again, this time using a incremental decoder d = codecs.getincrementaldecoder(self.encoding)() result = u"" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): result += d.decode(c) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(d.decode("", True), u"") self.assertEqual(d.buffer, "") # Check whether the reset method works properly d.reset() result = u"" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): result += d.decode(c) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(d.decode("", True), u"") self.assertEqual(d.buffer, "") # check iterdecode() encoded = input.encode(self.encoding) self.assertEqual( input, u"".join(codecs.iterdecode(encoded, self.encoding)) )
def check_partial(self, input, partialresults): # get a StreamReader for the encoding and feed the bytestring version # of input to the reader byte by byte. Read everything available from # the StreamReader and check that the results equal the appropriate # entries from partialresults. q = Queue(b"") r = codecs.getreader(self.encoding)(q) result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): q.write(bytes([c])) result += r.read() self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(r.read(), "") self.assertEqual(r.bytebuffer, b"") # do the check again, this time using an incremental decoder d = codecs.getincrementaldecoder(self.encoding)() result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): result += d.decode(bytes([c])) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(d.decode(b"", True), "") self.assertEqual(d.buffer, b"") # Check whether the reset method works properly d.reset() result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): result += d.decode(bytes([c])) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(d.decode(b"", True), "") self.assertEqual(d.buffer, b"") # check iterdecode() encoded = input.encode(self.encoding) self.assertEqual( input, "".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding)) )
def write(table_name, table_schema_path, connection_string, input_file, db_schema, geometry_support, from_srid, skip_headers, indexes_fields): table_schema = get_table_schema(table_schema_path) ## TODO: csv settings? use Frictionless Data csv standard? ## TODO: support line delimted json? with fopen(input_file) as file: if re.match(s3_regex, input_file) != None: rows = csv.reader(codecs.iterdecode(file, 'utf-8')) else: rows = csv.reader(file) if skip_headers: next(rows) if re.match(carto.carto_connection_string_regex, connection_string) != None: load_postgis = geometry_support == 'postgis' if indexes_fields != None: indexes_fields = indexes_fields.split(',') carto.load(db_schema, table_name, load_postgis, table_schema, connection_string, rows, indexes_fields) else: connection_string = get_connection_string(connection_string) engine, storage = create_storage_adaptor(connection_string, db_schema, geometry_support, from_srid=from_srid) ## TODO: truncate? carto does. Makes this idempotent if table_schema_path != None: table_schema = get_table_schema(table_schema_path) storage.describe(table_name, descriptor=table_schema) if geometry_support == None and engine.dialect.driver == 'psycopg2': copy_from(engine, table_name, table_schema, rows) else: storage.write(table_name, rows)