我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用IPython.embed()。
def clean_dupes(get_annos, repr_issues=False): annos = get_annos() seen = set() dupes = [a.id for a in annos if a.id in seen or seen.add(a.id)] preunduped = [a for a in annos if a.id in dupes] for id_ in dupes: print('=====================') anns = sorted((a for a in annos if a.id == id_), key=lambda a: a.updated) if not repr_issues: [print(a.updated, HypothesisHelper(a, annos)) for a in anns] for a in anns[:-1]: # all but latest annos.remove(a) deduped = [a for a in annos if a.id in dupes] assert len(preunduped) // len(dupes) == 2, 'Somehow you have managed to get more than 1 duplicate!' # get_annos.memoize_annos(annos) embed()
def test(epoch): Net.eval() preditions=[] for iteration,(inputss,labelss) in enumerate(testing_data_loader,1): #embed() inputss=Variable(inputss.view(-1,3,training_size,training_size)) if cuda: inputss=inputss.cuda() #embed() if model=='resnet101': prediction=Net.module.resnet101(inputss).cpu().data.numpy() elif model=='inception_v3': prediction=Net.module.inception_v3(inputss).cpu().data.numpy() elif model=='inception_v4': prediction=Net.module.inception_v4(inputss).cpu().data.numpy() prediction=prediction.mean(0).argmax() preditions.append(str(prediction)) #print 'video num: ',iteration,' predition: ',str(prediction) with open('/S2/MI/zqj/video_classification/data/ucf101/tmp_result/{}result_{}_new_epoch'.format(save_prefix,model)+str(epoch)+'.txt','w')as fw: fw.write('\n'.join(preditions)) str_out='python compute_test_result.py {}result_{}_new_epoch'.format(save_prefix,model)+str(epoch)+'.txt' os.system(str_out)
def handle(self, **options): print('... starting jawaf shell ...') waf = Jawaf(settings.PROJECT_NAME) # Use IPython if it exists try: import IPython IPython.embed() return except ImportError: pass # Use bypython if it exists try: import bpython bpython.embed() return except ImportError: pass # Ok, just do the pumpkin spice python shell. import code code.interact(local=locals())
def test_ipython_embed(): """test that `IPython.embed()` works""" with NamedFileInTemporaryDirectory('file_with_embed.py') as f: f.write(_sample_embed) f.flush() f.close() # otherwise msft won't be able to read the file # run `python file_with_embed.py` cmd = [sys.executable, f.name] env = os.environ.copy() env['IPY_TEST_SIMPLE_PROMPT'] = '1' p = subprocess.Popen(cmd, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate(_exit) std = out.decode('UTF-8') nt.assert_equal(p.returncode, 0) nt.assert_in('3 . 14', std) if os.name != 'nt': # TODO: Fix up our different stdout references, see issue gh-14 nt.assert_in('IPython', std) nt.assert_in('bye!', std)
def launch_ipython(): print "" print "===================================================================" print "" print " Welcome to the motion editor! " print "" print "To quit, click on the 'Quit' button in the GUI." print "" print "From this shell, available function calls include:" print "" print " plot_trajectories()" print "" print "===================================================================" IPython.embed() root.quit() time.sleep(0.2) os._exit(0) # dirty but avoids hangs from IPython's atexit callback
def __call__(self, args): if args.ds is None: print("Could not load file.") sys.exit() import yt.mods import yt import IPython local_ns = yt.mods.__dict__.copy() local_ns['ds'] = args.ds local_ns['pf'] = args.ds local_ns['yt'] = yt try: from traitlets.config.loader import Config except ImportError: from IPython.config.loader import Config import sys cfg = Config() # prepend sys.path with current working directory sys.path.insert(0,'') IPython.embed(config=cfg,user_ns=local_ns)
def go_interpreter(): actors = _world.get_system_entities(game.TurnOrderSystem) # noinspection PyUnusedLocal def print_actors(): import functools def concat(x, y): return x + "\n" + y print(str(functools.reduce(concat, map(str, actors)))) import IPython IPython.embed() # Activation
def get_layers(self, name, next_layer=False, last_layer=False, type=None): if type is None: name2layer = {layer.name: layer for layer in self.model.layers} else: name2layer = {} for layer in self.model.layers: for t in type: if t.lower() in layer.name.lower(): name2layer[layer.name] = layer break # name2layer = {layer.name: layer for layer in self.model.layers if type.lower() in layer.name.lower()} def _get_layer(name): return name2layer[name] nodes = self.graph.get_nodes(name, next_layer, last_layer, type=type) if not isinstance(nodes, list): nodes = [nodes] ''' for node in nodes: if node.name not in name2layer: embed() ''' return map(_get_layer, [node.name for node in nodes])
def _start_repl(api): # type: (Iota) -> None """ Starts the REPL. """ _banner = ( 'IOTA API client for {uri} ({testnet}) initialized as variable `api`.\n' 'Type `help(api)` for list of API commands.'.format( testnet = 'testnet' if api.testnet else 'mainnet', uri = api.adapter.get_uri(), ) ) try: # noinspection PyUnresolvedReferences import IPython except ImportError: # IPython not available; use regular Python REPL. from code import InteractiveConsole InteractiveConsole(locals={'api': api}).interact(_banner) else: # Launch IPython REPL. IPython.embed(header=_banner)
def invBN(self, arr, Y_name): if isinstance(arr, int) or len(self.bns) == 0 or len(self.affines) == 0: return arr interstellar = Y_name.split('_')[0] for i in self.bottom_names[interstellar]: if i in self.bns and 'branch2c' in i: bn = i break for i in self.affines: if self.layer_bottom(i) == bn: affine = i break if 1: print('inverted bn', bn, affine, Y_name) mean, std, k, b = self.getBNaff(bn, affine) # (y - mean) / std * k + b #return (arr - b) * std / k + mean return arr * std / k #embed()
def YYT(Y, n_components=None, DEBUG=False): """ Param: Y: n x d n_components: use 'mle' to guess Returns: P: d x d' QT: d' x d """ newdata = Y.copy() model = PCA(n_components=n_components) if len(newdata.shape) != 2: newdata = newdata.reshape((newdata.shape[0], -1)) #TODO center data model.fit(newdata) if DEBUG: from IPython import embed; embed() return model.components_.T, model.components_ #def GSVD(Z, Y): # NotImplementedError # return [U,V,X,C,S]
def shell(): import IPython from flask.globals import _app_ctx_stack app = _app_ctx_stack.top.app banner = 'Python %s on %s\nIPython: %s\nApp: %s%s\nInstance: %s\n' % ( sys.version, sys.platform, IPython.__version__, app.import_name, app.debug and ' [debug]' or '', app.instance_path, ) ctx = {} startup = os.environ.get('PYTHONSTARTUP') if startup and os.path.isfile(startup): with open(startup, 'rb') as f: eval(compile(f.read(), startup, 'exec'), ctx) ctx.update(app.make_shell_context()) IPython.embed(banner1=banner, user_ns=ctx)
def main(_): print('tf version', tf.__version__) topics, answers, num_topics = read_assistments_data(DATA_LOC) full_data = load_data(topics, answers, num_topics) model = DKTModel(num_topics, HIDDEN_SIZE, MAX_LENGTH) with tf.Session() as session: session.run(tf.global_variables_initializer()) #We need to explicitly initialize local variables to use #TensorFlow's AUC function for some reason... session.run(tf.local_variables_initializer()) train_model(model, session, full_data) #model1, model2 = train_paired_models(session, full_data, num_topics) #test_paired_models(session, full_data, model1, model2) #embed()
def main(_): print "Testing actor" topics, answers, masks, seq_lens, rewards = fake_sequences(20000, 3) actor = Actor(3, HIDDEN_SIZE, MAX_LENGTH) #embed() with tf.Session() as session: session.run(tf.global_variables_initializer()) #print topics for i in range(200): s,e = 100*i, 100*(i+1) obj = actor.train_on_batch(session, rewards[s:e], seq_lens[s:e], masks[s:e], answers[s:e], topics[s:e]) print obj actions = actor.test_on_batch(session, rewards[s:e], seq_lens[s:e], masks[s:e], answers[s:e], topics[s:e]) actionsArray = np.array(actions[0]) zerosByTime = np.sum(actionsArray == 0, axis=0) onesByTime = np.sum(actionsArray == 1, axis=0) twosByTime = np.sum(actionsArray == 2, axis=0) avgZeroPos = np.sum(np.arange(50) * zerosByTime[:50]) / np.sum(zerosByTime[:50]) avgOnePos = np.sum(np.arange(50) * onesByTime[:50]) / np.sum(onesByTime[:50]) avgTwoPos = np.sum(np.arange(50) * twosByTime[:50]) / np.sum(twosByTime[:50]) print avgZeroPos, avgOnePos, avgTwoPos embed() #print actions
def simulator_main(sim_class, sim_control_class=TangoTestDeviceServerBase): """Main function for a simulator with class sim_class sim_class is a tango.server.Device subclass """ run_ipython = '--ipython' in sys.argv if run_ipython: import IPython sys.argv.remove('--ipython') def start_ipython(sim_class): IPython.embed() t = threading.Thread(target=start_ipython, args=(sim_class,)) t.setDaemon(True) t.start() logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(module)s - ' '%(pathname)s : %(lineno)d - %(message)s', level=logging.INFO) classes = [sim_class] if sim_control_class: classes.append(sim_control_class) server_run(classes)
def parse_game_row(game_row): 'NYM - N. Syndergaard (4-5, 3.05)' home_cell = game_row.find('td', {'class': 'shsNamD shsProbHome'}) away_cell = game_row.find('td', {'class': 'shsNamD shsProbAway'}) home_match = pitcher_re.search(home_cell.text) away_match = pitcher_re.search(away_cell.text) if not home_match or not away_match: print 'Problem with RE matching!' import IPython IPython.embed() home_groups = home_match.groupdict() away_groups = away_match.groupdict() home_team = fix_team(home_groups['team']) away_team = fix_team(away_groups['team']) # Return teams, projected starters, @ if away, and opponents team_rows = [[home_team, home_groups['starter'], None, away_team], [away_team, away_groups['starter'], '@', home_team]] return team_rows
def load_stats_tables_from_history_page(url): """Load all the prediction tables from a Numberfire history page""" soup = getSoupFromURL(url) #salary = load_player_salary_table(soup) projection_months = ['%s-schedule' % month for month in ['March', 'April', 'May', 'June', 'July', 'August', 'September', 'October']] month_tables = [] for month in projection_months: month_schedule = soup.find('div', attrs={'id': month}) month_table = load_player_history_table(month_schedule) if month_table is not None: month_tables.append(month_table) if month_tables: all_predictions = pandas.concat(month_tables) all_predictions.sort_index(inplace=True) if all_predictions.index.duplicated().any(): print 'Duplicate games scraped!' import IPython IPython.embed() else: all_predictions = None return all_predictions
def parse_nba_player_list(page_html): import IPython # This... doesn't really work. Leaving it to maybe-update in the future. soup = BeautifulSoup(page_html) player_rows = soup.findAll('tr', {'class': 'vs-repeat-repeated-element'}) print len(player_rows) for i, row in enumerate(player_rows): print i player_pos = row.find('td', {'class': 'player-position'}).text.strip() player_name = row.find('span', {'class': 'player-first-name'}).text.strip() + ' ' + row.find('span', {'class': 'player-last-name'}).text.strip() flag_span = row.find('player') injured = row.findAll('abbr', {'class': 'player-badge'}) if injured: injury_status = injured[0].attr('data-injury-status') else: injury_status = 'OK' print player_pos, player_name, injury_status IPython.embed()
def __init__(self, anno, annos): super().__init__(anno, annos) if self._done_loading: if self._done_all: print('WARNING you ether have a duplicate annotation or your annotations are not sorted by updated.') self._fetch_xmls(os.path.expanduser('~/ni/scibot/scibot_rrid_xml.pickle')) #print(HypothesisHelper(anno, annos)) #embed() #raise BaseException('WHY ARE YOU GETTING CALLED MULTIPLE TIMES?') #self._do_papers()
def main(): from desc.prof import profile_me # clean updated annos #clean_dupes(get_annos, repr_issues=True) #return # fetching annos = get_annos() #_annos = annos #annos = [a for a in annos if a.updated > '2017-10-15'] # loading #@profile_me #def load(): #for a in annos: #rrcu(a, annos) #load() #rc = list(rrcu.objects.values()) rc = [rrcu(a, annos) for a in annos] # id all the things #from joblib import Parallel, delayed #id_annos = [] #for purl in rrcu._papers: #resp = idPaper(purl) #id_annos.append(resp) #id_annos = Parallel(n_jobs=5)(delayed(idPaper)(url) #for url in sorted(rrcu._papers)) #embed() #return # sanity checks #print('repr everything') #_ = [repr(r) for r in rc] # exorcise the spirits (this is the slow bit, joblib breaks...) try: stats = sanity_and_stats(rc, annos) except AssertionError as e: print(e) embed()
def get_confusion(gts,pres): confusion_mat=np.zeros([101,101]) len_items=len(gts) for i in range(len_items): confusion_mat[gts[i]][pres[i]]+=1 for i in range(101): confusion_mat[i,:]/=sum(confusion_mat[i,:]) #embed() return confusion_mat
def forward(self,x): #minibatch_size,seq_len,feature_dic=x.size() #embed() out,hidden_n=self.lstm(x,self.hidden_data) #embed() return self.fc(hidden_n[0][1,:,:])
def transform_rgb_test(self,img,train_size): mother_img=img # do not rescale in the testing process mother_w,mother_h=mother_img.size crop_ix=np.zeros([5,4],dtype=np.int16) w_indices=(0,mother_w-train_size) h_indices=(0,mother_h-train_size) w_center=(mother_w-train_size)/2 h_center=(mother_h-train_size)/2 crop_ix[4,:]=[w_center,h_center,train_size+w_center,train_size+h_center] cnt=0 for i in w_indices: for j in h_indices: crop_ix[cnt,:]=[i,j,i+train_size,j+train_size] cnt+=1 crop_ix=np.tile(crop_ix,(2,1)) img_return=np.zeros([10,3,train_size,train_size]) for i in range(10): cp=crop_ix[i] #embed() img_return[i]=np.array(mother_img.crop(cp),dtype=np.float32).transpose([2,0,1]) # transform w*h*channel to channel*w*h img_return[5:,:,:,:]=img_return[5:,:,:,::-1] #flipping img_return[:,:,:,:]=img_return[:,::-1,:,:] #transform the RGB to BGR type img_return[:,0,:,:]-=104 img_return[:,1,:,:]-=116 img_return[:,2,:,:]-=122 #embed() return img_return
def printSolution( r2proj ): # 0=offset, 1=size, 2=name symb_variables = getSymbolicMemoryRegions( r2proj ) content = ''' # print soltion if we found a path if len(pg.found) > 0: state_found = pg.found[0].state print "found the target!" ''' for variable in symb_variables: tmp = ''' concrete_memory = state_found.memory.load({0}, {1}) # {2} print state_found.se.any_str(concrete_memory)'''.format(hex(variable[0]), variable[1], variable[2]) content += tmp if len(symb_variables) == 0: # -> check for static mode content += "IPython.embed()" content += ''' else: print "start IPython shell" print "Variables: state_found, start_state, pg, proj" IPython.embed() ''' return content
def evaluate(dataset): tf.reset_default_graph() config = tf.ConfigProto(allow_soft_placement=True) with tf.Session(config=config) as s: model, _ = model_fn(s, restore_only=True) df = ev(s, model, dataset) print((df['predictions'] == df['labels']).mean()) import IPython IPython.embed()
def exit_raise(self, parameter_s=''): """%exit_raise Make the current embedded kernel exit and raise and exception. This function sets an internal flag so that an embedded IPython will raise a `IPython.terminal.embed.KillEmbeded` Exception on exit, and then exit the current I. This is useful to permanently exit a loop that create IPython embed instance. """ self.shell.should_raise = True self.shell.ask_exit()
def main(): if '-v' in sys.argv: loglevel = logging.DEBUG else: loglevel = logging.INFO data_dir = bqueryd.DEFAULT_DATA_DIR for arg in sys.argv: if arg.startswith('--data_dir='): data_dir = arg[11:] if 'controller' in sys.argv: bqueryd.ControllerNode(redis_url=redis_url, loglevel=loglevel).go() elif 'worker' in sys.argv: bqueryd.WorkerNode(redis_url=redis_url, loglevel=loglevel, data_dir=data_dir).go() elif 'downloader' in sys.argv: bqueryd.DownloaderNode(redis_url=redis_url, loglevel=loglevel).go() elif 'movebcolz' in sys.argv: bqueryd.MoveBcolzNode(redis_url=redis_url, loglevel=loglevel).go() else: if len(sys.argv) > 1 and sys.argv[1].startswith('tcp:'): rpc = bqueryd.RPC(address=sys.argv[1], redis_url=redis_url, loglevel=loglevel) else: rpc = bqueryd.RPC(redis_url=redis_url, loglevel=loglevel) import IPython IPython.embed()
def _aggregate_batch(data_holder, use_list=False): size = len(data_holder[0]) result = [] for k in range(size): if use_list: result.append( [x[k] for x in data_holder]) else: dt = data_holder[0][k] if type(dt) in [int, bool]: tp = 'int32' elif type(dt) == float: tp = 'float32' else: try: tp = dt.dtype except Exception: raise TypeError("Unsupported type to batch: {}" .format(type(dt))) try: result.append( np.asarray([x[k] for x in data_holder], dtype=tp)) except KeyboardInterrupt: raise except Exception: logger.exception("Cannot batch data. Perhaps they are of " "inconsistent shape?") import IPython as IP IP.embed(config=IP .terminal # @UndefinedVariable .ipapp.load_default_config()) return result
def start_shell(opts): seqrepo_dir = os.path.join(opts.root_directory, opts.instance_name) sr = SeqRepo(seqrepo_dir) import IPython IPython.embed(header="\n".join([ "seqrepo (https://github.com/biocommons/biocommons.seqrepo/)", "version: " + __version__, "instance path: " + seqrepo_dir ]))
def shell(): """Run a Python shell in the app context.""" try: import IPython except ImportError: IPython = None if IPython is not None: IPython.embed(banner1='', user_ns=current_app.make_shell_context()) else: import code code.interact(banner='', local=current_app.make_shell_context())
def insert_ipython(num_up=1): """ Placed inside a function, this will insert an IPython interpreter at that current location. This will enabled detailed inspection of the current execution environment, as well as (optional) modification of that environment. *num_up* refers to how many frames of the stack get stripped off, and defaults to 1 so that this function itself is stripped off. """ import IPython from IPython.terminal.embed import InteractiveShellEmbed try: from traitlets.config.loader import Config except ImportError: from IPython.config.loader import Config frame = inspect.stack()[num_up] loc = frame[0].f_locals.copy() glo = frame[0].f_globals dd = dict(fname = frame[3], filename = frame[1], lineno = frame[2]) cfg = Config() cfg.InteractiveShellEmbed.local_ns = loc cfg.InteractiveShellEmbed.global_ns = glo IPython.embed(config=cfg, banner2 = __header % dd) ipshell = InteractiveShellEmbed(config=cfg) del ipshell # # Our progress bar types and how to get one #
def embed(): vars = globals() vars.update(locals()) shell = code.InteractiveConsole(vars) shell.interact()
def run(self, line): ishellCompleter = readline.get_completer() embed() readline.set_completer(ishellCompleter)
def REPL(): # collect all variables outside this scope local = {} # set stack context to 0 to avoid the slow loading of source file for sinfo in inspect.stack(0): local.update(sinfo[0].f_globals) local.update(sinfo[0].f_locals) code.interact(local=local)
def shell(metadir, accept_metadir, controller, ctrlopt, modelsetup, modelopt, backend, local, verbosity ): handle_common_options(verbosity) ys = handle_connection_options(metadir, accept_metadir, controller, ctrlopt, modelsetup, modelopt, backend, local) assert ys import IPython IPython.embed()
def shell(): import IPython IPython.embed()
def run(self): matches = self.db.match(self.args.selectors) if len(matches) == 0: logger.warning("No results") return 10 # Just for convenience meta = self.meta files = self.files db = self.db ipy.embed() return 0
def parse_and_save(): en = spacy.load('en') reader = WikiReader(wikidump) records = reader.records() def section_texts_flat(records): while 1: try: record = next(records) except OSError as e: print('error: %s' % e) else: for section in record['sections']: yield section['text'] pipe = en.pipe(section_texts_flat(records), n_threads=cpu_count(), batch_size=1000) # pipe = (en(txt) for txt in section_texts_flat(records)) preproc = Preprocessor(en.vocab) with FilePoolWriter(wikidoc_dir, wikidoc_fn_template) as f: for i, doc in enumerate(tqdm.tqdm(pipe)): if len(doc._py_tokens) <= 7: # short sentences -- nah continue for sent in doc.sents: packed = preproc.pack(sent) f.write(packed) if i % 10000 == 0: print('i=%s, saving vocab' % i) save_vocab(en.vocab) save_vocab(en.vocab) import IPython IPython.embed()
def load_frozen(self, DEBUG=False, feats_dict=None, points_dict=None): if feats_dict is not None: print("loading imgs from memory") self._feats_dict = feats_dict self._points_dict = points_dict return if cfgs.layer: def subfile(filename): return osp.join(self._frozen_layer, filename) with open(subfile(self._points_dict_name), 'rb') as f: self._points_dict = pickle.load(f) convs = self.type2names() self._feats_dict = dict() for conv in convs: filename = subfile(conv) if osp.exists(filename): with open(filename, 'rb') as f: self._feats_dict[conv] = pickle.load(f) else: frozen = self._frozen print("loading imgs from", frozen) with open(frozen, 'rb') as f: self._feats_dict, self._points_dict = pickle.load(f) if DEBUG: convs = self.type2names() feats_dict = self.extract_features(convs, points_dict=self._points_dict, save=1) print("feats_dict", feats_dict) print("self._feats_dict", self._feats_dict) embed() for i in feats_dict: for x, y in zip(np.nditer(self._feats_dict[i]), np.nditer(feats_dict[i])): assert x == y OK("frozen ") print("loaded")
def python_shell(options): logger = setup_logger("Robot", debug=options.verbose) def conn_callback(*args): sys.stdout.write(".") sys.stdout.flush() return True if options.shell == "ipython": import IPython else: import importlib sys.path.append(os.path.abspath("")) module_name, entrance_name = options.shell.rsplit(".", 1) module_instance = importlib.import_module(module_name) entrance = module_instance.__getattribute__(entrance_name) robot, device = connect_robot_helper(options.target, options.clientkey) if options.shell == "ipython": logger.info("----> READY") logger.info(""" * Hint: Try 'robot?' and 'dir(robot)' to get more informations)\n""") IPython.embed() return 0 else: return entrance(robot, device)
def ipython_shell(user_ns,banner) : IPython.embed(banner1=banner,user_ns=user_ns)
def debug_mode(self): test_pol_grad = PolicyGradient(net_dims=[2,2,2,2]) IPython.embed()
def solve_kkt(U_Q, d, G, A, U_S, rx, rs, rz, ry, dbg=False): """ Solve KKT equations for the affine step""" nineq, nz, neq, _ = get_sizes(G, A) invQ_rx = torch.potrs(rx.view(-1, 1), U_Q).view(-1) if neq > 0: h = torch.cat([torch.mv(A, invQ_rx) - ry, torch.mv(G, invQ_rx) + rs / d - rz], 0) else: h = torch.mv(G, invQ_rx) + rs / d - rz w = -torch.potrs(h.view(-1, 1), U_S).view(-1) g1 = -rx - torch.mv(G.t(), w[neq:]) if neq > 0: g1 -= torch.mv(A.t(), w[:neq]) g2 = -rs - w[neq:] dx = torch.potrs(g1.view(-1, 1), U_Q).view(-1) ds = g2 / d dz = w[neq:] dy = w[:neq] if neq > 0 else None # if np.all(np.array([x.norm() for x in [rx, rs, rz, ry]]) != 0): if dbg: import IPython import sys IPython.embed() sys.exit(-1) # if rs.norm() > 0: import IPython, sys; IPython.embed(); sys.exit(-1) return dx, ds, dz, dy
def _browse_db(self, name, enabled): tasks = [] class Do(): def __init__(self): pass def __call__(self): rwe = self IPython.embed() a = ActionListTask([PythonInteractiveAction(Do())], [], [], name) tasks.append(a) return tasks
def console(): banner = """ [Sea Console]: the following vars are included: `app` (the current app) """ ctx = {'app': current_app} try: from IPython import embed h, kwargs = embed, dict(banner1=banner, user_ns=ctx) except ImportError: import code h, kwargs = code.interact, dict(banner=banner, local=ctx) h(**kwargs) return 0
def _default_embed_callback(tensor, var): logger.info('embed for {}, access by tensor and var'.format(tensor.name)) from IPython import embed embed()