Python urllib2 模块,request() 实例源码

我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用urllib2.request()

项目:TorrentBro    作者:subins2000    | 项目源码 | 文件源码
def files(self):
        if not self._files:
            path = '/ajax_details_filelist.php'
            url = self.url.path(path).query_param('id', self.id)

            request = urllib.request.Request(
                url, headers={'User-Agent': "Magic Browser"})
            response = urllib.request.urlopen(request).read()

            root = html.document_fromstring(response)

            rows = root.findall('.//tr')

            if len(rows) == 1 and rows[0].find('td').get('colspan') == str(2):
                self._files = {}
            else:
                for row in rows:
                    name, size = [unicode(v.text_content())
                              for v in row.findall('.//td')]
                    self._files[name] = size.replace('\xa0', ' ')
        return self._files
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def run(self, suppress = False):
    """
    Blocks until our request is complete then provides the descriptors. If we
    haven't yet started our request then this does so.

    :param bool suppress: avoids raising exceptions if **True**

    :returns: list for the requested :class:`~stem.descriptor.__init__.Descriptor` instances

    :raises:
      Using the iterator can fail with the following if **suppress** is
      **False**...

        * **ValueError** if the descriptor contents is malformed
        * **socket.timeout** if our request timed out
        * **urllib2.URLError** for most request failures

      Note that the urllib2 module may fail with other exception types, in
      which case we'll pass it along.
    """

    return list(self._run(suppress))
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def _pick_url(self, use_authority = False):
    """
    Provides a url that can be queried. If we have multiple endpoints then one
    will be picked randomly.

    :param bool use_authority: ignores our endpoints and uses a directory
      authority instead

    :returns: **str** for the url being queried by this request
    """

    if use_authority or not self.endpoints:
      authority = random.choice(filter(HAS_V3IDENT, get_authorities().values()))
      address, dirport = authority.address, authority.dir_port
    else:
      address, dirport = random.choice(self.endpoints)

    return 'http://%s:%i/%s' % (address, dirport, self.resource.lstrip('/'))
项目:sequana    作者:sequana    | 项目源码 | 文件源码
def _get_data(url):
    """Helper function to get data over http or from a local file"""
    if url.startswith('http://'):
        # Try Python 2, use Python 3 on exception
        try:
            resp = urllib.urlopen(url)
            encoding = resp.headers.dict.get('content-encoding', 'plain')
        except AttributeError:
            resp = urllib.request.urlopen(url)
            encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            pass
        elif encoding == 'gzip':
            data = StringIO(data)
            data = gzip.GzipFile(fileobj=data).read()
        else:
            raise RuntimeError('unknown encoding')
    else:
        with open(url, 'r') as fid:
            data = fid.read()

    return data
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def run(self, suppress = False):
    """
    Blocks until our request is complete then provides the descriptors. If we
    haven't yet started our request then this does so.

    :param bool suppress: avoids raising exceptions if **True**

    :returns: list for the requested :class:`~stem.descriptor.__init__.Descriptor` instances

    :raises:
      Using the iterator can fail with the following if **suppress** is
      **False**...

        * **ValueError** if the descriptor contents is malformed
        * **socket.timeout** if our request timed out
        * **urllib2.URLError** for most request failures

      Note that the urllib2 module may fail with other exception types, in
      which case we'll pass it along.
    """

    return list(self._run(suppress))
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def _pick_url(self, use_authority = False):
    """
    Provides a url that can be queried. If we have multiple endpoints then one
    will be picked randomly.

    :param bool use_authority: ignores our endpoints and uses a directory
      authority instead

    :returns: **str** for the url being queried by this request
    """

    if use_authority or not self.endpoints:
      authority = random.choice(filter(HAS_V3IDENT, get_authorities().values()))
      address, dirport = authority.address, authority.dir_port
    else:
      address, dirport = random.choice(self.endpoints)

    return 'http://%s:%i/%s' % (address, dirport, self.resource.lstrip('/'))
项目:multi-diffusion    作者:chemical-diffusion    | 项目源码 | 文件源码
def _get_data(url):
    """Helper function to get data over http or from a local file"""
    if url.startswith('http://'):
        # Try Python 2, use Python 3 on exception
        try:
            resp = urllib.urlopen(url)
            encoding = resp.headers.dict.get('content-encoding', 'plain')
        except AttributeError:
            resp = urllib.request.urlopen(url)
            encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            pass
        elif encoding == 'gzip':
            data = StringIO(data)
            data = gzip.GzipFile(fileobj=data).read()
        else:
            raise RuntimeError('unknown encoding')
    else:
        with open(url, 'r') as fid:
            data = fid.read()

    return data
项目:dyfunconn    作者:makism    | 项目源码 | 文件源码
def _get_data(url):
    """Helper function to get data over http or from a local file"""
    if url.startswith('http://'):
        # Try Python 2, use Python 3 on exception
        try:
            resp = urllib.urlopen(url)
            encoding = resp.headers.dict.get('content-encoding', 'plain')
        except AttributeError:
            resp = urllib.request.urlopen(url)
            encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            pass
        elif encoding == 'gzip':
            data = StringIO(data)
            data = gzip.GzipFile(fileobj=data).read()
        else:
            raise RuntimeError('unknown encoding')
    else:
        with open(url, 'r') as fid:
            data = fid.read()
        fid.close()

    return data
项目:polylearn    作者:scikit-learn-contrib    | 项目源码 | 文件源码
def _get_data(url):
    """Helper function to get data over http or from a local file"""
    if url.startswith('http://'):
        # Try Python 2, use Python 3 on exception
        try:
            resp = urllib.urlopen(url)
            encoding = resp.headers.dict.get('content-encoding', 'plain')
        except AttributeError:
            resp = urllib.request.urlopen(url)
            encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            pass
        elif encoding == 'gzip':
            data = StringIO(data)
            data = gzip.GzipFile(fileobj=data).read()
        else:
            raise RuntimeError('unknown encoding')
    else:
        with open(url, 'r') as fid:
            data = fid.read()
        fid.close()

    return data
项目:python-http-client    作者:sendgrid    | 项目源码 | 文件源码
def _make_request(self, opener, request):
        """Make the API call and return the response. This is separated into
           it's own function, so we can mock it easily for testing.

        :param opener:
        :type opener:
        :param request: url payload to request
        :type request: urllib.Request object
        :return: urllib response
        """
        try:
            return opener.open(request)
        except HTTPError as err:
            exc = handle_error(err)
            exc.__cause__ = None
            raise exc
项目:python-http-client    作者:sendgrid    | 项目源码 | 文件源码
def make_request(self,
                     method,
                     request_body=None,
                     query_params=None,
                     request_headers=None):
        method = method.upper()
        if request_headers:
            self._set_headers(request_headers)
            request_body = json.dumps(request_body) if request_body else None
            query_params = query_params if query_params else None
            opener = urllib.build_opener()
            request = urllib.Request(self._build_url(query_params),
                                     data=request_body)
            for key, value in self.request_headers.iteritems():
                request.add_header(key, value)
            request.get_method = lambda: method
            self._response = opener.open(request)
            self._set_response(self._response)
            self._reset()
项目:spiderfoot    作者:ParrotSec    | 项目源码 | 文件源码
def run(self, suppress = False):
    """
    Blocks until our request is complete then provides the descriptors. If we
    haven't yet started our request then this does so.

    :param bool suppress: avoids raising exceptions if **True**

    :returns: list for the requested :class:`~stem.descriptor.__init__.Descriptor` instances

    :raises:
      Using the iterator can fail with the following if **suppress** is
      **False**...

        * **ValueError** if the descriptor contents is malformed
        * **socket.timeout** if our request timed out
        * **urllib2.URLError** for most request failures

      Note that the urllib2 module may fail with other exception types, in
      which case we'll pass it along.
    """

    return list(self._run(suppress))
项目:spiderfoot    作者:ParrotSec    | 项目源码 | 文件源码
def _pick_url(self, use_authority = False):
    """
    Provides a url that can be queried. If we have multiple endpoints then one
    will be picked randomly.

    :param bool use_authority: ignores our endpoints and uses a directory
      authority instead

    :returns: **str** for the url being queried by this request
    """

    if use_authority or not self.endpoints:
      authority = random.choice(filter(HAS_V3IDENT, get_authorities().values()))
      address, dirport = authority.address, authority.dir_port
    else:
      address, dirport = random.choice(self.endpoints)

    return 'http://%s:%i/%s' % (address, dirport, self.resource.lstrip('/'))
项目:Parallel-SGD    作者:angadgill    | 项目源码 | 文件源码
def _get_data(url):
    """Helper function to get data over http or from a local file"""
    if url.startswith('http://'):
        # Try Python 2, use Python 3 on exception
        try:
            resp = urllib.urlopen(url)
            encoding = resp.headers.dict.get('content-encoding', 'plain')
        except AttributeError:
            resp = urllib.request.urlopen(url)
            encoding = resp.headers.get('content-encoding', 'plain')
        data = resp.read()
        if encoding == 'plain':
            pass
        elif encoding == 'gzip':
            data = StringIO(data)
            data = gzip.GzipFile(fileobj=data).read()
        else:
            raise RuntimeError('unknown encoding')
    else:
        with open(url, 'r') as fid:
            data = fid.read()
        fid.close()

    return data
项目:bob.bio.base    作者:bioidiap    | 项目源码 | 文件源码
def atnt_database_directory():
  global atnt_downloaded_directory
  if atnt_downloaded_directory:
    return atnt_downloaded_directory

  if os.path.exists(atnt_default_directory):
    return atnt_default_directory

  import sys, tempfile
  if sys.version_info[0] <= 2:
    import urllib2 as urllib
  else:
    import urllib.request as urllib

  atnt_downloaded_directory = tempfile.mkdtemp(prefix='atnt_db_')
  db_url = "http://www.cl.cam.ac.uk/Research/DTG/attarchive/pub/data/att_faces.zip"
  logger.warn("Downloading the AT&T database from '%s' to '%s' ...", db_url, atnt_downloaded_directory)
  logger.warn("To avoid this, please download the database manually, extract the data and set the ATNT_DATABASE_DIRECTORY environment variable to this directory.")

  # to avoid re-downloading in parallel test execution
  os.environ['ATNT_DATABASE_DIRECTORY'] = atnt_downloaded_directory

  # download
  url = urllib.urlopen(db_url)
  local_zip_file = os.path.join(atnt_downloaded_directory, 'att_faces.zip')
  dfile = open(local_zip_file, 'wb')
  dfile.write(url.read())
  dfile.close()

  # unzip
  import zipfile
  zip = zipfile.ZipFile(local_zip_file)
  zip.extractall(atnt_downloaded_directory)
  os.remove(local_zip_file)

  return atnt_downloaded_directory
项目:TorrentBro    作者:subins2000    | 项目源码 | 文件源码
def items(self):
        """
        Request URL and parse response. Yield a ``Torrent`` for every torrent
        on page.
        """

        request = urllib.request.Request(
            self.url, headers={'User-Agent': "Magic Browser"})
        response = urllib.request.urlopen(request).read()

        root = html.document_fromstring(str(response))
        items = [self._build_torrent(row) for row in
                 self._get_torrent_rows(root)]
        for item in items:
            yield item
项目:TorrentBro    作者:subins2000    | 项目源码 | 文件源码
def info(self):
        if self._info is None:

            request = urllib.request.Request(
                self.url, headers={'User-Agent': "Magic Browser"})
            response = urllib.request.urlopen(request).read()

            root = html.document_fromstring(response)
            info = root.cssselect('#details .nfo pre')[0].text_content()
            self._info = info
        return self._info
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def get_server_descriptors(self, fingerprints = None, **query_args):
    """
    Provides the server descriptors with the given fingerprints. If no
    fingerprints are provided then this returns all descriptors in the present
    consensus.

    :param str,list fingerprints: fingerprint or list of fingerprints to be
      retrieved, gets all descriptors if **None**
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the server descriptors

    :raises: **ValueError** if we request more than 96 descriptors by their
      fingerprints (this is due to a limit on the url length by squid proxies).
    """

    resource = '/tor/server/all.z'

    if isinstance(fingerprints, str):
      fingerprints = [fingerprints]

    if fingerprints:
      if len(fingerprints) > MAX_FINGERPRINTS:
        raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)

      resource = '/tor/server/fp/%s.z' % '+'.join(fingerprints)

    return self.query(resource, **query_args)
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def get_extrainfo_descriptors(self, fingerprints = None, **query_args):
    """
    Provides the extrainfo descriptors with the given fingerprints. If no
    fingerprints are provided then this returns all descriptors in the present
    consensus.

    :param str,list fingerprints: fingerprint or list of fingerprints to be
      retrieved, gets all descriptors if **None**
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the extrainfo descriptors

    :raises: **ValueError** if we request more than 96 descriptors by their
      fingerprints (this is due to a limit on the url length by squid proxies).
    """

    resource = '/tor/extra/all.z'

    if isinstance(fingerprints, str):
      fingerprints = [fingerprints]

    if fingerprints:
      if len(fingerprints) > MAX_FINGERPRINTS:
        raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)

      resource = '/tor/extra/fp/%s.z' % '+'.join(fingerprints)

    return self.query(resource, **query_args)
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def get_microdescriptors(self, hashes, **query_args):
    """
    Provides the microdescriptors with the given hashes. To get these see the
    'microdescriptor_hashes' attribute of
    :class:`~stem.descriptor.router_status_entry.RouterStatusEntryV3`. Note
    that these are only provided via a microdescriptor consensus (such as
    'cached-microdesc-consensus' in your data directory).

    :param str,list hashes: microdescriptor hash or list of hashes to be
      retrieved
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the microdescriptors

    :raises: **ValueError** if we request more than 92 microdescriptors by their
      hashes (this is due to a limit on the url length by squid proxies).
    """

    if isinstance(hashes, str):
      hashes = [hashes]

    if len(hashes) > MAX_MICRODESCRIPTOR_HASHES:
      raise ValueError('Unable to request more than %i microdescriptors at a time by their hashes' % MAX_MICRODESCRIPTOR_HASHES)

    return self.query('/tor/micro/d/%s.z' % '-'.join(hashes), **query_args)
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def query(self, resource, **query_args):
    """
    Issues a request for the given resource.

    :param str resource: resource being fetched, such as '/tor/server/all.z'
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the descriptors

    :raises: **ValueError** if resource is clearly invalid or the descriptor
      type can't be determined when 'descriptor_type' is **None**
    """

    args = dict(self._default_args)
    args.update(query_args)

    if 'endpoints' not in args:
      args['endpoints'] = self._endpoints

    if 'fall_back_to_authority' not in args:
      args['fall_back_to_authority'] = True

    return Query(
      resource,
      **args
    )
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def get_server_descriptors(self, fingerprints = None, **query_args):
    """
    Provides the server descriptors with the given fingerprints. If no
    fingerprints are provided then this returns all descriptors in the present
    consensus.

    :param str,list fingerprints: fingerprint or list of fingerprints to be
      retrieved, gets all descriptors if **None**
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the server descriptors

    :raises: **ValueError** if we request more than 96 descriptors by their
      fingerprints (this is due to a limit on the url length by squid proxies).
    """

    resource = '/tor/server/all.z'

    if isinstance(fingerprints, str):
      fingerprints = [fingerprints]

    if fingerprints:
      if len(fingerprints) > MAX_FINGERPRINTS:
        raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)

      resource = '/tor/server/fp/%s.z' % '+'.join(fingerprints)

    return self.query(resource, **query_args)
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def get_extrainfo_descriptors(self, fingerprints = None, **query_args):
    """
    Provides the extrainfo descriptors with the given fingerprints. If no
    fingerprints are provided then this returns all descriptors in the present
    consensus.

    :param str,list fingerprints: fingerprint or list of fingerprints to be
      retrieved, gets all descriptors if **None**
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the extrainfo descriptors

    :raises: **ValueError** if we request more than 96 descriptors by their
      fingerprints (this is due to a limit on the url length by squid proxies).
    """

    resource = '/tor/extra/all.z'

    if isinstance(fingerprints, str):
      fingerprints = [fingerprints]

    if fingerprints:
      if len(fingerprints) > MAX_FINGERPRINTS:
        raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)

      resource = '/tor/extra/fp/%s.z' % '+'.join(fingerprints)

    return self.query(resource, **query_args)
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def get_microdescriptors(self, hashes, **query_args):
    """
    Provides the microdescriptors with the given hashes. To get these see the
    'microdescriptor_hashes' attribute of
    :class:`~stem.descriptor.router_status_entry.RouterStatusEntryV3`. Note
    that these are only provided via a microdescriptor consensus (such as
    'cached-microdesc-consensus' in your data directory).

    :param str,list hashes: microdescriptor hash or list of hashes to be
      retrieved
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the microdescriptors

    :raises: **ValueError** if we request more than 92 microdescriptors by their
      hashes (this is due to a limit on the url length by squid proxies).
    """

    if isinstance(hashes, str):
      hashes = [hashes]

    if len(hashes) > MAX_MICRODESCRIPTOR_HASHES:
      raise ValueError('Unable to request more than %i microdescriptors at a time by their hashes' % MAX_MICRODESCRIPTOR_HASHES)

    return self.query('/tor/micro/d/%s.z' % '-'.join(hashes), **query_args)
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def query(self, resource, **query_args):
    """
    Issues a request for the given resource.

    :param str resource: resource being fetched, such as '/tor/server/all.z'
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the descriptors

    :raises: **ValueError** if resource is clearly invalid or the descriptor
      type can't be determined when 'descriptor_type' is **None**
    """

    args = dict(self._default_args)
    args.update(query_args)

    if 'endpoints' not in args:
      args['endpoints'] = self._endpoints

    if 'fall_back_to_authority' not in args:
      args['fall_back_to_authority'] = True

    return Query(
      resource,
      **args
    )
项目:github-notifications    作者:unknownuser88    | 项目源码 | 文件源码
def api_request_native(url, data=None, token=None, https_proxy=None, method=None):
    request = urllib.Request(url)
    # print('API request url:', request.get_full_url())
    if method:
        request.get_method = lambda: method
    token = token if token != None else token_auth_string()
    request.add_header('Authorization', 'token ' + token)
    request.add_header('Accept', 'application/json')
    request.add_header('Content-Type', 'application/json')

    if data is not None:
        request.add_data(bytes(data.encode('utf8')))

    # print('API request data:', request.get_data())
    # print('API request header:', request.header_items())
    # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy')
    # if https_proxy:
    #     opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(),
    #                                  urllib.ProxyHandler({'https': https_proxy}))

    #     urllib.install_opener(opener)

    try:
        with contextlib.closing(urllib.urlopen(request)) as response:
            if response.code == 204:  # No Content
                return None
            else:
                return json.loads(response.read().decode('utf8', 'ignore'))

    except urllib.HTTPError as err:
        with contextlib.closing(err):
            raise SimpleHTTPError(err.code, err.read())
项目:github-notifications    作者:unknownuser88    | 项目源码 | 文件源码
def api_request_curl(url, data=None, token=None, https_proxy=None, method=None):
    command = ["curl", '-K', '-', url]
    token = token if token != None else token_auth_string()
    config = ['--header "Authorization: token ' + token + '"',
              '--header "Accept: application/json"',
              '--header "Content-Type: application/json"',
              "--silent"]

    if method:
        config.append('--request "%s"' % method)

    # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy')
    # if https_proxy:
    #     config.append(https_proxy)

    with named_tempfile() as header_output_file:
        config.append('--dump-header "%s"' % header_output_file.name)
        header_output_file.close()
        with named_tempfile() as data_file:
            if data is not None:
                data_file.write(bytes(data.encode('utf8')))
                data_file.close()
                config.append('--data-binary "@%s"' % data_file.name)

            process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            response, _ = process.communicate(bytes('\n'.join(config).encode('utf8')))
            returncode = process.returncode

            if returncode != 0:
                raise subprocess.CalledProcessError(returncode, 'curl')

            with open(header_output_file.name, "r") as headers:
                _, responsecode, message = headers.readline().split(None, 2)
                responsecode = int(responsecode)

                if responsecode == 204:  # No Content
                    return None
                elif 200 <= responsecode < 300 or responsecode == 100:  # Continue
                    return json.loads(response.decode('utf8', 'ignore'))
                else:
                    raise SimpleHTTPError(responsecode, response)
项目:aws-ec2rescue-linux    作者:awslabs    | 项目源码 | 文件源码
def test_s3upload_get_presigned_url_bad_region(self):
        """Test that the region parser provides a valid default region when an invalid region used."""
        import json
        responses.add(responses.POST, "https://30yinsv8k6.execute-api.us-east-1.amazonaws.com/prod/get-signed-url",
                      body="http://test/", status=200)

        ec2rlcore.s3upload.get_presigned_url("https://aws-support-uploader.s3.amazonaws.com/uploader?"
                                             "account-id=9999999999&case-id=99999999&expiration=1486577795&"
                                             "key=92e1ab350e7f5302551e0b05a89616381bb6c66"
                                             "9c9492d9acfbf63701e455ef6", "test", "in-valid-1")

        request = json.loads(responses.calls[0].request.body)
        self.assertEqual(len(responses.calls), 1)
        self.assertEqual("us-east-1", request["region"])
项目:aws-ec2rescue-linux    作者:awslabs    | 项目源码 | 文件源码
def test_s3upload_get_presigned_url_good_region(self):
        """Test that the region parser provides a valid region."""
        import json
        responses.add(responses.POST, "https://30yinsv8k6.execute-api.us-east-1.amazonaws.com/prod/get-signed-url",
                      body="http://test/", status=200)

        ec2rlcore.s3upload.get_presigned_url("https://aws-support-uploader.s3.amazonaws.com/uploader?"
                                             "account-id=9999999999&case-id=99999999&expiration=1486577795&"
                                             "key=92e1ab350e7f5302551e0b05a89616381bb6c66"
                                             "9c9492d9acfbf63701e455ef6", "test", "eu-west-1")

        request = json.loads(responses.calls[0].request.body)
        self.assertEqual(len(responses.calls), 1)
        self.assertEqual("eu-west-1", request["region"])
项目:python-http-client    作者:sendgrid    | 项目源码 | 文件源码
def _update_headers(self, request_headers):
        """Update the headers for the request

        :param request_headers: headers to set for the API call
        :type response: dictionary
        :return: dictionary
        """
        self.request_headers.update(request_headers)
项目:python-http-client    作者:sendgrid    | 项目源码 | 文件源码
def _make_request(self, opener, request):

        if 200 <= self.response_code < 299:   # if successsful code
            return MockResponse(self.response_code)
        else:
            raise handle_error(MockException(self.response_code))
项目:deb-python-jsonschema    作者:openstack    | 项目源码 | 文件源码
def fetch_or_load(spec_path):
    """
    Fetch a new specification or use the cache if it's current.

    :argument cache_path: the path to a cached specification

    """

    headers = {}

    try:
        modified = datetime.utcfromtimestamp(os.path.getmtime(spec_path))
        date = modified.strftime("%a, %d %b %Y %I:%M:%S UTC")
        headers["If-Modified-Since"] = date
    except OSError as error:
        if error.errno != errno.ENOENT:
            raise

    request = urllib.Request(VALIDATION_SPEC, headers=headers)
    response = urllib.urlopen(request)

    if response.code == 200:
        with open(spec_path, "w+b") as spec:
            spec.writelines(response)
            spec.seek(0)
            return html.parse(spec)

    with open(spec_path) as spec:
        return html.parse(spec)
项目:CKME136    作者:asterix135    | 项目源码 | 文件源码
def twitterreq(url, method, parameters):
    """
    Construct, sign and open a twitter request using credentials above
    :param url: request url
    :param method: POST or GET
    :param parameters: (irrelevant, for Posting)
    :return: Twitter response
    """
    req = oauth.Request.from_consumer_and_token(oauth_consumer,
                                                token=oauth_token,
                                                http_method=http_method,
                                                http_url=url,
                                                parameters=parameters)

    req.sign_request(signature_method_hmac_sha1, oauth_consumer, oauth_token)

    headers = req.to_header()

    if http_method == "POST":
        encoded_post_data = req.to_postdata()
    else:
        encoded_post_data = None
        url = req.to_url()

    opener = urllib.OpenerDirector()
    opener.add_handler(http_handler)
    opener.add_handler(https_handler)

    response = opener.open(url, encoded_post_data)

    return response
项目:spiderfoot    作者:ParrotSec    | 项目源码 | 文件源码
def get_server_descriptors(self, fingerprints = None, **query_args):
    """
    Provides the server descriptors with the given fingerprints. If no
    fingerprints are provided then this returns all descriptors in the present
    consensus.

    :param str,list fingerprints: fingerprint or list of fingerprints to be
      retrieved, gets all descriptors if **None**
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the server descriptors

    :raises: **ValueError** if we request more than 96 descriptors by their
      fingerprints (this is due to a limit on the url length by squid proxies).
    """

    resource = '/tor/server/all.z'

    if isinstance(fingerprints, str):
      fingerprints = [fingerprints]

    if fingerprints:
      if len(fingerprints) > MAX_FINGERPRINTS:
        raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)

      resource = '/tor/server/fp/%s.z' % '+'.join(fingerprints)

    return self.query(resource, **query_args)
项目:spiderfoot    作者:ParrotSec    | 项目源码 | 文件源码
def get_extrainfo_descriptors(self, fingerprints = None, **query_args):
    """
    Provides the extrainfo descriptors with the given fingerprints. If no
    fingerprints are provided then this returns all descriptors in the present
    consensus.

    :param str,list fingerprints: fingerprint or list of fingerprints to be
      retrieved, gets all descriptors if **None**
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the extrainfo descriptors

    :raises: **ValueError** if we request more than 96 descriptors by their
      fingerprints (this is due to a limit on the url length by squid proxies).
    """

    resource = '/tor/extra/all.z'

    if isinstance(fingerprints, str):
      fingerprints = [fingerprints]

    if fingerprints:
      if len(fingerprints) > MAX_FINGERPRINTS:
        raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)

      resource = '/tor/extra/fp/%s.z' % '+'.join(fingerprints)

    return self.query(resource, **query_args)
项目:spiderfoot    作者:ParrotSec    | 项目源码 | 文件源码
def get_microdescriptors(self, hashes, **query_args):
    """
    Provides the microdescriptors with the given hashes. To get these see the
    'microdescriptor_hashes' attribute of
    :class:`~stem.descriptor.router_status_entry.RouterStatusEntryV3`. Note
    that these are only provided via a microdescriptor consensus (such as
    'cached-microdesc-consensus' in your data directory).

    :param str,list hashes: microdescriptor hash or list of hashes to be
      retrieved
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the microdescriptors

    :raises: **ValueError** if we request more than 92 microdescriptors by their
      hashes (this is due to a limit on the url length by squid proxies).
    """

    if isinstance(hashes, str):
      hashes = [hashes]

    if len(hashes) > MAX_MICRODESCRIPTOR_HASHES:
      raise ValueError('Unable to request more than %i microdescriptors at a time by their hashes' % MAX_MICRODESCRIPTOR_HASHES)

    return self.query('/tor/micro/d/%s.z' % '-'.join(hashes), **query_args)
项目:spiderfoot    作者:ParrotSec    | 项目源码 | 文件源码
def query(self, resource, **query_args):
    """
    Issues a request for the given resource.

    :param str resource: resource being fetched, such as '/tor/server/all.z'
    :param query_args: additional arguments for the
      :class:`~stem.descriptor.remote.Query` constructor

    :returns: :class:`~stem.descriptor.remote.Query` for the descriptors

    :raises: **ValueError** if resource is clearly invalid or the descriptor
      type can't be determined when 'descriptor_type' is **None**
    """

    args = dict(self._default_args)
    args.update(query_args)

    if 'endpoints' not in args:
      args['endpoints'] = self._endpoints

    if 'fall_back_to_authority' not in args:
      args['fall_back_to_authority'] = True

    return Query(
      resource,
      **args
    )
项目:QueryableList    作者:kata198    | 项目源码 | 文件源码
def download_goodTests(GOODTESTS_URL=None):
    '''
        download_goodTests - Attempts to download GoodTests, using the default global url (or one provided).

        @return <int> - 0 on success (program should continue), otherwise non-zero (program should abort with this exit status)
    '''
    if GOODTESTS_URL is None:
        GOODTESTS_URL = globals()['GOODTESTS_URL']

    validAnswer = False
    while validAnswer == False:
        sys.stdout.write('GoodTests not found. Would you like to install it to local folder? (y/n): ')
        sys.stdout.flush()
        answer = sys.stdin.readline().strip().lower()
        if answer not in ('y', 'n', 'yes', 'no'):
            continue
        validAnswer = True
        answer = answer[0]

    if answer == 'n':
        sys.stderr.write('Cannot run tests without installing GoodTests. http://pypi.python.org/pypi/GoodTests or https://github.com/kata198/Goodtests\n')
        return 1
    try:
        import urllib2 as urllib
    except ImportError:
        try:
            import urllib.request as urllib
        except:
            sys.stderr.write('Failed to import urllib. Trying pip.\n')
            import subprocess
            pipe = subprocess.Popen('pip install GoodTests', shell=True)
            res = pipe.wait()
            if res != 0:
                sys.stderr.write('Failed to install GoodTests with pip ordirect download. aborting.\n')
                return 1
    try:
        response = urllib.urlopen(GOODTESTS_URL)
        contents = response.read()
        if str != bytes:
            contents = contents.decode('ascii')
    except Exception as e:
        sys.stderr.write('Failed to download GoodTests.py from "%s"\n%s\n' %(GOODTESTS_URL, str(e)))
        return 1
    try:
        with open('GoodTests.py', 'w') as f:
            f.write(contents)
    except Exception as e:
        sys.stderr.write('Failed to write to GoodTests.py\n%s\n' %(str(e,)))
        return 1
    try:
        os.chmod('GoodTests.py', 0o775)
    except:
        sys.stderr.write('WARNING: Failed to chmod +x GoodTests.py, may not be able to be executed.\n')

    try:
        import GoodTests
    except ImportError:
        sys.stderr.write('Seemed to download GoodTests okay, but still cannot import. Aborting.\n')
        return 1

    return 0