目录
自动缩减:”abaci.dag.datasize.per.reduce”: “20000000”, # 20m
default_concurrency:在create_pileline的时候设置
自动缩减只能调小不能调大。所以如果default concurrency比较小,就不会调了。这个default concurrcency最好是设置成比预估稍微大一点的并发,如果设置的太大,会影响dce shuffle的性能。
也就是说
split_size = total_size / datasize_per_reduce
tasks = 0
if split_size > default_concurrency:
tasks = default_concurrency
else:
tasks = split_size
如果后面还有执行慢的问题的话。可以设置下cpu_profile,这样那里计算耗时可以通过pprof显示出来
pipeline = base.Pipeline.create(*****, cpu_profile=True)
看dce-writer-xxxxxxxxxxxxxxx文件,出现这句就挂了。。。
FATAL [writer.cc:188] 0113 17:27:47.340183 1342 | CHECK failed: key.length() + value.length() < max_length: Too Big Data.Crashing...
目前外部词典的加载方式主要有3种:
方法1. 如果词典文件极小(不超过10M)直接使用Python的闭包功能
方法2. 如果词典文件较大(10M < 词典大小 < 1G),可以使用bigflow提供的lazy_var模块在远端加载词典。
# coding: utf-8
from bigflow import base, lazy_var
"""
使用建议:
以下demo中的my_lazy_var是一个全局变量,实际上my_lazy_var可以在全局和局部使用。
我们建议最好在局部使用。
如果多个函数依赖my_lazy_var,可以适当修改业务代码,封装在类中,
my_lazy_var作为类的一个成员,在类的所有成员函数中以self.my_var_lazy使用,
效果几乎等价于全局.
"""
"""
mydict.txt
key1 value1
key2 value2
key3 value3
"""
def load_dict(path):
data = dict()
with open(path, "r") as f:
for line in f:
(key, value) = line.split()
data[key] = value
return data
my_lazy_var = lazy_var.declare(lambda: load_dict("./mydict.txt"))
def get_value(key):
# 获取lazy_var内容
my_dict = my_lazy_var.get()
return my_dict.get(key)
def main():
my_dict = my_lazy_var.get()
# 可以在本地直接获取lazy_var内容
# ["value1", "value2", "value3"]
print my_dict.values()
pipeline = base.Pipeline.create("local")
pipeline.add_file("./mydict.txt", "./mydict.txt")
keys = pipeline.parallelize(["key1", "key2", "key3", "key4"])
# 可以在transforms中获取lazy_var内容
values = keys.map(get_value)
# ["value1", "value2", "value3", None]
print values.get()
if __name__ == "__main__":
main()
另外,如果自定义的类型(python的class)会出现bad marshal等错误时,可以用lazyvar来搞
import my_lazy_var
from brand_tagger import BrandTagger
def init_tagger():
"""
"""
tagger = BrandTagger()
fname_model = "./third/model/model"
fname_bigram = "./third/model/bigram.dat"
tagger.set_fnames(fname_model, fname_bigram)
return tagger
def do_tag(line, tagger):
"""
"""
if not tagger.has_loaded():
ret = tagger.load_model()
csid = line[0]
userid = line[1]
real_rawpicurl = line[2]
encoded_data = line[3]
raw_width = line[4]
raw_height = line[5]
descs = line[6]
picurl = line[7]
srcs = line[8]
ext_json = line[9]
to_cmp_descs_str = line[10]
search_word = line[11]
search_word_list = search_word.split(";")
formatted_res_list = []
for item in search_word_list:
sub_item = item.split(":")
word = sub_item[0]
prob = sub_item[1]
brands = tagger.do_tag(word)
brand_tag_res = word
for brand in brands:
brand_tag_res = brand_tag_res.replace(brand, "{brand}")
formatted_res_list.append(":".join([brand_tag_res, prob]))
formatted_res = ";".join(formatted_res_list)
res_list = line[: -1] + [formatted_res]
return res_list
_pipeline.add_file(workspace_path + "./opt-feeds-image/image-tagging/postprocess/my_lazy_var.py", "my_lazy_var.py")
tagger = my_lazy_var.declare(init_tagger)
format_brand_res = filter_res\
.map(lambda x: do_tag(x, tagger.get()))\
.map(lambda x: "\t".join(x))