Commit 0081c6f3 authored by Yaowentong's avatar Yaowentong

1

parent 282af7b7
......@@ -50,39 +50,38 @@ def get_keyword_ranks(text, keywords):
if isinstance(keywords, str):
keywords = [keywords]
appearance_order = []
start_pos = 0
text_length = len(text)
while start_pos < text_length:
min_position = None
found_key = None
matches = []
for key in keywords:
pos = text.find(key, start_pos)
start = 0
while True:
pos = text.find(key, start)
if pos == -1:
continue
# 1. 更早出现的优先
# 2. 如果起始位置相同,优先更短的关键词
if (
min_position is None
or pos < min_position
or (pos == min_position and len(key) < len(found_key))
):
min_position = pos
found_key = key
if found_key is None:
break
appearance_order.append(found_key)
start_pos = min_position + len(found_key)
matches.append({
"word": key,
"pos": pos
})
# 允许继续查找后面的相同关键词
start = pos + 1
# 按出现位置排序;同位置时短词优先
matches.sort(key=lambda x: (x["pos"], len(x["word"])))
result = []
for key in keywords:
rank_list = [i + 1 for i, k in enumerate(appearance_order) if k == key]
result.append({"word": key, "rank_list": rank_list})
rank_list = [
i + 1
for i, item in enumerate(matches)
if item["word"] == key
]
result.append({
"word": key,
"rank_list": rank_list
})
return result
......@@ -623,6 +622,11 @@ def merge_com_and_ai_by_subset(brand_words_rank, ai_word_list_with_rank):
return brand_list, remain_ai_list
def to_int_trans(v):
try:
return int(v or 0)
except Exception:
return 0
def rebuild_count_and_rank(data_list):
"""
......@@ -648,8 +652,8 @@ def rebuild_count_and_rank(data_list):
if not isinstance(vo, dict):
continue
count = vo.get("count", 0) or 0
rank = vo.get("rank", 0) or 0
count = to_int_trans(vo.get("count", 0))
rank = to_int_trans(vo.get("rank", 0))
total_count += count
......@@ -1399,9 +1403,23 @@ def result_v2(response_content, data):
product_list = list(set(product_list))
product_list = [str(x).strip() for x in product_list if str(x).strip()]
print(response_content)
print('----')
print('----')
print('----')
print(product_list)
# 获取所有产品排名
product_rank_list = get_keyword_ranks(response_content, product_list)
print(product_rank_list)
print('----')
print('----')
print('----')
all_product_list_with_rank = convert_rank_data(product_rank_list)
# 所有产品
product_all = get_all_words(all_product_list_with_rank)
# 获取产品的情感倾向以及正面次负面词
......@@ -1414,17 +1432,25 @@ def result_v2(response_content, data):
# 获取产品词的排名
product_map_list_with_rank = build_brand_summary(all_product_list_with_rank, product_map)
# 获取所有词的情感倾向
product_map_list_with_rank = merge_sentiment_to_brand_words(product_map_list_with_rank,
all_product_sentiment_and_mentions)
# 删除ai中出现的品牌词
product_map_list_with_rank, all_product_list_with_rank = merge_com_and_ai_by_subset(product_map_list_with_rank,
all_product_list_with_rank)
# 获取产品词的分
product_map_list_with_rank = attach_favorability_brand_score(all_product_sentiment_and_mentions_score,
product_map_list_with_rank)
product_map_list_with_rank_result = rebuild_count_and_rank(product_map_list_with_rank)
# 获取所有品牌的分
all_product_list_with_rank = attach_favorability_brand_score(all_product_sentiment_and_mentions_score,
all_product_list_with_rank)
......@@ -1789,6 +1815,8 @@ def task_send_queue(data, queue):
redis_client.lpush(f"{data['platform']}:geo:{queue}:list", json.dumps(data))
if __name__ == '__main__':
# data = {
......@@ -1808,34 +1836,25 @@ if __name__ == '__main__':
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
#
# begin = '2026-05-18'
# end = '2026-05-20'
#
#
# req_list = get_req_id(15100000026,begin,end)
# req_ids = []
# for item in req_list:
# req_id = item.get("req_id")
# req_ids.append(req_id)
#
# req_id_sql = ",".join([f"'{req_id}'" for req_id in req_ids])
# platform = ['DOUBA']
# platform_list = ",".join([f"'{p}'" for p in platform])
# query_sql = f"select * from geo_commit_task where reqId in ({req_id_sql}) and platform in ({platform_list}) and type ='success'"
# data_list = bh_utils.query_data(query_sql)
# print(len(data_list))
data_list = bh_utils.query_data("select * from geo_commit_task where status = 'ING' and type in ('batch','stream_batch') ")
#
# data_list = bh_utils.query_data(f"select * from geo_commit_task where reqId in ('0bf417a5-79f9-47f8-8f62-41ac63fe1a83','4deb09b0-de4b-4067-9bd3-b34cd29a089e','fd11816c-2255-4d15-ab78-5966effb4651','4deb09b0-de4b-4067-9bd3-b34cd29a089e','39e2b623-ee9f-4057-8a8e-6673d411226e','4deb09b0-de4b-4067-9bd3-b34cd29a089e','f81511cc-1c63-438c-99ce-b603a2340c06','4ce38505-257f-4bc5-a0ef-0b37615ec3a0','31a7119d-97b0-44f9-adb4-b67c957e1ec6','ef9b7c91-a60f-4afd-916f-fe8a5cd6c42d','6a4c7971-f455-427b-9fce-054066feecbf','bfeccabb-3a54-42cf-aacc-e481a486816e','e5decb39-f9ab-4eed-9605-83dc847f9b44','0bf417a5-79f9-47f8-8f62-41ac63fe1a83','fd11816c-2255-4d15-ab78-5966effb4651','ba90889e-db64-4df3-bf42-734bb28f02d6','9c45a2e2-b07c-44f7-a72a-05f338169f79','ecbb8c0c-4857-44ea-9703-5813dd6e4685','23543fee-ee44-48fa-8e23-d5a8657ce651','7927c636-d948-46d2-a7b0-4569ae8b5617','bfeccabb-3a54-42cf-aacc-e481a486816e','39e2b623-ee9f-4057-8a8e-6673d411226e','c068a9d4-277a-4428-9195-8174d1a1ff37','bfeccabb-3a54-42cf-aacc-e481a486816e','1ea51e2d-b069-4713-a5b5-bc75e7e0d6dc','e5decb39-f9ab-4eed-9605-83dc847f9b44','ba90889e-db64-4df3-bf42-734bb28f02d6','eb1d4753-e585-4ba0-ae95-28e938afe6c8','5d18240f-143d-4a97-b687-d98e0e2fdc20','fd7294a4-2c9c-4e92-91d7-ff6b5477ba2c','5495ee25-96fb-4ab7-a576-a43601aa1fb7','23543fee-ee44-48fa-8e23-d5a8657ce651')")
begin = '2026-05-22'
end = '2026-05-22'
req_list = get_req_id(15100000026,begin,end)
req_ids = []
for item in req_list:
req_id = item.get("req_id")
req_ids.append(req_id)
req_id_sql = ",".join([f"'{req_id}'" for req_id in req_ids])
query_sql = f"select * from geo_commit_task where reqId in ({req_id_sql}) "
data_list = bh_utils.query_data(query_sql)
#
# # data_list = bh_utils.query_data(f"-- select * from geo_commit_task where reqId = 'bb32742f-c353-4a80-8855-ee097e4e0c91'")
# data_list = bh_utils.query_data(f"select * from geo_commit_task where reqId='92b5611d-8ecf-4dc4-a96e-db41c3f29d05' ")
# #
# #
# # # #
def handle_item(i):
print(i.get('reqId'))
if i.get('comWordsMap'):
i['comWordsMap'] = json.loads(i.get('comWordsMap'))
if i.get('brandWords'):
......@@ -1848,7 +1867,9 @@ if __name__ == '__main__':
i['productWordsMap'] = json.loads(i.get('productWordsMap'))
type_t = i.get('type')
return task_send_queue(i,type_t)
# return task_send_queue(i,type_t)
return platform_process(i)
if data_list:
with ThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(handle_item, i) for i in data_list]
......
......@@ -30,9 +30,7 @@ def get_platform_response(
while retry_count < max_retries:
try:
response = requests.get(platform_config["url"], params=params, timeout=600)
print(platform_config["url"])
response_data = response.json()
storage_path = platform_config["storage_path"](data['taskId'])
if response_data.get("code") == 200:
put_string_to_tos(storage_path, response_data.get("data"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment