Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
Y
ydl_ai_recommender
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
闫发泽
ydl_ai_recommender
Commits
1cc4e6be
Commit
1cc4e6be
authored
May 18, 2023
by
王金柱
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
精排序
parent
487789ef
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
64 additions
and
177 deletions
+64
-177
requirements.txt
requirements.txt
+2
-0
recommender.py
src/core/recommender.py
+3
-170
rank_service.py
src/service/rank_service.py
+57
-4
recommend_service.py
src/service/recommend_service.py
+2
-3
No files found.
requirements.txt
View file @
1cc4e6be
...
@@ -3,6 +3,8 @@ DBUtils
...
@@ -3,6 +3,8 @@ DBUtils
pandas
pandas
numpy
numpy
openpyxl
openpyxl
xgboost
requests
tornado
==6.2
tornado
==6.2
...
...
src/core/recommender.py
View file @
1cc4e6be
...
@@ -26,17 +26,6 @@ from ydl_ai_recommender.src.utils.log import create_logger
...
@@ -26,17 +26,6 @@ from ydl_ai_recommender.src.utils.log import create_logger
logger
=
create_logger
(
__name__
,
'recommender.log'
,
is_rotating
=
True
)
logger
=
create_logger
(
__name__
,
'recommender.log'
,
is_rotating
=
True
)
def
cost_time
(
desc
):
def
the_func_cost_time
(
func
):
def
fun
(
*
args
,
**
kwargs
):
t
=
time
.
perf_counter
()
result
=
func
(
*
args
,
**
kwargs
)
logger
.
info
(
'函数:{},{} 耗时:{} ms'
.
format
(
str
(
func
.
__name__
),
desc
,
round
((
time
.
perf_counter
()
-
t
)
*
1000
,
2
)))
return
result
return
fun
return
the_func_cost_time
class
Recommender
:
class
Recommender
:
def
__init__
(
self
)
->
None
:
def
__init__
(
self
)
->
None
:
...
@@ -256,163 +245,8 @@ class ItemCFRecommender(Recommender):
...
@@ -256,163 +245,8 @@ class ItemCFRecommender(Recommender):
counselors
=
counselors
[:
size
]
counselors
=
counselors
[:
size
]
return
counselors
return
counselors
class
RecommendByXgboost
(
Recommender
):
def
__init__
(
self
,
top_n
=
5
,
k
=
20
,
is_use_db
=
True
,
u2c
=
'combination'
,
c2c
=
None
)
->
None
:
super
()
.
__init__
()
config
=
configparser
.
RawConfigParser
()
config
.
read
(
get_conf_path
())
self
.
dmp_url
=
config
.
get
(
'DMP'
,
'url'
)
select_items
=
[
'uid'
,
'ffrom_login'
,
'user_login_city'
,
'user_preference_cate'
]
self
.
select_fields
=
{
k
:
True
for
k
in
select_items
}
self
.
user_encoder_convert
=
read_user_encoder_dict
()
self
.
all_counselors
=
read_counselors
()
#self.recommender = UserCFRecommender(top_n=top_n, k=k, u2c=u2c)
self
.
params
=
{
'n_estimators'
:
150
,
'max_depth'
:
7
,
'min_child_weight'
:
5
,
'gamma'
:
0
,
'subsample'
:
0.9
,
'colsample_bytree'
:
0.5
,
'reg_alpha'
:
0
,
'reg_lambda'
:
1
,
'learning_rate'
:
0.1
,
'max_delta_step'
:
0
,
'scale_pos_weight'
:
1
}
self
.
model
=
xgb
.
XGBClassifier
(
objective
=
'binary:logistic'
,
nthread
=-
1
,
**
self
.
params
)
self
.
model
.
load_model
(
os
.
path
.
join
(
get_project_path
(),
'model_data/xgb_model.bin'
))
@cost_time
(
desc
=
'召回咨询师'
)
def
recall_data
(
self
,
user_id
,
size
=
0
,
is_merge
=
True
):
return
self
.
recommender
.
recommend
(
user_id
,
size
=
size
,
is_merge
=
True
)
@cost_time
(
desc
=
'模型推荐整个流程'
)
def
recommend
(
self
,
user_id
,
size
=
0
,
is_merge
=
True
):
user_profile
=
self
.
get_user_profile
(
user_id
)
if
not
user_profile
:
return
self
.
_recommend_top
(
size
)
# recommend_result = self.recall_data(user_id, size=size, is_merge=True)
recommend_result
=
None
data_time
=
time
.
time
()
predit_data
=
self
.
trans_feature_data
(
user_id
,
user_profile
,
recommend_result
)
doctor_ids
=
predit_data
.
pop
(
'doctor_id'
)
doctor_ids
=
doctor_ids
.
to_numpy
()
pre_time
=
time
.
time
()
predit_result
=
self
.
model
.
predict_proba
(
predit_data
)[:,
1
]
self
.
logger
.
info
(
'predit_time:{}ms'
.
format
(
int
((
time
.
time
()
-
pre_time
)
*
1000
)))
result_dict
=
dict
(
zip
(
doctor_ids
,
predit_result
))
result_dict
=
sorted
(
result_dict
.
items
(),
key
=
lambda
x
:
x
[
1
],
reverse
=
True
)
recommend_data
=
[{
'counselor'
:
c_id
,
'score'
:
round
(
float
(
proba
),
4
),
'from'
:
'similar_users {}'
.
format
(
user_id
),
}
for
(
c_id
,
proba
)
in
result_dict
[
0
:
50
]]
return
recommend_data
@cost_time
(
desc
=
''
)
def
trans_feature_data
(
self
,
user_id
,
user_profile
,
counselor_data
):
user_feature_data
=
self
.
trans_user_feature_data
(
user_id
,
user_profile
)
counselor_feature_data
=
self
.
trans_counselor_feature_data
(
counselor_data
)
counselor_num
=
len
(
counselor_feature_data
)
user_feature_data_dataframe
=
pd
.
DataFrame
([
user_feature_data
]
*
counselor_num
,
columns
=
[
'ffrom_login_encoder'
\
,
'user_login_city_encoder'
,
'cate_id_1_encoder'
,
'cate_id_2_encoder'
,
'cate_id_3_encoder'
\
,
'cate_id_4_encoder'
,
'cate_id_5_encoder'
])
predit_feature_data
=
pd
.
concat
([
user_feature_data_dataframe
,
counselor_feature_data
],
axis
=
1
)
return
predit_feature_data
def
trans_user_feature_data
(
self
,
user_id
,
user_profile
):
if
not
user_profile
:
user_profile
=
self
.
get_user_profile
(
user_id
)
from_login_encoder
=
self
.
get_encoder_from_dict
(
'ffrom_login'
,
user_profile
[
'ffrom_login'
])
user_login_city_encoder
=
self
.
get_encoder_from_dict
(
'user_login_city'
,
user_profile
[
'user_login_city'
])
user_preference_cate
=
user_profile
[
'user_preference_cate'
]
user_preference_cate_top_5_encoder
=
self
.
process_user_preference_cate
(
user_preference_cate
)
user_feature_data
=
[
from_login_encoder
,
user_login_city_encoder
]
user_feature_data
.
extend
(
user_preference_cate_top_5_encoder
)
return
user_feature_data
def
trans_counselor_feature_data
(
self
,
counselor_data
):
# counselor_ids = [str(item['counselor']) for item in counselor_data]
# counselor_profiles = self.all_counselors[self.all_counselors['doctor_id'].isin(counselor_ids)].reset_index(drop=True)
# return counselor_profiles
return
self
.
all_counselors
@cost_time
(
desc
=
'获取用户画像'
)
def
get_user_profile
(
self
,
user_id
):
if
user_id
==
'0'
:
return
[]
headers
=
{
'X-App-Id'
:
'plough_cloud'
,
'Content-Type'
:
'application/json'
}
payload
=
{
"filter"
:
{
"uid"
:
user_id
,
},
"fields"
:
self
.
select_fields
,
"limit"
:
10
}
try
:
get_profile_time
=
time
.
time
()
response
=
requests
.
request
(
'POST'
,
self
.
dmp_url
,
headers
=
headers
,
json
=
payload
)
resp
=
response
.
json
()
return
resp
[
'data'
][
'objects'
][
0
]
except
Exception
as
e
:
self
.
logger
.
error
(
'获取用户画像数据失败:
%
s'
,
e
,
exc_info
=
True
)
try
:
self
.
logger
.
exception
(
'response json data
%
s'
,
resp
)
except
:
pass
return
[]
def
process_user_preference_cate
(
self
,
preference_cate
):
result
=
[
0
,
0
,
0
,
0
,
0
]
ids
=
[]
if
isinstance
(
preference_cate
,
str
):
pref_data
=
json
.
loads
(
preference_cate
)
for
info
in
pref_data
:
ids
.
append
(
info
[
'cate_id'
])
ids
=
ids
[
0
:
min
(
5
,
len
(
ids
))]
for
ind
,
val
in
enumerate
(
ids
):
result
[
ind
]
=
val
encoder_result
=
[]
for
ind
,
val
in
enumerate
(
result
):
value_convert_dict
=
self
.
user_encoder_convert
.
get
(
'cate_id_{}_encoder'
.
format
(
ind
+
1
))
if
value_convert_dict
is
not
None
:
encoder_result
.
append
(
value_convert_dict
.
get
(
val
,
0
))
if
len
(
encoder_result
)
<
5
:
encoder_result
.
extend
([
0
]
*
(
5
-
len
(
encoder_result
)))
return
encoder_result
def
get_encoder_from_dict
(
self
,
feature_name
,
feature_value
):
value_convert_dict
=
self
.
user_encoder_convert
.
get
(
'{}_encoder'
.
format
(
feature_name
))
if
value_convert_dict
is
None
:
return
0
return
value_convert_dict
.
get
(
str
(
feature_value
),
0
)
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
# s_time = time.time()
# recommender1 = UserCFRecommender()
# recommender1.recommend('30004410')
# print('all cost time: {}'.format(time.time() - s_time), recommender1.recommend('12047'))
print
()
print
()
s_time
=
time
.
time
()
s_time
=
time
.
time
()
recommender
=
RecommendByXgboost
()
recommender1
=
UserCFRecommender
()
recommender
.
recommend
(
'3251227'
)
recommender1
.
recommend
(
'30004410'
)
print
(
'all cost time: '
.
format
(
time
.
time
()
-
s_time
),
recommender
.
recommend
(
'12047'
))
print
(
'all cost time: {}'
.
format
(
time
.
time
()
-
s_time
),
recommender1
.
recommend
(
'12047'
))
# print()
# print()
# s_time = time.time()
# recommender.recommend('37298')
# print('all cost time: '.format(time.time() - s_time), recommender.recommend('12047'))
\ No newline at end of file
src/service/rank_service.py
View file @
1cc4e6be
...
@@ -11,11 +11,12 @@ from tornado.concurrent import run_on_executor
...
@@ -11,11 +11,12 @@ from tornado.concurrent import run_on_executor
from
ydl_ai_recommender.src.utils.log
import
create_logger
from
ydl_ai_recommender.src.utils.log
import
create_logger
from
ydl_ai_recommender.src.core.rank.ranker
import
Ranker
from
ydl_ai_recommender.src.core.rank.ranker
import
Ranker
from
ydl_ai_recommender.src.core.rank.ranker
import
RankByXGB
logger
=
create_logger
(
__name__
,
'rank_service.log'
,
is_rotating
=
True
)
logger
=
create_logger
(
__name__
,
'rank_service.log'
,
is_rotating
=
True
)
ranker
=
Ranker
()
ranker
=
Ranker
()
rankByXgb
=
RankByXGB
()
class
RankHandler
(
tornado
.
web
.
RequestHandler
):
class
RankHandler
(
tornado
.
web
.
RequestHandler
):
executor
=
ThreadPoolExecutor
(
1
)
executor
=
ThreadPoolExecutor
(
1
)
...
@@ -69,12 +70,64 @@ class RankHandler(tornado.web.RequestHandler):
...
@@ -69,12 +70,64 @@ class RankHandler(tornado.web.RequestHandler):
return
ret
return
ret
if
__name__
==
'__main__'
:
class
RankByXgboostHandler
(
tornado
.
web
.
RequestHandler
):
executor
=
ThreadPoolExecutor
(
1
)
@tornado.gen.coroutine
def
get
(
self
):
uid
=
self
.
get_argument
(
'uid'
,
None
)
if
uid
is
None
:
logger
.
warn
(
'请求参数不正确,无uid'
)
counselors
=
self
.
get_argument
(
'counselors'
,
''
)
ret
=
yield
self
.
run
(
uid
,
counselors
)
self
.
write
(
ret
)
@tornado.gen.coroutine
def
post
(
self
):
param
=
json
.
loads
(
self
.
request
.
body
.
decode
(
'utf-8'
))
uid
=
param
.
get
(
'uid'
,
None
)
counselors
=
param
.
get
(
'counselors'
,
''
)
if
uid
is
None
:
logger
.
warn
(
'请求参数不正确,无uid'
)
ret
=
yield
self
.
run
(
uid
,
counselors
)
self
.
write
(
ret
)
tornado
.
options
.
define
(
'port'
,
default
=
8868
,
type
=
int
,
help
=
'服务启动的端口号'
)
@run_on_executor
def
run
(
self
,
uid
,
counselors
=
''
):
logger
.
info
(
'request@@uid=
%
s@@counselors=
%
s'
,
uid
,
counselors
)
try
:
rank_result
=
rankByXgb
.
rank
(
uid
,
counselors
)
ret
=
{
'status'
:
'success'
,
'code'
:
0
,
'data'
:
rank_result
,
'total_count'
:
len
(
rank_result
),
}
except
Exception
as
e
:
logger
.
error
(
'执行精排报错'
,
exc_info
=
True
)
ret
=
{
'status'
:
'error'
,
'code'
:
1
,
'data'
:
[],
'total_count'
:
0
,
}
ret_str
=
json
.
dumps
(
ret
,
ensure_ascii
=
False
)
logger
.
info
(
'response@@uid=
%
s@@counselors=
%
s@@ret=
%
s'
,
uid
,
counselors
,
ret_str
)
return
ret
if
__name__
==
'__main__'
:
tornado
.
options
.
define
(
'port'
,
default
=
8868
,
type
=
int
,
help
=
'服务启动的端口号'
)
tornado
.
options
.
parse_command_line
()
tornado
.
options
.
parse_command_line
()
app
=
tornado
.
web
.
Application
(
handlers
=
[(
r'/ai_counselor_rank'
,
RankHandler
)],
autoreload
=
False
,
debug
=
False
)
app
=
tornado
.
web
.
Application
(
handlers
=
[(
r'/ai_counselor_rank'
,
RankHandler
),
(
r'/ai_counselor_rank/xgb/v1'
,
RankByXgboostHandler
)],
autoreload
=
False
,
debug
=
False
)
http_server
=
tornado
.
httpserver
.
HTTPServer
(
app
)
http_server
=
tornado
.
httpserver
.
HTTPServer
(
app
)
http_server
.
listen
(
tornado
.
options
.
options
.
port
)
http_server
.
listen
(
tornado
.
options
.
options
.
port
)
tornado
.
ioloop
.
IOLoop
.
instance
()
.
start
()
tornado
.
ioloop
.
IOLoop
.
instance
()
.
start
()
src/service/recommend_service.py
View file @
1cc4e6be
...
@@ -138,9 +138,8 @@ if __name__ == '__main__':
...
@@ -138,9 +138,8 @@ if __name__ == '__main__':
tornado
.
options
.
define
(
'port'
,
default
=
8868
,
type
=
int
,
help
=
'服务启动的端口号'
)
tornado
.
options
.
define
(
'port'
,
default
=
8868
,
type
=
int
,
help
=
'服务启动的端口号'
)
tornado
.
options
.
parse_command_line
()
tornado
.
options
.
parse_command_line
()
app
=
tornado
.
web
.
Application
(
handlers
=
[(
r'/ai_counselor_recommend'
,
RecommendHandler
),
app
=
tornado
.
web
.
Application
(
handlers
=
[(
r'/ai_counselor_recommend'
,
RecommendHandler
),
]
(
r'/ai_counselor_recommend/xgb/v1'
,
RecommendXgbHandler
)]
,
autoreload
=
False
,
debug
=
False
)
,
autoreload
=
True
,
debug
=
False
)
http_server
=
tornado
.
httpserver
.
HTTPServer
(
app
)
http_server
=
tornado
.
httpserver
.
HTTPServer
(
app
)
http_server
.
listen
(
tornado
.
options
.
options
.
port
)
http_server
.
listen
(
tornado
.
options
.
options
.
port
)
tornado
.
ioloop
.
IOLoop
.
instance
()
.
start
()
tornado
.
ioloop
.
IOLoop
.
instance
()
.
start
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment