使用库pybloom_live
from pybloom_live import ScalableBloomFilter,BloomFilter # 可自动伸缩的布隆过滤器 bloom = ScalableBloomFilter(initial_capacity=100,error_rate=0.001) # 添加内容 bloom.add('daqi') print('daqi'in bloom) # 定长的布隆过滤器 bloom1 = BloomFilter(capacity=10000) bloom1.add('daqi') print('daqi'in bloom1)
手动实现一个简单的布隆过滤器
使用bitarray实现,将初始数组置为0,根据hash计算出节点置为1,同时写了一个生成随机码的函数用于测试。
import random import mmh3 from bitarray import bitarray import os.path import re # bitarray长度 BIT_SIZE = 50000 class BloomFilter(): def __init__(self): bit_array = bitarray(BIT_SIZE) bit_array.setall(0) self.bit_array = bit_array self.bit_size = self.length() def get_points(self, url): """ 生成需要插入的位置 :param url: :return:节点的列表 """ point_list = [] for i in range(7): point = mmh3.hash(url,30+i) % self.bit_size point_list.append(point) return point_list def add(self, url): """ 添加url到bitarray中 :param url: :return: """ res = self.bitarray_expand() points = self.get_points(url) try: for point in points: self.bit_array[point] = 1 return '注册完成!' except Exception as e: return e def contains(self,url): """ 验证url是否存在 :param url: :return:True or False """ points = self.get_points(url) # 在bitarray中查找对应的点,如果有一个点值为0就说明该url不存在 for p in points: if self.bit_array[p] == 0: return False return True def count(self): """ 获取bitarrray中使用的节点数 :return: bitarray长度 """ return self.bit_array.count() def length(self): """ 获取bitarray的长度 :return:bitarray的长度 """ return len(self.bit_array) def bitarray_expand(self): """ 扩充bitarray长度 :return:bitarray的长度或使用率,布隆过滤器的bitarray的使用最好不要超过50%,这样误判率低一些 """ isusespace = round(int(self.count()) / int(self.length()),4) if 0.50 < isusespace: # 新建bitarray expand_bitarray = bitarray(BIT_SIZE) expand_bitarray.setall(0) # 增加新建的bitarray self.bit_array = self.bit_array + expand_bitarray self.bit_size = self.length() return self.bit_size else: return f'长度尚可,{round(isusespace * 100,2)}%' def get_captcha(): """ 生成用于测试的随机码 :return: """ seed = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' captcha = "" for i in range(10): captcha += random.choice(seed) print(captcha) return captcha if __name__ == '__main__': bloom = BloomFilter() for i in range(100000): bloom.add(f'www.{get_captcha()}.com') print(bloom.length()) print(bloom.count()) print(bloom.count())