Python factory 模块,Factory() 实例源码

我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用factory.Factory()

项目:pysoa    作者:eventbrite    | 项目源码 | 文件源码
def _adjust_kwargs(cls, **kwargs):
        # Factory Boy special method used to alter custom settings dictionaries.
        # Make a copy of settings and override transport to use base transport.
        kwargs['data'] = dict(
            kwargs['data'],
            transport={
                'path': 'pysoa.common.transport.base:ServerTransport',
            },
        )
        return kwargs
项目:osisoftpy    作者:dstcontrols    | 项目源码 | 文件源码
def get_data_archive_servers(self):
        # type: () -> osisoftpy.structures.TypedList[DataArchive]
        """

        :return: 
        """
        log.debug('Retrieving all PI Data Archive servers from %s', self.url)
        r = self.session.get(self.url + '/dataservers')
        if r.status_code == requests.codes.ok:
            data = r.json()
            if len(data['Items']) > 0:
                log.debug('HTTP %s - Instantiating OSIsoftPy.DataArchives()',
                          r.status_code)
                factory = Factory(DataArchive)
                servers = TypedList(validtypes=DataArchive)
                log.debug('Staging %s PI server(s) for instantiation...',
                          get_count(data['Items']))
                for i in data['Items']:
                    try:
                        log.debug('Instantiating "%s" as '
                                  'OSIsoftPy.DataArchive...', i['Name'])
                        server = factory.create(name=i['Name'],
                                                serverversion=i[
                                                    'ServerVersion'],
                                                webid=i['WebId'],
                                                isconnected=i['IsConnected'],
                                                id=i['Id'])
                        servers.append(server)
                    except OSIsoftPyException as e:
                        log.error('Unable to retrieve server info for '
                                  '"%s"', i['Name'], exc_info=True)
                log.debug('PI Data Archive server retrieval success! %s PI '
                          'server(s) were '
                          'found and instantiated.', get_count(servers))
                return servers
        r.raise_for_status()
项目:osisoftpy    作者:dstcontrols    | 项目源码 | 文件源码
def get_points(self, query, count=10, scope='*'):
        # type: (str, int, str) -> osisoftpy.structures.TypedList[Point]
        """

        :param query: 
        :param count: 
        :param scope: 
        :return: 
        """
        payload = {'q': query, 'count': count, 'scope': scope}
        log.debug(
            'Executing Query against PI Web WebAPI Indexed Search with '
            'the following parameters: Query: "%s", Count: "%s". Payload: %s',
            query, count, payload)
        r = self.session.get(self.url + '/search/query', params=payload)
        if r.status_code != requests.codes.ok:
            r.raise_for_status()
        else:
            data = r.json()
            log.debug('HTTP %s - Instantiating %s PI points', r.status_code,
                      get_count(data.get('Items', None)))
            factory = Factory(Point)
            items = list(map(lambda x: create(factory, x),
                             data.get('Items', None)))
            points = TypedList(Point)
            for point in items:
                points.append(point)
            log.debug('PI Point retrieval success! %s PI '
                      'point(s) were '
                      'found and instantiated.', get_count(points))

            if len(data['Errors']) != 0:
                for error in data['Errors']:
                    try:
                        log.warning('The PI Web WebAPI returned the '
                                    'following error while instantiating '
                                    'PI points. '
                                    'ErrorCode: {0}, Source: {1}, '
                                    'Message {2}'.format(
                            error['ErrorCode'], error['Source'],
                            error['Message']))
                    except Exception as e:
                        log.error('Exception encounted while '
                                  'instantiating '
                                  'PI points!', exc_info=True)

            return points