Python csv 模块,QUOTE_MINIMAL 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用csv.QUOTE_MINIMAL

项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def load(corpus_csv_file: Path,
             sampled_training_example_count: Optional[int] = None) -> 'Corpus':
        import csv
        with corpus_csv_file.open(encoding='utf8') as opened_csv:
            reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            def to_absolute(audio_file_path: Path) -> Path:
                return audio_file_path if audio_file_path.is_absolute() else Path(
                    corpus_csv_file.parent) / audio_file_path

            examples = [
                (
                    LabeledExampleFromFile(
                        audio_file=to_absolute(Path(audio_file_path)), id=id, label=label,
                        positional_label=None if positional_label == "" else PositionalLabel.deserialize(
                            positional_label)), Phase[phase])
                for id, audio_file_path, label, phase, positional_label in reader]

            return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training],
                          test_examples=[e for e, phase in examples if phase == Phase.test],
                          sampled_training_example_count=sampled_training_example_count)
项目:sds011    作者:luetzel    | 项目源码 | 文件源码
def sensor_live(self):
            x = []
            y1 = []
            y2 = []
            for i in range(0,330,30): # change time interval here, if required
                self.sensor_wake()
                time.sleep(10)
                pm = self.sensor_read()
                if pm is not None:
                    x.append(i)
                    y1.append(pm[0])
                    y2.append(pm[1])
                    with open('/home/pi/data.csv', 'ab') as csvfile:
                        file = csv.writer(csvfile, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL)
                        file.writerow([datetime.datetime.now().replace(microsecond=0).isoformat().replace('T', ' '), pm[0], pm[1]])
                        csvfile.close()
                    line1, = self.ax.plot(x,y1,'r-x')
                    line2, = self.ax.plot(x,y2,'b-x')
                    self.canvas.draw()
                self.sensor_sleep()
                time.sleep(20)
项目:LegalNetworks    作者:brschneidE3    | 项目源码 | 文件源码
def list_to_csv(directory_and_filename, list):
    if directory_and_filename[-4:] == '.csv':
        directory_and_filename = directory_and_filename[:-4]
    with open(directory_and_filename + '.csv', 'wb') as csvfile:
        spamwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
        for row in list:
            try:
                spamwriter.writerow(row)
            except UnicodeEncodeError:
                new_row = []
                for element in row:
                    if type(element) is unicode:
                        new_row.append(element.encode('utf-8'))
                    else:
                        new_row.append(element)

    csvfile.close()
项目:rltk    作者:usc-isi-i2    | 项目源码 | 文件源码
def __init__(self, file_path, type='text', **kwargs):
        self._file_path = file_path
        self._type = type
        self._kwargs = kwargs
        self._file_handler = open(file_path, 'r')

        if type == 'json_line':
            # pre-compile json path, raise exception if not exists
            self._id_path_parser = parse(kwargs['id_path'])
        elif type == 'csv':
            self._id_column = kwargs['id_column'] # raise exception if not exists
            delimiter = kwargs['delimiter'] if 'delimiter' in kwargs else ','
            quote_char = kwargs['quote_char'] if 'quote_char' in kwargs else '"'
            quoting = kwargs['quoting'] if 'quoting' in kwargs else csv.QUOTE_MINIMAL
            column_names = kwargs['column_names'] if 'column_names' in kwargs else None
            self._csv_reader = csv.DictReader(
                self._file_handler, delimiter=delimiter, quotechar=quote_char, quoting=quoting, fieldnames=column_names)
        else: # text
            self._id_prefix = hashlib.md5(file_path).hexdigest()[:6]
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def save_to_tsv(out_file, geocoding, out_path=''):
        try:
            out_file_with_extension = '{}.tsv'.format(out_file)
            with open(os.path.realpath(os.path.join(out_path, out_file_with_extension)), 'w+', newline='') as csvfile:
                fieldnames = ['geonameId', 'name', 'toponymName', 'fcl', 'fcode', 'fcodeName', 'fclName', 'lat', 'lng',
                              'adminCode1', 'adminName1', 'adminCode2', 'adminName2', 'adminCode3', 'adminName3',
                              'adminCode4',
                              'adminName4', 'countryName', 'population', 'continentCode', 'countryCode',
                              ]
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='\t', quotechar='"',
                                        quoting=csv.QUOTE_MINIMAL)
                writer.writeheader()
                id, locations = geocoding[0]
                for data in locations:
                    writer.writerow(data)
            csvfile.close()
            return out_file_with_extension
        except Exception as e:
            logger.error('Error while writing tsv file {}'.format(e))
            raise
项目:OnlineAdapterDatabase    作者:NCBI-Hackathons    | 项目源码 | 文件源码
def render(self, data, media_type=None, renderer_context=None):
        sbuf = StringIO()
        writer = csv.writer(sbuf, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        writer.writerow((
            'Vendor', 
            'Kit',
            'Subkit',
            'Version',
            'Barcode',
            'Index Sequence',
            'Full Sequence',
        ))

        for result in data:
            writer.writerow((
                result['vendor'],
                result['kit'],
                result['subkit'],
                result['version'],
                result['barcode'],
                result['index_sequence'],
                result['full_sequence']
            ))
        return sbuf.getvalue().encode(self.charset)
项目:triage    作者:dssg    | 项目源码 | 文件源码
def save_db_objects(db_engine, db_objects):
    """Saves a collection of SQLAlchemy model objects to the database using a COPY command

    Args:
        db_engine (sqlalchemy.engine)
        db_objects (list) SQLAlchemy model objects, corresponding to a valid table
    """
    with tempfile.TemporaryFile(mode='w+') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
        for db_object in db_objects:
            writer.writerow([
                getattr(db_object, col.name)
                for col in db_object.__table__.columns
            ])
        f.seek(0)
        postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
项目:poloTrader    作者:GooTZ    | 项目源码 | 文件源码
def writeDataToFile(self):
        end_time = 0
        with open(self.datafilePath, 'a', newline='') as csvfile:
            spamwriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)

            if self.newfile:
                spamwriter.writerow(self.header)

            for dr in self.data:
                row = []
                end_time = dr
                for v in self.data[dr]:
                    row.append(self.data[dr][v])
                spamwriter.writerow(row)

        with open(self.timefilePath, "w") as ft:
            ft.write("%d\n" % end_time)
            ft.close()

        self.data.clear()

        print(datetime.datetime.utcnow())
        print("data written")
项目:dnflow    作者:DocNow    | 项目源码 | 文件源码
def run(self):
        users = {}
        for tweet_str in self.input().open('r'):
            tweet = json.loads(tweet_str)
            user = tweet['user']['screen_name']
            followers = int(tweet['user']['followers_count'])
            following = int(tweet['user']['friends_count'])
            if following > 0:
                r = followers / float(following)
                users[user] = r

        with self.output().open('w') as fp_counts:
            writer = csv.DictWriter(fp_counts, delimiter=',',
                                    quoting=csv.QUOTE_MINIMAL,
                                    fieldnames=['user', 'count'])
            writer.writeheader()
            for user, r in users.items():
                writer.writerow({'user': user, 'count': r})
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _test_default_attrs(self, ctor, *args):
        obj = ctor(*args)
        # Check defaults
        self.assertEqual(obj.dialect.delimiter, ',')
        self.assertEqual(obj.dialect.doublequote, True)
        self.assertEqual(obj.dialect.escapechar, None)
        self.assertEqual(obj.dialect.lineterminator, "\r\n")
        self.assertEqual(obj.dialect.quotechar, '"')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
        self.assertEqual(obj.dialect.skipinitialspace, False)
        self.assertEqual(obj.dialect.strict, False)
        # Try deleting or changing attributes (they are read-only)
        self.assertRaises(AttributeError, delattr, obj.dialect, 'delimiter')
        self.assertRaises(AttributeError, setattr, obj.dialect, 'delimiter', ':')
        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
        self.assertRaises(AttributeError, setattr, obj.dialect,
                          'quoting', None)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:pyktrader2    作者:harveywwu    | 项目源码 | 文件源码
def save_order_list(self, tday):
        orders = self.id2order.keys()
        if len(self.id2order) > 1:
            orders.sort()
        order_list = [self.id2order[key] for key in orders]
        filename = self.file_prefix + 'order_' + tday.strftime('%y%m%d') + '.csv'
        with open(filename, 'wb') as log_file:
            file_writer = csv.writer(log_file, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL);
            file_writer.writerow(
                ['order_ref', 'local_id', 'sysID', 'inst', 'volume',
                 'filledvolume', 'filledprice', 'filledorders',
                 'action_type', 'direction', 'price_type',
                 'limitprice', 'order_time', 'status', 'order_class', 'trade_ref'])
            for iorder in order_list:
                forders = [ str(key) + ':' + '_'.join([str(s) for s in iorder.filled_orders[key]]) for key in iorder.filled_orders if len(str(key))>0 ]
                filled_str = '|'.join(forders)
                file_writer.writerow(
                    [iorder.order_ref, iorder.local_id, iorder.sys_id, iorder.instrument, iorder.volume,
                     iorder.filled_volume, iorder.filled_price, filled_str,
                     iorder.action_type, iorder.direction, iorder.price_type,
                     iorder.limit_price, iorder.start_tick, iorder.status, iorder.type, iorder.trade_ref])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test_default_attrs(self, ctor, *args):
        obj = ctor(*args)
        # Check defaults
        self.assertEqual(obj.dialect.delimiter, ',')
        self.assertEqual(obj.dialect.doublequote, True)
        self.assertEqual(obj.dialect.escapechar, None)
        self.assertEqual(obj.dialect.lineterminator, "\r\n")
        self.assertEqual(obj.dialect.quotechar, '"')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
        self.assertEqual(obj.dialect.skipinitialspace, False)
        self.assertEqual(obj.dialect.strict, False)
        # Try deleting or changing attributes (they are read-only)
        self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter')
        self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':')
        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
        self.assertRaises(AttributeError, setattr, obj.dialect,
                          'quoting', None)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self._write_error_test(csv.Error, ['a',1,'p,"q"'],
                               escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test_default_attrs(self, ctor, *args):
        obj = ctor(*args)
        # Check defaults
        self.assertEqual(obj.dialect.delimiter, ',')
        self.assertEqual(obj.dialect.doublequote, True)
        self.assertEqual(obj.dialect.escapechar, None)
        self.assertEqual(obj.dialect.lineterminator, "\r\n")
        self.assertEqual(obj.dialect.quotechar, '"')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
        self.assertEqual(obj.dialect.skipinitialspace, False)
        self.assertEqual(obj.dialect.strict, False)
        # Try deleting or changing attributes (they are read-only)
        self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter')
        self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':')
        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
        self.assertRaises(AttributeError, setattr, obj.dialect,
                          'quoting', None)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self._write_error_test(csv.Error, ['a',1,'p,"q"'],
                               escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:data_programming    作者:kep1616    | 项目源码 | 文件源码
def create_memcsv(self):
        output = io.StringIO()
        writer = csv.writer(output, delimiter=',',
                            quotechar='"', quoting=csv.QUOTE_MINIMAL)
        writer.writerow(self.data_headers)

        for row in self.data:
            writer.writerow(row)

        self.data = output

        if self.data_subcat is not None:
            output1 = io.StringIO()
            writer = csv.writer(output1, delimiter=',',
                            quotechar='"', quoting=csv.QUOTE_MINIMAL)
            writer.writerow(self.data_subcat_headers)
            for row in self.data_subcat:
                writer.writerow(row)

            self.data_subcat = output1
项目:data_programming    作者:kep1616    | 项目源码 | 文件源码
def create_memcsv(self):
        output = io.StringIO()
        writer = csv.writer(output, delimiter=',',
                            quotechar='"', quoting=csv.QUOTE_MINIMAL)
        writer.writerow(self.data_headers)

        for row in self.data:
            writer.writerow(row)

        self.data = output

        if self.data_subcat is not None:
            output1 = io.StringIO()
            writer = csv.writer(output1, delimiter=',',
                            quotechar='"', quoting=csv.QUOTE_MINIMAL)
            writer.writerow(self.data_subcat_headers)
            for row in self.data_subcat:
                writer.writerow(row)

            self.data_subcat = output1
项目:expressionist    作者:james-owen-ryan    | 项目源码 | 文件源码
def monte_carlo_export(self, nonterminal, filename, samplesscalar=1, ):
        """
        returns a tab seperated value list of productions, duplicates removed.
        one thing I need to change is to output the set of markup in a nicer fashion
        """
        expansion = collections.Counter(sorted(self.monte_carlo_expand(nonterminal, samplesscalar)))
        with open(filename, 'a') as csvfile:
            row_writer = csv.writer(csvfile, delimiter='\t', quotechar='|', quoting=
            csv.QUOTE_MINIMAL)
            prob_range = 0
            for deriv in expansion:
                rng_interval = float(expansion[deriv]) / sum(expansion.values())
                rng_max = prob_range + rng_interval
                temp_prob = [prob_range, rng_max]
                row_writer.writerow(
                    [nonterminal, str(deriv.expansion),
                     '^'.join(str(annotation) for annotation in list(deriv.markup)),
                     [prob_range, rng_max]]
                )
                prob_range += rng_interval
项目:expressionist    作者:james-owen-ryan    | 项目源码 | 文件源码
def exhaustively_and_nonprobabilistically_export(self, nonterminal, filename):
        """Append to a tab-separated file lines specifying each of the templated lines of dialogue that
        may be yielded by each of the possible terminal expansions of nonterminal.
        """
        all_possible_expansions_of_this_symbol = (
            self.exhaustively_and_nonprobabilistically_expand(nonterminal=nonterminal)
        )
        with open(filename, 'a') as export_tsv_file:
            row_writer = csv.writer(
                export_tsv_file, delimiter='\t', quotechar='|', quoting=csv.QUOTE_MINIMAL
            )
            for intermediate_derivation_object in all_possible_expansions_of_this_symbol:
                row_writer.writerow(
                    [nonterminal, str(intermediate_derivation_object.expansion),
                     '^'.join(str(annotation) for annotation in list(intermediate_derivation_object.markup)),
                     ['N/A', 'N/A']]  # We write 'N/A' here to indicate that we did not expand probabilistically
                )
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _test_default_attrs(self, ctor, *args):
        obj = ctor(*args)
        # Check defaults
        self.assertEqual(obj.dialect.delimiter, ',')
        self.assertEqual(obj.dialect.doublequote, True)
        self.assertEqual(obj.dialect.escapechar, None)
        self.assertEqual(obj.dialect.lineterminator, "\r\n")
        self.assertEqual(obj.dialect.quotechar, '"')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
        self.assertEqual(obj.dialect.skipinitialspace, False)
        self.assertEqual(obj.dialect.strict, False)
        # Try deleting or changing attributes (they are read-only)
        self.assertRaises(AttributeError, delattr, obj.dialect, 'delimiter')
        self.assertRaises(AttributeError, setattr, obj.dialect, 'delimiter', ':')
        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
        self.assertRaises(AttributeError, setattr, obj.dialect,
                          'quoting', None)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:postgres-workshop    作者:Fortiz2305    | 项目源码 | 文件源码
def export_line_by_line(logger, csv_path):
    conn = PostgresDb('workshop', 'postgres', 'db', 'postgres')
    query = "SELECT data->'location' as location, \
data->>'uri' as uri, \
data->>'num_requests' as num_requests, \
data->>'bytes' as bytes, \
data->>'ip' as ip \
FROM events WHERE \
data->>'location'='MAD50' and data->>'num_requests'='1'"
    records = conn.execute(query)

    with open(csv_path, 'w') as f:
        writer = csv.writer(f, delimiter=';', quotechar='|', quoting=csv.QUOTE_MINIMAL)
        writer.writerow(['location', 'uri', 'num_requests', 'bytes', 'ip'])
        for idx, row in enumerate(records):
            writer.writerow(row)
            if idx % 1000 == 0:
                print('1000 inserted: {}'.format(idx))
    logger.info('Done writing')
项目:serverMonitor    作者:WangChengLongWilliam    | 项目源码 | 文件源码
def cpuinfo():
    men=psutil.virtual_memory()
    menab=(men.available)/(1024*1024)
    menta=(men.total)/(1024*1024)
    menus=(men.used)/(1024*1024)
    disk=psutil.disk_usage('/')
    diskta=(disk.total)/(1024*1024*1024)
    diskus=(disk.used)/(1024*1024*1024)
    diskfr=(disk.free)/(1024*1024*1024)
    time = datetime.datetime.now()
    with open('eggs.csv', 'a') as csvfile:
        spamwriter = csv.writer(csvfile, delimiter=' ',
                                quotechar='|', quoting=csv.QUOTE_MINIMAL)
        spamwriter.writerow({"??????%s"%time})
        spamwriter.writerow({"cpu?????%d%s"%(psutil.cpu_percent(interval=1),"%")})
        spamwriter.writerow({"???????%sMB" % int(menta)})
        spamwriter.writerow({"?????????%sMB" % int(menus)})
        spamwriter.writerow({"????????%sMB" % int(menab)})
        spamwriter.writerow({"???????%sG" % int(diskta)})
        spamwriter.writerow({"???????%sG" % int(diskus)})
        spamwriter.writerow({"???????%sG" % int(diskfr)})
    with open('eggs.csv', 'r') as csvfile:
          spamreader = csv.reader(csvfile, delimiter=' ', quotechar='|')
          for row in spamreader:
             print(row)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _test_default_attrs(self, ctor, *args):
        obj = ctor(*args)
        # Check defaults
        self.assertEqual(obj.dialect.delimiter, ',')
        self.assertEqual(obj.dialect.doublequote, True)
        self.assertEqual(obj.dialect.escapechar, None)
        self.assertEqual(obj.dialect.lineterminator, "\r\n")
        self.assertEqual(obj.dialect.quotechar, '"')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
        self.assertEqual(obj.dialect.skipinitialspace, False)
        self.assertEqual(obj.dialect.strict, False)
        # Try deleting or changing attributes (they are read-only)
        self.assertRaises((TypeError, AttributeError), delattr, obj.dialect,
                          'delimiter')
        self.assertRaises((TypeError, AttributeError), setattr, obj.dialect,
                          'delimiter', ':')
        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
        # PyPy gets a TypeError instead of an AttributeError
        self.assertRaises((AttributeError, TypeError), setattr, obj.dialect,
                          'quoting', None)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _test_default_attrs(self, ctor, *args):
        obj = ctor(*args)
        # Check defaults
        self.assertEqual(obj.dialect.delimiter, ',')
        self.assertEqual(obj.dialect.doublequote, True)
        self.assertEqual(obj.dialect.escapechar, None)
        self.assertEqual(obj.dialect.lineterminator, "\r\n")
        self.assertEqual(obj.dialect.quotechar, '"')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
        self.assertEqual(obj.dialect.skipinitialspace, False)
        self.assertEqual(obj.dialect.strict, False)
        # Try deleting or changing attributes (they are read-only)
        self.assertRaises(AttributeError, delattr, obj.dialect, 'delimiter')
        self.assertRaises(AttributeError, setattr, obj.dialect, 'delimiter', ':')
        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
        self.assertRaises(AttributeError, setattr, obj.dialect,
                          'quoting', None)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self._write_error_test(csv.Error, ['a',1,'p,"q"'],
                               escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def _test_default_attrs(self, ctor, *args):
        obj = ctor(*args)
        # Check defaults
        self.assertEqual(obj.dialect.delimiter, ',')
        self.assertEqual(obj.dialect.doublequote, True)
        self.assertEqual(obj.dialect.escapechar, None)
        self.assertEqual(obj.dialect.lineterminator, "\r\n")
        self.assertEqual(obj.dialect.quotechar, '"')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
        self.assertEqual(obj.dialect.skipinitialspace, False)
        self.assertEqual(obj.dialect.strict, False)
        # Try deleting or changing attributes (they are read-only)
        self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter')
        self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':')
        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
        self.assertRaises(AttributeError, setattr, obj.dialect,
                          'quoting', None)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:GLT    作者:MikeTheGreat    | 项目源码 | 文件源码
def write_data_file(self, output_file_path):
        """Write our course info file information out
        output_file_path: path to file file"""
        with open(output_file_path, "w") as f_out:
            f_out.write("Assignments\n")

            for assign in self.assignments:
                f_out.write(assign.name+","+assign.internal_name+","+str(assign.id))
            f_out.write("\n")

            f_out.write("Roster\n")
            csvwriter = csv.writer(f_out, quoting=csv.QUOTE_MINIMAL)
            for student in self.roster.students_no_errors:
                csvwriter.writerow([student.first_name,\
                    student.last_name,\
                    student.email,\
                    student.glid])
项目:Self-DriverFlow    作者:davsuacar    | 项目源码 | 文件源码
def callback(data):
    print("callback called...")
    global i
    global vc
    rospy.loginfo("I heard %s",data)
    with open('dataset.csv', 'a') as csvfile:
        spamwriter = csv.writer(csvfile, delimiter=' ',
                            quotechar='|', quoting=csv.QUOTE_MINIMAL)
        spamwriter.writerow([data.angular.z])

        if vc.isOpened(): # try to get the first frame
            rval, frame = vc.read()
            else:
        rval = False


        rval, frame = vc.read()
        cv2.imwrite("images/image_" + str(i) + ".png", frame)
        i += 1
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _test_default_attrs(self, ctor, *args):
        obj = ctor(*args)
        # Check defaults
        self.assertEqual(obj.dialect.delimiter, ',')
        self.assertEqual(obj.dialect.doublequote, True)
        self.assertEqual(obj.dialect.escapechar, None)
        self.assertEqual(obj.dialect.lineterminator, "\r\n")
        self.assertEqual(obj.dialect.quotechar, '"')
        self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
        self.assertEqual(obj.dialect.skipinitialspace, False)
        self.assertEqual(obj.dialect.strict, False)
        # Try deleting or changing attributes (they are read-only)
        self.assertRaises(AttributeError, delattr, obj.dialect, 'delimiter')
        self.assertRaises(AttributeError, setattr, obj.dialect, 'delimiter', ':')
        self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
        self.assertRaises(AttributeError, setattr, obj.dialect,
                          'quoting', None)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_write_escape(self):
        self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
                         escapechar='\\')
        self.assertRaises(csv.Error,
                          self._write_test,
                          ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                          escapechar=None, doublequote=False)
        self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
                         escapechar='\\', doublequote = False)
        self._write_test(['"'], '""""',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_MINIMAL,
                         doublequote = False)
        self._write_test(['"'], '\\"',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
        self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
                         escapechar='\\', quoting = csv.QUOTE_NONE)
项目:catwalk    作者:dssg    | 项目源码 | 文件源码
def save_db_objects(db_engine, db_objects):
    """Saves a collection of SQLAlchemy model objects to the database using a COPY command

    Args:
        db_engine (sqlalchemy.engine)
        db_objects (list) SQLAlchemy model objects, corresponding to a valid table
    """
    with tempfile.TemporaryFile(mode='w+') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
        for db_object in db_objects:
            writer.writerow([
                getattr(db_object, col.name)
                for col in db_object.__table__.columns
            ])
        f.seek(0)
        postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
项目:s3bundler    作者:awslabs    | 项目源码 | 文件源码
def dl_and_parse(s3,headers,keylist,bucket):
    for key in keylist:
        with TemporaryFile() as fp:
            try:
                s3.Bucket(bucket).download_fileobj(key,fp)
            except ClientError as e:
                logger.error('Unable to download s3://{}/{}'.format(bucket, key))
                logger.debug('Received error: {}'.format(e))
                sys.exit(5)
            fp.seek(0)
            with TextIOWrapper(GzipFile(fileobj=fp,mode='r')) as f:
                try:
                    reader = csv.DictReader(f,fieldnames=headers,delimiter=',',quoting=csv.QUOTE_MINIMAL)
                    for row in reader:
                        yield row
                except csv.Error as e:
                    logger.error("Unable to read CSV '{}'".format(reader.line))
                    logger.debug('Received error: {}'.format(e))
                    sys.exit(3)

# main parser
项目:pgreaper    作者:vincentlaucsb    | 项目源码 | 文件源码
def to_string(table):
    ''' Return table as a StringIO object for writing via copy() '''

    string = StringIO()
    writer = csv.writer(string, delimiter=",", quoting=csv.QUOTE_MINIMAL)
    dict_encoder = json.JSONEncoder()

    jsonb_cols = set([i for i, j in enumerate(table.col_types) if j == 'jsonb'])
    datetime_cols = set([i for i, j in enumerate(table.col_types) if j == 'datetime'])

    for row in table:
        for i in jsonb_cols:
            row[i] = dict_encoder.encode(row[i])
        for i in datetime_cols:
            row[i] = psycopg2.extensions.adapt(i)

        writer.writerow(row)

    string.seek(0)
    return string
项目:LOST    作者:kylemh    | 项目源码 | 文件源码
def create_csv(csv_filename, header_list, sql_query_string):
    """Transfer records from database query to a new .csv file.

    Keyword arguments:
    csv_filename -- String with a '.csv' suffix
    header_list -- List of strings (as CSV header names)
    sql_query_string -- Passed to match header_list format
    """

    with open(csv_filename, 'w', newline='\n') as csvfile:
        csv_writer = csv.writer(csvfile, quotechar="'", quoting=csv.QUOTE_MINIMAL)
        csv_writer.writerow(header_list)

        CUR.execute(sql_query_string)
        records = CUR.fetchall()
        CONN.commit()
        for entry in records:
            csv_writer.writerow(entry)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def write_csv_rows(rows, path):
    """
    Write CSV rows in a standard format.
    :type rows: list[list[string]]
    :type path: string
    """
    with open(path, 'w') as outfile:
        writer = csv.writer(outfile, delimiter=',', quoting=csv.QUOTE_MINIMAL)
        writer.writerows(rows)
项目:fccforensics    作者:RagtagOpen    | 项目源码 | 文件源码
def run_query(src, fn, rows=200):
    print('writing data for %s to %s' % (source, fn))
    query['query']['function_score']['query']['bool']['must'][0]['term']['analysis.source'] = source
    emails = set()
    batches = 0
    print(json.dumps(query))
    total = None
    with open(fn, 'w', newline='') as outfile:
        writer = csv.writer(outfile, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
        writer.writerow(['email', 'name', 'date', 'comment', 'url'])
        while len(emails) < rows and batches < 10:
            offset = batches * 100
            if total and offset > total:
                break
            resp = es.search(index='fcc-comments', body=query, size=100, from_=offset)
            if batches == 0:
                total = resp['hits']['total']
                print('\t%s matches' % (total))
            else:
                print('\tbatch %s: have %s' % (batches+1, len(emails)))
            batches += 1
            for doc in resp['hits']['hits']:
                if len(emails) == rows:
                    break
                data = doc['_source']
                if data['contact_email'] in emails:
                    continue
                emails.add(data['contact_email'])
                writer.writerow([data['contact_email'], data['filers'][0]['name'],
                    data['date_received'], data['text_data'],
                    'https://www.fcc.gov/ecfs/filing/%s' % doc['_id']
                ])
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def validate_to_csv(model_name: str, last_epoch: int,
                        configuration: Configuration = Configuration.german(),
                        step_count=10, first_epoch: int = 0,
                        csv_directory: Path = configuration.default_data_directories.test_results_directory) -> List[
        Tuple[int, ExpectationsVsPredictionsInGroupedBatches]]:

        step_size = (last_epoch - first_epoch) / (step_count - 1)

        epochs = distinct(list(int(first_epoch + index * step_size) for index in range(step_count)))
        log("Testing model {} on epochs {}.".format(model_name, epochs))

        model = configuration.load_model(model_name, last_epoch,
                                         allowed_characters_for_loaded_model=configuration.allowed_characters,
                                         use_kenlm=True,
                                         language_model_name_extension="-incl-trans")

        def get_result(epoch: int) -> ExpectationsVsPredictionsInGroupedBatches:
            log("Testing epoch {}.".format(epoch))

            model.load_weights(
                allowed_characters_for_loaded_model=configuration.allowed_characters,
                load_model_from_directory=configuration.directories.nets_base_directory / model_name, load_epoch=epoch)

            return configuration.test_model_grouped_by_loaded_corpus_name(model)

        results_with_epochs = []

        csv_file = csv_directory / "{}.csv".format(model_name + "-incl-trans")
        import csv
        with csv_file.open('w', encoding='utf8') as opened_csv:
            writer = csv.writer(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            for epoch in epochs:
                result = get_result(epoch)
                writer.writerow((epoch, result.average_loss, result.average_letter_error_rate,
                                 result.average_word_error_rate, result.average_letter_error_count,
                                 result.average_word_error_count))

        return results_with_epochs
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def summarize_to_csv(self, summary_csv_file: Path) -> None:
        import csv
        with summary_csv_file.open('w', encoding='utf8') as csv_summary_file:
            writer = csv.writer(csv_summary_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            for row in self.csv_rows():
                writer.writerow(row)
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def save(self, corpus_csv_file: Path, use_relative_audio_file_paths: bool = True):
        import csv
        with corpus_csv_file.open('w', encoding='utf8') as opened_csv:
            writer = csv.writer(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

            examples_and_phase = [(e, Phase.training) for e in self.training_examples] + \
                                 [(e, Phase.test) for e in self.test_examples]

            for e, phase in examples_and_phase:
                writer.writerow(
                    (e.id, str(e.audio_file.relative_to(
                        corpus_csv_file.parent) if use_relative_audio_file_paths else e.audio_file),
                     e.label, phase.value, e.positional_label.serialize() if e.positional_label else ""))
项目:us_zipcodes_congress    作者:OpenSourceActivismTech    | 项目源码 | 文件源码
def csv_writer(filename, data, fields=None, delimiter=',', quoting=csv.QUOTE_MINIMAL):
    log.info('writing', filename)
    rows = 0
    with open(filename, 'w') as f:
        if not fields:
            fields = data[0].keys()
        writer = csv.DictWriter(f, fieldnames=fields, delimiter=delimiter, quoting=quoting)
        writer.writeheader()
        for row in data:
            writer.writerow(row)
            rows += 1
    log.info('wrote %d rows' % rows)
项目:us_zipcodes_congress    作者:OpenSourceActivismTech    | 项目源码 | 文件源码
def load_csv_columns(filename, column_names=None, skip=0, delimiter=',', quoting=csv.QUOTE_MINIMAL):
    r = []
    log.info('opening', filename)
    with open(filename, 'r') as f:
        data_file = csv_reader_converter(f, delimiter=delimiter, quoting=quoting)
        for i in range(skip):
            next(data_file)
        headers = next(data_file, None)  # parse the headers
        columns = {}
        for (i, h) in enumerate(headers):
            h = h.strip()
            if (not column_names) or h in column_names:
                columns[i] = h
        log.info("headers", headers)
        log.info("columns", column_names)

        for line in data_file:
            d = {}
            if not line:
                continue
            for (column, index) in columns.items():
                if column_names:
                    rename = column_names[index]
                else:
                    rename = headers[column]
                value = line[column].strip()
                d[rename] = value
            r.append(d)
        log.info('read %d lines' % len(r))
        return r
项目:croissance    作者:biosustain    | 项目源码 | 文件源码
def __init__(self, file, include_default_phase: bool = True):
        writer = csv.writer(file, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
        writer.writerow(['name', 'phase', 'slope', 'intercept', 'N0', 'SNR', 'rank'])

        self._writer = writer
        self._file = file
        self._include_default_phase = include_default_phase
项目:dprr-django    作者:kingsdigitallab    | 项目源码 | 文件源码
def run():

    note_files = ['promrep/scripts/data/MRR3ShortNotesV2.csv',
                  'promrep/scripts/data/MRR3LongNotesV2.csv']

    source = SecondarySource.objects.get(abbrev_name='Broughton MRR3')

    for ifname in note_files:
        print 'Will read notes from file', ifname, '\n\n'

        ifile = codecs.open(ifname, 'r', encoding='latin1')
        log_fname = os.path.splitext(os.path.basename(ifname))[0] + '.log'

        with open(log_fname, 'wb') as log_file:
            spamwriter = csv.writer(
                log_file,
                delimiter=',',
                quotechar='"',
                quoting=csv.QUOTE_MINIMAL)
            spamwriter.writerow(('id', 'note'))

            for i, line in enumerate(ifile):

                note_text = line.encode('utf-8').replace("**", ";").strip('"')

                print str(i) + ":", note_text

                note = PostAssertionNote.objects.create(
                    text=note_text, secondary_source=source)
                spamwriter.writerow((note.id, note_text[0:20]))
项目:pfb-network-connectivity    作者:azavea    | 项目源码 | 文件源码
def handle(self, *args, **options):

        tmpdir = tempfile.mkdtemp()

        try:
            queryset = AnalysisJob.objects.all().filter(status=AnalysisJob.Status.COMPLETE)
            filter_set = AnalysisJobFilterSet()
            queryset = filter_set.filter_latest(queryset, 'latest', True)

            tmp_csv_filename = os.path.join(tmpdir, 'results.csv')
            with open(tmp_csv_filename, 'w') as csv_file:
                writer = None
                fieldnames = []

                for job in queryset:
                    row_data = {}
                    for export in EXPORTS:
                        columns, values = export(job)
                        if writer is None:
                            fieldnames = fieldnames + columns
                        for column, value in zip(columns, values):
                            row_data[column] = value
                    if writer is None:
                        writer = csv.DictWriter(csv_file,
                                                fieldnames=fieldnames,
                                                dialect=csv.excel,
                                                quoting=csv.QUOTE_MINIMAL)
                        writer.writeheader()
                    writer.writerow(row_data)

            s3_client = boto3.client('s3')
            now = datetime.utcnow()
            s3_key = 'analysis-spreadsheets/results-{}.csv'.format(now.strftime('%Y-%m-%dT%H%M'))
            s3_client.upload_file(tmp_csv_filename, settings.AWS_STORAGE_BUCKET_NAME, s3_key)
            logger.info('File uploaded to: s3://{}/{}'
                        .format(settings.AWS_STORAGE_BUCKET_NAME, s3_key))
        finally:
            shutil.rmtree(tmpdir)
项目:LegalNetworks    作者:brschneidE3    | 项目源码 | 文件源码
def add_row_to_csv(directory, filename, row, columns_to_skip):
    row = ['' for i in range(columns_to_skip)] + row
    with open(directory + '/' + filename + '.csv', 'a') as csvfile:
        spamwriter = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
        spamwriter.writerow(row)
    csvfile.close()
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super(RankerRelevanceFileQueryStream, self).__init__(*args, **kwargs)

        dialect = csv.excel

        # The following explicit assignments shadow the dialect defaults
        # but are necessary to avoid strange behavior while called by
        # certain unit tests. Please do not delete.
        dialect.doublequote = True
        dialect.quoting = csv.QUOTE_MINIMAL
        dialect.skipinitialspace = True

        self.__reader__ = csv.reader(self.query_file, dialect=dialect)