我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.sql.select()。
def for_update_clause(self, select, **kw): if self.is_subquery(): return "" tmp = ' FOR UPDATE' if select._for_update_arg.of: tmp += ' OF ' + ', '.join( self.process(elem, **kw) for elem in select._for_update_arg.of ) if select._for_update_arg.nowait: tmp += " NOWAIT" return tmp
def upgrade(): task = table('task', column('id'), column('info') ) conn = op.get_bind() query = select([task.c.id, task.c.info]) tasks = conn.execute(query) update_values = [] for row in tasks: info_data = row.info info_dict = json.loads(info_data) if info_dict.get('n_answers'): del info_dict['n_answers'] update_values.append({'task_id': row.id, 'new_info': json.dumps(info_dict)}) task_update = task.update().\ where(task.c.id == bindparam('task_id')).\ values(info=bindparam('new_info')) if len(update_values) > 0: conn.execute(task_update, update_values)
def downgrade(): task = table('task', column('id'), column('info'), column('n_answers') ) conn = op.get_bind() query = select([task.c.id, task.c.info, task.c.n_answers]) tasks = conn.execute(query) update_values = [] for row in tasks: info_data = row.info info_dict = json.loads(info_data) info_dict['n_answers'] = row.n_answers update_values.append({'task_id': row.id, 'new_info': json.dumps(info_dict)}) task_update = task.update().\ where(task.c.id == bindparam('task_id')).\ values(info=bindparam('new_info')) if len(update_values) > 0: conn.execute(task_update, update_values)
def setup_loader(self, instance): def lazyload(): clause = sql.and_() try: pk = self.parent.pks_by_table[self.columns[0].table] except KeyError: pk = self.columns[0].table.primary_key for primary_key in pk: attr = self.parent._getattrbycolumn(instance, primary_key) if not attr: return None clause.clauses.append(primary_key == attr) if self.group is not None: groupcols = [p for p in self.parent.props.values() if isinstance(p, DeferredColumnProperty) and p.group==self.group] row = sql.select([g.columns[0] for g in groupcols], clause, use_labels=True).execute().fetchone() for prop in groupcols: if prop is self: continue instance.__dict__[prop.key] = row[prop.columns[0]] objectstore.global_attributes.create_history(instance, prop.key, uselist=False) return row[self.columns[0]] else: return sql.select([self.columns[0]], clause, use_labels=True).scalar() return lazyload
def material(uuid): cols = [materials.c.uuid, materials.c.parent_id, materials.c.ga_absolute_volumetric_loading, materials.c.sa_volumetric_surface_area, materials.c.vf_helium_void_fraction, materials.c.generation, materials.c.run_id] rows = and_(materials.c.uuid == uuid, or_(materials.c.retest_passed == None, materials.c.retest_passed == True)) s = select(cols, rows) print( '\nuuid\t\t\t\t\tparent\tgas adsorption (cc/cc)\tsurface area (m2/cc)' + '\tvoid fraction\tgeneration\trun' ) result = engine.execute(s) for row in result: print( '%s\t%s\t%s\t\t%s\t\t\t' % (row[0], row[1], row[2], row[3]) + '%s\t%s\t\t%s' % (row[4], row[5], row[6]) ) result.close()
def find_children(uuid): cols = [materials.c.id] rows = [materials.c.uuid == uuid] result = engine.execute(select(cols, *rows)) for row in result: parent_id = row[0] result.close() cols = [materials.c.uuid] rows = and_(materials.c.parent_id == parent_id, or_(materials.c.retest_passed == None, materials.c.retest_passed == True)) print('\nchildren of %s :' % uuid) result = engine.execute(select(cols, rows)) for row in result: print('\t%s' % row[0]) result.close()
def remove_old_members(self, trans, guild): # Since pylint complains about <thing> == True. # We need to do this otherwise silly comparison # because it's not a comparison at all, it's actually # creating a SQLAlchemy "equality" object that is used # to generate the query. # # pylint: disable=singleton-comparison self.logger.info(f"Deleting old members from guild {guild.name}") sel = select([self.tb_guild_membership]) \ .where(and_( self.tb_guild_membership.c.guild_id == guild.id, self.tb_guild_membership.c.is_member == True, )) result = trans.execute(sel) for row in result.fetchall(): user_id = row[0] member = guild.get_member(user_id) if member is not None: self.remove_member(trans, member)
def update_downloads(resource_id, total_downloads): table = get_table('ckanext_tayside_resource_downloads') id_col_name = 'resource_id' id_col = getattr(table.c, id_col_name) s = select([func.count(id_col)], id_col == resource_id) connection = model.Session.connection() count = connection.execute(s).fetchone() engine = model.meta.engine if count and count[0]: engine.execute(table.update() .where(id_col == resource_id) .values(total_downloads=total_downloads)) else: values = {id_col_name: resource_id, 'total_downloads': total_downloads} engine.execute(table.insert().values(**values))
def get_tasks(self): """Get all tasks in the database.""" try: conn = self.db_engine.connect() query = sql.select([self.tasks_tbl]) rs = conn.execute(query) task_list = [objects.Task.from_db(dict(r)) for r in rs] self._assemble_tasks(task_list=task_list) # add reference to this state manager to each task for t in task_list: t.statemgr = self conn.close() return task_list except Exception as ex: self.logger.error("Error querying task list: %s" % str(ex)) return []
def get_boot_action(self, action_id): """Query for a single boot action by ID. :param action_id: string ULID bootaction id """ try: with self.db_engine.connect() as conn: query = self.ba_status_tbl.select().where( self.ba_status_tbl.c.action_id == ulid2.decode_ulid_base32( action_id)) rs = conn.execute(query) r = rs.fetchone() if r is not None: ba_dict = dict(r) ba_dict['action_id'] = bytes(ba_dict['action_id']) ba_dict['identity_key'] = bytes(ba_dict['identity_key']) ba_dict['task_id'] = uuid.UUID(bytes=ba_dict['task_id']) return ba_dict else: return None except Exception as ex: self.logger.error( "Error querying boot action %s" % action_id, exc_info=ex)
def for_update_clause(self, select): if self.is_subquery(): return "" tmp = ' FOR UPDATE' if select._for_update_arg.of: tmp += ' OF ' + ', '.join( self.process(elem) for elem in select._for_update_arg.of ) if select._for_update_arg.nowait: tmp += " NOWAIT" return tmp
def score_capabilities(self): # TODO: Is that limit justified? https = hcase( self.https_available, hlimit((hcoalesce(self.https_grade, 0)) / len(SSLLABS_GRADES), 0.5, 1), 0 ) ips = self.ipv4 * 0.5 + self.ipv6 * 0.5 # TODO: + same on the last few days? avg_rtime_q = sql.select([sqlf.coalesce(sqlf.sum(Ping.response_time)/sqlf.count(), 0).label('c')]).where((Ping.instance_id == self.id) & (Ping.state == True)).alias('t') avg_rtime = 1 - hlimit(hsubq(self, avg_rtime_q) / MAX_RESPONSE_TIME_MS, 0, 1) if isinstance(self, Instance): logger.debug("score:caps: tls=%.2f ips=%.2f rtime=%.2f", https, ips, avg_rtime) return https * 0.50 + ips * 0.20 + avg_rtime * 0.30
def score_uptime(self): """ CW: sqlalchemy hell """ tc = Ping.time >= sqlf.now() - timedelta(days=7) global_fs = sql.select([sqlf.count()]).where((Ping.instance_id==self.id) & (Ping.state==False)).as_scalar().label('gfs') global_ts = sql.select([sqlf.count()]).where((Ping.instance_id==self.id) & (Ping.state==True)).as_scalar().label('gts') recent_fs = sql.select([sqlf.count()]).where((Ping.instance_id==self.id) & (Ping.state==False) & tc).as_scalar().label('rfs') recent_ts = sql.select([sqlf.count()]).where((Ping.instance_id==self.id) & (Ping.state==True) & tc).as_scalar().label('rts') global_uptime = hsubq(self, sql.select([sqlf.coalesce(global_ts / (global_fs + global_ts), 0)])) recent_uptime = hsubq(self, sql.select([sqlf.coalesce(recent_ts / (recent_fs + recent_ts), 0)])) continuous_bonus = hlimit(dt_seconds_now(self.last_state_change) / 3600 * 24 * 7, 0, 0.2) if isinstance(self, Instance): logger.debug("score:uptime: global=%.2f recent=%.2f bonus=%.2f", global_uptime, recent_uptime, continuous_bonus) r = hlimit(global_uptime * 0.4 + recent_uptime * 0.60 + continuous_bonus, 0, 1) return r
def get_ping_aggregates(self, template): now = datetime.utcnow() if template == '30m': a = now - timedelta(days=7) agg_date = sqle.text("datetime((strftime('%s', time) / 1800) * 1800, 'unixepoch')") else: raise Exception() cnt = sqlf.count(Ping.id) q = sql.select([ agg_date, sqlf.sum(Ping.response_time) / cnt, sqlf.sum(Ping.users) / cnt, sqlf.sum(Ping.statuses) / cnt, sqlf.sum(Ping.connections) / cnt, ]) q = q.where((Ping.time >= a) & (Ping.instance_id == self.id)) q = q.group_by(agg_date) q = q.order_by(Ping.id) Nt = namedtuple('PingAgg', ['time', 'response_time', 'users', 'statuses', 'connections']) r = db.session.execute(q) return [Nt(*t) for t in r]
def get_uptime_aggregates(self, template): now = datetime.utcnow() if template == '30m': a = now - timedelta(days=7) agg_date = sqle.text("datetime((strftime('%s', time) / 1800) * 1800, 'unixepoch')") else: raise Exception() cnt = sqlf.count(Ping.id) q = sql.select([ agg_date, Ping.state, sqlf.sum(Ping.response_time) / cnt, ]) q = q.where((Ping.time >= a) & (Ping.instance_id == self.id)) q = q.group_by(agg_date, Ping.state) q = q.order_by(Ping.id) Nt = namedtuple('PingUptimeAgg', ['time', 'state', 'response_time']) r = db.session.execute(q) return [Nt(*t) for t in r]
def create_uuids(migrate_engine, primary_table_name, revision_table_name): # have changed type of cols so recreate metadata metadata = MetaData(migrate_engine) # 4 create uuids for primary entities and in related tables primary_table = Table(primary_table_name, metadata, autoload=True) if revision_table_name: revision_table = Table(revision_table_name, metadata, autoload=True) # fetchall wouldn't be optimal with really large sets of data but here <20k ids = [ res[0] for res in migrate_engine.execute(select([primary_table.c.id])).fetchall() ] for count,id in enumerate(ids): # if count % 100 == 0: print(count, id) myuuid = make_uuid() update = primary_table.update().where(primary_table.c.id==id).values(id=myuuid) migrate_engine.execute(update) if revision_table_name: # ensure each id in revision table match its continuity id. q = revision_table.update().values(id=revision_table.c.continuity_id) migrate_engine.execute(q)
def for_update_clause(self, select, **kw): if self.is_subquery(): return "" tmp = ' FOR UPDATE' if select._for_update_arg.of: tmp += ' OF ' + ', '.join( self.process(elem, **kw) for elem in select._for_update_arg.of ) if select._for_update_arg.nowait: tmp += " NOWAIT" if select._for_update_arg.skip_locked: tmp += " SKIP LOCKED" return tmp
def exists(*args, **kwargs): """Return an ``EXISTS`` clause as applied to a :class:`.Select` object. Calling styles are of the following forms:: # use on an existing select() s = select([table.c.col1]).where(table.c.col2==5) s = exists(s) # construct a select() at once exists(['*'], **select_arguments).where(criterion) # columns argument is optional, generates "EXISTS (SELECT *)" # by default. exists().where(table.c.col2==5) """ return Exists(*args, **kwargs)
def union_all(*selects, **kwargs): """Return a ``UNION ALL`` of multiple selectables. The returned object is an instance of :class:`.CompoundSelect`. A similar :func:`union_all()` method is available on all :class:`.FromClause` subclasses. \*selects a list of :class:`.Select` instances. \**kwargs available keyword arguments are the same as those of :func:`select`. """ return CompoundSelect(CompoundSelect.UNION_ALL, *selects, **kwargs)
def self_group(self, against=None): """Apply a 'grouping' to this :class:`.ClauseElement`. This method is overridden by subclasses to return a "grouping" construct, i.e. parenthesis. In particular it's used by "binary" expressions to provide a grouping around themselves when placed into a larger expression, as well as by :func:`.select` constructs when placed into the FROM clause of another :func:`.select`. (Note that subqueries should be normally created using the :func:`.Select.alias` method, as many platforms require nested SELECT statements to be named). As expressions are composed together, the application of :meth:`self_group` is automatic - end-user code should never need to use this method directly. Note that SQLAlchemy's clause constructs take operator precedence into account - so parenthesis might not be needed, for example, in an expression like ``x OR (y AND z)`` - AND takes precedence over OR. The base :meth:`self_group` method of :class:`.ClauseElement` just returns self. """ return self
def __setitem__(self, key, value): if key in self: # this warning is primarily to catch select() statements # which have conflicting column names in their exported # columns collection existing = self[key] if not existing.shares_lineage(value): util.warn('Column %r on table %r being replaced by ' '%r, which has the same key. Consider ' 'use_labels for select() statements.' % (key, getattr(existing, 'table', None), value)) self._all_cols.remove(existing) # pop out memoized proxy_set as this # operation may very well be occurring # in a _make_proxy operation ColumnElement.proxy_set._reset(value) self._all_cols.add(value) self._data[key] = value
def columns(self): """A named-based collection of :class:`.ColumnElement` objects maintained by this :class:`.FromClause`. The :attr:`.columns`, or :attr:`.c` collection, is the gateway to the construction of SQL expressions using table-bound or other selectable-bound columns:: select([mytable]).where(mytable.c.somecolumn == 5) """ if '_columns' not in self.__dict__: self._init_collections() self._populate_column_collection() return self._columns.as_immutable()
def __init__(self, keyword, *selects, **kwargs): self._auto_correlate = kwargs.pop('correlate', False) self.keyword = keyword self.selects = [] numcols = None # some DBs do not like ORDER BY in the inner queries of a UNION, etc. for n, s in enumerate(selects): s = _clause_element_as_expr(s) if not numcols: numcols = len(s.c) elif len(s.c) != numcols: raise exc.ArgumentError('All selectables passed to ' 'CompoundSelect must have identical numbers of ' 'columns; select #%d has %d columns, select ' '#%d has %d' % (1, len(self.selects[0].c), n + 1, len(s.c))) self.selects.append(s.self_group(self)) SelectBase.__init__(self, **kwargs)
def distinct(self, *expr): """Return a new select() construct which will apply DISTINCT to its columns clause. :param \*expr: optional column expressions. When present, the Postgresql dialect will render a ``DISTINCT ON (<expressions>>)`` construct. """ if expr: expr = [_literal_as_text(e) for e in expr] if isinstance(self._distinct, list): self._distinct = self._distinct + expr else: self._distinct = expr else: self._distinct = True
def append_column(self, column): """append the given column expression to the columns clause of this select() construct. This is an **in-place** mutation method; the :meth:`~.Select.column` method is preferred, as it provides standard :term:`method chaining`. """ self._reset_exported() column = _interpret_as_column_or_from(column) if isinstance(column, ScalarSelect): column = column.self_group(against=operators.comma_op) self._raw_columns = self._raw_columns + [column]
def find_author(ain): async with engine.acquire() as conn: author = model.Author.__table__ if ain.get('first_name'): where = and_(author.c.first_name == ain[ 'first_name'], author.c.last_name == ain['last_name']) else: where = and_( author.c.last_name == ain['last_name'], author.c.first_name == None) res = await conn.execute(select([author.c.id, author.c.first_name, author.c.last_name]).where(where)) a = await res.fetchone() if a: ao = {'id': a[0], 'last_name': a[2]} if a[1]: ao['first_name'] = a[1] return ao
def find_genre(gnr): async with engine.acquire() as conn: genre = model.Genre.__table__ async def fg(name): res = await conn.execute(select([genre.c.id, genre.c.name]).where(func.lower(genre.c.name) == name.lower())) g = await res.fetchone() if g: return {'id':g[0], 'name':g[1]} ng = await fg(gnr['name']) if not ng: name = await find_synonym(gnr['name'], model.Synonym.GENRE) if name: ng = await fg(name) return ng
def get_ebooks_ids_for_object(object_name, id): async with engine.acquire() as conn: if object_name.lower() == 'author': q = select([model.ebook_authors.c.ebook_id]).where(model.ebook_authors.c.author_id == id) elif object_name.lower() == 'series': ebook = model.Ebook.__table__ q = select([ebook.c.id]).where(ebook.c.series_id == id) elif object_name.lower() == 'bookshelf': bookshelf_item = model.BookshelfItem.__table__ q = select([bookshelf_item.c.ebook_id]).where(and_(bookshelf_item.c.ebook_id != None, bookshelf_item.c.bookshelf_id == id)).distinct() else: raise ValueError('Invalid object_name') res = await conn.execute(q) res = await res.fetchall() return list(map(lambda x: x[0], res))
def get_conversion_candidate(ebook_id, to_format): to_format_id = await get_format_id(to_format) async with engine.acquire() as conn: source = model.Source.__table__ format = model.Format.__table__ res = await conn.execute(select([source.c.id, format.c.extension]).where(and_(source.c.ebook_id == ebook_id, source.c.format_id == to_format_id, source.c.format_id == format.c.id))\ .order_by(nullslast(desc(source.c.quality)))) res = await res.first() if res: return res.as_tuple() #TODO: Consider optimal selection of the source # in previous version we first selected format (from available convertable in ebook) # and then one with best quality - so actually the other way around q=select([source.c.id, format.c.extension])\ .where(and_(source.c.format_id == format.c.id, source.c.ebook_id == ebook_id)).order_by(nullslast(desc(source.c.quality))) async for row in conn.execute(q): if row.extension in settings.CONVERTABLE_TYPES: return row.id, row.extension return None, None
def get_last_updated(self, source_id): try: db = await self.db table = self._get_table() result = await db.execute( table.select().where( table.c.source_id == source_id ).order_by( table.c.updated.desc() ).limit(1)) item = await result.first() tstamp = item["updated"] if item else None return tstamp except Exception as exc: logger.error("[DB] Error when querying for last updated item on {}".format(source_id)) logger.exception(exc) return None
def get_control(self, updated=None): try: db = await self.db table = self._get_control_table() sql = table.select().where(table.c.type == "control") if updated: # check for updated timestamp sql = sql.where(table.c.updated != updated) sql = sql.limit(1) result = await db.execute(sql) item = await result.first() if item: item = dict(item) item["data"] = json.loads(item["data"]) if item.get("data") != "" else {} return item except Exception as exc: logger.error("[DB] Error when querying for a control data on {}".format(self.control_table_name)) logger.error(exc) return False
def test_upgrade_ip_policy_cidr_inside(self): self.connection.execute( self.subnets.insert(), dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111")) dt = datetime.datetime(1970, 1, 1) self.connection.execute( self.ip_policy_cidrs.insert(), dict(id="222", created_at=dt, ip_policy_id="111", cidr="192.168.10.0/32")) alembic_command.upgrade(self.config, '2748e48cee3a') results = self.connection.execute( select([self.ip_policy_cidrs])).fetchall() self.assertEqual(len(results), 1) result = results[0] self.assertEqual(result["id"], "222") self.assertEqual(result["created_at"], dt) self.assertEqual(result["ip_policy_id"], "111") self.assertEqual(result["cidr"], "192.168.10.0/32")
def test_upgrade_ip_policy_cidr_overlaps(self): self.connection.execute( self.subnets.insert(), dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111")) self.connection.execute( self.ip_policy_cidrs.insert(), dict(id="222", created_at=datetime.date(1970, 1, 1), ip_policy_id="111", cidr="192.168.10.0/16")) with mock.patch("oslo_utils.uuidutils") as uuid, \ mock.patch("oslo_utils.timeutils") as tu: tu.utcnow.return_value = datetime.datetime(2004, 2, 14) uuid.generate_uuid.return_value = "foo" alembic_command.upgrade(self.config, '2748e48cee3a') results = self.connection.execute( select([self.ip_policy_cidrs])).fetchall() self.assertEqual(len(results), 1) result = results[0] self.assertEqual(result["id"], uuid.generate_uuid.return_value) self.assertEqual(result["created_at"], tu.utcnow.return_value) self.assertEqual(result["ip_policy_id"], "111") self.assertEqual(result["cidr"], "192.168.10.0/24")
def test_upgrade_ip_policy_cidr_overlaps_v6(self): self.connection.execute( self.subnets.insert(), dict(id="000", _cidr="fd00::/8", ip_policy_id="111")) self.connection.execute( self.ip_policy_cidrs.insert(), dict(id="222", created_at=datetime.date(1970, 1, 1), ip_policy_id="111", cidr="fd00::/7")) with mock.patch("oslo_utils.uuidutils") as uuid, \ mock.patch("oslo_utils.timeutils") as tu: tu.utcnow.return_value = datetime.datetime(2004, 2, 14) uuid.generate_uuid.return_value = "foo" alembic_command.upgrade(self.config, '2748e48cee3a') results = self.connection.execute( select([self.ip_policy_cidrs])).fetchall() self.assertEqual(len(results), 1) result = results[0] self.assertEqual(result["id"], uuid.generate_uuid.return_value) self.assertEqual(result["created_at"], tu.utcnow.return_value) self.assertEqual(result["ip_policy_id"], "111") self.assertEqual(result["cidr"], "fd00::/8")
def test_upgrade_with_subnets_default_ip_policy_cidrs(self): self.connection.execute( self.subnets.insert(), dict(id="000", _cidr="192.168.10.0/24", ip_policy_id="111")) dt = datetime.datetime(1970, 1, 1) self.connection.execute( self.ip_policy_cidrs.insert(), dict(id="222", created_at=dt, ip_policy_id="111", cidr="192.168.10.0/32"), dict(id="223", created_at=dt, ip_policy_id="111", cidr="192.168.10.255/32")) alembic_command.upgrade(self.config, '45a07fac3d38') results = self.connection.execute( select([self.ip_policy_cidrs])).fetchall() self.assertEqual(len(results), 2) default_cidrs = ["192.168.10.0/32", "192.168.10.255/32"] self.assertIn(results[0]["cidr"], default_cidrs) self.assertIn(results[1]["cidr"], default_cidrs) self.assertTrue(results[0]["id"] == "222" or results[0]["id"] == "223") self.assertTrue(results[1]["id"] == "222" or results[1]["id"] == "223") self.assertEqual(results[0]["created_at"], dt) self.assertEqual(results[1]["created_at"], dt)
def test_upgrade_bulk(self): self.connection.execute( self.ip_policy.insert(), dict(id="1", size=None), dict(id="2", size=None)) self.connection.execute( self.ip_policy_cidrs.insert(), dict(id="2", ip_policy_id="1", cidr="192.168.10.13/32"), dict(id="3", ip_policy_id="1", cidr="192.168.10.16/31"), dict(id="4", ip_policy_id="2", cidr="fd00::/64")) alembic_command.upgrade(self.config, '28e55acaf366') results = self.connection.execute(select([ self.ip_policy])).fetchall() self.assertEqual(len(results), 2) for result in results: self.assertIn(result["id"], ("1", "2")) if result["id"] == "1": self.assertEqual(result["size"], 3) elif result["id"] == "2": self.assertEqual(result["size"], 2 ** 64)
def test_upgrade_bulk(self): netv4 = netaddr.IPNetwork("192.168.10.13/31") netv6 = netaddr.IPNetwork("fd00::/64") self.connection.execute( self.ip_policy_cidrs.insert(), dict(id="1", ip_policy_id="1", cidr=str(netv4)), dict(id="2", ip_policy_id="2", cidr=str(netv6))) alembic_command.upgrade(self.config, '1664300cb03a') results = self.connection.execute(select([ self.ip_policy_cidrs])).fetchall() self.assertEqual(len(results), 2) for result in results: self.assertIn(result["cidr"], (str(netv4), str(netv6))) if result["cidr"] == "192.168.10.13/31": self.assertEqual(result["first_ip"], netv4.ipv6().first) self.assertEqual(result["last_ip"], netv4.ipv6().last) else: self.assertEqual(result["first_ip"], netv6.first) self.assertEqual(result["last_ip"], netv6.last)
def _TODO_visit_compound_select(self, select): """Need to determine how to get ``LIMIT``/``OFFSET`` into a ``UNION`` for Oracle. """ pass
def limit_clause(self, select, **kw): return ""
def read(uid): s = select([Lamadb.indicator]).where(Lamadb.indicator.c._uid == uid) result = Lamadb.execute(s) if result.rowcount != 1: print("Error read indicator DAO") return None row = result.fetchone() ms = IndicatorDAO.make_from_row(row) return ms
def find_by_module_uid(module_uid): s = select([Lamadb.indicator])\ .where(Lamadb.indicator.c._module_uid == module_uid) result = Lamadb.execute(s) ms_tab = [] for row in result: ms_tab.append(IndicatorDAO.make_from_row(row)) return ms_tab
def read(uid): s = select([Lamadb.module_status])\ .where(Lamadb.module_status.c._uid == uid) result = Lamadb.execute(s) if result.rowcount != 1: print("Error read module Status DAO") return None row = result.fetchone() ms = ModuleStatusDAO.make_from_row(row) return ms
def find_by_malware_uid(malware_uid): s = select([Lamadb.module_status])\ .where(Lamadb.module_status.c._malware_uid == malware_uid) result = Lamadb.execute(s) ms_tab = [] for row in result: ms_tab.append(ModuleStatusDAO.make_from_row(row)) return ms_tab
def read(uid): s = select([Lamadb.analysis]).where(Lamadb.analysis.c._uid == uid) result = Lamadb.execute(s) if result.rowcount != 1: print("Error read analysis DAO") return None row = result.fetchone() analysis = AnalysisDAO.make_from_row(row) analysis._malwares = MalwareDAO.find_by_analysis_uid(analysis.uid) return analysis
def read(uid): s = select([Lamadb.malware]).where(Lamadb.malware.c._uid == uid) result = Lamadb.execute(s) if result.rowcount != 1: print("Error read malware DAO") return None row = result.fetchone() malware = MalwareDAO.make_from_row(row) return malware