Python urllib.parse 模块,urljoin() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.urljoin()

项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def parse(self, response):
        """
                1. ???????????url???scrapy????????
                2. ??????url???scrapy????? ???????parse
                """
        # ???????????url???scrapy????????
        if response.status == 404:
            self.fail_urls.append(response.url)
            self.crawler.stats.inc_value("failed_url")
        #?extra?list????????
        post_nodes = response.css("#archive .floated-thumb .post-thumb a")
        for post_node in post_nodes:
            #??????url
            image_url = post_node.css("img::attr(src)").extract_first("")
            post_url = post_node.css("::attr(href)").extract_first("")
            #request?????????parse_detail??????????
            # Request(url=post_url,callback=self.parse_detail)
            yield Request(url=parse.urljoin(response.url, post_url), meta={"front_image_url": image_url}, callback=self.parse_detail)
            #??href?????????
            #response.url + post_url
            print(post_url)
        # ????????scrapy????
        next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
        if next_url:
            yield Request(url=parse.urljoin(response.url, next_url), callback=self.parse)
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def getCsvReport(product_list, startdate, enddate, source_obj):
        print
        print ("Requesting a csv report for the given time period")
        headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8','Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'}
        path = "/billing-usage/v1/contractUsageData/csv"

        parameters = {  "reportSources" :[source_obj],
            "products"  :product_list,
                        "startDate"     :startdate,
                        "endDate"       :enddate
                }
        print
        data_string = parse.urlencode({p: json.dumps(parameters[p]) for p in parameters})
        products_result = session.post(parse.urljoin(baseurl,path),data=data_string, headers=headers)
        products_csv = products_result.text
        return products_csv
项目:meg-server    作者:Argonne-National-Laboratory    | 项目源码 | 文件源码
def get_sendgrid_request_message(cfg, keyid, hex, user_email):
    url_prefix = urljoin(
        cfg.config.megserver_hostname_url,
        os.path.join(cfg.config.meg_url_prefix, "revoke")
    )
    params = urlencode([("keyid", keyid), ("token", hex)])
    parsed = list(urlparse(url_prefix))
    parsed[4] = params
    revocation_link = urlunparse(parsed)

    message = Mail()
    message.add_to(user_email)
    message.set_from(cfg.config.sendgrid.from_email)
    message.set_subject(cfg.config.sendgrid.subject)
    message.set_html(EMAIL_HTML.format(keyid=keyid, link=revocation_link))
    return message
项目:post-review    作者:ericforbes    | 项目源码 | 文件源码
def _setup_token(self):
        self.logger.warn("\n\n(One Time Setup) Please create a Personal Access Token")
        self.logger.warn("https://%s/profile/personal_access_tokens" % self.origin_domain)
        self.logger.warn("Scope: API, Expires: Never\n")
        token = input("Please enter your Personal Access Token:  ")

        # Make request to resource that requires us to be authenticated
        path = 'projects/%s/labels' % self._url_encoded_path()
        url = urljoin(str(self._API()), path)

        res = requests.get(
            url,
            headers={"PRIVATE-TOKEN": token}
            )

        if res.status_code == 200:
            return(token, None)
        return(-1, "Invalid Personal Access Token")
项目:Emoji-Web    作者:emoji-gen    | 项目源码 | 文件源码
def url_for(
        text,
        font,
        color,
        back_color,
        size_fixed = False,
        align = 'center',
        stretch = True
        ):
    base_url = app.config['SITE_BASE_URL']
    payload = {
            'text': text,
            'font': font,
            'color': color,
            'back_color': back_color,
            'size_fixed': str(size_fixed).lower(),
            'align': align,
            'stretch': str(stretch).lower()
            }
    return urljoin(base_url, 'emoji') + '?' + urlencode(payload)
项目:fabric8-analytics-common    作者:fabric8-analytics    | 项目源码 | 文件源码
def spaces_page(context):
    """Go to the Spaces page with list of available Spaces."""
    print("Spaces page")
    url = urljoin(context.server, context.username + "/_spaces")
    context.browser.visit(url)
    space_names = get_all_existing_space_names(context.browser)
    new_space_name = generate_unique_space_name(space_names)
    context.space_name = new_space_name
    print("Unique name for new Space\n    " + new_space_name)
    create_new_space_step_1(context)
    create_new_space_step_2(context)
    create_new_space_step_3(context)
    create_new_space_step_4(context)
    create_new_space_step_5(context)
    create_new_space_step_6(context)
    create_new_space_step_7(context)
项目:fabric8-analytics-common    作者:fabric8-analytics    | 项目源码 | 文件源码
def stack_reccomendation_on_pipepines_page(context):
    """Check the presence of stack recommendation on the Pipelines page."""
    url = urljoin(context.server, context.username + "/" + context.space_name +
                  "/create/pipelines")
    print("Going to the pipeline page for the Space {s}".format(
        s=context.space_name))
    context.browser.visit(url)
    time.sleep(SLEEP_BEFORE_CLICK)
    check_text_presence(context, "Stack Reports")
    link = context.browser.find_by_text("Stack Reports")
    link.click()
    time.sleep(SLEEP_BETWEEN_PAGES)

    # TODO - ask why the text is different: Recommendation/Recommended
    recommendation1 = 'Recommended - Change io.vertx:vertx-web : 3.4.1'
    check_text_presence(context, recommendation1)
    recommendation2 = 'Recommended - Change io.vertx:vertx-core : 3.4.1'
    check_text_presence(context, recommendation2)

    time.sleep(SLEEP_BETWEEN_PAGES)
项目:django-aliyun-oss2-storage    作者:xiewenya    | 项目源码 | 文件源码
def _normalize_name(self, name):
        """
        Normalizes the name so that paths like /path/to/ignored/../foo.txt
        work. We check to make sure that the path pointed to is not outside
        the directory specified by the LOCATION setting.
        """

        base_path = force_text(self.location)
        base_path = base_path.rstrip('/')

        final_path = urljoin(base_path.rstrip('/') + "/", name)

        base_path_len = len(base_path)
        if (not final_path.startswith(base_path) or
                final_path[base_path_len:base_path_len + 1] not in ('', '/')):
            raise SuspiciousOperation("Attempted access to '%s' denied." %
                                      name)
        return final_path.lstrip('/')
项目:pip-upgrader    作者:simion    | 项目源码 | 文件源码
def _prepare_api_url(self, index_url):  # pragma: nocover
        if not index_url.endswith('/'):
            index_url += '/'

        if index_url.endswith('/simple/'):
            self.PYPI_API_TYPE = 'simple_html'
            return urljoin(index_url, '{package}')

        if index_url.endswith('/+simple/'):
            self.PYPI_API_TYPE = 'simple_html'
            return urljoin(index_url, '{package}')

        if '/pypi/' in index_url:
            base_url = index_url.split('/pypi/')[0]
            return urljoin(base_url, '/pypi/{package}/json')

        return urljoin(index_url, '/pypi/{package}/json')
项目:scraping-python    作者:python-madrid    | 项目源码 | 文件源码
def parse_lista_diputados(response):
    tree = fromstring(response.content)

    # listado de diputados
    diputados = tree.xpath('//div[@class="listado_1"]/ul/li/a/@href')
    for diputado in diputados:
        diputado_url = urljoin(response.url, diputado)
        response = requests.get(diputado_url)
        parse_diputado(response)

    # proxima pagina
    pagina_siguiente = tree.xpath('//a[contains(., "Página Siguiente")]/@href')
    if pagina_siguiente:
        pagina_siguiente_url = pagina_siguiente[0]
        response = requests.get(pagina_siguiente_url)
        parse_lista_diputados(response)
项目:ubi-virtual-assistant    作者:Alzemand    | 项目源码 | 文件源码
def get_file_urls(mainUrl,extension):
    uniFileUrls = []
    if not mainUrl.lower().startswith('http://') and not mainUrl.lower().startswith('https://'):
        mainUrl = 'http://%s'%mainUrl
    print('Downloading from %s...'%mainUrl)
    if extension.startswith('*'):
        extension = extension[1:]
    if not extension.startswith('.'):
        extension = '.' + extension
    req = urllib.request.Request(
        mainUrl, 
        data=None, 
        headers={
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
        }
    )
    urlContent = urllib.request.urlopen(req).read().decode('utf-8')
    html = lxml.html.fromstring(urlContent) 
    urls = html.xpath('//a/@href')
    for url in urls:
        if url.endswith(extension):
            url = urljoin(mainUrl,url)
            if url not in uniFileUrls:
                uniFileUrls.append(url)
    return uniFileUrls
项目:ubi-virtual-assistant    作者:Alzemand    | 项目源码 | 文件源码
def get_file_urls(mainUrl,extension):
    uniFileUrls = []
    if not mainUrl.lower().startswith('http://') and not mainUrl.lower().startswith('https://'):
        mainUrl = 'http://%s'%mainUrl
    print('Downloading from %s...'%mainUrl)
    if extension.startswith('*'):
        extension = extension[1:]
    if not extension.startswith('.'):
        extension = '.' + extension
    req = urllib.request.Request(
        mainUrl, 
        data=None, 
        headers={
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
        }
    )
    urlContent = urllib.request.urlopen(req).read().decode('utf-8')
    html = lxml.html.fromstring(urlContent) 
    urls = html.xpath('//a/@href')
    for url in urls:
        if url.endswith(extension):
            url = urljoin(mainUrl,url)
            if url not in uniFileUrls:
                uniFileUrls.append(url)
    return uniFileUrls
项目:spider    作者:pythonsite    | 项目源码 | 文件源码
def parse(self, response):
        '''
        1.????????????url,???scrapy??????????
        2.??????url???scrapy?????????????parse
        :param response:
        :return:
        '''
        #???????????url????scrapy???????
        post_nodes = response.css("#archive .floated-thumb .post-thumb a")
        for post_node in post_nodes:
            #image_url??????
            image_url = post_node.css("img::attr(src)").extract_first("")
            post_url = post_node.css("::attr(href)").extract_first("")
            #????meta??????url????????parse.urljoin?????????????response.url???
            # ???????response.url?post_url???
            yield Request(url=parse.urljoin(response.url,post_url),meta={"front_image_url":parse.urljoin(response.url,image_url)},callback=self.parse_detail)

        #????????scrapy??
        next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
        if next_url:
            yield Request(url=next_url,callback=self.parse)
项目:buffer-api    作者:barisariburnu    | 项目源码 | 文件源码
def create_request(self, method, path, options):
        """Creating a request with the given arguments

        If api_version is set, appends it immediately after host
        """
        version = '/' + options['api_version'] if 'api_version' in options else ''

        # Adds a suffix (ex: ".html", ".json") to url
        suffix = options['response_type'] if 'response_type' in options else 'json'
        path = path + '.' + suffix

        path = urlparse.urljoin(self.base, version + path)

        if 'api_version' in options:
            del options['api_version']

        if 'response_type' in options:
            del options['response_type']

        return requests.request(method, path, **options)
项目:dust_extinction    作者:karllark    | 项目源码 | 文件源码
def _resolve_version(version):
    """
    Resolve LATEST version
    """
    if version is not LATEST:
        return version

    meta_url = urljoin(DEFAULT_URL, '/pypi/setuptools/json')
    resp = urlopen(meta_url)
    with contextlib.closing(resp):
        try:
            charset = resp.info().get_content_charset()
        except Exception:
            # Python 2 compat; assume UTF-8
            charset = 'UTF-8'
        reader = codecs.getreader(charset)
        doc = json.load(reader(resp))

    return str(doc['info']['version'])
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def _relocate (self, response, newloc):
        if len (self._history) > 5:
            raise RuntimeError ("Maximum Redirects Reached")

        if response.status_code in (301, 302):
            if self.get_method () in ("POST", "PUT"):           
                self.payload = b""
                self.set_content_length (0)
                self.remove_header ('content-type')
                self.remove_header ('content-length')
                self.remove_header ('transfer-encoding')
                self.remove_header ('content-encoding')
            self.method = "GET"

        if not newloc:
            newloc = response.get_header ('location')
        self.uri = urljoin (self.uri, newloc)
        self.address, self.path = self.split (self.uri)
        self.add_history (response)
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def get_certificate_url(mmtrack, course):
    """
    Find certificate associated with highest passing grade for the course

    Args:
        mmtrack (dashboard.utils.MMTrack): a instance of all user information about a program
        course (courses.models.Course): A course
    Returns:
        str: url to view the certificate
    """
    url = ""
    final_grades = mmtrack.get_passing_final_grades_for_course(course)
    if final_grades.exists():
        best_grade = final_grades.first()
        course_key = best_grade.course_run.edx_course_key
        if mmtrack.financial_aid_available:
            if best_grade.has_certificate and course.signatories.exists():
                url = reverse('certificate', args=[best_grade.certificate.hash])
        elif mmtrack.has_passing_certificate(course_key):
            download_url = mmtrack.certificates.get_verified_cert(course_key).download_url
            if download_url:
                url = urljoin(settings.EDXORG_BASE_URL, download_url)
    return url
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def test_url_with_course_key(self):
        """Test course url with a course key and no enrollment_url"""
        course_run = CourseRunFactory.create(
            course=self.course,
            start_date=self.from_weeks(-1),
            end_date=None,
            enrollment_start=self.from_weeks(-1),
            enrollment_end=None,
            enrollment_url=None,
            edx_course_key="course_key"
        )
        expected = urljoin(
            BASE_URL,
            'courses/{key}/about'.format(key=course_run.edx_course_key)
        )
        assert course_run.course.url == expected
项目:thejoker    作者:adrn    | 项目源码 | 文件源码
def _resolve_version(version):
    """
    Resolve LATEST version
    """
    if version is not LATEST:
        return version

    meta_url = urljoin(DEFAULT_URL, '/pypi/setuptools/json')
    resp = urlopen(meta_url)
    with contextlib.closing(resp):
        try:
            charset = resp.info().get_content_charset()
        except Exception:
            # Python 2 compat; assume UTF-8
            charset = 'UTF-8'
        reader = codecs.getreader(charset)
        doc = json.load(reader(resp))

    return str(doc['info']['version'])
项目:python-bsonjs    作者:mongodb-labs    | 项目源码 | 文件源码
def _resolve_version(version):
    """
    Resolve LATEST version
    """
    if version is not LATEST:
        return version

    meta_url = urljoin(DEFAULT_URL, '/pypi/setuptools/json')
    resp = urlopen(meta_url)
    with contextlib.closing(resp):
        try:
            charset = resp.info().get_content_charset()
        except Exception:
            # Python 2 compat; assume UTF-8
            charset = 'UTF-8'
        reader = codecs.getreader(charset)
        doc = json.load(reader(resp))

    return str(doc['info']['version'])
项目:Python_Study    作者:thsheep    | 项目源码 | 文件源码
def get_links_from_url(url):
    """Download the page at `url` and parse it for links.
    Returned links have had the fragment after `#` removed, and have been made
    absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes
    'http://www.tornadoweb.org/en/stable/gen.html'.
    """
    try:
        response = yield httpclient.AsyncHTTPClient().fetch(url)
        print('fetched %s' % url)

        html = response.body if isinstance(response.body, str) \
            else response.body.decode()
        urls = [urljoin(url, remove_fragment(new_url))
                for new_url in get_links(html)]
    except Exception as e:
        print('Exception: %s %s' % (e, url))
        raise gen.Return([])
    raise gen.Return(urls)
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def compare_by_title(self, title: str) -> bool:

        r = requests.get(urljoin(constants.main_url, 'search/') + quote(title),
                         headers=self.settings.requests_headers, timeout=self.settings.timeout_timer)

        r.encoding = 'utf-8'
        soup_1 = BeautifulSoup(r.text, 'html.parser')

        matches_links = set()

        # content-row manga row
        for gallery in soup_1.find_all("div", class_=re.compile("content-row")):
            link_container = gallery.find("a", class_="content-title")
            if link_container:
                matches_links.add(urljoin(constants.main_url, link_container['href']))

        self.gallery_links = list(matches_links)
        if len(self.gallery_links) > 0:
            self.found_by = self.name
            return True
        else:
            return False
项目:modman    作者:haihala    | 项目源码 | 文件源码
def api_get(self, url, *args, **kwargs):
        """Gets an object from factorio mod api."""
        url = urljoin(FACTORIO_BASEURL, url)
        assert url.startswith(urljoin(FACTORIO_BASEURL, "/api")), "Only factorio mods api can be used with this"
        qparams = ""
        if "params" in kwargs:
            qparams = "&".join(key + "=" + str(kwargs["params"][key]) for key in sorted(kwargs["params"].keys()))

        cache_key = urlparse(url).path + "?" + qparams

        res = self.fetch(cache_key)
        if res:
            return json.loads(res)

        data = self.get(url, *args, **kwargs).text
        # check and minify data
        try:
            data = json.dumps(json.loads(data), separators=(',',':'))
        except json.decoder.JSONDecodeError:
            exit("Invalid JSON data in cache")
        self.store(cache_key, data)
        return json.loads(data)
项目:python-search-engine    作者:ncouture    | 项目源码 | 文件源码
def get_links(etree, page_url):
    """    page_url: the url of the page parsed in the etree
    """
    _is_etree(etree)
    links = [urljoin(page_url, i)
             for i in etree.xpath('//a/@href')]

    localhost = urlparse(page_url).hostname
    internal = set()
    external = set()
    for link in links:
        if not urlparse(link).hostname == localhost:
            internal.add(link)
        else:
            external.add(link)

    return {'internal': list(internal),
            'external': list(external)}
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def parse(self, response):
        """
                ???html??????url ?????url??????
                ?????url???? /question/xxx ?????????????
                """
        all_urls = response.css("a::attr(href)").extract()
        all_urls = [parse.urljoin(response.url, url) for url in all_urls]
        # ??lambda???????url????????true???????false???
        all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls)
        for url in all_urls:
            match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
            if match_obj:
                # ?????question???????????????????
                request_url = match_obj.group(1)
                yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
                #??
                # break
            else:
                # pass
                # ????question??????????
                yield scrapy.Request(url, headers=self.headers, callback=self.parse)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def dns_resolve(self, domain):
        payload = {'hostnames': [domain], 'key': self.api_key}
        r = requests.get(urljoin(self.base_url, 'dns/resolve'), params=payload)
        if r.status_code == requests.codes.ok:
            return r.json()
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def reverse_dns(self, ip):
        payload = {'ips': [ip], 'key': self.api_key}
        r = requests.get(urljoin(self.base_url, 'dns/reverse'), params=payload)
        if r.status_code == requests.codes.ok:
            result = r.json()
            if not result[ip]:
                result[ip] = []
            return result
项目:Zoom2Youtube    作者:Welltory    | 项目源码 | 文件源码
def _request(self, method, params):
        url = urljoin(SlackClient.BASE_URL, method)
        data = {'token': self.token}
        if self.bot_name:
            data['username'] = self.bot_name
        params.update(data)
        return requests.post(
            url,
            data=params,
            headers={'content-type': 'application/x-www-form-urlencoded'}
        )
项目:Zoom2Youtube    作者:Welltory    | 项目源码 | 文件源码
def _join_url(self, path):
        if path.startswith('/'):
            path = path[1:]
        return urljoin(ZoomClient.BASE_URL, path)
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def get_url(self, routename, **kargs):
        """ Return a string that matches a named route """
        scriptname = request.environ.get('SCRIPT_NAME', '').strip('/') + '/'
        location = self.router.build(routename, **kargs).lstrip('/')
        return urljoin(urljoin('/', scriptname), location)
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def fullpath(self):
        """ Request path including :attr:`script_name` (if present). """
        return urljoin(self.script_name, self.path.lstrip('/'))
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def redirect(url, code=None):
    """ Aborts execution and causes a 303 or 302 redirect, depending on
        the HTTP protocol version. """
    if not code:
        code = 303 if request.get('SERVER_PROTOCOL') == "HTTP/1.1" else 302
    res = response.copy(cls=HTTPResponse)
    res.status = code
    res.body = ""
    res.set_header('Location', urljoin(request.url, url))
    raise res
项目:twentybn-dl    作者:TwentyBN    | 项目源码 | 文件源码
def url(self, filename):
        full_path = op.join(self.name, self.version, filename)
        return urljoin(DEFAULT_BASE_URL, full_path)
项目:twentybn-dl    作者:TwentyBN    | 项目源码 | 文件源码
def url(self, filename):
        full_path = op.join(self.name, self.version, filename)
        return urljoin(self.base_url, full_path)
项目:luminance    作者:nginth    | 项目源码 | 文件源码
def is_safe_url(target):
    ref_url = urlparse(request.host_url)
    test_url = urlparse(urljoin(request.host_url, target))
    return test_url.scheme in ('http', 'https') and \
        ref_url.netloc == test_url.netloc
项目:pymarsys    作者:transcovo    | 项目源码 | 文件源码
def make_call(self,
                  method,
                  endpoint,
                  headers=None,
                  payload=None,
                  params=None):
        """
        Make an authenticated synchronous HTTP call to the Emarsys api using
        the requests library.
        :param method: HTTP method.
        :param endpoint: Emarsys' api endpoint.
        :param headers: HTTP headers.
        :param payload: HTTP payload.
        :param params: HTTP params.
        :return: Dictionary with the result of the query.
        """
        if not payload:
            payload = {}

        if not params:
            params = {}

        url = urljoin(self.uri, endpoint)
        headers = self.build_headers(headers)
        response = requests.request(
            method,
            url,
            headers=headers,
            json=payload,
            params=params
        )
        try:
            response.raise_for_status()
        except requests.exceptions.HTTPError as err:
            raise ApiCallError(
                'Error message: "{}" \n Error details: "{}"'.format(
                    err,
                    response.text
                )
            )
        return response.json()
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def getProducts(parameter_obj,startdate,enddate):
    print
    print ("Requesting a list of products for the given time period")
    headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8','Accept':'application/json'}
    path = "/billing-usage/v1/products"

    parameters = {  "reportSources" :parameter_obj,
            "startDate" :startdate,
            "endDate"   :enddate
        }

    data_string = encoder.urlencode({p: json.dumps(parameters[p]) for p in parameters})
    products_result = session.post(parse.urljoin(baseurl,path),data=data_string, headers=headers)
    products_obj = json.loads(products_result.text)
    return products_obj['contents']
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def urlJoin(self, url, path):
        return parse.urljoin(url, path)
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def getResult(self, endpoint, parameters=None):
      path = endpoint
      endpoint_result = self.session.get(parse.urljoin(self.baseurl,path), params=parameters)
      if self.verbose: print (">>>\n" + json.dumps(endpoint_result.json(), indent=2) + "\n<<<\n")
      status = endpoint_result.status_code
      if self.verbose: print( "LOG: GET %s %s %s" % (endpoint,status,endpoint_result.headers["content-type"]))
      self.httpErrors(endpoint_result.status_code, path, endpoint_result.json())
      return endpoint_result.json()
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def postResult(self, endpoint, body, parameters=None):
        headers = {'content-type': 'application/json'}
        path = endpoint
        endpoint_result = self.session.post(parse.urljoin(self.baseurl,path), data=body, headers=headers, params=parameters)
        status = endpoint_result.status_code
        if self.verbose: print ("LOG: POST %s %s %s" % (path,status,endpoint_result.headers["content-type"]))
        if status == 204:
           return {}
        self.httpErrors(endpoint_result.status_code, path, endpoint_result.json())

        if self.verbose: print (">>>\n" + json.dumps(endpoint_result.json(), indent=2) + "\n<<<\n")
        return endpoint_result.json()
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def putResult(self, endpoint, body, parameters=None):
        headers = {'content-type': 'application/json'}
        path = endpoint

        endpoint_result = self.session.put(parse.urljoin(self.baseurl,path), data=body, headers=headers, params=parameters)
        status = endpoint_result.status_code
        if self.verbose: print ("LOG: PUT %s %s %s" % (endpoint,status,endpoint_result.headers["content-type"]))
        if status == 204:
            return {}
        if self.verbose: print (">>>\n" + json.dumps(endpoint_result.json(), indent=2) + "\n<<<\n")
        return endpoint_result.json()
项目:ccu_and_eccu_publish    作者:gaofubin    | 项目源码 | 文件源码
def deleteResult(self, endpoint):
        endpoint_result = self.session.delete(parse.urljoin(self.baseurl,endpoint))
        status = endpoint_result.status_code
        if self.verbose: print ("LOG: DELETE %s %s %s" % (endpoint,status,endpoint_result.headers["content-type"]))
        if status == 204:
            return {}
        if self.verbose: print (">>>\n" + json.dumps(endpoint_result.json(), indent=2) + "\n<<<\n")
        return endpoint_result.json()
项目:pyseeder    作者:PurpleI2P    | 项目源码 | 文件源码
def run(filename, config):
    API_URL = "https://api.github.com/"
    asset_name = os.path.split(filename)[-1]
    content_type = guess_type(asset_name)[0] or "application/zip"
    creds = (config["username"], config["token"])
    release_info_url = urljoin(API_URL, "/repos/{}/releases/tags/{}".format(
                config["repo"], config["release_tag"]))

    # get release info
    try:
        resp = requests.get(release_info_url, auth=creds)
    except:
        raise TransportException("Failed to connect to GitHub API")

    if resp.status_code is not 200:
        raise TransportException("Check your GitHub API auth settings")

    # delete old asset
    for x in resp.json()["assets"]:
        if x["name"] == asset_name:
            r = requests.delete(x["url"], auth=creds)
            if r.status_code is not 204:
                raise TransportException("Failed to delete asset from GitHub")

    # upload new asset
    upload_url = resp.json()["upload_url"].split("{")[0] # wat
    headers = {'Content-Type': content_type}
    params = {'name': asset_name}

    data = open(filename, 'rb').read()
    r = requests.post(upload_url, headers=headers, params=params, auth=creds, 
            data=data)

    if r.status_code is not 201:
        raise TransportException("Failed to upload asset to GitHub API")
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def urlopen(self, method, url, redirect=True, **kw):
        """
        Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen`
        with custom cross-host redirect logic and only sends the request-uri
        portion of the ``url``.

        The given ``url`` parameter must be absolute, such that an appropriate
        :class:`urllib3.connectionpool.ConnectionPool` can be chosen for it.
        """
        u = parse_url(url)
        conn = self.connection_from_host(u.host, port=u.port, scheme=u.scheme)

        kw['assert_same_host'] = False
        kw['redirect'] = False
        if 'headers' not in kw:
            kw['headers'] = self.headers

        if self.proxy is not None and u.scheme == "http":
            response = conn.urlopen(method, url, **kw)
        else:
            response = conn.urlopen(method, u.request_uri, **kw)

        redirect_location = redirect and response.get_redirect_location()
        if not redirect_location:
            return response

        # Support relative URLs for redirecting.
        redirect_location = urljoin(url, redirect_location)

        # RFC 2616, Section 10.3.4
        if response.status == 303:
            method = 'GET'

        log.info("Redirecting %s -> %s" % (url, redirect_location))
        kw['retries'] = kw.get('retries', 3) - 1  # Persist retries countdown
        kw['redirect'] = redirect
        return self.urlopen(method, redirect_location, **kw)
项目:noscrapy    作者:hwms    | 项目源码 | 文件源码
def combine_urls(self, parent_url, child_url):
        return urljoin(parent_url, child_url)
项目:talonspider    作者:howie6879    | 项目源码 | 文件源码
def parse(self, res):
        # ?html???etree
        etree = self.e_html(res.html)
        # ?????????url
        pages = list(set(i.get('href') for i in etree.cssselect('li.pb_list_pager>a')))

        pages.append(self.start_urls[0])
        for page in pages:
            url = urljoin(self.start_urls[0], page)
            yield Request(url, headers=self.headers, callback=self.parse_item)
项目:post-review    作者:ericforbes    | 项目源码 | 文件源码
def _API(self):
        url = 'https://%s/' % self.origin_domain
        path = 'api/v4/'
        return urljoin(str(url), str(path))
项目:post-review    作者:ericforbes    | 项目源码 | 文件源码
def parent_branch_exists(self, token):
        path = '/repos/%s/%s/branches/%s' % (self.namespace, self.project, self.parent_branch)
        url = urljoin(str(self.API), str(path))
        try:
            res = requests.get(url)
        except Exception as e:
            self.logger.fatal(e)
            sys.exit()

        if res.status_code == 404:
            return False
        return True
项目:CustomsSpider    作者:orangZC    | 项目源码 | 文件源码
def parse(self, response):
        #???????????
        custom_district = response.xpath('//div[@id="tmenu_126614"]/table/tr[5]//a/@href').extract()[0]
        if custom_district:
            self.log('custom_district: %s' % custom_district)
            yield scrapy.Request(url=custom_district, callback=self.get_custom_district)
            '''
        # ?????????????
        monthly_magazine = response.xpath('//div[@id="tmenu_126614"]/table/tr[2]//a/@href').extract()[0]
        if monthly_magazine:
            monthly_magazine_url = urljoin('http://www.customs.gov.cn', str(monthly_magazine))
            self.log('monthly_magazine: %s' % monthly_magazine_url)
            yield scrapy.Request(url=monthly_magazine_url, callback=self.get_monthly_magazine)

        #????????
        pages_text = response.xpath('//div[@id="ess_ctr175903_ListC_Info_AspNetPager"]/table/tr/td[1]//text()').extract()
        pages_count = str(pages_text[2]).split('/')[1]
        print(pages_count)
        i = 1
        while i <= int(pages_count):
            print(i)
            page_url = urljoin('http://www.customs.gov.cn', '/publish/portal0/tab49666/module175903/page%d.htm' % i)
            self.log('page_url: %s' % page_url)
            i += 1
            yield scrapy.Request(url=page_url, callback=self.get_flash)
            '''
项目:CustomsSpider    作者:orangZC    | 项目源码 | 文件源码
def get_flash(self, response):
        # ?????????
        page_list = response.xpath('//li[@class="liebiaoys24"]/span/a/@href').extract()
        for page in page_list:
            if 'http' in page:
                self.log("flash_url: %s" % str(page))
                yield scrapy.Request(url=str(page), callback=self.parse_flash)
            else:
                flash_url = urljoin('http://www.customs.gov.cn', str(page))
                self.log("flash_url: %s" % flash_url)
                # ???????????parse_flash????????
                yield scrapy.Request(url=flash_url, callback=self.parse_flash)