我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用ast.Break()。
def __init__(self, stmt, context): self.stmt = stmt self.context = context self.stmt_table = { ast.Expr: self.expr, ast.Pass: self.parse_pass, ast.AnnAssign: self.ann_assign, ast.Assign: self.assign, ast.If: self.parse_if, ast.Call: self.call, ast.Assert: self.parse_assert, ast.For: self.parse_for, ast.AugAssign: self.aug_assign, ast.Break: self.parse_break, ast.Return: self.parse_return, } stmt_type = self.stmt.__class__ if stmt_type in self.stmt_table: self.lll_node = self.stmt_table[stmt_type]() elif isinstance(stmt, ast.Name) and stmt.id == "throw": self.lll_node = LLLnode.from_list(['assert', 0], typ=None, pos=getpos(stmt)) else: raise StructureException("Unsupported statement type", stmt)
def _handle_ast_list(self, ast_list): """ Find unreachable nodes in the given sequence of ast nodes. """ for index, node in enumerate(ast_list): if isinstance(node, (ast.Break, ast.Continue, ast.Raise, ast.Return)): try: first_unreachable_node = ast_list[index + 1] except IndexError: continue class_name = node.__class__.__name__.lower() self._define( self.unreachable_code, class_name, first_unreachable_node, last_node=ast_list[-1], message="unreachable code after '{class_name}'".format( **locals()), confidence=100) return
def CONTINUE(self, node): # Walk the tree up until we see a loop (OK), a function or class # definition (not OK), for 'continue', a finally block (not OK), or # the top module scope (not OK) n = node while hasattr(n, 'parent'): n, n_child = n.parent, n if isinstance(n, LOOP_TYPES): # Doesn't apply unless it's in the loop itself if n_child not in n.orelse: return if isinstance(n, (ast.FunctionDef, ast.ClassDef)): break # Handle Try/TryFinally difference in Python < and >= 3.3 if hasattr(n, 'finalbody') and isinstance(node, ast.Continue): if n_child in n.finalbody: self.report(messages.ContinueInFinally, node) return if isinstance(node, ast.Continue): self.report(messages.ContinueOutsideLoop, node) else: # ast.Break self.report(messages.BreakOutsideLoop, node)
def get_coverable_nodes(cls): return { ast.Assert, ast.Assign, ast.AugAssign, ast.Break, ast.Continue, ast.Delete, ast.Expr, ast.Global, ast.Import, ast.ImportFrom, ast.Nonlocal, ast.Pass, ast.Raise, ast.Return, ast.FunctionDef, ast.ClassDef, ast.TryExcept, ast.TryFinally, ast.ExceptHandler, ast.If, ast.For, ast.While, }
def get_coverable_nodes(cls): return { ast.Assert, ast.Assign, ast.AugAssign, ast.Break, ast.Continue, ast.Delete, ast.Expr, ast.Global, ast.Import, ast.ImportFrom, ast.Nonlocal, ast.Pass, ast.Raise, ast.Return, ast.ClassDef, ast.FunctionDef, ast.Try, ast.ExceptHandler, ast.If, ast.For, ast.While, }
def _build_node_cfg(node): handlers = { ast.If: _build_if_cfg, ast.For: _build_loop_cfg, ast.While: _build_loop_cfg, ast.With: _build_with_cfg, ast.Break: _build_break_cfg, ast.Continue: _build_continue_cfg, ast.Return: _build_return_cfg, ast.Try: _build_try_cfg, } if type(node) in handlers: handler = handlers[type(node)] else: handler = _build_statement_cfg return handler(node)
def handle_Return(state, node, ctx, **_): state_update = dict(returns_ctr=state.returns_ctr + 1) new_nodes = [ ast.Assign( targets=[ast.Name(id=ctx.return_var, ctx=ast.Store())], value=node.value)] if state.loop_nesting_ctr > 0: new_nodes.append( ast.Assign( targets=[ast.Name(id=ctx.return_flag_var, ctx=ast.Store())], value=TRUE_NODE)) state_update.update(return_inside_a_loop=True, returns_in_loops=True) new_nodes.append(ast.Break()) return state.update(state_update), new_nodes
def _translate_body(self, body, types, allow_loose_in_edges=False, allow_loose_out_edges=False): cfg_factory = CFGFactory(self._id_gen) for child in body: if isinstance(child, (ast.AnnAssign, ast.Expr)): cfg_factory.add_stmts(self.visit(child, types)) elif isinstance(child, ast.If): cfg_factory.complete_basic_block() if_cfg = self.visit(child, types) cfg_factory.append_cfg(if_cfg) elif isinstance(child, ast.While): cfg_factory.complete_basic_block() while_cfg = self.visit(child, types) cfg_factory.append_cfg(while_cfg) elif isinstance(child, ast.Break): cfg_factory.complete_basic_block() break_cfg = self.visit(child, types) cfg_factory.append_cfg(break_cfg) elif isinstance(child, ast.Continue): cfg_factory.complete_basic_block() cont_cfg = self.visit(child, types) cfg_factory.append_cfg(cont_cfg) elif isinstance(child, ast.Pass): if cfg_factory.incomplete_block(): pass else: cfg_factory.append_cfg(_dummy_cfg(self._id_gen)) else: raise NotImplementedError(f"The statement {str(type(child))} is not yet translatable to CFG!") cfg_factory.complete_basic_block() if not allow_loose_in_edges and cfg_factory.cfg and cfg_factory.cfg.loose_in_edges: cfg_factory.prepend_cfg(_dummy_cfg(self._id_gen)) if not allow_loose_out_edges and cfg_factory.cfg and cfg_factory.cfg.loose_out_edges: cfg_factory.append_cfg(_dummy_cfg(self._id_gen)) return cfg_factory.cfg
def isStatement(a): """Determine whether the given node is a statement (vs an expression)""" return type(a) in [ ast.Module, ast.Interactive, ast.Expression, ast.Suite, ast.FunctionDef, ast.ClassDef, ast.Return, ast.Delete, ast.Assign, ast.AugAssign, ast.For, ast.While, ast.If, ast.With, ast.Raise, ast.Try, ast.Assert, ast.Import, ast.ImportFrom, ast.Global, ast.Expr, ast.Pass, ast.Break, ast.Continue ]
def _translate_body(self, body, allow_loose_in_edges=False, allow_loose_out_edges=False): cfg_factory = CfgFactory(self._id_gen) for child in body: if isinstance(child, (ast.Assign, ast.AugAssign, ast.Expr)): cfg_factory.add_stmts(self.visit(child)) elif isinstance(child, ast.If): cfg_factory.complete_basic_block() if_cfg = self.visit(child) cfg_factory.append_cfg(if_cfg) elif isinstance(child, ast.While): cfg_factory.complete_basic_block() while_cfg = self.visit(child) cfg_factory.append_cfg(while_cfg) elif isinstance(child, ast.Break): cfg_factory.complete_basic_block() break_cfg = self.visit(child) cfg_factory.append_cfg(break_cfg) elif isinstance(child, ast.Continue): cfg_factory.complete_basic_block() cont_cfg = self.visit(child) cfg_factory.append_cfg(cont_cfg) elif isinstance(child, ast.Pass): if cfg_factory.incomplete_block(): pass else: cfg_factory.append_cfg(_dummy_cfg(self._id_gen)) else: raise NotImplementedError(f"The statement {str(type(child))} is not yet translatable to CFG!") cfg_factory.complete_basic_block() if not allow_loose_in_edges and cfg_factory.cfg and cfg_factory.cfg.loose_in_edges: cfg_factory.prepend_cfg(_dummy_cfg(self._id_gen)) if not allow_loose_out_edges and cfg_factory.cfg and cfg_factory.cfg.loose_out_edges: cfg_factory.append_cfg(_dummy_cfg(self._id_gen)) return cfg_factory.cfg
def mutate_Continue(self, node): return ast.Break()
def one_iteration(self, node): node.body.append(ast.Break()) return node
def zero_iteration(self, node): node.body = [ast.Break()] return node
def get_nontrivial_nodes(self): # returns ids of nodes that can possibly raise an exception nodes = [] for node_id, node_obj in self.nodes.items(): node = node_obj.ast_node if type(node) not in (ast.Break, ast.Continue, ast.Pass, ast.Try): nodes.append(node_id) return nodes
def _build_cfg(statements): enter = id(statements[0]) exits = [enter] graph = Graph() jumps = Jumps() for i, node in enumerate(statements): cfg = _build_node_cfg(node) graph.update(cfg.graph) if i > 0: for exit_ in exits: graph.add_edge(exit_, cfg.enter) exits = cfg.exits jumps = jumps.join(cfg.jumps) if type(node) in (ast.Break, ast.Continue, ast.Return): # Issue a warning about unreachable code? break return ControlFlowSubgraph(graph, enter, exits=exits, jumps=jumps)
def _handle_loop(node, state, ctx, visit_after, visiting_after, walk_field, **_): if not visiting_after: # Need to traverse fields explicitly since for the purposes of _replace_returns(), # the body of `orelse` field is not inside a loop. state = state.update(loop_nesting_ctr=state.loop_nesting_ctr + 1) state, new_body = walk_field(state, node.body, block_context=True) state = state.update(loop_nesting_ctr=state.loop_nesting_ctr - 1) state, new_orelse = walk_field(state, node.orelse, block_context=True) visit_after() return state, replace_fields(node, body=new_body, orelse=new_orelse) else: # If there was a return inside a loop, append a conditional break # to propagate the return otside all nested loops if state.return_inside_a_loop: new_nodes = [ node, ast.If( test=ast.Name(id=ctx.return_flag_var), body=[ast.Break()], orelse=[])] else: new_nodes = node # if we are at root level, reset the return-inside-a-loop flag if state.loop_nesting_ctr == 0: state = state.update(return_inside_a_loop=False) return state, new_nodes
def handle_While(node, **_): last_node = node.body[-1] unconditional_jump = type(last_node) in (ast.Break, ast.Raise, ast.Return) if unconditional_jump and find_jumps(node.body) == 1: if type(last_node) == ast.Break: new_body = node.body[:-1] else: new_body = node.body return ast.If(test=node.test, body=new_body, orelse=node.orelse) else: return node