我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用factory.Factory()。
def _adjust_kwargs(cls, **kwargs): # Factory Boy special method used to alter custom settings dictionaries. # Make a copy of settings and override transport to use base transport. kwargs['data'] = dict( kwargs['data'], transport={ 'path': 'pysoa.common.transport.base:ServerTransport', }, ) return kwargs
def get_data_archive_servers(self): # type: () -> osisoftpy.structures.TypedList[DataArchive] """ :return: """ log.debug('Retrieving all PI Data Archive servers from %s', self.url) r = self.session.get(self.url + '/dataservers') if r.status_code == requests.codes.ok: data = r.json() if len(data['Items']) > 0: log.debug('HTTP %s - Instantiating OSIsoftPy.DataArchives()', r.status_code) factory = Factory(DataArchive) servers = TypedList(validtypes=DataArchive) log.debug('Staging %s PI server(s) for instantiation...', get_count(data['Items'])) for i in data['Items']: try: log.debug('Instantiating "%s" as ' 'OSIsoftPy.DataArchive...', i['Name']) server = factory.create(name=i['Name'], serverversion=i[ 'ServerVersion'], webid=i['WebId'], isconnected=i['IsConnected'], id=i['Id']) servers.append(server) except OSIsoftPyException as e: log.error('Unable to retrieve server info for ' '"%s"', i['Name'], exc_info=True) log.debug('PI Data Archive server retrieval success! %s PI ' 'server(s) were ' 'found and instantiated.', get_count(servers)) return servers r.raise_for_status()
def get_points(self, query, count=10, scope='*'): # type: (str, int, str) -> osisoftpy.structures.TypedList[Point] """ :param query: :param count: :param scope: :return: """ payload = {'q': query, 'count': count, 'scope': scope} log.debug( 'Executing Query against PI Web WebAPI Indexed Search with ' 'the following parameters: Query: "%s", Count: "%s". Payload: %s', query, count, payload) r = self.session.get(self.url + '/search/query', params=payload) if r.status_code != requests.codes.ok: r.raise_for_status() else: data = r.json() log.debug('HTTP %s - Instantiating %s PI points', r.status_code, get_count(data.get('Items', None))) factory = Factory(Point) items = list(map(lambda x: create(factory, x), data.get('Items', None))) points = TypedList(Point) for point in items: points.append(point) log.debug('PI Point retrieval success! %s PI ' 'point(s) were ' 'found and instantiated.', get_count(points)) if len(data['Errors']) != 0: for error in data['Errors']: try: log.warning('The PI Web WebAPI returned the ' 'following error while instantiating ' 'PI points. ' 'ErrorCode: {0}, Source: {1}, ' 'Message {2}'.format( error['ErrorCode'], error['Source'], error['Message'])) except Exception as e: log.error('Exception encounted while ' 'instantiating ' 'PI points!', exc_info=True) return points