我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用os.path.find()。
def test_object_path_ns_list(self): root = self.XML(xml_str) path = objectify.ObjectPath( ['{objectified}root', 'c1', 'c2'] ) self.assertEqual(root.c1.c2.text, path.find(root).text) path = objectify.ObjectPath( ['{objectified}root', '{objectified}c1', 'c2'] ) self.assertEqual(root.c1.c2.text, path.find(root).text) path = objectify.ObjectPath( ['root', '{objectified}c1', '{objectified}c2'] ) self.assertEqual(root.c1.c2.text, path.find(root).text) path = objectify.ObjectPath( ['root', '{objectified}c1', '{objectified}c2[2]'] ) self.assertEqual(root.c1.c2[2].text, path.find(root).text) path = objectify.ObjectPath( ['root', 'c1', '{objectified}c2'] ) self.assertEqual(root.c1.c2.text, path.find(root).text) path = objectify.ObjectPath( ['root', 'c1', '{objectified}c2[2]'] ) self.assertEqual(root.c1.c2[2].text, path.find(root).text) path = objectify.ObjectPath( ['root', 'c1', '{otherNS}c2'] ) self.assertEqual(getattr(root.c1, '{otherNS}c2').text, path.find(root).text)
def test_object_path_set_create_list(self): root = self.XML(xml_str) path = objectify.ObjectPath( "root.c1.c99" ) self.assertRaises(AttributeError, path.find, root) new_el = self.Element("{objectified}test") new_el.a = ["TEST1", "TEST2"] new_el.a[0].set("myattr", "ATTR1") new_el.a[1].set("myattr", "ATTR2") path.setattr(root, list(new_el.a)) self.assertEqual(2, len(root.c1.c99)) self.assertEqual("ATTR1", root.c1.c99[0].get("myattr")) self.assertEqual("TEST1", root.c1.c99[0].text) self.assertEqual("ATTR2", root.c1.c99[1].get("myattr")) self.assertEqual("TEST2", root.c1.c99[1].text) self.assertEqual("TEST1", path(root).text)
def completions(): """ Search completions. :return None: """ row, col = env.cursor if env.var('a:findstart', True): count = 0 for char in reversed(env.current.line[:col]): if not re.match(r'[\w\d]', char): break count += 1 env.debug('Complete find start', (col - count)) return env.stop(col - count) base = env.var('a:base') source, offset = env.get_offset_params((row, col), base) proposals = get_proporsals(source, offset, base) return env.stop(proposals)
def execute(self): cf = self.fm.thisfile path = cf.relative_path.replace("%", "%%") if path.find('.') != 0 and path.rfind('.') != -1 and not cf.is_directory: self.fm.open_console('rename ' + path, position=(7 + path.rfind('.'))) else: self.fm.open_console('rename ' + path)
def test_object_path(self): root = self.XML(xml_str) path = objectify.ObjectPath( "root.c1.c2" ) self.assertEqual(root.c1.c2.text, path.find(root).text) self.assertEqual(root.c1.c2.text, path(root).text)
def test_object_path_list(self): root = self.XML(xml_str) path = objectify.ObjectPath( ['root', 'c1', 'c2'] ) self.assertEqual(root.c1.c2.text, path.find(root).text) self.assertEqual(root.c1.c2.text, path(root).text)
def test_object_path_ns(self): root = self.XML(xml_str) path = objectify.ObjectPath( "{objectified}root.c1.c2" ) self.assertEqual(root.c1.c2.text, path.find(root).text) path = objectify.ObjectPath( "{objectified}root.{objectified}c1.c2" ) self.assertEqual(root.c1.c2.text, path.find(root).text) path = objectify.ObjectPath( "root.{objectified}c1.{objectified}c2" ) self.assertEqual(root.c1.c2.text, path.find(root).text) path = objectify.ObjectPath( "root.c1.{objectified}c2" ) self.assertEqual(root.c1.c2.text, path.find(root).text) path = objectify.ObjectPath( "root.c1.{otherNS}c2" ) self.assertEqual(getattr(root.c1, '{otherNS}c2').text, path.find(root).text)
def test_object_path_set(self): root = self.XML(xml_str) path = objectify.ObjectPath( "root.c1.c2" ) self.assertEqual(root.c1.c2.text, path.find(root).text) self.assertEqual("1", root.c1.c2[1].text) new_value = "my new value" path.setattr(root, new_value) self.assertEqual(new_value, root.c1.c2.text) self.assertEqual(new_value, path(root).text) self.assertEqual("1", root.c1.c2[1].text)
def test_object_path_set_create(self): root = self.XML(xml_str) path = objectify.ObjectPath( "root.c1.c99" ) self.assertRaises(AttributeError, path.find, root) new_value = "my new value" path.setattr(root, new_value) self.assertEqual(1, len(root.c1.c99)) self.assertEqual(new_value, root.c1.c99.text) self.assertEqual(new_value, path(root).text)
def test_object_path_set_create_element(self): root = self.XML(xml_str) path = objectify.ObjectPath( "root.c1.c99" ) self.assertRaises(AttributeError, path.find, root) new_el = self.Element("{objectified}test") etree.SubElement(new_el, "{objectified}sub", myattr="ATTR").a = "TEST" path.setattr(root, new_el.sub) self.assertEqual(1, len(root.c1.c99)) self.assertEqual("ATTR", root.c1.c99.get("myattr")) self.assertEqual("TEST", root.c1.c99.a.text) self.assertEqual("TEST", path(root).a.text)
def test_object_path_addattr_create(self): root = self.XML(xml_str) path = objectify.ObjectPath( "root.c1.c99" ) self.assertRaises(AttributeError, path.find, root) new_value = "my new value" path.addattr(root, new_value) self.assertEqual(1, len(root.c1.c99)) self.assertEqual(new_value, root.c1.c99.text) self.assertEqual(new_value, path(root).text)
def test_object_path_addattr_create_element(self): root = self.XML(xml_str) path = objectify.ObjectPath( "root.c1.c99" ) self.assertRaises(AttributeError, path.find, root) new_el = self.Element("{objectified}test") etree.SubElement(new_el, "{objectified}sub", myattr="ATTR").a = "TEST" path.addattr(root, new_el.sub) self.assertEqual(1, len(root.c1.c99)) self.assertEqual("TEST", root.c1.c99.a.text) self.assertEqual("TEST", path(root).a.text) self.assertEqual("ATTR", root.c1.c99.get("myattr"))
def os9PathConvert(path): """Attempt to convert a unix style path to a Mac OS9 style path. No support for relative paths! """ if path.find("/Volumes") == 0: # it's on the volumes list, some sort of external volume path = path[len("/Volumes")+1:] elif path[0] == "/": # a dir on the root volume path = path[1:] new = path.replace("/", ":") return new
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. Returns a tuple of lists of EC2 instance objects for the masters and slaves. """ print("Searching for existing cluster {c} in region {r}...".format( c=cluster_name, r=opts.region)) def get_instances(group_names): """ Get all non-terminated instances that belong to any of the provided security groups. EC2 reservation filters and instance states are documented here: http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options """ reservations = conn.get_all_reservations( filters={"instance.group-name": group_names}) instances = itertools.chain.from_iterable(r.instances for r in reservations) return [i for i in instances if i.state not in ["shutting-down", "terminated"]] master_instances = get_instances([cluster_name + "-master"]) slave_instances = get_instances([cluster_name + "-slaves"]) if any((master_instances, slave_instances)): print("Found {m} master{plural_m}, {s} slave{plural_s}.".format( m=len(master_instances), plural_m=('' if len(master_instances) == 1 else 's'), s=len(slave_instances), plural_s=('' if len(slave_instances) == 1 else 's'))) if not master_instances and die_on_error: print("ERROR: Could not find a master for cluster {c} in region {r}.".format( c=cluster_name, r=opts.region), file=sys.stderr) sys.exit(1) return (master_instances, slave_instances) # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster.
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. Returns a tuple of lists of EC2 instance objects for the masters and slaves. """ print("Searching for existing cluster {c} in region {r}...".format( c=cluster_name, r=opts.region)) def get_instances(group_names): """ Get all non-terminated instances that belong to any of the provided security groups. EC2 reservation filters and instance states are documented here: http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options """ reservations = conn.get_all_reservations( filters={"instance.group-name": group_names}) instances = itertools.chain.from_iterable(r.instances for r in reservations) return [i for i in instances if i.state not in ["shutting-down", "terminated"]] master_instances = get_instances([cluster_name + "-master"]) slave_instances = get_instances([cluster_name + "-slaves"]) if any((master_instances, slave_instances)): print("Found {m} master{plural_m}, {s} slave{plural_s}.".format( m=len(master_instances), plural_m=('' if len(master_instances) == 1 else 's'), s=len(slave_instances), plural_s=('' if len(slave_instances) == 1 else 's'))) if not master_instances and die_on_error: print("ERROR: Could not find a master for cluster {c} in region {r}.".format( c=cluster_name, r=opts.region), file=sys.stderr) sys.exit(1) return (master_instances, slave_instances)
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. Returns a tuple of lists of EC2 instance objects for the masters and slaves. """ print("Searching for existing cluster {c} in region {r}...".format( c=cluster_name, r=opts.region)) def get_instances(group_names): """ Get all non-terminated instances that belong to any of the provided security groups. EC2 reservation filters and instance states are documented here: http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options """ reservations = conn.get_all_reservations( filters={"instance.group-name": group_names}) instances = itertools.chain.from_iterable(r.instances for r in reservations) return [i for i in instances if i.state not in ["shutting-down", "terminated"]] master_instances = get_instances([cluster_name + "-master"]) slave_instances = get_instances([cluster_name + "-slaves"]) if any((master_instances, slave_instances)): print("Found {m} master{plural_m}, {s} slave{plural_s}.".format( m=len(master_instances), plural_m=('' if len(master_instances) == 1 else 's'), s=len(slave_instances), plural_s=('' if len(slave_instances) == 1 else 's'))) if not master_instances and die_on_error: print("ERROR: Could not find a master for cluster {c} in region {r}.".format( c=cluster_name, r=opts.region), file=sys.stderr) sys.exit(1) return (master_instances, slave_instances) # Execute a cmd on master and slave nodes
def update_python_path(paths): """ Update sys.path and make sure the new items come first. """ old_sys_path_items = list(sys.path) for path in paths: # see if it is a site dir if path.find('site-packages') != -1: site.addsitedir(path) else: sys.path.insert(0, path) # Reorder sys.path so new directories at the front. new_sys_path_items = set(sys.path) - set(old_sys_path_items) sys.path = list(new_sys_path_items) + old_sys_path_items
def execute(self): thisdir = self.fm.thisdir flags = self.flags pattern = self.pattern regex = self._build_regex() count = self._count(move=True) self.fm.thistab.last_search = regex self.fm.set_search_method(order="search") if (self.MARK in flags or self.UNMARK in flags) and thisdir.files: value = flags.find(self.MARK) > flags.find(self.UNMARK) if self.FILTER in flags: for f in thisdir.files: thisdir.mark_item(f, value) else: for f in thisdir.files: if regex.search(f.relative_path): thisdir.mark_item(f, value) if self.PERM_FILTER in flags: thisdir.filter = regex if pattern else None # clean up: self.cancel() if self.OPEN_ON_ENTER in flags or \ self.AUTO_OPEN in flags and count == 1: if os.path.exists(pattern): self.fm.cd(pattern) else: self.fm.move(right=1) if self.KEEP_OPEN in flags and thisdir != self.fm.thisdir: # reopen the console: if not pattern: self.fm.open_console(self.line) else: self.fm.open_console(self.line[0:-len(pattern)]) if self.quickly_executed and thisdir != self.fm.thisdir and pattern != "..": self.fm.block_input(0.5)
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): """ Get the EC2 instances in an existing cluster if available. Returns a tuple of lists of EC2 instance objects for the masters and stores. """ print("Searching for existing cluster {c} in region {r}...".format( c=cluster_name, r=opts.region)) def get_instances(group_names): """ Get all non-terminated instances that belong to any of the provided security groups. EC2 reservation filters and instance states are documented here: http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options """ reservations = conn.get_all_reservations( filters={"instance.group-name": group_names}) instances = itertools.chain.from_iterable(r.instances for r in reservations) return [i for i in instances if i.state not in ["shutting-down", "terminated"]] locator_instances = get_instances([cluster_name + "-locator"]) lead_instances = get_instances([cluster_name + "-lead"]) store_instances = get_instances([cluster_name + "-stores"]) zeppelin_instances = get_instances([cluster_name + "-zeppelin"]) if any((locator_instances, lead_instances, store_instances, zeppelin_instances)): print("Found {l} locator{plural_l}, {m} lead{plural_m}, {s} store{plural_s}, {z} zeppelin instance.".format( l=len(locator_instances), plural_l=('' if len(locator_instances) == 1 else 's'), m=len(lead_instances), plural_m=('' if len(lead_instances) == 1 else 's'), s=len(store_instances), plural_s=('' if len(store_instances) == 1 else 's'), z=len(zeppelin_instances))) if not locator_instances and die_on_error: print("ERROR: Could not find a locator for cluster {c} in region {r}.".format( c=cluster_name, r=opts.region), file=sys.stderr) sys.exit(1) return (locator_instances, lead_instances, store_instances, zeppelin_instances) # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster.