我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pymongo.errors.ConfigurationError()。
def database(function): def _connection(self, *args, **kwargs): try: if not self.db: connection = pymongo.MongoClient(config.mongodb_url(), tz_aware=True) self.db = connection[config.mongodb_database()] ensureindex(self.db) return function(self, *args, **kwargs) except KeyError as e: raise e except VolumeTypeNotFoundException as e: raise e except NotImplementedError as e: raise e except ConfigurationError as e: logging.exception("DB Connection, make sure username and password doesn't contain the following :+&/ " "character") raise e except Exception as e: logging.exception(e) raise e return _connection
def split_hosts(hosts, default_port=DEFAULT_PORT): """Takes a string of the form host1[:port],host2[:port]... and splits it into (host, port) tuples. If [:port] isn't present the default_port is used. Returns a set of 2-tuples containing the host name (or IP) followed by port number. :Parameters: - `hosts`: A string of the form host1[:port],host2[:port],... - `default_port`: The port number to use when one wasn't specified for a host. """ nodes = [] for entity in hosts.split(','): if not entity: raise ConfigurationError("Empty host " "(or extra comma in host list).") port = default_port # Unix socket entities don't have ports if entity.endswith('.sock'): port = None nodes.append(parse_host(entity, port)) return nodes
def _build_credentials_tuple(mech, source, user, passwd, extra): """Build and return a mechanism specific credentials tuple. """ user = _unicode(user) if mech == 'GSSAPI': properties = extra.get('authmechanismproperties', {}) service_name = properties.get('SERVICE_NAME', 'mongodb') props = GSSAPIProperties(service_name=service_name) # No password, source is always $external. return MongoCredential(mech, '$external', user, None, props) elif mech == 'MONGODB-X509': return MongoCredential(mech, '$external', user, None, None) else: if passwd is None: raise ConfigurationError("A password is required.") return MongoCredential(mech, source, user, _unicode(passwd), None)
def get_validated_options(options, warn=True): """Validate each entry in options and raise a warning if it is not valid. Returns a copy of options with invalid entries removed """ validated_options = {} for opt, value in iteritems(options): lower = opt.lower() try: validator = URI_VALIDATORS.get(lower, raise_config_error) value = validator(opt, value) except (ValueError, ConfigurationError) as exc: if warn: warnings.warn(str(exc)) else: raise else: validated_options[lower] = value return validated_options
def validate_positive_float(option, value): """Validates that 'value' is a float, or can be converted to one, and is positive. """ err = ConfigurationError("%s must be a positive int or float" % (option,)) try: value = float(value) except (ValueError, TypeError): raise err # float('inf') doesn't work in 2.4 or 2.5 on Windows, so just cap floats at # one billion - this is a reasonable approximation for infinity if not 0 < value < 1e9: raise err return value
def validate_tag_sets(dummy, value): """Validate tag sets for a ReplicaSetConnection. """ if value is None: return [{}] if not isinstance(value, list): raise ConfigurationError(( "Tag sets %s invalid, must be a list") % repr(value)) if len(value) == 0: raise ConfigurationError(( "Tag sets %s invalid, must be None or contain at least one set of" " tags") % repr(value)) for tags in value: if not isinstance(tags, dict): raise ConfigurationError( "Tag set %s invalid, must be a dict" % repr(tags)) return value
def _build_credentials_tuple(mech, source, user, passwd, extra): """Build and return a mechanism specific credentials tuple. """ user = _unicode(user) password = passwd if passwd is None else _unicode(passwd) if mech == 'GSSAPI': properties = extra.get('authmechanismproperties', {}) service_name = properties.get('SERVICE_NAME', 'mongodb') canonicalize = properties.get('CANONICALIZE_HOST_NAME', False) service_realm = properties.get('SERVICE_REALM') props = GSSAPIProperties(service_name=service_name, canonicalize_host_name=canonicalize, service_realm=service_realm) # Source is always $external. return MongoCredential(mech, '$external', user, password, props) elif mech == 'MONGODB-X509': return MongoCredential(mech, '$external', user, None, None) else: if passwd is None: raise ConfigurationError("A password is required.") return MongoCredential(mech, source, user, password, None)
def _build_credentials_tuple(mech, source, user, passwd, extra): """Build and return a mechanism specific credentials tuple. """ user = _unicode(user) if user is not None else None password = passwd if passwd is None else _unicode(passwd) if mech == 'GSSAPI': properties = extra.get('authmechanismproperties', {}) service_name = properties.get('SERVICE_NAME', 'mongodb') canonicalize = properties.get('CANONICALIZE_HOST_NAME', False) service_realm = properties.get('SERVICE_REALM') props = GSSAPIProperties(service_name=service_name, canonicalize_host_name=canonicalize, service_realm=service_realm) # Source is always $external. return MongoCredential(mech, '$external', user, password, props) elif mech == 'MONGODB-X509': # user can be None. return MongoCredential(mech, '$external', user, None, None) else: if passwd is None: raise ConfigurationError("A password is required.") return MongoCredential(mech, source, user, password, None)
def __new__(cls, strict_number_long=False, datetime_representation=DatetimeRepresentation.LEGACY, strict_uuid=False, *args, **kwargs): kwargs["tz_aware"] = kwargs.get("tz_aware", True) if kwargs["tz_aware"]: kwargs["tzinfo"] = kwargs.get("tzinfo", utc) if datetime_representation not in (DatetimeRepresentation.LEGACY, DatetimeRepresentation.NUMBERLONG, DatetimeRepresentation.ISO8601): raise ConfigurationError( "JSONOptions.datetime_representation must be one of LEGACY," "NUMBERLONG, or ISO8601 from DatetimeRepresentation.") self = super(JSONOptions, cls).__new__(cls, *args, **kwargs) if not _HAS_OBJECT_PAIRS_HOOK and self.document_class != dict: raise ConfigurationError( "Support for JSONOptions.document_class on Python 2.6 " "requires simplejson " "(https://pypi.python.org/pypi/simplejson) to be installed.") self.strict_number_long = strict_number_long self.datetime_representation = datetime_representation self.strict_uuid = strict_uuid return self
def __init__(self, mode, tag_sets=None): if mode == _PRIMARY and tag_sets is not None: raise ConfigurationError("Read preference primary " "cannot be combined with tags") self.__mongos_mode = _MONGOS_MODES[mode] self.__mode = mode self.__tag_sets = _validate_tag_sets(tag_sets)
def make_read_preference(mode, tag_sets): if mode == _PRIMARY: if tag_sets not in (None, [{}]): raise ConfigurationError("Read preference primary " "cannot be combined with tags") return Primary() return _ALL_READ_PREFERENCES[mode](tag_sets)
def __init__(self, w=None, wtimeout=None, j=None, fsync=None): self.__document = {} self.__acknowledged = True if wtimeout is not None: if not isinstance(wtimeout, integer_types): raise TypeError("wtimeout must be an integer") self.__document["wtimeout"] = wtimeout if j is not None: if not isinstance(j, bool): raise TypeError("j must be True or False") self.__document["j"] = j if fsync is not None: if not isinstance(fsync, bool): raise TypeError("fsync must be True or False") if j and fsync: raise ConfigurationError("Can't set both j " "and fsync at the same time") self.__document["fsync"] = fsync if self.__document and w == 0: raise ConfigurationError("Can not use w value " "of 0 with other options") if w is not None: if isinstance(w, integer_types): self.__acknowledged = w > 0 elif not isinstance(w, string_type): raise TypeError("w must be an integer or string") self.__document["w"] = w
def get_ssl_context(*args): """Create and return an SSLContext object.""" certfile, keyfile, ca_certs, cert_reqs = args # Note PROTOCOL_SSLv23 is about the most misleading name imaginable. # This configures the server and client to negotiate the # highest protocol version they both support. A very good thing. ctx = SSLContext(ssl.PROTOCOL_SSLv23) if hasattr(ctx, "options"): # Explicitly disable SSLv2 and SSLv3. Note that up to # date versions of MongoDB 2.4 and above already do this, # python disables SSLv2 by default in >= 2.7.7 and >= 3.3.4 # and SSLv3 in >= 3.4.3. There is no way for us to do this # explicitly for python 2.6 or 2.7 before 2.7.9. ctx.options |= getattr(ssl, "OP_NO_SSLv2", 0) ctx.options |= getattr(ssl, "OP_NO_SSLv3", 0) if certfile is not None: ctx.load_cert_chain(certfile, keyfile) if ca_certs is not None: ctx.load_verify_locations(ca_certs) elif cert_reqs != ssl.CERT_NONE: # CPython >= 2.7.9 or >= 3.4.0, pypy >= 2.5.1 if hasattr(ctx, "load_default_certs"): ctx.load_default_certs() # Python >= 3.2.0, useless on Windows. elif (sys.platform != "win32" and hasattr(ctx, "set_default_verify_paths")): ctx.set_default_verify_paths() elif sys.platform == "win32" and HAVE_WINCERTSTORE: with _WINCERTSLOCK: if _WINCERTS is None: _load_wincerts() ctx.load_verify_locations(_WINCERTS.name) elif HAVE_CERTIFI: ctx.load_verify_locations(certifi.where()) else: raise ConfigurationError( "`ssl_cert_reqs` is not ssl.CERT_NONE and no system " "CA certificates could be loaded. `ssl_ca_certs` is " "required.") ctx.verify_mode = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs return ctx
def get_ssl_context(*dummy): """No ssl module, raise ConfigurationError.""" raise ConfigurationError("The ssl module is not available.")
def check_compatible(self): """Raise ConfigurationError if any server is incompatible. A server is incompatible if its wire protocol version range does not overlap with PyMongo's. """ if self._incompatible_err: raise ConfigurationError(self._incompatible_err)
def count(self, filter=None, **kwargs): """Get the number of documents in this collection. All optional count parameters should be passed as keyword arguments to this method. Valid options include: - `hint` (string or list of tuples): The index to use. Specify either the index name as a string or the index specification as a list of tuples (e.g. [('a', pymongo.ASCENDING), ('b', pymongo.ASCENDING)]). - `limit` (int): The maximum number of documents to count. - `skip` (int): The number of matching documents to skip before returning results. - `maxTimeMS` (int): The maximum amount of time to allow the count command to run, in milliseconds. The :meth:`count` method obeys the :attr:`read_preference` of this :class:`Collection`. :Parameters: - `filter` (optional): A query document that selects which documents to count in the collection. - `**kwargs` (optional): See list of options above. """ cmd = SON([("count", self.__name)]) if filter is not None: if "query" in kwargs: raise ConfigurationError("can't pass both filter and query") kwargs["query"] = filter if "hint" in kwargs and not isinstance(kwargs["hint"], string_type): kwargs["hint"] = helpers._index_document(kwargs["hint"]) cmd.update(kwargs) return self._count(cmd)
def distinct(self, key, filter=None, **kwargs): """Get a list of distinct values for `key` among all documents in this collection. Raises :class:`TypeError` if `key` is not an instance of :class:`basestring` (:class:`str` in python 3). All optional distinct parameters should be passed as keyword arguments to this method. Valid options include: - `maxTimeMS` (int): The maximum amount of time to allow the count command to run, in milliseconds. The :meth:`distinct` method obeys the :attr:`read_preference` of this :class:`Collection`. :Parameters: - `key`: name of the field for which we want to get the distinct values - `filter` (optional): A query document that specifies the documents from which to retrieve the distinct values. - `**kwargs` (optional): See list of options above. """ if not isinstance(key, string_type): raise TypeError("key must be an " "instance of %s" % (string_type.__name__,)) cmd = SON([("distinct", self.__name), ("key", key)]) if filter is not None: if "query" in kwargs: raise ConfigurationError("can't pass both filter and query") kwargs["query"] = filter cmd.update(kwargs) with self._socket_for_reads() as (sock_info, slave_ok): return self._command(sock_info, cmd, slave_ok, read_concern=self.read_concern)["values"]
def raise_config_error(key, dummy): """Raise ConfigurationError with the given key name.""" raise ConfigurationError("Unknown option %s" % (key,)) # Mapping of URI uuid representation options to valid subtypes.
def validate_timeout_or_zero(option, value): """Validates a timeout specified in milliseconds returning a value in floating point seconds for the case where None is an error and 0 is valid. Setting the timeout to nothing in the URI string is a config error. """ if value is None: raise ConfigurationError("%s cannot be None" % (option, )) if value == 0 or value == "0": return 0 return validate_positive_float(option, value) / 1000.0
def validate_auth_option(option, value): """Validate optional authentication parameters. """ lower, value = validate(option, value) if lower not in _AUTH_OPTIONS: raise ConfigurationError('Unknown ' 'authentication option: %s' % (option,)) return lower, value
def authenticate(credentials, sock_info, cmd_func): """Authenticate sock_info. """ mechanism = credentials[0] if mechanism == 'GSSAPI': if not HAVE_KERBEROS: raise ConfigurationError('The "kerberos" module must be ' 'installed to use GSSAPI authentication.') auth_func = _AUTH_MAP.get(mechanism) auth_func(credentials[1:], sock_info, cmd_func)
def validate_boolean(option, value): """Validates that 'value' is 'true' or 'false'. """ if isinstance(value, bool): return value elif isinstance(value, basestring): if value not in ('true', 'false'): raise ConfigurationError("The value of %s must be " "'true' or 'false'" % (option,)) return value == 'true' raise TypeError("Wrong type for %s, value must be a boolean" % (option,))
def validate_positive_integer(option, value): """Validate that 'value' is a positive integer. """ val = validate_integer(option, value) if val < 0: raise ConfigurationError("The value of %s must be " "a positive integer" % (option,)) return val
def validate_cert_reqs(option, value): """Validate the cert reqs are valid. It must be None or one of the three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``""" if value is None: return value if HAS_SSL: if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED): return value raise ConfigurationError("The value of %s must be one of: " "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or " "`ssl.CERT_REQUIRED" % (option,)) else: raise ConfigurationError("The value of %s is set but can't be " "validated. The ssl module is not available" % (option,))
def validate_read_preference(dummy, value): """Validate read preference for a ReplicaSetConnection. """ if value in read_preferences.modes: return value # Also allow string form of enum for uri_parser try: return read_preferences.mongos_enum(value) except ValueError: raise ConfigurationError("Not a valid read preference")
def validate_uuid_representation(dummy, value): """Validate the uuid representation option selected in the URI. """ if value not in _UUID_SUBTYPES.keys(): raise ConfigurationError("%s is an invalid UUID representation. " "Must be one of " "%s" % (value, _UUID_SUBTYPES.keys())) return _UUID_SUBTYPES[value]
def validate_uuid_subtype(dummy, value): """Validate the uuid subtype option, a numerical value whose acceptable values are defined in bson.binary.""" if value not in _UUID_SUBTYPES.values(): raise ConfigurationError("Not a valid setting for uuid_subtype.") return value # jounal is an alias for j, # wtimeoutms is an alias for wtimeout, # readpreferencetags is an alias for tag_sets.