背景

推荐漏斗

推荐漏斗由上图召回、粗排、精排、重排、曝光级联构成,item量级从千万到个逐级递减,同时获得从用户行为开始的逐级反向隐式反馈。

为了简化问题,这里暂不考虑重排反馈,直接将行为反馈作为精排输入,但实际上是需要考虑的。

一般情况下,召回、粗排、精排可能是三个小组在各自优化,没有考虑联合建模,简单使用用户行为反馈数据进行各自模型训练,这里会存在以下问题:

  1. 离线用户行为反馈数据vs线上实际预估数据,数据量级分布存在gap,影响模型效果,尤其对于召回和粗排

  2. 没有考虑联合建模,先不表多模型资源和维护成本,没考虑下游模型和数据反馈,对于召回和粗排在最终目标对齐上可能也有gap

方案

单任务

  1. 模型维度

有一拖二、一拖三、蒸馏等各种玩法,这里列举2个实用有代表性的:

精排蒸馏召回(粗排)

精排蒸馏召回和粗排

  1. 样本维度

利用下游的级联反馈可以补充有价值的正负样本,以召回为例:

负样本:可以将随机负采样、粗排尾部、精排尾部、曝光负样本按不同置信权重构建几个等级负样本。

样本

正样本:类似地,可以将粗排头部、精排头部、曝光正样本按不同置信权重构建几个等级正样本。

多任务

类比单任务,可以对多任务中每个目标进行单任务联合建模,然后在漏斗的每一层执行和重排一致的多目标融合。

另一种思路是,只训练一个综合模型,不以某个特定任务为训练目标,召回、粗排以多目标融合后头部为正样本,尾部及随机负采样为负样本。

18年搞了半年大规模分布式深度学习框架研发,那时细节还很鲜活,本该一年前写此文,奈何懒是原罪😓。本文仅针对parameter server部分进行highlight小结,距工业界可用分布式学习框架还差很多组件(数据流,算子,优化器,failover,servering等),不在本文范围。具体细节需参考相关paper和代码,本文更多是个人时间线上的备忘。

问题规模

数据

按100亿instance算,每个样本平均100个key,每个key 20B,共计20TB。

特征

按100亿feature算,每个feature kv占内存50B,共计500GB。

模型

DNN部分存储可忽略。
按6层FC算,网络宽度分别512-256-128-128-128-1,一次前向计算需要20万次float乘法。

逻辑图

很好理解不展开,主要是异步。
ps-逻辑图

流程图

流程图如下图,可以抽象出以下模块:
异步离不开队列,这里抽象出channel,流转各种callback,类似go的channel;RpcServer作为生产者,customer thread group作为消费者,server端主要是request handler,worker端主要是response handler。
Table路由逻辑可以设计的比较简单,比如按余,这样可以取消proxy模块;线程组可以和table shard一一对应,可以简单无锁。

ps-流程图

ps-lite时序图

这里以开源ps-lite为例,基本也是上图的抽象范式,同时引入了scheduler进行集群状态管理。barrier前主要是集群的准备/同步,具体的pull/push训练逻辑可以参考例子进行定制。
ps-lite时序图

Actor-Critic算法是一种结合策略梯度policy gradient时序差分学习TD learning的强化学习方法。其中actor(演员)是指策略函数π(s, a),即学习一个策略来得到尽量高的回报;critic(评论员)是指值函数Vϕ(s),对当前策略的值函数进行估计,即评估actor的好坏。然后交替学习至收敛。借助于值函数,actor-critic 算法可以进行单步更新参数,不需要等到回合结合才进行更新。

数学推导

伪代码示例

1
2
3
4
5
6
7
8
9
10
class Critic(object):
def __init__(self, sess, n_features, lr=0.01):
"""
Critic网络结构,输出V(s),略
"""
with tf.variable_scope('squared_TD_error'):
self.td_error = self.r + GAMMA * self.v_next - self.v
self.loss = tf.square(self.td_error) # TD_error = (r+gamma*V_next) - V_eval
with tf.variable_scope('train'):
self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)
1
2
3
4
5
6
7
8
9
10
11
12
class Actor(object):
def __init__(self, sess, n_features, n_actions, lr=0.001):
"""
Actor网络结构,输出p(a|s),略
"""
with tf.variable_scope('exp_v'):
log_prob = tf.log(self.acts_prob[0, self.a])
self.exp_v = tf.reduce_mean(log_prob * self.td_error)
with tf.variable_scope('train'):
self.train_op = tf.train.AdamOptimizer(lr).minimize(-self.exp_v)#minimize(-exp_v) = maximize(exp_v)
def choose_action(self, s):
pass

应用场景

离线没有足够label数据时,无法进行监督学习,可考虑通过线上try-and-error,进行Actor-Critic强化学习,Reward恒正也ok。

背景与目的

本着知其然知其所以然的原则,对TensorFlow的核心特性进行探索,希望通过代码落地方式加强认知与备忘。toytensorflow是对TensorFlow python API模拟的玩具小轮子,包括DAG、惰性求值、链式法则、自动求偏导、前向/后项算法等特性。
github地址:https://github.com/nanjunxiao/toytensorflow

抽象四元素

1.operation
操作符、Variable、Constant、Placeholder统一抽象为operation。
2.graph
有向无环图DAG
3.session
会话,sess.run(op)才真正计算,惰性求值
4.optimizer
优化算子,比如梯度下降

实现notes

以linear regression为例,loss = reduce_mean(square(matmul(X,W)+b - Y) ),构建的DAG如下图所示,实线表示前向计算,虚线表示BP反向传播

1.operation:为了支持向量化表达,操作符包括matmul等矩阵操作及求导
2.graph:通过邻接链表构建DAG,singleton实现default_graph
3.session:sess.run(op)才真正计算,DFS递归求值
4.optimizer:目前只实现了GradientDescentOptimizer
5.sess.run(train_op)时启动BP反向传播,为避免重复计算,采用BFS实现链式求导

例子及效果

以linear regression为例,左图是TensorFlow结果,右图是toytensorflow结果,结果是一致的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import toytensorflow as tf
import numpy as np

#real data
np.random.seed(1) #for the same data
x_data = np.float32(np.random.rand(2,100) )
y_data = np.dot([0.1,0.2], x_data) + 0.3
#ops & DAG
W = tf.Variable([[0.0,0.0] ], name='weight')
b = tf.Variable(0.0, name='bias')
X = tf.placeholder(tf.float32)
Y = tf.placeholder(tf.float32)
predict = tf.matmul(W, X) + b

loss = tf.reduce_mean(tf.square(predict - Y) )
learning_rate = 0.5
optimizer = tf.train.GradientDescentOptimizer(learning_rate)
train_op = optimizer.minimize(loss)

init = tf.initialize_all_variables()
#run
feed_dict = {X:x_data, Y:y_data}
with tf.Session() as sess:
sess.run(init)
for epoch in range(30):
mse = sess.run(loss, feed_dict=feed_dict)
print 'epoch: {}, mse: {}, w: {}, b: {}'.format(epoch, mse, sess.run(W), sess.run(b))
sess.run(train_op, feed_dict=feed_dict)
w_value = sess.run(W, feed_dict=feed_dict)
b_value = sess.run(b, feed_dict=feed_dict)
print('W: {}, b: {}'.format(w_value, b_value) )

TODO

1.添加更多的操作符
2.添加更多的优化算法,比如momentum/adam/adagrad
3.丰富更多例子,比如logisticregression/mlp

  纸上得来终觉浅,光看明白还是不够的,常用算法还是要亲自推导下印象才深刻。请忽略我丑陋的字迹,好久没写过字了-_-!

Logistic Regression and Softmax Regression

    LR and Softmax

SVM

    SVM

Bernoulli and Multinomial Naive Bayes

    NB

BP神经网络

    BP

Expectation-Maximization and Gaussian Mixture Models

    EM and GMM

Hidden Markov Model

    HMM

Latent Dirichlet Allocation

  详见Topic Model-LDA理论篇

GBRT

  详见GBM之GBRT总结

To be continued…

  Session Track是TREC的一个子任务,目标是根据用户和搜索引擎的交互Session信息提高检索质量。当时抱陈博大腿一起做的这个任务,学到了不少东西。

数据

  Session Track提供了4种测试数据RL1-RL4,其中每一种都比前一种多一个信息。RL1只有最后需要查询的query词组,RL2相比RL1多了前几次session的query词组,RL3相比RL2多了前几次session的部分query结果,RL4相比RL3多了点击文档和停留时间信息。下面是2011年的部分数据示例。
        RL1
        RL2
        RL3
        RL4

  Session track目标是想检验利用query的增量prior信息是否能持续提高检索效果,理想情况应该是效果RL4>RL3>RL2>RL1,其实这也应该是客观结果。

我们的做法

  针对RL1没有什么附加信息,只能直接利用检索系统拉结果,这也是最终的召回结果了,后面只是在此基础上进行重排序而已。这里我们使用的是实验室的检索系统,说实话在这个任务上效果不太好。

  RL2由于多了prior的query信息,所以我们可以参考查询扩展重构query,将所有的query term按照exp权重加权累加,得到一个新的带权重的query term list,然后和RL1结果计算BM25重排序。

  RL3由于多了部分结果,我们可以基于此构建虚拟文档,利用该虚拟文档和RL1结果计算cosine重排序。

  RL4多了点击和停留时间信息,这个没有太好的想法,参考了BUPT的方法,通过文档和被点击文档的相似度换算出停留时间,再重排序。

  接下来我们采用pairwise的ranking svm融合上面的各种得分来学习重排序模型,利用2011年的relevance judgments结果数据进行训练,将模型应用到2012年数据输出score进行排序。2011结果数据中label包含-2/0/1/2/3,分布代表spam/not relevant/relevant/high relevant/navigational。

  下面是比较重头的特征了,这里我们没有采用标题/url长度等raw feature,使用的是各种similarity等high level feature,如下图所示
        

  最终我们提交了3组结果,每组包括RL1-RL4四个结果,具体方法和特征如下表所示:
        

结果

  从下图可知RL4最好NDCG@10结果为0.2857,比直接通过检索系统返回结果0.1586提高了80.14%,可见ranking svm能利用prior session信息,很大程度的提高检索质量。直接检索结果比较差,如果该结果提高,再加入标题/url长度等各种raw feature,效果应该还可以提高。
        

Ranking SVM

  Ranking svm是一种pairwise的排序算法,它将排序问题转化为一个分类问题,在给定query下,如果文档d1比d2更相关,我们把pair<d1,d2>作为一个新的正样本,否则作为一个负样本。

  将排序问题转化为分类问题后,学习原理同svm。Ranking svm目标是学习一个排序函数f(x)=w*x+b,如果xi比xj更相关,我们希望f(xi)>f(xj),转换为文档对表示就是<w,xi-xj> > 0。ranking svm的优化目标和svm类似,差别只在于svm是单点,ranking svm是pair:
        

  其中w为参数向量, x为文档的特征,y为文档对之间的相对相关性, ξ为松弛变量,m为pair个数。

  产生pair代码如下图所示
            

关于正负pair效果相同没有影响,这个问题其实很简单,可以这样理解:考虑pair< xi,xj>,约束条件(yi-yj)*w*(xi-xj) >= 1

  1. 正例xi > xj, yi-yj==1. –> 1*w*(xi-xj) >= 1
  2. 负例xi < xj, yi-yj==-1. –> -1*w*(xi-xj) >= 1 等价于 1*w*(xj-xi) >=1,这就相当于把负例反转变为正例(SVM样本没有Ranking SVM这种对称性,由于bias b),所以整个训练过程中全部使用正例(负例反转为正例),全部使用负例,正负混合都是等价的。

使用obj=hinge loss + regularization同理可证。

系统架构

  软件设计界有一句很经典的话:“大部分的耦合问题,都可以通过在中间加一层解决”。

  参照MVC,可以把消息分析系统大致归结为接入层、数据层、分析层、服务层。其中数据层主要是各种冷热数据的存储及检索的构建,服务层对应view,主要是前端的展示,分析层对应各种后台离线、在线分析。
        

数据流

  利用消息队列和Nosql数据库两大部件,我们能很快地把分析系统搭起来,利用消息队列作为pipeline,各种业务流程能很快跑通,而且scale-out。

  消息队列可以选择kafka、rabbitmq等,Nosql可以选择mongodb、hbase等,具体还要根据实际业务来决定。

  一种简单数据流示意如下图所示:
        
  其中直接从消息队列接入数据可以进行类似online的实时分析,比如分类、打tag等。从Nosql接入数据主要进行离线分析,比如聚类(当然可以做成online)、历史数据统计、用户影响力训练/预测等。

  这里给出我们一个实际业务的具体数据流,可在此基础上进行各种插拔伸缩。其中消息队里中数据采用pull方式主动获取,db_writer除了入Nosql外,还负责生成全局唯一ID。
  

部分性能指标

  Mongo2.6读写压力测试,单台机器可承受两亿条消息,消息平均10K。

  Mongo3.0增加了压缩,存储空间是mongo2.6的20%-40%。

  一般分布式消息队列可以把千兆带宽跑满,按消息平均20K计算,每秒可吞吐6000+条。

  百科有很多新添加和待完善词条,需要将这些词条推送给有能力有意愿的用户进行编辑完善。这是个推荐问题,这里我大致说下我想到的方法,欢迎吐槽探讨~

数据准备

  1. 640万词条内容清洗/抽取/分词

  2. Session日志中筛选600万高质量用户,基于编辑浏览词条BOW构建用户profile

  3. 构建各种词典:比如User->list< Item>,Item->list< User>等。

  4. 构建倒排:根据Item->list< term>,User->list< term>正排,构建倒排term->list< Item>,term->list< User>

  为了表示方便,下面User/Item/term(切词后的词)分别缩写为U/I/t.

关联关系

  该方法比较直接简单,词条I下面有同义词列表,利用同义词的历史编辑用户U,可以为I推荐编辑用户U。

  该方法示意图如下,最终得到的是I->list< U>结果,需要转置为U->list< I>供前端展示使用。
  

  该方法有局限性,当词条没有同义词或者很少时,召回会很低,一般把结果用来和其他方法结果进行融合后使用。

ItemCF

  ItemCF首先找到词条A的相似词条B,然后将编辑过B的用户U作为结果推荐给A。

  这里强调下,由于词条有很丰富的文本信息,这里计算相似度并没有采用传统的利用用户的行为记录为向量的方式,而是直接利用文本的BOW计算cosine相似度。下面的UserCF同,采用user的profile计算相似度。

  该方法示意图如下,最终得到的是I->list< U>结果,需要转置为U->list< I>供前端展示使用。
  

  这里需要计算Item[i][j]的相似度稀疏矩阵,首先拉取t->list< I>倒排,比如t->list< I1,I2,I5>,这时I1/I2/I5两两之间对应的Item[i][j]分别加1。当t->list< I>都处理完了,Item[i][j]表就打完了,这就是Item之间cosine相似度的分子部分–笛卡尔积。分母部分L1 Norm可以提前计算好,附在I->list< t>的最后。这样相比两两Item的list< t>直接计算cosine相似度,计算量大大降低了,原因在于很多Item之间根本没有term交集,根本没必要浪费时间在这些相似度为0的Item之间。

  当Item量不是很大时,上面方法可以单机完成,但当Item量很大时,尽管Item[][]是稀疏矩阵,单机也是存储不下的,这时就需要分治了。举例来说假如需要计算I1和其他Item的相似度,我们可以只存储原来稀疏矩阵的一行Item[],通过I1->list< t1,t2>拉取t->list< I>倒排,

  假如t1->list< I1,I2,I3>,t2->list< I1,I2,I5>,可以得到I1和I2笛卡尔积为2,I1和I3/I1和I5笛卡尔积均为1,这样I1和其他Item的相似度向量Item[]就都求出来了。同样计算量相比O(N)也大大减少了。

  得到Item之间的相似度后,给词条A推荐最相似topK词条的历史编辑用户。用户u对词条i的兴趣为P(u,i)=sum( cosine(i,j) ),其中j为用户u编辑过的词条和topK相似词条的交集。

UserCF

  UserCF将用户A的相似用户B编辑过的词条I,作为结果推荐给A。

  该方法示意图如下,得到的就是U->list< I>最终结果,可以直接用来展示。
  

  由于User量很大,参照ItemCF存储User[][]稀疏矩阵是不可行的,直接采用分治来填充User[]向量。举例来说假如需要计算U1和其他User的相似度,通过U1->list< t1,t2>拉取t->list< U>倒排,假如t1->list< U1,U2,U3>,t2->list< U1,U2,U5>,可以得到U1和U2笛卡尔积为2,U1和U3/U1和U5笛卡尔积均为1,这样U1和其他User的相似度向量User[]就更新完了。同样计算量相比O(N)也大大减少了。

  得到User之间的相似度后,给用户A推荐最相似topK用户的历史编辑词条。用户u对词条i的兴趣为P(u,i)=sum( cosine(u,v) ),其中v为编辑过词条i的用户和topK相似用户的交集。

  UserCF相比ItemCF效果相差不大,但由于用户量大于词条量,用户相似度矩阵计算量大于词条相似度矩阵,所以采用ItemCF性能会好一些。

Content-based

  由于User和Item都有文本描述,所以可以直接计算用户和词条的内容相似度,选取topK词条进行推荐,输出就是U->list< I>最终结果。

  同样的可以利用t->list< U>,t->list< I>倒排计算有term交集的用户和词条相似度,减小计算量。

小结

  上面只是大致思路,距真正页面展示还有很长一段路要优化。比如有的用户推荐词条多有的少,如何平均?不基于内容而是利用用户行为记录效果会怎样?推荐词条以何种方式展示对用户最友好?如何冷启动,直接random推送最需要完善的词条?