Python typing 模块,Dict() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Dict()

项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def _send_receive(self, nummsgs: int, outformat: str='json',
                      dataupdate: Optional[Dict[AnyStr, Any]]=None,
                      restart_data: bool=True) -> List[Response]:
        if restart_data:
            self._restart_data(outformat)

        if dataupdate:
            self.data.update(dataupdate)

        self._add_to_buffer(nummsgs, outformat)
        self.sendbuffer.seek(0)

        processor, _ = get_processor_instance(
                outformat,
                custom_outbuffer=self.recvbuffer,
                custom_inbuffer=self.sendbuffer
        )
        processor.process_requests(self.sendbuffer)
        return self._loadResults(outformat)
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def calc_rs_modality(self) -> Dict[str, float]:

        modality_counter = Counter()
        for i, s in enumerate(self.sentences):
            chunks = []
            for bnst in self.knp.parse(s).bnst_list():
                chunk = Chunk(chunk_id=bnst.bnst_id,
                              link=bnst.parent,
                              description=bnst.fstring)
                chunks.append(chunk)

            s = "".join([chunk.description for chunk in chunks])
            ms = set(re.findall("<?????-(.+?)>", s))
            modality_counter += Counter(ms)

            n = len(self.sentences)

        return dict([(k, float(c) / n)
                     for k, c in modality_counter.items()])
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def _load_word_freq(self, threshold: int) -> Tuple[Dict[str, int], int]:
        n_total_words = 0
        word_freq = {}
        with open(self.rnnlm_model_path, mode='r') as f:
            for line in f:

                n_total_words += 1

                word, freq = line.split(' ')
                freq = int(freq)
                if freq > threshold:
                    word_freq[word] = freq
                else:
                    word_freq['<unk/>'] = word_freq.get('<unk/>', 0) + 1

        return (word_freq, n_total_words)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def __init__(self, client: Session) -> None:
        """Make new NetworkManager."""
        super().__init__()
        self._client = client
        self._requestIdToRequest: Dict[str, Request] = dict()
        self._interceptionIdToRequest: Dict[str, Request] = dict()
        self._extraHTTPHeaders: OrderedDict[str, str] = OrderedDict()
        self._requestInterceptionEnabled = False
        self._requestHashToRequestIds = Multimap()
        self._requestHashToInterceptions = Multimap()

        self._client.on('Network.requestWillBeSent', self._onRequestWillBeSent)
        self._client.on('Network.requestIntercepted', self._onRequestIntercepted)  # noqa: E501
        self._client.on('Network.responseReceived', self._onResponseReceived)
        self._client.on('Network.loadingFinished', self._onLoadingFinished)
        self._client.on('Network.loadingFailed', self._onLoadingFailed)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def waitForNavigation(self, options: dict = None, **kwargs: Any
                                ) -> Optional[Response]:
        """Wait navigation completes."""
        if options is None:
            options = dict()
        options.update(kwargs)
        watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
                                   options)
        responses: Dict[str, Response] = dict()
        listener = helper.addEventListener(
            self._networkManager,
            NetworkManager.Events.Response,
            lambda response: responses.__setitem__(response.url, response)
        )
        await watcher.waitForNavigation()
        helper.removeEventListeners([listener])
        return responses.get(self.url)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def __init__(self, options: Dict[str, Any] = None, **kwargs: Any) -> None:
        """Make new launcher."""
        self.options = options or dict()
        self.options.update(kwargs)
        self.chrome_args = DEFAULT_ARGS
        self._tmp_user_data_dir: Optional[str] = None
        self._parse_args()
        if 'headless' not in self.options or self.options.get('headless'):
            self.chrome_args = self.chrome_args + [
                '--headless',
                '--disable-gpu',
                '--hide-scrollbars',
                '--mute-audio',
            ]
        if 'executablePath' in self.options:
            self.exec = self.options['executablePath']
        else:
            if not check_chromium():
                download_chromium()
            self.exec = str(chromium_excutable())
        self.cmd = [self.exec] + self.chrome_args
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def _visibleCenter(self) -> Dict[str, int]:
        center = await self.evaluate('''
element => {
    if (!element.ownerDocument.contains(element))
        return null;
    element.scrollIntoViewIfNeeded();
    let rect = element.getBoundingClientRect();
    return {
        x: (Math.max(rect.left, 0) + Math.min(rect.right, window.innerWidth)) / 2,
        y: (Math.max(rect.top, 0) + Math.min(rect.bottom, window.innerHeight)) / 2
    };
}
        ''')  # noqa: E501
        if not center:
            # raise Exception('No node found for selector: ' + selector)
            raise BrowserError('No node found for selector: ')
        return center
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def __init__(self, database, tabledesc=None, timestamp=False):
        # type: (str, Optional[Dict[str, str]], bool) -> None
        self.database = database
        if tabledesc is None:
            tabledesc = self.prefixdesc()
        self.tabledesc = tabledesc
        timestampsql = self._sqtimestamp if timestamp else ''
        sqcreate = self._sqcreate % timestampsql
        self.kv_create = sqcreate.format(**tabledesc)
        self.kv_get = self._sqget.format(**tabledesc)
        self.kv_mget = self._sqmget.format(**tabledesc)
        self.kv_put = self._sqput.format(**tabledesc)
        self.kv_delete = self._sqdelete.format(**tabledesc)
        self._connection = None  # type: Optional[sqlite3.Connection]
        self.sqlite_limit_variable_number = 999
        self.support_mget = True
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def mget(self, keys):
        # type: (List[str]) -> List[Optional[bytes]]
        rows = []
        if self.support_mget:
            try:
                with self.conn as conn:
                    for somekeys in grouper(self.sqlite_limit_variable_number, keys):
                        keylist = list(somekeys)
                        questionmarks = ','.join(['?'] * len(keylist))
                        sql = self.kv_mget % questionmarks
                        for row in conn.execute(sql, keylist):
                            rows.append(row)
                resultdict = dict(rows)  # type: Dict[str, bytes]
                rget = resultdict.get
                return [rget(k) for k in keys]
            except sqlite3.OperationalError:
                self.support_mget = False
        return [self.__get(k) for k in keys]
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def contains_all_options(optiongroup, parentstyle, matchvalues=False):
    # type: (Dict[Any, Any], Style, bool) -> bool
    """Returns true if all options in optiongroup are present in parentstyle.
    If matchvalues is True, the values in optiongroup must also match
    those in the parentstyle.
    """
    for optionname, value in optiongroup.items():
        if optionname not in parentstyle:
            return False
        if isinstance(value, dict):
            parent_suboptions = parentstyle[optionname]
            if not contains_all_options(value, parent_suboptions, matchvalues=matchvalues):
                return False
        elif matchvalues:
            pvalue = parentstyle.get(optionname)
            if type(pvalue) != type(value):
                return False
            if pvalue != value:
                return False
    return True
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def misses_to_frame(parsed_lexemes: Iterable,
                    terms: Dict[str, str]=None) -> pd.DataFrame:
    if not terms:
        terms = {}
    miss_dict = collect_misses(parsed_lexemes)
    misses = []
    for miss in miss_dict:
        low_miss = miss.lower()
        miss_record = OrderedDict()
        miss_record['miss'] = low_miss
        miss_record['term'] = terms.get(low_miss, low_miss)
        miss_record['lexemes'] = ' '.join(miss_dict[miss])
        misses.append(miss_record)
    miss_frame = pd.DataFrame.from_records(
        misses, index='miss', columns=['miss', 'term', 'lexemes'])
    return miss_frame
项目:drf-metadata    作者:night-crawler    | 项目源码 | 文件源码
def get_meta(self) -> t.Generator[t.Dict, None, None]:
        all_fields = self.model._meta.get_fields(
            include_parents=self.include_parents, include_hidden=self.include_hidden
        )
        for f in all_fields:
            if f.name in self.exclude:
                continue
            if self.fields and f.name not in self.fields:
                continue

            if f.name not in self.fields:
                if f.concrete not in self.concrete_in:
                    continue
                if f.auto_created not in self.auto_created_in:
                    continue
                if f.editable not in self.editable_in:
                    continue

            yield self.get_field_meta(f)
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def update_game_state(self) -> None:
        c: Color
        v: Value
        discards: Dict[Color, List[int]]
        discards = {c: [0] * 6 for c in self.game.variant.pile_colors}
        d: int
        for d in self.game.discards:
            ci: CardInfo = gameCards[d]
            discards[ci.color][ci.value] += 1
        self.nextCardPlay: Dict[Color, int]
        self.nextCardPlay = {c: len(self.game.playedCards[c]) + 1
                             for c in self.game.variant.pile_colors}
        self.maxCardPlay: Dict[Color, Value]
        self.maxCardPlay = {c: Value.V5 for c in self.game.variant.pile_colors}
        for c in self.game.variant.pile_colors:
            for v in reversed(Value):  # type: ignore
                if discards[c][v] < v.num_copies:
                    self.maxCardPlay[c] = v
                    break
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def received(self, type: str, resp: Dict[str, Any]) -> None:
        if type == 'message':
            self.message_received(resp['text'])
        elif type == 'init':
            raise Exception()
        elif type == 'advanced':
            # Fast UI Animations
            pass
        elif type == 'connected':
            # Players connected to the game
            pass
        elif type == 'notify':
            self.handle_notify(resp)
        elif type == 'action':
            self.decide_action(resp['can_clue'], resp['can_discard'])
        else:
            print(type, resp)
            raise Exception()
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def updateMaxScore(self) -> None:
        maxScore: int = 0
        s: Suit
        for s in self.variant.pile_suits:
            possible: int = 5
            copies: Dict[Rank, int] = {r: 0 for r in Rank}
            d: int
            for d in self.discards[s]:
                card: ServerCard = self.deck[d]
                copies[card.rank] += 1
            r: Rank
            for r in reversed(Rank):  # type: ignore
                totalCopies: int = r.num_copies
                if self.variant == Variant.OneOfEach and s == Suit.Extra:
                    totalCopies += 1
                if copies[r] == totalCopies:
                    possible = r.value - 1
            maxScore += possible
        self.maxScore = maxScore
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]:
        xml_ending = ".xml"

        microphone_endings = [
            "_Yamaha",
            "_Kinect-Beam",
            "_Kinect-RAW",
            "_Realtek",
            "_Samson",
            "_Microsoft-Kinect-Raw"
        ]

        xml_files = [file for file in files if file.name.endswith(xml_ending) if
                     self.id_filter_regex.match(name_without_extension(file))]

        return OrderedDict(
            (name_without_extension(file) + microphone_ending,
             self._extract_label_from_xml(file))
            for file in xml_files
            for microphone_ending in microphone_endings
            if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def _input_dictionary_for_loss_net(self, labeled_spectrogram_batch: List[LabeledSpectrogram]) -> Dict[str, ndarray]:
        spectrograms = [x.z_normalized_transposed_spectrogram() for x in labeled_spectrogram_batch]
        labels = [x.label for x in labeled_spectrogram_batch]
        input_batch, prediction_lengths = self._input_batch_and_prediction_lengths(spectrograms)

        # Sets learning phase to training to enable dropout (see backend.learning_phase documentation for more info):
        training_phase_flag_tensor = array([True])
        label_lengths = reshape(array([len(label) for label in labels]), (len(labeled_spectrogram_batch), 1))
        return {
            Wav2Letter.InputNames.input_batch: input_batch,
            Wav2Letter.InputNames.prediction_lengths: self._prediction_length_batch(prediction_lengths,
                                                                                    batch_size=len(spectrograms)),
            Wav2Letter.InputNames.label_batch: self.grapheme_encoding.encode_label_batch(labels),
            Wav2Letter.InputNames.label_lengths: label_lengths,
            'keras_learning_phase': training_phase_flag_tensor
        }
项目:scheduled-bots    作者:SuLab    | 项目源码 | 文件源码
def create_articles(pmids: Set[str], login: object, write: bool = True) -> Dict[str, str]:
    """
    Given a list of pmids, make article items for each
    :param pmids: list of pmids
    :param login: wdi_core login instance
    :param write: actually perform write
    :return: map pmid -> wdid
    """
    pmid_map = dict()
    for pmid in pmids:
        p = wdi_helpers.PubmedItem(pmid)
        if write:
            try:
                pmid_wdid = p.get_or_create(login)
            except Exception as e:
                print("Error creating article pmid: {}, error: {}".format(pmid, e))
                continue
            pmid_map[pmid] = pmid_wdid
        else:
            pmid_map[pmid] = 'Q1'
    return pmid_map
项目:Lyra    作者:caterinaurban    | 项目源码 | 文件源码
def __init__(self, variables: List[VariableIdentifier], lattices: Dict[Type, Type[Lattice]],
                 arguments: Dict[Type, Dict[str, Any]] = defaultdict(lambda: dict())):
        """Create a mapping Var -> L from each variable in Var to the corresponding element in L.

        :param variables: list of program variables
        :param lattices: dictionary from variable types to the corresponding lattice types
        :param arguments: dictionary from variable types to arguments of the corresponding lattices
        """
        super().__init__()
        self._variables = variables
        self._lattices = lattices
        self._arguments = arguments
        try:
            self._store = {v: lattices[type(v.typ)](**arguments[type(v.typ)]) for v in variables}
        except KeyError as key:
            error = f"Missing lattice for variable type {repr(key.args[0])}!"
            raise ValueError(error)
项目:BlueWhale    作者:caffe2    | 项目源码 | 文件源码
def preprocess_samples(
        self,
        states: List[Dict[str, float]],
        actions: List[Dict[str, float]],
        rewards: List[float],
        next_states: List[Dict[str, float]],
        next_actions: List[Dict[str, float]],
        is_terminals: List[bool],
        possible_next_actions: List[List[Dict[str, float]]],
        reward_timelines: List[Dict[int, float]],
    ) -> TrainingDataPage:
        tdp = GridworldContinuous.preprocess_samples(
            self, states, actions, rewards, next_states, next_actions,
            is_terminals, possible_next_actions, reward_timelines
        )
        tdp.states = np.where(tdp.states == 1.0)[1].reshape(-1, 1
                                                           ).astype(np.float32)
        tdp.next_states = np.where(tdp.next_states == 1.0)[1].reshape(
            -1, 1
        ).astype(np.float32)
        return tdp
项目:BlueWhale    作者:caffe2    | 项目源码 | 文件源码
def preprocess_samples(
        self,
        states: List[Dict[str, float]],
        actions: List[str],
        rewards: List[float],
        next_states: List[Dict[str, float]],
        next_actions: List[str],
        is_terminals: List[bool],
        possible_next_actions: List[List[str]],
        reward_timelines: Optional[List[Dict[int, float]]],
    ) -> TrainingDataPage:
        tdp = self.preprocess_samples_discrete(
            states, actions, rewards, next_states, next_actions, is_terminals,
            possible_next_actions, reward_timelines
        )
        tdp.states = np.where(tdp.states == 1.0)[1].reshape(-1, 1
                                                           ).astype(np.float32)
        tdp.next_states = np.where(tdp.next_states == 1.0)[1].reshape(
            -1, 1
        ).astype(np.float32)
        return tdp
项目:kudubot    作者:namboy94    | 项目源码 | 文件源码
def to_dict(self) -> Dict[str, str or float or Dict[str, str or int]]:
        """
        :return: The message data as a dictionary which can be used
                 to store a message as a JSON file
        """

        sender_group = \
            None if self.sender_group is None else self.sender_group.to_dict()
        return {
            "message_title": self.message_title,
            "message_body": self.message_body,
            "sender": self.sender.to_dict(),
            "sender_group": sender_group,
            "receiver": self.receiver.to_dict(),
            "timestamp": self.timestamp,
        }
项目:kudubot    作者:namboy94    | 项目源码 | 文件源码
def from_dict(data: Dict[str, str or float or Dict[str, str or int]])\
        -> Message:
    """
    Generates a Message object from a dictionary

    :param data: The dictionary to turn into a Message
    :return: The generates Message object
    """

    if data["sender_group"] is None:
        sender_group = None
    else:
        sender_group = contact_from_dict(data["sender_group"])
    return Message(
        data["message_title"],
        data["message_body"],
        contact_from_dict(data["receiver"]),
        contact_from_dict(data["sender"]),
        sender_group,
        data["timestamp"]
    )
项目:kudubot    作者:namboy94    | 项目源码 | 文件源码
def test_importing(self):
        """
        Tests the String import handler method

        :return: None
        """
        os_import = \
            self.config_handler.__handle_import_statement__("import os")
        dict_import = \
            self.config_handler.__handle_import_statement__(
                "from typing import Dict"
            )
        from typing import Dict

        self.assertEqual(os, os_import)
        self.assertEqual(dict_import, Dict)
项目:kudubot    作者:namboy94    | 项目源码 | 文件源码
def get_unsent_reminders(database: sqlite3.Connection) \
        -> List[Dict[str, str or int or datetime]]:
    """
    Retrieves all unsent reminders from the database

    :param database: The database to use
    :return: A list of dictionaries that contain the reminder information
    """

    results = database.execute(
        "SELECT reminder.id, reminder.msg_text, reminder.due_time, "
        "address_book.address, address_book.id, address_book.display_name "
        "FROM reminder "
        "JOIN address_book ON reminder.sender_id = address_book.id "
        "WHERE reminder.sent = 0")
    formatted_results = []
    for result in results:
        formatted_results.append({
            "id": result[0],
            "message": result[1],
            "due_time": convert_string_to_datetime(result[2]),
            "receiver": Contact(result[4], result[5], result[3])
        })
    return formatted_results
项目:kudubot    作者:namboy94    | 项目源码 | 文件源码
def thread_exists(thread: Dict[str, str or int], db: sqlite3.Connection) \
        -> bool:
    """
    Checks if a reddit discussion thread with the given thread parameters
    already exists

    :param thread: The thread to check for
    :param db: The database to check
    :return: True if the thread exists, false otherwise
    """
    result = db.execute(
        "SELECT * FROM anime_reminder_threads "
        "WHERE show_name=? AND episode=? AND thread=?",
        (thread["show_name"], thread["episode"], thread["url"])
    )
    return len(result.fetchall()) > 0


# noinspection SqlNoDataSourceInspection,SqlDialectInspection,SqlResolve
项目:kudubot    作者:namboy94    | 项目源码 | 文件源码
def define_language_text(self) -> Dict[str, Dict[str, str]]:
        """
        Defines the dictionary with which the text is translated
        in the translate() method.
        This should be in the form of a dictionary like this:

        { key: {"language": "text_in_language", ...}, ... }

        Keep in mind that every instance of the 'key'
        value is replaced while translating

        :return: The dictionary to create translations with
        """
        raise NotImplementedError

    # noinspection PyMethodMayBeStatic
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def send_alert_notification(settings: Dict, tmpl_args: Dict):
    attachment = {
        'fallback': settings['tmpl-msg'].render(**tmpl_args),
        'fields': [],
    }
    attachment['pretext'] = attachment['fallback']
    if settings['tmpl-duration']:
        attachment['fields'].append({
            'title': 'Duration',
            'value': settings['tmpl-duration'].render(**tmpl_args),
            'short': False,
        })
    if settings['tmpl-url']:
        attachment['fields'].append({
            'title': 'URL',
            'value': settings['tmpl-url'].render(**tmpl_args),
            'short': False,
        })
    await send_slack_notification(settings['webhook-url'], [attachment])
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def parse_settings(config: Any) -> Optional[Dict[str, Any]]:
    ret = {
        'webhook-url': config.get('slack-webhook-url'),
        'tmpl-msg': config.get('slack-tmpl-msg'),
        'tmpl-duration': config.get('slack-tmpl-duration', fallback=''),
        'tmpl-url': config.get('slack-tmpl-url', fallback='')
    }  # type: Any
    if not ret['webhook-url'] or not ret['tmpl-msg']:
        log.debug('Slack settings missing, no slack notifications will be sent', 'NOTIFICATIONS')
        ret = None
    else:
        log.debug('Valid slack notification settings found', 'NOTIFICATIONS')
        ret['tmpl-msg'] = jinja2.Template(ret['tmpl-msg'])
        if ret['tmpl-duration']:
            ret['tmpl-duration'] = jinja2.Template(ret['tmpl-duration'])
        if ret['tmpl-url']:
            ret['tmpl-url'] = jinja2.Template(ret['tmpl-url'])
    return ret
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def parse_settings(config: Any) -> Optional[Dict[str, Any]]:
    """Parse clicksend sms settings.

    Should only be called from sms.parse_settings.
    """
    ret = {
        'provider': 'clicksend',
        'username': config.get('sms-clicksend-username'),
        'api-key': config.get('sms-clicksend-api-key'),
        'sender': config.get('sms-clicksend-sender'),
        'tmpl': config.get('sms-tmpl'),
    }  # type: Any
    if not ret['username'] or not ret['api-key'] or not ['sender'] or not ['tmpl']:
        log.msg('SMS settings missing, no sms notifications will be sent', 'NOTIFICATIONS')
        ret = None
    else:
        log.debug('Valid SMS notification settings found', 'NOTIFICATIONS')
        ret['tmpl'] = jinja2.Template(ret['tmpl'])
    return ret
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def update_monitor_group(dbcon: DBConnection, monitor_group_id: int, data: Dict[str, Any]):
    """Update a monitor group in the database.

    Data is a dict with parent_id/name values that will be updated.
    """
    async def _run(cur: Cursor) -> None:
        for key, value in data.items():
            if key not in ['parent_id', 'name']:
                raise errors.IrisettError('invalid monitor_group key %s' % key)
            if key == 'parent_id' and value:
                if monitor_group_id == int(value):
                    raise errors.InvalidArguments('monitor group can\'t be its own parent')
                if not await monitor_group_exists(dbcon, value):
                    raise errors.InvalidArguments('parent monitor group does not exist')
            q = """update monitor_groups set %s=%%s where id=%%s""" % key
            q_args = (value, monitor_group_id)
            await cur.execute(q, q_args)

    await dbcon.transact(_run)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def require_dict(value: Optional[Dict[Any, Any]], key_type: Any=None, value_type: Any=None,
                 allow_none: bool=False) -> Any:
    """Make sure a value is a Dict[key_type, value_type].

    Used when dealing with http input data.
    """
    if value is None and allow_none:
        return value
    if type(value) != dict:
        raise InvalidData('value was %s(%s), expected dict' % (type(value), value))
    value = cast(Dict, value)
    if key_type or value_type:
        for k, v in value.items():
            if key_type and type(k) != key_type:
                raise InvalidData('dict key was %s(%s), expected %s' % (type(k), k, key_type))
            if value_type and type(v) != value_type:
                raise InvalidData('dict value was %s(%s), expected %s' % (type(v), v, key_type))
    return value
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def _get_monitor_metadata(self, dbcon: DBConnection) -> Optional[Dict[int, Dict[str, str]]]:
        include_metadata = require_bool(
            get_request_param(self.request, 'include_metadata', error_if_missing=False),
            convert=True) or False
        if not include_metadata:
            return None
        if 'id' in self.request.rel_url.query:
            metadata_models = await metadata.get_metadata_for_object(
                dbcon, 'active_monitor', require_int(cast(str, get_request_param(self.request, 'id'))))
        elif 'meta_key' in self.request.rel_url.query:
            meta_key = require_str(get_request_param(self.request, 'meta_key'))
            meta_value = require_str(get_request_param(self.request, 'meta_value'))
            metadata_models = await metadata.get_metadata_for_object_metadata(
                dbcon, meta_key, meta_value, 'active_monitor', 'active_monitors')
        elif 'monitor_group_id' in self.request.rel_url.query:
            metadata_models = await monitor_group.get_active_monitor_metadata_for_monitor_group(
                dbcon, require_int(cast(str, get_request_param(self.request, 'monitor_group_id'))))
        else:
            metadata_models = await metadata.get_metadata_for_object_type(dbcon, 'active_monitor')
        metadata_dict = {}  # type: Dict[int, Dict[str, str]]
        for metadata_model in metadata_models:
            if metadata_model.object_id not in metadata_dict:
                metadata_dict[metadata_model.object_id] = {}
            metadata_dict[metadata_model.object_id][metadata_model.key] = metadata_model.value
        return metadata_dict
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def __init__(self, config: Dict):
        self.config = config
        self.dbs = {}
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def select(self, query: str, values: Union[List, Dict],
                     db_name: str = 'default') -> List[DictRow]:
        return await self._select(query=query, values=values, db_name=db_name)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def first(self, query: str, values: Union[List, Dict],
                    db_name: str = 'default') -> Optional[DictRow]:
        return await self._first(query=query, values=values, db_name=db_name)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def insert(self, query: str, values: Union[List, Dict],
                     db_name: str = 'default', returning: bool = False):
        return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def update(self, query: str, values: Union[List, Dict],
                     db_name: str = 'default', returning: bool = False):
        return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _execute(self, query: str, values: Union[List, Dict], db_name: str = 'default',
                       returning: bool = False):
        pool = self.dbs[db_name]['master']
        if pool is None:
            raise RuntimeError('db {} master is not initialized'.format(db_name))

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                if returning:
                    return await cursor.fetchone()
                else:
                    return cursor.rowcount
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _select(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
        dbs = self.dbs[db_name]
        pool = dbs.get('slave') or dbs.get('master')
        if pool is None:
            raise RuntimeError('db {} master is not initialized'.format(db_name))

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                return await cursor.fetchall()
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _first(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
        dbs = self.dbs[db_name]
        pool = dbs.get('slave') or dbs.get('master')
        if pool is None:
            raise RuntimeError('db {} master is not initialized'.format(db_name))

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                return await cursor.fetchone()
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _create_request_data(request: Request) -> Dict:
        return {
            'url': str(request.url),
            'query_string': request.query_string,
            'method': request.method,
            'headers': dict(request.headers),
        }
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def assign_data(centroids: Sequence[Centroid], data: Iterable[Point]) -> Dict[Centroid, Sequence[Point]]:
    'Assign data the closest centroid'
    d : DefaultDict[Point, List[Point]] = defaultdict(list)
    for point in data:
        centroid: Point = min(centroids, key=partial(dist, point))
        d[centroid].append(point)
    return dict(d)
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def quality(labeled: Dict[Centroid, Sequence[Point]]) -> float:
    'Mean value of squared distances from data to its assigned centroid'
    return mean(dist(c, p) ** 2 for c, pts in labeled.items() for p in pts)
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def calc_rs_pos(self) -> Dict[str, float]:
        """Calculate the ratio of each pos of words in input text
        Returns:
            float: the ratio of each pos of words in input text
        """
        pos = []
        # TODO: It may take a long time when the number of sentences are large
        for sentence in self.sentences:
            juman_result = self.juman.analysis(sentence)
            pos += [mrph.hinsi for mrph in juman_result.mrph_list()]
        pos_counter = Counter(pos)
        total = sum(pos_counter.values())
        return {name: float(num) / total for name, num in pos_counter.items()}
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def setExtraHTTPHeaders(self, extraHTTPHeaders: Dict[str, str]
                                  ) -> None:
        """Set extra http headers."""
        self._extraHTTPHeaders = OrderedDict()
        headers = OrderedDict()  # type: Dict[str, str]
        for k, v in extraHTTPHeaders.items():
            self._extraHTTPHeaders[k] = v
            headers[k] = v
        await self._client.send('Network.setExtraHTTPHeaders',
                                {'headers': headers})
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def extraHTTPHeaders(self) -> Dict[str, str]:
        """Get extra http headers."""
        return dict(**self._extraHTTPHeaders)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def setExtraHTTPHeaders(self, headers: Dict[str, str]):
        """Set extra http headers."""
        return await self._networkManager.setExtraHTTPHeaders(headers)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def goto(self, url: str, options: dict = None, **kwargs: Any
                   ) -> Optional[Response]:
        """Got to url."""
        if options is None:
            options = dict()
        options.update(kwargs)
        watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors,
                                   options)
        responses: Dict[str, Response] = dict()
        listener = helper.addEventListener(
            self._networkManager, NetworkManager.Events.Response,
            lambda response: responses.__setitem__(response.url, response)
        )
        result = asyncio.ensure_future(watcher.waitForNavigation())
        referrer = self._networkManager.extraHTTPHeaders().get('referer', '')

        try:
            await self._client.send('Page.navigate',
                                    dict(url=url, referrer=referrer))
        except Exception:
            watcher.cancel()
            raise
        await result
        helper.removeEventListeners([listener])

        if self._frameManager.isMainFrameLoadingFailed():
            raise PageError('Failed to navigate: ' + url)
        return responses.get(self.url)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def addEventListener(emitter: EventEmitter, eventName: str, handler: Callable
                     ) -> Dict[str, Any]:
    """Add handler to the emitter and return emitter/handler."""
    emitter.on(eventName, handler)
    return {'emitter': emitter, 'eventName': eventName, 'handler': handler}