小编典典

Python中的概率分布

algorithm

我有一堆钥匙,每个钥匙都有一个不太相似的变量。我想随机选择这些键之一,但是我希望它比不太可能(更可能)的对象更不可能选择不太可能(键,值)。我想知道您是否有任何建议,最好是可以使用的现有python模块,否则我需要自己做。

我已经检查了随机模块;它似乎没有提供此功能。

我必须对1000个不同的对象集(每一个对象包含2455个对象)进行数百万次选择。每个集合将彼此交换对象,因此随机选择器需要动态。有1000套2,433个对象,即24.33亿个对象;低内存消耗至关重要。而且由于这些选择不是算法的主要内容,因此我需要此过程要相当快;CPU时间是有限的。

谢谢

更新:

好的,我试图明智地考虑您的建议,但是时间太有限了…

我研究了二进制搜索树方法,它似乎太冒险了(复杂而复杂)。其他建议都类似于ActiveState配方。我进行了一些修改,以期提高效率:

def windex(dict, sum, max):
    '''an attempt to make a random.choose() function that makes
    weighted choices accepts a dictionary with the item_key and
    certainty_value as a pair like:
    >>> x = [('one', 20), ('two', 2), ('three', 50)], the
    maximum certainty value (max) and the sum of all certainties.'''
    n = random.uniform(0, 1)
    sum = max*len(list)-sum 
    for key, certainty in dict.iteritems():
        weight = float(max-certainty)/sum
        if n < weight:
            break
        n = n - weight
    return key

我希望通过动态保持确定性总和和最大确定性来获得效率提高。欢迎任何其他建议。你们为我节省了很多时间和精力,同时提高了我的效率,这太疯狂了。谢谢!谢谢!谢谢!

更新2:

我决定通过一次选择更多选择来提高效率。由于它是动态的,因此这将导致我的算法中可接受的精度损失。无论如何,这就是我现在所拥有的:

def weightedChoices(dict, sum, max, choices=10):
    '''an attempt to make a random.choose() function that makes
    weighted choices accepts a dictionary with the item_key and
    certainty_value as a pair like:
    >>> x = [('one', 20), ('two', 2), ('three', 50)], the
    maximum certainty value (max) and the sum of all certainties.'''
    list = [random.uniform(0, 1) for i in range(choices)]
    (n, list) = relavate(list.sort())
    keys = []
    sum = max*len(list)-sum 
    for key, certainty in dict.iteritems():
        weight = float(max-certainty)/sum
        if n < weight:
            keys.append(key)
            if list: (n, list) = relavate(list)
            else: break
        n = n - weight
    return keys
def relavate(list):
    min = list[0]
    new = [l - min for l in list[1:]]
    return (min, new)

我还没有尝试过。如果您有任何意见/建议,请不要犹豫。谢谢!

更新3:

我整天都在研究任务定制版本的Rex
Logan的答案。它实际上是一个特殊的字典类,而不是2个对象和权重数组。由于Rex的代码生成了一个随机索引,这使事情变得非常复杂…我还编写了一个测试用例,类似于我的算法将要发生的事情(但是直到尝试我才能真正知道!)。基本原理是:密钥越是经常随机生成,则再次生成的可能性就越小:

import random, time
import psyco
psyco.full()

class ProbDict():
    """
    Modified version of Rex Logans RandomObject class. The more a key is randomly
    chosen, the more unlikely it will further be randomly chosen. 
    """
    def __init__(self,keys_weights_values={}):
        self._kw=keys_weights_values
        self._keys=self._kw.keys()
        self._len=len(self._keys)
        self._findSeniors()
        self._effort = 0.15
        self._fails = 0
    def __iter__(self):
        return self.next()
    def __getitem__(self, key):
        return self._kw[key]
    def __setitem__(self, key, value):
        self.append(key, value)
    def __len__(self):
        return self._len
    def next(self):
        key=self._key()
        while key:
            yield key
            key = self._key()
    def __contains__(self, key):
        return key in self._kw
    def items(self):
        return self._kw.items()
    def pop(self, key):  
        try:
            (w, value) = self._kw.pop(key)
            self._len -=1
            if w == self._seniorW:
                self._seniors -= 1
                if not self._seniors:
                    #costly but unlikely:
                    self._findSeniors()
            return [w, value]
        except KeyError:
            return None
    def popitem(self):
        return self.pop(self._key())
    def values(self):
        values = []
        for key in self._keys:
            try:
                values.append(self._kw[key][1])
            except KeyError:
                pass
        return values
    def weights(self):
        weights = []
        for key in self._keys:
            try:
                weights.append(self._kw[key][0])
            except KeyError:
                pass
        return weights
    def keys(self, imperfect=False):
        if imperfect: return self._keys
        return self._kw.keys()
    def append(self, key, value=None):
        if key not in self._kw:
            self._len +=1
            self._kw[key] = [0, value]
            self._keys.append(key)
        else:
            self._kw[key][1]=value
    def _key(self):
        for i in range(int(self._effort*self._len)):
            ri=random.randint(0,self._len-1) #choose a random object
            rx=random.uniform(0,self._seniorW)
            rkey = self._keys[ri]
            try:
                w = self._kw[rkey][0]
                if rx >= w: # test to see if that is the value we want
                    w += 1
                    self._warnSeniors(w)
                    self._kw[rkey][0] = w
                    return rkey
            except KeyError:
                self._keys.pop(ri)
        # if you do not find one after 100 tries then just get a random one
        self._fails += 1 #for confirming effectiveness only
        for key in self._keys:
            if key in self._kw:
                w = self._kw[key][0] + 1
                self._warnSeniors(w)
                self._kw[key][0] = w
                return key
        return None
    def _findSeniors(self):
        '''this function finds the seniors, counts them and assess their age. It
        is costly but unlikely.'''
        seniorW = 0
        seniors = 0
        for w in self._kw.itervalues():
            if w >= seniorW:
                if w == seniorW:
                    seniors += 1
                else:
                    seniorsW = w
                    seniors = 1
        self._seniors = seniors
        self._seniorW = seniorW
    def _warnSeniors(self, w):
        #a weight can only be incremented...good
        if w >= self._seniorW:
            if w == self._seniorW:
                self._seniors+=1
            else:
                self._seniors = 1
                self._seniorW = w
def test():
    #test code
    iterations = 200000
    size = 2500
    nextkey = size


    pd = ProbDict(dict([(i,[0,i]) for i in xrange(size)]))
    start = time.clock()
    for i in xrange(iterations):
        key=pd._key()
        w=pd[key][0]
        if random.randint(0,1+pd._seniorW-w):
            #the heavier the object, the more unlikely it will be removed
            pd.pop(key)
        probAppend = float(500+(size-len(pd)))/1000
        if random.uniform(0,1) < probAppend:
            nextkey+=1
            pd.append(nextkey)
    print (time.clock()-start)*1000/iterations, "msecs / iteration with", pd._fails, "failures /", iterations, "iterations"
    weights = pd.weights()
    weights.sort()
    print "avg weight:", float(sum(weights))/pd._len, max(weights), pd._seniorW, pd._seniors, len(pd), len(weights)
    print weights
test()

仍然欢迎任何评论。@Darius:您的二叉树对我来说太复杂了;而且我认为它的叶子不能有效去除…全部


阅读 251

收藏
2020-07-28

共1个答案

小编典典

此activestate配方提供了一种易于遵循的方法,特别是注释中不需要您预先标准化权重的版本:

import random

def weighted_choice(items):
    """items is a list of tuples in the form (item, weight)"""
    weight_total = sum((item[1] for item in items))
    n = random.uniform(0, weight_total)
    for item, weight in items:
        if n < weight:
            return item
        n = n - weight
    return item

如果您有很多项目,这将很慢。在这种情况下,二进制搜索可能会更好……但是编写起来也会更复杂,如果样本量较小,则收获很小。
如果您要遵循这条路线,这是python中二进制搜索方法的示例

(我建议对数据集上的这两种方法进行一些快速性能测试。这种算法的不同方法的性能通常有点不直观。)


编辑: 因为我很好奇,所以我接受了自己的建议,并做了一些测试。

我比较了四种方法:

上面的weighted_choice函数。

二进制搜索选择函数如下所示:

def weighted_choice_bisect(items):
    added_weights = []
    last_sum = 0

    for item, weight in items:
        last_sum += weight
        added_weights.append(last_sum)

    return items[bisect.bisect(added_weights, random.random() * last_sum)][0]

1的编译版本:

def weighted_choice_compile(items):
    """returns a function that fetches a random item from items

    items is a list of tuples in the form (item, weight)"""
    weight_total = sum((item[1] for item in items))
    def choice(uniform = random.uniform):
        n = uniform(0, weight_total)
        for item, weight in items:
            if n < weight:
                return item
            n = n - weight
        return item
    return choice

2的编译版本:

def weighted_choice_bisect_compile(items):
    """Returns a function that makes a weighted random choice from items."""
    added_weights = []
    last_sum = 0

    for item, weight in items:
        last_sum += weight
        added_weights.append(last_sum)

    def choice(rnd=random.random, bis=bisect.bisect):
        return items[bis(added_weights, rnd() * last_sum)][0]
    return choice

然后,我建立了一个很大的选择列表,如下所示:

choices = [(random.choice("abcdefg"), random.uniform(0,50)) for i in xrange(2500)]

还有一个过于简单的分析功能:

def profiler(f, n, *args, **kwargs):
    start = time.time()
    for i in xrange(n):
        f(*args, **kwargs)
    return time.time() - start

结果:

(第二次调用该函数需要1,000次。)

  • 简单未编译:0.918624162674
  • 二进制未编译:1.01497793198
  • 简单编译:0.287325024605
  • 二进制编译:0.00327413797379

“已编译”结果包括一次编译选择函数所花费的平均时间。(我为1,000次编译计时,然后将该时间除以1,000,然后将结果加到选择函数时间。)

因此:如果您有一个项目+权重的列表,它们很少变化,那么二进制编译方法是 迄今为止 最快的。

2020-07-28