Commit ca1fb0b4 by 柴鹏飞

数据库连接改为连接池

parent 9f715802
...@@ -39,7 +39,7 @@ class Indexer(): ...@@ -39,7 +39,7 @@ class Indexer():
index = json.load(f) index = json.load(f)
self.index_data = index self.index_data = index
def index(self, q='', count=0) -> List[Tuple[str, float]]: def index(self, q='', count=-1) -> List[Tuple[str, float]]:
""" """
返回值类型:[[相似id, score], [相似id, score], ...] 返回值类型:[[相似id, score], [相似id, score], ...]
""" """
...@@ -47,7 +47,7 @@ class Indexer(): ...@@ -47,7 +47,7 @@ class Indexer():
self.logger.error('未加载索引数据,使用`index`函数之前,确认对应已执行执行 `load_index_data()` 方法') self.logger.error('未加载索引数据,使用`index`函数之前,确认对应已执行执行 `load_index_data()` 方法')
raise raise
if count == 0: if count < 0:
return self.index_data.get(q, []) return self.index_data.get(q, [])
else: else:
return self.index_data.get(q, [])[:count] return self.index_data.get(q, [])[:count]
...@@ -307,14 +307,14 @@ class CounselorCounselorCFIndexer(Indexer): ...@@ -307,14 +307,14 @@ class CounselorCounselorCFIndexer(Indexer):
counselor_user_dict = self.load_pair_by_order() counselor_user_dict = self.load_pair_by_order()
self.logger.info('基于订单的[用户->咨询师]数据加载完成') self.logger.info('基于订单的[用户->咨询师]数据加载完成')
_counselor_user_dict2 = self.load_pair_by_chat() # _counselor_user_dict2 = self.load_pair_by_chat()
self.logger.info('基于询单的[用户->咨询师]数据加载完成') # self.logger.info('基于询单的[用户->咨询师]数据加载完成')
for key, val in _counselor_user_dict2.items(): # for key, val in _counselor_user_dict2.items():
if key not in counselor_user_dict: # if key not in counselor_user_dict:
counselor_user_dict[key] = val # counselor_user_dict[key] = val
else: # else:
counselor_user_dict[key].update(val) # counselor_user_dict[key].update(val)
self.logger.info('数据合并完成,共有咨询师 %s', len(counselor_user_dict)) self.logger.info('数据合并完成,共有咨询师 %s', len(counselor_user_dict))
index = {} index = {}
...@@ -344,17 +344,17 @@ class CounselorCounselorCFIndexer(Indexer): ...@@ -344,17 +344,17 @@ class CounselorCounselorCFIndexer(Indexer):
if __name__ == '__main__': if __name__ == '__main__':
indexer = UserCounselorDefaultIndexer() # indexer = UserCounselorDefaultIndexer()
indexer.make_index() # indexer.make_index()
indexer = UserCounselorOrderIndexer() # indexer = UserCounselorOrderIndexer()
indexer.make_index() # indexer.make_index()
indexer = UserCounselorChatIndexer() # indexer = UserCounselorChatIndexer()
indexer.make_index() # indexer.make_index()
indexer = UserCounselorCombinationIndexer() # indexer = UserCounselorCombinationIndexer()
indexer.make_index() # indexer.make_index()
indexer = CounselorCounselorCFIndexer() indexer = CounselorCounselorCFIndexer()
indexer.make_index() indexer.make_index()
\ No newline at end of file
...@@ -15,7 +15,7 @@ from ydl_ai_recommender.src.core.indexer import ( ...@@ -15,7 +15,7 @@ from ydl_ai_recommender.src.core.indexer import (
CounselorCounselorCFIndexer, CounselorCounselorCFIndexer,
) )
from ydl_ai_recommender.src.core.profile import encode_profile from ydl_ai_recommender.src.core.profile import encode_profile
from ydl_ai_recommender.src.data.mysql_client import MySQLClient from ydl_ai_recommender.src.data.mysql_client import MySQLClientPool
from ydl_ai_recommender.src.utils import get_conf_path, get_data_path from ydl_ai_recommender.src.utils import get_conf_path, get_data_path
from ydl_ai_recommender.src.utils.log import create_logger from ydl_ai_recommender.src.utils.log import create_logger
...@@ -23,10 +23,19 @@ from ydl_ai_recommender.src.utils.log import create_logger ...@@ -23,10 +23,19 @@ from ydl_ai_recommender.src.utils.log import create_logger
class Recommender(): class Recommender():
def __init__(self) -> None: def __init__(self) -> None:
pass self.logger = create_logger(__name__, 'recommender.log')
self.default_indexer = UserCounselorDefaultIndexer(self.logger)
self.default_indexer.load_index_data()
def _recommend_top(self, size=0):
return [{
'counselor': str(c_id),
'score': score,
'from': 'default',
} for [c_id, score] in self.default_indexer.index(size)]
def recommend(self, user) -> List: def recommend(self, user, size=100) -> List:
raise NotImplementedError raise self._recommend_top(size)
class UserCFRecommender(Recommender): class UserCFRecommender(Recommender):
...@@ -48,13 +57,10 @@ class UserCFRecommender(Recommender): ...@@ -48,13 +57,10 @@ class UserCFRecommender(Recommender):
self.logger = create_logger(__name__, 'recommender.log') self.logger = create_logger(__name__, 'recommender.log')
if is_use_db: if is_use_db:
self.client = MySQLClient.create_from_config_file(get_conf_path()) self.client = MySQLClientPool.create_from_config_file(get_conf_path())
else: else:
self.logger.warn('未连接数据库') self.logger.warn('未连接数据库')
self.default_indexer = UserCounselorDefaultIndexer(self.logger)
self.default_indexer.load_index_data()
if u2c == 'chat': if u2c == 'chat':
self.indexer = UserCounselorChatIndexer(self.logger) self.indexer = UserCounselorChatIndexer(self.logger)
elif u2c == 'order': elif u2c == 'order':
...@@ -91,6 +97,9 @@ class UserCFRecommender(Recommender): ...@@ -91,6 +97,9 @@ class UserCFRecommender(Recommender):
def get_user_profile(self, user_id): def get_user_profile(self, user_id):
if user_id == '0':
return []
sql = 'SELECT * FROM ads.ads_register_user_profiles' sql = 'SELECT * FROM ads.ads_register_user_profiles'
sql += ' WHERE uid={}'.format(user_id) sql += ' WHERE uid={}'.format(user_id)
try: try:
...@@ -103,15 +112,6 @@ class UserCFRecommender(Recommender): ...@@ -103,15 +112,6 @@ class UserCFRecommender(Recommender):
return [] return []
def _recommend_top(self, size=100):
return [{
'counselor': str(c_id),
'score': score,
'from': 'default',
} for [c_id, score] in self.default_indexer.index(size)]
def _recommend(self, user_embedding): def _recommend(self, user_embedding):
D, I = self.index.search(np.array([user_embedding]), self.k) D, I = self.index.search(np.array([user_embedding]), self.k)
counselors = [] counselors = []
...@@ -132,7 +132,8 @@ class UserCFRecommender(Recommender): ...@@ -132,7 +132,8 @@ class UserCFRecommender(Recommender):
supplement_data.extend([{ supplement_data.extend([{
'counselor': sc_id, 'counselor': sc_id,
'score': ro['score'] * score, 'score': ro['score'] * score,
'from': '{} supplement {}'.format(ro['from'], ro['counselor']), # 'score': (ro['score'] + score) / 2,
'from': 'supplement {} {}'.format(ro['counselor'], ro['from']),
} for (sc_id, score) in self.c2c_indexer.index(ro['counselor'], count=int(self.top_n))]) } for (sc_id, score) in self.c2c_indexer.index(ro['counselor'], count=int(self.top_n))])
# } for (sc_id, score) in self.c2c_indexer.index(ro['counselor'], count=int(self.top_n / len(recommend_data)))]) # } for (sc_id, score) in self.c2c_indexer.index(ro['counselor'], count=int(self.top_n / len(recommend_data)))])
...@@ -176,6 +177,48 @@ class UserCFRecommender(Recommender): ...@@ -176,6 +177,48 @@ class UserCFRecommender(Recommender):
return self.recommend_with_profile(user_profile, size, is_merge) return self.recommend_with_profile(user_profile, size, is_merge)
class ItemCFRecommender(Recommender):
def __init__(self, top_n=10) -> None:
super().__init__()
self.top_n = top_n
self.logger = create_logger(__name__, 'item_cf_recommender.log')
self.indexer = UserCounselorChatIndexer(self.logger)
self.indexer.load_index_data()
self.c2c_indexer = CounselorCounselorCFIndexer(self.logger)
self.c2c_indexer.load_index_data()
def recommend(self, user_id, size=0) -> List:
similar_user_counselor = self.indexer.index(q=user_id)
if not similar_user_counselor:
return self._recommend_top(size)
recommend_data = [{
'counselor': c_id,
'score': score * 100,
'from': 'similar_users {}'.format(user_id),
} for (c_id, score) in similar_user_counselor]
supplement_data = []
for ro in recommend_data:
supplement_data.extend([{
'counselor': sc_id,
'score': ro['score'] * score,
'from': 'supplement {} {}'.format(ro['counselor'], ro['from']),
} for (sc_id, score) in self.c2c_indexer.index(ro['counselor'], count=int(self.top_n))])
counselors = recommend_data + supplement_data + self._recommend_top()
counselors.sort(key=lambda x: x['score'], reverse=True)
if size > 0:
counselors = counselors[:size]
return counselors
if __name__ == '__main__': if __name__ == '__main__':
recommender = UserCFRecommender() recommender = ItemCFRecommender()
print(recommender.recommend('10957910')) print(recommender.recommend('10957910'))
\ No newline at end of file
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
import configparser import configparser
import pymysql import pymysql
from dbutils.pooled_db import PooledDB
from ydl_ai_recommender.src.utils.log import create_logger from ydl_ai_recommender.src.utils.log import create_logger
...@@ -57,3 +58,59 @@ class MySQLClient(): ...@@ -57,3 +58,59 @@ class MySQLClient():
config.get(section, 'user'), config.get(section, 'user'),
config.get(section, 'password') config.get(section, 'password')
) )
class MySQLClientPool():
def __init__(self, host, port, user, password) -> None:
self.logger = create_logger(__name__, 'mysql_client.log', is_rotating=True)
try:
self.pool = PooledDB(
creator=pymysql,
maxconnections=8, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
ping=0, # ping MySQL服务端,检查是否服务可用。如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host=host,
port=port,
user=user,
password=password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
except Exception as e:
self.logger.error('数据库连接失败', exc_info=True)
raise e
self.logger.info('数据库连接成功')
def open(self):
conn = self.pool.connection() # 去连接池中获取一个连接
cur = conn.cursor()
return conn, cur
def close(self, conn, cur):
cur.close()
conn.close() # 将连接放回到连接池,并不会关闭连接,当线程终止时,连接自动关闭
def query(self, sql, args=None):
# sql += ' limit 1000'
self.logger.debug('begin execute sql: %s', sql)
conn, cur = self.open()
row_count = cur.execute(sql, args)
data = cur.fetchall()
self.close(conn, cur)
self.logger.debug('fetch row count: %s', row_count)
return row_count, data
@classmethod
def create_from_config_file(cls, config_file, section='ADB'):
config = configparser.RawConfigParser()
config.read(config_file)
return cls(
config.get(section, 'host'),
config.getint(section, 'port'),
config.get(section, 'user'),
config.get(section, 'password')
)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment