Project 'chaipengfei/ydl_ai_recommender' was moved to 'yanfaze/ydl_ai_recommender'. Please update any links and bookmarks that may still have the old path.
Commit db437f1c by 柴鹏飞

Merge branch 'YDL-REC-XGB' into 'master'

Ydl rec xgb

See merge request chaipengfei/ydl_ai_recommender!2
parents 3d58e296 33d575d5
# -*- coding:utf-8 -*-
import time
import requests
import os
import json
import configparser
import xgboost as xgb
import pandas as pd
import numpy as np
from ydl_ai_recommender.src.core.profile import encode_user_profile
from ydl_ai_recommender.src.utils import get_conf_path, get_data_path, read_user_encoder_dict, read_counselors, get_project_path
from ydl_ai_recommender.src.utils.log import create_logger
logger = create_logger(__name__, 'rankByXgb.log')
def cost_time(desc):
def the_func_cost_time(func):
def fun(*args,**kwargs):
t = time.perf_counter()
result = func(*args, **kwargs)
logger.info('函数:{},{} 耗时:{} ms'.format(str(func.__name__), desc, round((time.perf_counter()-t)*1000, 2)))
return result
return fun
return the_func_cost_time
class RankByXGB:
def __init__(self, is_use_db=True) -> None:
self.logger = create_logger(__name__, 'rankByXgb.log')
config = configparser.RawConfigParser()
config.read(get_conf_path())
self.dmp_url = config.get('DMP', 'url')
select_items = ['uid', 'ffrom_login', 'user_login_city', 'user_preference_cate']
self.select_fields = {k: True for k in select_items}
self.user_encoder_convert = read_user_encoder_dict()
self.all_counselors = read_counselors()
self.params = {'n_estimators': 150, 'max_depth': 7, 'min_child_weight': 5, 'gamma': 0, 'subsample': 0.9,
'colsample_bytree': 0.5, 'reg_alpha': 0, 'reg_lambda': 1, 'learning_rate': 0.1,
'max_delta_step': 0,
'scale_pos_weight': 1}
self.model = xgb.XGBClassifier(objective='binary:logistic', nthread=-1, **self.params)
self.model.load_model(os.path.join(get_project_path(), 'model_data/xgb_model.bin'))
@cost_time(desc='模型推荐整个流程')
def rank(self, user_id, counselors):
if not counselors or not(counselors.strip()):
return []
counselors = counselors.strip()
counselors = counselors.split(',')
user_profile = self.get_user_profile(user_id)
if not user_profile:
ret = []
for counselor_id in counselors:
ret.append({
'counselor': str(counselor_id),
'score': 0.0,
})
return ret
predit_data = self.trans_feature_data(user_id, user_profile, counselors)
doctor_ids = predit_data.pop('doctor_id')
doctor_ids = doctor_ids.to_numpy()
pre_time = time.time()
predit_result = self.model.predict_proba(predit_data)[:, 1]
self.logger.info('predit_time:{}ms'.format(int((time.time()-pre_time)*1000)))
result_dict = dict(zip(doctor_ids, predit_result))
result_dict = sorted(result_dict.items(), key=lambda x:x[1], reverse=True)
recommend_data = [{
'counselor': str(c_id),
'score': round(float(proba), 4),
} for (c_id, proba) in result_dict]
return recommend_data
@cost_time(desc='')
def trans_feature_data(self, user_id, user_profile, counselors):
user_feature_data = self.trans_user_feature_data(user_id, user_profile)
counselor_feature_data = self.trans_counselor_feature_data(counselors)
counselor_num = len(counselor_feature_data)
user_feature_data_dataframe = pd.DataFrame([user_feature_data]*counselor_num, columns=['ffrom_login_encoder'\
, 'user_login_city_encoder', 'cate_id_1_encoder', 'cate_id_2_encoder', 'cate_id_3_encoder'\
, 'cate_id_4_encoder', 'cate_id_5_encoder'])
predit_feature_data = pd.concat([user_feature_data_dataframe, counselor_feature_data], axis=1)
return predit_feature_data
def trans_user_feature_data(self, user_id, user_profile):
if not user_profile:
user_profile = self.get_user_profile(user_id)
from_login_encoder = self.get_encoder_from_dict('ffrom_login', user_profile['ffrom_login'])
user_login_city_encoder = self.get_encoder_from_dict('user_login_city', user_profile['user_login_city'])
user_preference_cate = user_profile['user_preference_cate']
user_preference_cate_top_5_encoder = self.process_user_preference_cate(user_preference_cate)
user_feature_data = [from_login_encoder, user_login_city_encoder]
user_feature_data.extend(user_preference_cate_top_5_encoder)
return user_feature_data
def trans_counselor_feature_data(self, counselor_ids):
counselor_profiles = self.all_counselors[self.all_counselors['doctor_id'].isin(counselor_ids)].reset_index(drop=True)
return counselor_profiles
@cost_time(desc='获取用户画像')
def get_user_profile(self, user_id):
if user_id == '0':
return []
headers = {
'X-App-Id': 'plough_cloud',
'Content-Type': 'application/json'
}
payload = {
"filter": {
"uid": user_id,
},
"fields": self.select_fields,
"limit": 10
}
try:
get_profile_time = time.time()
response = requests.request('POST', self.dmp_url, headers=headers, json=payload)
resp = response.json()
return resp['data']['objects'][0]
except Exception as e:
self.logger.error('获取用户画像数据失败: %s', e, exc_info=True)
try:
self.logger.exception('response json data %s', resp)
except:
pass
return []
def process_user_preference_cate(self, preference_cate):
result = [0, 0, 0, 0, 0]
ids = []
if isinstance(preference_cate, str):
pref_data = json.loads(preference_cate)
for info in pref_data:
ids.append(info['cate_id'])
ids = ids[0:min(5, len(ids))]
for ind, val in enumerate(ids):
result[ind] = val
encoder_result = []
for ind, val in enumerate(result):
value_convert_dict = self.user_encoder_convert.get('cate_id_{}_encoder'.format(ind+1))
if value_convert_dict is not None:
encoder_result.append(value_convert_dict.get(val, 0))
if len(encoder_result)<5:
encoder_result.extend([0]*(5-len(encoder_result)))
return encoder_result
def get_encoder_from_dict(self, feature_name, feature_value):
value_convert_dict = self.user_encoder_convert.get('{}_encoder'.format(feature_name))
if value_convert_dict is None:
return 0
return value_convert_dict.get(str(feature_value), 0)
if __name__ == '__main__':
ranker = RankByXGB()
print(ranker.rank('12047', '37298,21144,12019,13010,11038,2830'))
print(ranker.rank('0', '37298,21144,12019,13010,11038,2830'))
\ No newline at end of file
......@@ -26,17 +26,6 @@ from ydl_ai_recommender.src.utils.log import create_logger
logger = create_logger(__name__, 'recommender.log', is_rotating=True)
def cost_time(desc):
def the_func_cost_time(func):
def fun(*args,**kwargs):
t = time.perf_counter()
result = func(*args, **kwargs)
logger.info('函数:{},{} 耗时:{} ms'.format(str(func.__name__), desc, round((time.perf_counter()-t)*1000, 2)))
return result
return fun
return the_func_cost_time
class Recommender:
def __init__(self) -> None:
......@@ -256,163 +245,8 @@ class ItemCFRecommender(Recommender):
counselors = counselors[:size]
return counselors
class RecommendByXgboost(Recommender):
def __init__(self, top_n=5, k=20, is_use_db=True, u2c='combination', c2c=None) -> None:
super().__init__()
config = configparser.RawConfigParser()
config.read(get_conf_path())
self.dmp_url = config.get('DMP', 'url')
select_items = ['uid', 'ffrom_login', 'user_login_city', 'user_preference_cate']
self.select_fields = {k: True for k in select_items}
self.user_encoder_convert = read_user_encoder_dict()
self.all_counselors = read_counselors()
#self.recommender = UserCFRecommender(top_n=top_n, k=k, u2c=u2c)
self.params = {'n_estimators': 150, 'max_depth': 7, 'min_child_weight': 5, 'gamma': 0, 'subsample': 0.9,
'colsample_bytree': 0.5, 'reg_alpha': 0, 'reg_lambda': 1, 'learning_rate': 0.1,
'max_delta_step': 0,
'scale_pos_weight': 1}
self.model = xgb.XGBClassifier(objective='binary:logistic', nthread=-1, **self.params)
self.model.load_model(os.path.join(get_project_path(), 'model_data/xgb_model.bin'))
@cost_time(desc='召回咨询师')
def recall_data(self, user_id, size=0, is_merge=True):
return self.recommender.recommend(user_id, size=size, is_merge=True)
@cost_time(desc='模型推荐整个流程')
def recommend(self, user_id, size=0, is_merge=True):
user_profile = self.get_user_profile(user_id)
if not user_profile:
return self._recommend_top(size)
# recommend_result = self.recall_data(user_id, size=size, is_merge=True)
recommend_result = None
data_time = time.time()
predit_data = self.trans_feature_data(user_id, user_profile, recommend_result)
doctor_ids = predit_data.pop('doctor_id')
doctor_ids = doctor_ids.to_numpy()
pre_time = time.time()
predit_result = self.model.predict_proba(predit_data)[:, 1]
self.logger.info('predit_time:{}ms'.format(int((time.time()-pre_time)*1000)))
result_dict = dict(zip(doctor_ids, predit_result))
result_dict = sorted(result_dict.items(), key=lambda x:x[1], reverse=True)
recommend_data = [{
'counselor': c_id,
'score': round(float(proba), 4),
'from': 'similar_users {}'.format(user_id),
} for (c_id, proba) in result_dict[0:50]]
return recommend_data
@cost_time(desc='')
def trans_feature_data(self, user_id, user_profile, counselor_data):
user_feature_data = self.trans_user_feature_data(user_id, user_profile)
counselor_feature_data = self.trans_counselor_feature_data(counselor_data)
counselor_num = len(counselor_feature_data)
user_feature_data_dataframe = pd.DataFrame([user_feature_data]*counselor_num, columns=['ffrom_login_encoder'\
, 'user_login_city_encoder', 'cate_id_1_encoder', 'cate_id_2_encoder', 'cate_id_3_encoder'\
, 'cate_id_4_encoder', 'cate_id_5_encoder'])
predit_feature_data = pd.concat([user_feature_data_dataframe, counselor_feature_data], axis=1)
return predit_feature_data
def trans_user_feature_data(self, user_id, user_profile):
if not user_profile:
user_profile = self.get_user_profile(user_id)
from_login_encoder = self.get_encoder_from_dict('ffrom_login', user_profile['ffrom_login'])
user_login_city_encoder = self.get_encoder_from_dict('user_login_city', user_profile['user_login_city'])
user_preference_cate = user_profile['user_preference_cate']
user_preference_cate_top_5_encoder = self.process_user_preference_cate(user_preference_cate)
user_feature_data = [from_login_encoder, user_login_city_encoder]
user_feature_data.extend(user_preference_cate_top_5_encoder)
return user_feature_data
def trans_counselor_feature_data(self, counselor_data):
# counselor_ids = [str(item['counselor']) for item in counselor_data]
# counselor_profiles = self.all_counselors[self.all_counselors['doctor_id'].isin(counselor_ids)].reset_index(drop=True)
# return counselor_profiles
return self.all_counselors
@cost_time(desc='获取用户画像')
def get_user_profile(self, user_id):
if user_id == '0':
return []
headers = {
'X-App-Id': 'plough_cloud',
'Content-Type': 'application/json'
}
payload = {
"filter": {
"uid": user_id,
},
"fields": self.select_fields,
"limit": 10
}
try:
get_profile_time = time.time()
response = requests.request('POST', self.dmp_url, headers=headers, json=payload)
resp = response.json()
return resp['data']['objects'][0]
except Exception as e:
self.logger.error('获取用户画像数据失败: %s', e, exc_info=True)
try:
self.logger.exception('response json data %s', resp)
except:
pass
return []
def process_user_preference_cate(self, preference_cate):
result = [0, 0, 0, 0, 0]
ids = []
if isinstance(preference_cate, str):
pref_data = json.loads(preference_cate)
for info in pref_data:
ids.append(info['cate_id'])
ids = ids[0:min(5, len(ids))]
for ind, val in enumerate(ids):
result[ind] = val
encoder_result = []
for ind, val in enumerate(result):
value_convert_dict = self.user_encoder_convert.get('cate_id_{}_encoder'.format(ind+1))
if value_convert_dict is not None:
encoder_result.append(value_convert_dict.get(val, 0))
if len(encoder_result)<5:
encoder_result.extend([0]*(5-len(encoder_result)))
return encoder_result
def get_encoder_from_dict(self, feature_name, feature_value):
value_convert_dict = self.user_encoder_convert.get('{}_encoder'.format(feature_name))
if value_convert_dict is None:
return 0
return value_convert_dict.get(str(feature_value), 0)
if __name__ == '__main__':
# s_time = time.time()
# recommender1 = UserCFRecommender()
# recommender1.recommend('30004410')
# print('all cost time: {}'.format(time.time() - s_time), recommender1.recommend('12047'))
print()
print()
s_time = time.time()
recommender = RecommendByXgboost()
recommender.recommend('3251227')
print('all cost time: '.format(time.time()-s_time), recommender.recommend('12047'))
# print()
# print()
# s_time = time.time()
# recommender.recommend('37298')
# print('all cost time: '.format(time.time() - s_time), recommender.recommend('12047'))
\ No newline at end of file
recommender1 = UserCFRecommender()
recommender1.recommend('30004410')
print('all cost time: {}'.format(time.time() - s_time), recommender1.recommend('12047'))
......@@ -11,11 +11,12 @@ from tornado.concurrent import run_on_executor
from ydl_ai_recommender.src.utils.log import create_logger
from ydl_ai_recommender.src.core.rank.ranker import Ranker
from ydl_ai_recommender.src.core.rank.rankByXgb import RankByXGB
logger = create_logger(__name__, 'rank_service.log', is_rotating=True)
ranker = Ranker()
rankByXgb = RankByXGB()
class RankHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(1)
......@@ -69,12 +70,64 @@ class RankHandler(tornado.web.RequestHandler):
return ret
if __name__ == '__main__':
class RankByXgboostHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(1)
@tornado.gen.coroutine
def get(self):
uid = self.get_argument('uid', None)
if uid is None:
logger.warn('请求参数不正确,无uid')
counselors = self.get_argument('counselors', '')
ret = yield self.run(uid, counselors)
self.write(ret)
@tornado.gen.coroutine
def post(self):
param = json.loads(self.request.body.decode('utf-8'))
uid = param.get('uid', None)
counselors = param.get('counselors', '')
if uid is None:
logger.warn('请求参数不正确,无uid')
ret = yield self.run(uid, counselors)
self.write(ret)
tornado.options.define('port', default=8868, type=int, help='服务启动的端口号')
@run_on_executor
def run(self, uid, counselors=''):
logger.info('request@@uid=%s@@counselors=%s', uid, counselors)
try:
rank_result = rankByXgb.rank(uid, counselors)
ret = {
'status': 'success',
'code': 0,
'data': rank_result,
'total_count': len(rank_result),
}
except Exception as e:
logger.error('执行精排报错', exc_info=True)
ret = {
'status': 'error',
'code': 1,
'data': [],
'total_count': 0,
}
ret_str = json.dumps(ret, ensure_ascii=False)
logger.info('response@@uid=%s@@counselors=%s@@ret=%s', uid, counselors, ret_str)
return ret
if __name__ == '__main__':
tornado.options.define('port', default=8868, type=int, help='服务启动的端口号')
tornado.options.parse_command_line()
app = tornado.web.Application(handlers=[(r'/ai_counselor_rank', RankHandler)], autoreload=False, debug=False)
app = tornado.web.Application(handlers=[(r'/ai_counselor_rank', RankHandler),
(r'/ai_counselor_rank/xgb/v1', RankByXgboostHandler)],
autoreload=False, debug=False)
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(tornado.options.options.port)
tornado.ioloop.IOLoop.instance().start()
......@@ -13,12 +13,9 @@ from tornado.concurrent import run_on_executor
from ydl_ai_recommender.src.utils.log import create_logger
from ydl_ai_recommender.src.core.recommender import UserCFRecommender
from ydl_ai_recommender.src.core.recommender import RecommendByXgboost
logger = create_logger(__name__, 'service.log', is_rotating=True)
recommender = UserCFRecommender(top_n=2, k=50, u2c='order')
recommenderByXgb = RecommendByXgboost()
class RecommendHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(1)
......@@ -76,71 +73,13 @@ class RecommendHandler(tornado.web.RequestHandler):
logger.info('response@@uid=%s@@ret=%s', uid, ret_str)
return ret
class RecommendXgbHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(1)
@tornado.gen.coroutine
def get(self):
uid = self.get_argument('uid', None)
if uid is None:
logger.warn('请求参数不正确,无uid')
size = self.get_argument('size', 100)
try:
size = int(size)
except Exception as e:
logger.warn('size=%s 不是数字', size)
size = 100
ret = yield self.run(uid, size)
self.write(ret)
@tornado.gen.coroutine
def post(self):
param = json.loads(self.request.body.decode('utf-8'))
uid = param.get('uid', None)
size = param.get('size', 100)
if uid is None:
logger.warn('请求参数不正确,无uid')
ret = yield self.run(uid, size)
self.write(ret)
@run_on_executor
def run(self, uid, size=100):
logger.info('request@@uid=%s@@size=%s', uid, size)
try:
start_time = time.time()
recommend_result = recommenderByXgb.recommend(uid, size=size, is_merge=True)
logger.info('request@@uid=%s@@size=%s, cost %s ms', uid, size, (time.time()-start_time)*1000)
ret = {
'status': 'success',
'code': 0,
'data': recommend_result,
'total_count': len(recommend_result),
}
except Exception as e:
logger.error('执行推荐函数报错', exc_info=True)
ret = {
'status': 'error',
'code': 1,
'data': [],
'total_count': 0,
}
ret_str = json.dumps(ret, ensure_ascii=False)
logger.info('response@@uid=%s@@ret=%s', uid, ret_str)
return ret
if __name__ == '__main__':
tornado.options.define('port', default=8868, type=int, help='服务启动的端口号')
tornado.options.parse_command_line()
app = tornado.web.Application(handlers=[(r'/ai_counselor_recommend', RecommendHandler),
(r'/ai_counselor_recommend/xgb/v1', RecommendXgbHandler)]
, autoreload=True, debug=False)
app = tornado.web.Application(handlers=[(r'/ai_counselor_recommend', RecommendHandler), ]
, autoreload=False, debug=False)
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(tornado.options.options.port)
tornado.ioloop.IOLoop.instance().start()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment