Commit 282af7b7 authored by Yaowentong's avatar Yaowentong

1

parents
import json
import os
from functools import wraps
from loguru import logger
from flask import jsonify
LOG_DIR = "logs"
os.makedirs(LOG_DIR, exist_ok=True)
logger.remove()
logger.add(
os.path.join(LOG_DIR, "info_{time:YYYY-MM-DD}.log"),
level="INFO",
rotation="00:00",
retention="7 days",
encoding="utf-8",
enqueue=True,
backtrace=False
)
# 配置ERROR级别日志 - 每日滚动,保留7天
logger.add(
os.path.join(LOG_DIR, "err_{time:YYYY-MM-DD}.log"),
level="ERROR",
rotation="00:00",
retention="7 days",
encoding="utf-8",
enqueue=True,
backtrace=True,
diagnose=True
)
class ApiResponse:
@staticmethod
def success(data=None, msg="success"):
response = {
"code": 200,
"data": data,
"msg": msg
}
logger.info(f"dso-ai-bot: {msg}, 数据: {data}")
return jsonify(response)
@staticmethod
def error(msg="服务器内部错误", code=500):
response = {
"code": code,
"data": None,
"msg": msg
}
logger.error(f"dso-ai-bot: {code}, exception: {msg}")
return jsonify(response)
def handle_exceptions(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
return ApiResponse.success(result)
except Exception as e:
logger.exception(f"function {func.__name__} exception")
return ApiResponse.error(str(e), 500)
return wrapper
import requests
import json
api_key = "Bearer fcc424e5-58af-494d-9683-5787413a26c9"
def get_ai_chat_history_lite(messages, stream=False):
url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
payload = json.dumps({
"model": "ep-m-20250408105049-7dj9r",
"stream": stream,
"messages": messages
})
headers = {
'Authorization': api_key,
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
data = response.json()
ai_reply = data.get('choices', [{}])[0].get('message', {})
total_tokens = data.get('usage', {}).get('total_tokens')
return ai_reply, total_tokens
# chat
def get_ai_chat_history(messages, stream=False):
url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
payload = json.dumps({
"model": "ep-20241214232538-x27zp",
"stream": stream,
"messages": messages
})
headers = {
'Authorization': api_key,
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
data = response.json()
ai_reply = data.get('choices', [{}])[0].get('message', {})
total_tokens = data.get('usage', {}).get('total_tokens')
return ai_reply, total_tokens
# rag_联网插件
def get_bot_with_history(messages):
url = "https://ark.cn-beijing.volces.com/api/v3/bots/chat/completions"
payload = json.dumps({
"model": "bot-20250730164611-nbsng",
"stream": False,
"stream_options": {
"include_usage": False
},
"messages":messages
})
headers = {
'Authorization': api_key,
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
data = response.json()
ai_reply = data.get('choices', [{}])[0].get('message', {})
total_tokens = data.get('bot_usage', {}).get('model_usage')[0].get('total_tokens')
return ai_reply, total_tokens
def get_bot_with_history_rag(messages):
url = "https://ark.cn-beijing.volces.com/api/v3/bots/chat/completions"
payload = json.dumps({
"model": "bot-20250725180737-mjxbx",
"stream": False,
"stream_options": {
"include_usage": False
},
"messages":messages
})
headers = {
'Authorization': api_key,
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
data = response.json()
ai_reply = data.get('choices', [{}])[0].get('message', {})
total_tokens = data.get('bot_usage', {}).get('model_usage')[0].get('total_tokens')
return ai_reply, total_tokens
def get_image_to_text(content):
url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
payload = json.dumps({
"model": "doubao-seed-2-0-lite-260215",
"messages": [
{
"content": content,
"role": "user"
}
]
})
headers = {
'Authorization': api_key,
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
data = response.json()
ai_reply = data.get('choices', [{}])[0].get('message', {})
total_tokens = data.get('usage', {}).get('total_tokens')
return ai_reply, total_tokens
if __name__ == '__main__':
content = [
{"type": "image_url","image_url": {"url": "https://douchacha-web.tos-cn-beijing.volces.com/assets/6583982362302876935/cover.jpg"}},
# {"type": "image_url","image_url": {"url": "https://douchacha-web.tos-cn-beijing.volces.com/assets/7283099688222379304/1.jpg"}},
{"type": "text", "text": "提取图片中的文字"},
]
print(get_image_to_text(content))
# list = []
# for i in range(10):
# list.append(f"https://douchacha-web.tos-cn-beijing.volces.com/assets/7283099688222379304/{i}.jpg")
# print(list)
import time
import json
import requests
import functools
from typing import Optional, Dict, Any
# 配置类
class Config:
APP_ID = '6523591376'
TOKEN = '8jkXUl2u90wL1drhXMfvs65Cq2JMVhwT'
CLUSTER = 'volc_auc_common'
SERVICE_URL = 'https://openspeech.bytedance.com/api/v1/auc'
MAX_RETRY_COUNT = 50
MAX_TASK_ATTEMPTS = 500
# 重试装饰器
def retry(max_attempts: int, delay: int = 5):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_attempts - 1:
time.sleep(delay)
return None
return wrapper
return decorator
class SpeechRecognizer:
def __init__(self):
self.headers = {
'Authorization': f'Bearer; {Config.TOKEN}',
'Content-Type': 'application/json'
}
@retry(Config.MAX_RETRY_COUNT)
def _submit_task(self, audio_url: str) -> Optional[str]:
"""提交语音识别任务(内部方法)"""
request_data = {
"app": {
"appid": Config.APP_ID,
"token": Config.TOKEN,
"cluster": Config.CLUSTER
},
"user": {
"uid": "dcc_live"
},
"audio": {
"format": "mp3",
"url": audio_url
},
"additions": {
'with_speaker_info': 'False',
}
}
response = requests.post(
f"{Config.SERVICE_URL}/submit",
data=json.dumps(request_data),
headers=self.headers
)
response.raise_for_status()
return response.json()['resp']['id']
@retry(Config.MAX_RETRY_COUNT)
def _query_task(self, task_id: str) -> Optional[Dict[str, Any]]:
query_data = {
'appid': Config.APP_ID,
'token': Config.TOKEN,
'id': task_id,
'cluster': Config.CLUSTER
}
response = requests.post(
f"{Config.SERVICE_URL}/query",
data=json.dumps(query_data),
headers=self.headers
)
response.raise_for_status()
return response.json()
@retry(Config.MAX_RETRY_COUNT)
def recognize_audio(self, audio_url: str) -> Optional[Dict[str, Any]]:
task_id = self._submit_task(audio_url)
if not task_id:
return None
for attempt in range(Config.MAX_TASK_ATTEMPTS):
result = self._query_task(task_id)
if not result:
return None
code = result['resp']['code']
if code >= 2000:
time.sleep(5)
continue
elif code == 1000:
return result.get('resp').get('text')
else:
return None
return None
#
# if __name__ == "__main__":
# recognizer = SpeechRecognizer()
# audio_url = "https://douchacha-web.tos-cn-beijing.volces.com/assets/7531557698396048640/7531557698396048640.mp4" # 替换为实际音频URL
# result = recognizer.recognize_audio(audio_url)
# print(result)
\ No newline at end of file
from flask import Flask, request
import os, sys
base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_path)
from ai_chat import parse_utils, sys_content
from ApiResponse import handle_exceptions
from ai_utils import get_bot_with_history, get_ai_chat_history, get_bot_with_history_rag
from parse_utils import get_last_user_value, convert_dialog_format
app = Flask(__name__)
@app.route('/api/chat', methods=['POST'])
@handle_exceptions
def get_chat_with_history():
data = request.get_json()
sys_list = [{"role": "system", "content": sys_content.action_content}]
ai_reply, total_tokens = get_ai_chat_history(sys_list + get_last_user_value(data))
if ai_reply.get('content') == '1':
mag = parse_utils.extract_mp4_to_text(get_last_user_value(data)[0].get('content'))
return mag
elif ai_reply.get('content') == '2':
sys_list = [{"role": "system", "content": sys_content.douyin_text_fix}]
ai_reply, total_tokens = get_ai_chat_history(sys_list + convert_dialog_format(data))
return ai_reply
elif ai_reply.get('content') == '3':
sys_list = [{"role": "system", "content": sys_content.helpful_content_text_rag}]
ai_reply, total_tokens = get_bot_with_history_rag(sys_list + convert_dialog_format(data))
return ai_reply
else:
sys_list = [{"role": "system", "content": sys_content.helpful_content_text}]
ai_reply, total_tokens = get_bot_with_history(sys_list + convert_dialog_format(data))
return ai_reply
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8088, debug=True)
# print(get_chat_with_history([{'user': '提取文案https://www.douyin.com/video/7525861419037855018'}]))
This diff is collapsed.
import requests
import urllib.parse
import re
from urllib.parse import quote_plus
"""
快手
"""
def is_valid_kuaishou_profile(url: str) -> bool:
pattern = re.compile(r'''
^https:// # 必须以https协议开头
www\.kuaishou\.com # 固定域名
/f/ # 固定路径前缀
[0-9a-zA-Z_-]{10,20} # 用户ID:10-20位数字、字母、下划线或连字符
$ # 字符串结束
''', re.VERBOSE)
return bool(pattern.match(url))
def get_ks_info(video_url):
url = ""
if is_valid_kuaishou_profile(video_url):
url = f"http://172.16.18.10:8875/api/v1/ks?share_url={quote_plus(video_url)}"
payload={}
headers = {}
if url == "":
return None
response = requests.request("GET", url, headers=headers, data=payload)
return response.json()
"""
小红书
"""
def is_valid_xiaohongshu_link(url: str) -> bool:
pattern = re.compile(
r'^https://www\.xiaohongshu\.com/(?:discovery|explore)/'
r'(?:item/)?[0-9a-fA-F]{24}\?'
r'(?:[^&]+&)*?'
r'xsec_token=[^&]+'
r'(?:&[^&]+)*?$'
)
return bool(pattern.match(url))
def get_xhs_info(video_url):
url = ""
if is_valid_xiaohongshu_link(video_url):
url = f"http://172.16.18.10:8875/api/v1/xhs?share_url={quote_plus(video_url)}"
payload = {}
headers = {}
if url == "":
return None
response = requests.request("GET", url, headers=headers, data=payload)
return response.json()
"""
抖音图文
"""
def get_dy_ocr_info(video_url):
url = ""
pattern1 = r'(?:douyin\.com/(?:video|share/video|note|jingxuan)/|video_id|modal_id=)(\d+)'
match1 = re.search(pattern1, video_url)
if match1:
url = f"http://172.16.18.10:8875/api/v1/ocr?video_id={match1.group(1)}"
pattern = r'https?://v\.douyin\.com/[a-zA-Z0-9]+/?'
pattern = r'https?://v\.douyin\.com/[a-zA-Z0-9-]+/?'
if re.search(pattern, video_url):
url = f"http://172.16.18.10:8875/api/v1/ocr?video_share_url={video_url}"
payload = {}
headers = {}
if url == "":
return None
response = requests.request("GET", url, headers=headers, data=payload)
return response.json()
def is_valid_bilibili_video(url: str) -> bool:
pattern = re.compile(r'''
^https:// # 必须以https协议开头
www\.bilibili\.com # 固定域名
/video/ # 固定路径
BV[0-9a-zA-Z]{10,30} # BV号(放宽长度限制为10-15位)
/?\? # 允许视频ID后有一个可选的斜杠,再必须跟问号
.+ # 问号后至少有一个参数
$ # 字符串结束
''', re.VERBOSE)
return bool(pattern.match(url))
def get_bilibili_info(video_url):
url = ""
if is_valid_bilibili_video(video_url):
url = f"http://172.16.18.10:8875/api/v1/bilibili?share_url={quote_plus(video_url)}"
payload = {}
headers = {}
if url == "":
return None
response = requests.request("GET", url, headers=headers, data=payload)
return response.json()
if __name__ == '__main__':
# url= "https://v.douyin.com/--n8QjSuvh4/"
url= "https://www.douyin.com/jingxuan?modal_id=7584179438045121842"
print(get_dy_ocr_info(url))
\ No newline at end of file
action_content = """
判断其与文案提取或文案改写的相关性,并给出相应回复。
1: 判断输入内容并回复
1.1. 与文案提取相关时并且域名中包含抖音快手小红书b站的域名时,必须包含一个url,返回1。
1.2. 与文案改写相关时 返回2 。
1.3 与dso抖音搜索相关时返回3
- 仅回复1,2,3不提供其他额外信息
"""
douyin_text_fix = """
# 角色
你是一位专业的抖音短视频脚本改写专家,能够在遵循原文逻辑的基础上,对给定的抖音短视频脚本进行改写。要以自然、简洁的口语化方式表达,模仿真人说话的语气,坚决不使用排比句以及复杂的词汇。同时,务必保留人物名、时间、地点、数字、政策名称等核心细节,保证原文关键信息不改变。
## 技能
### 技能 1: 抖音短视频脚本改写
1. 接收用户提供的抖音短视频脚本后,认真研读脚本内容,精准把握原文逻辑。
2. 逐句对脚本进行改写,使用简单易懂、贴近生活的口语化表述,摒弃排比句和复杂生僻的词汇。
3. 完成改写后,仔细核对,确保所有核心细节及关键信息与原文一致。
## 限制
- 仅围绕抖音短视频脚本改写提供服务,不回答与脚本改写无关的问题。
- 改写后的脚本必须严格遵循原文逻辑,核心细节和关键信息不得有误。
- 表述要口语化、自然、简洁,杜绝使用排比句和复杂词汇。
"""
xhs_text_fix="""
# 角色
你是一个小红书短视频脚本改写小能手,负责将给定的小红书短视频脚本进行改写。要做到与原文逻辑保持一致,用特别自然、简洁的口语化表达,就像平时真人聊天说话那种语气,千万不要用排比句,也别用复杂的词汇。一定要把人物名、时间、地点、数字、政策名称这些核心细节都保留好,保证原文关键信息不会变。
## 技能
### 技能 1: 小红书短视频脚本改写
1. 拿到用户给的小红书短视频脚本后,好好看看里面写了啥,搞清楚原文的逻辑。
2. 一句一句地改写脚本,用那种简单好懂的口语化说法,别整排比句,也别用那些复杂的词。
3. 改写完后检查检查,保证核心细节和关键信息跟原文一模一样。
## 限制
- 只做小红书短视频脚本改写这件事,别的跟改写脚本没关系的问题一概不回答。
- 改写后的脚本逻辑必须和原文一致,核心细节和关键信息不能有差错。
- 表达得口语化、自然、简洁,不能出现排比句和复杂词汇。
"""
action_content_text = """
# 角色
你是一个文案改写判断助手,能够判断用户提供的文案更倾向于是抖音文案改写需求还是小红书文案改写需求。
## 技能
### 技能 1: 判断文案改写类型
1. 接收用户提供的需要改写的文案。
2. 分析文案风格、语言特点、目标受众倾向等,判断其更符合抖音文案改写需求还是小红书文案改写需求。
3. 直接回复“抖音文案改写回复1,小红书文案改写回复2 无法判断默认是1
## 限制
- 仅回复1,2不提供其他额外信息
"""
helpful_content_text_rag = """
# 角色
你是爱搜灵犀北京爱查查开发的智能助手,你将根据用户的各种咨询需求,依据既定知识和逻辑,一步步为用户提供准确且有用的回答。
# 任务描述与要求
1. 对于用户提出的问题,需深入理解其意图,从多维度分析并给出全面的回答。
2. 回答要简洁明了,避免使用过于复杂的语言和句式,确保用户能轻松理解。
3. 若涉及专业知识,要在解释清楚的基础上,结合通俗易懂的例子辅助说明。
# 相关限制
1. 回答需基于准确的知识,不能随意编造信息。
2. 避免给出模棱两可、没有明确结论的回答。
3. 语言风格要保持友好、专业,不能使用不当或冒犯性的词汇。
不知道答案时返回 我还不太清楚你要表达什么意思
"""
helpful_content_text = """
# 角色
你是爱搜灵犀北京爱查查开发的智能助手,你将根据用户的各种咨询需求,依据既定知识和逻辑,一步步为用户提供准确且有用的回答。
# 任务描述与要求
1. 对于用户提出的问题,需深入理解其意图,从多维度分析并给出全面的回答。
2. 回答要简洁明了,避免使用过于复杂的语言和句式,确保用户能轻松理解。
3. 若涉及专业知识,要在解释清楚的基础上,结合通俗易懂的例子辅助说明。
# 相关限制
1. 回答需基于准确的知识,不能随意编造信息。
2. 避免给出模棱两可、没有明确结论的回答。
3. 语言风格要保持友好、专业,不能使用不当或冒犯性的词汇。
不知道答案时返回 我还不太清楚你要表达什么意思
## 限制 当用户问题是一个url 并且与文案提取相关时 回答链接格式错误 并且返回正确url链接样式
"""
# -*- coding: utf-8 -*-
import time
import tos
from io import StringIO
import json
TOS_AK = "AKLTMmRiMmU3YmY5ZjZjNDZkMTlhMmQxY2JkYTllYTQzNDI"
TOS_SK = "WkdWak4yUTRNakl3WVdOa05HUXdaR0V4TlRBM1l6YzJZMll5WkRnMFlUTQ=="
TOS_ENDPOINT = "tos-cn-beijing.volces.com"
TOS_REGION = "cn-beijing"
TOS_BUCKET_NAME = "douchacha-web"
def check_file_in_tos(object_key, retry_times=50):
retry_count = 0
while retry_count <= retry_times:
try:
client = tos.TosClientV2(TOS_AK, TOS_SK, TOS_ENDPOINT, TOS_REGION, max_retry_count=3)
client.head_object(bucket=TOS_BUCKET_NAME, key=object_key)
return True
except (tos.exceptions.TosClientError,
tos.exceptions.TosServerError,
Exception) as e:
if isinstance(e, tos.exceptions.TosServerError) and e.status_code == 404:
retry_count += 1
time.sleep(5)
if retry_count > retry_times:
return False
def put_string_to_tos( object_key, content, retry_times=3):
client = tos.TosClientV2(TOS_AK, TOS_SK, TOS_ENDPOINT, TOS_REGION, max_retry_count=3)
for _ in range(retry_times):
try:
client.put_object(TOS_BUCKET_NAME, object_key, content=json.dumps(content))
return True
except Exception as e:
print(e)
continue
return False
def get_string_from_tos( object_key, retry_times=3):
client = tos.TosClientV2(TOS_AK, TOS_SK, TOS_ENDPOINT, TOS_REGION, max_retry_count=3)
for _ in range(retry_times):
try:
response = client.get_object(TOS_BUCKET_NAME,object_key)
content = response.read()
return json.loads(content)
except tos.exceptions.TosServerError as e:
if e.status_code == 404:
print(f"文件不存在: {object_key}")
return None
except (tos.exceptions.TosClientError, json.JSONDecodeError) as e:
print(f"JSON解析失败")
return False
import json
import os
from functools import wraps
from loguru import logger
from flask import jsonify
LOG_DIR = "logs"
os.makedirs(LOG_DIR, exist_ok=True)
logger.remove()
logger.add(
os.path.join(LOG_DIR, "info_{time:YYYY-MM-DD}.log"),
level="INFO",
rotation="00:00",
retention="7 days",
encoding="utf-8",
enqueue=True,
backtrace=False
)
# 配置ERROR级别日志 - 每日滚动,保留7天
logger.add(
os.path.join(LOG_DIR, "err_{time:YYYY-MM-DD}.log"),
level="ERROR",
rotation="00:00",
retention="7 days",
encoding="utf-8",
enqueue=True,
backtrace=True,
diagnose=True
)
class ApiResponse:
@staticmethod
def success(data=None, msg="success"):
response = {
"code": 200,
"data": data,
"msg": msg
}
logger.info(f"dso-ai-bot: {msg}, 数据: {data}")
return jsonify(response)
@staticmethod
def error(msg="服务器内部错误", code=500):
response = {
"code": code,
"data": None,
"msg": msg
}
logger.error(f"dso-ai-bot: {code}, exception: {msg}")
return jsonify(response)
def handle_exceptions(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
return ApiResponse.success(result)
except Exception as e:
logger.exception(f"function {func.__name__} exception")
return ApiResponse.error(str(e), 500)
return wrapper
import requests
import json
CHAT_COMPLETIONS_URL = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
BOTS_CHAT_COMPLETIONS_URL = "https://ark.cn-beijing.volces.com/api/v3/bots/chat/completions"
API_KEY = "Bearer fcc424e5-58af-494d-9683-5787413a26c9"
DEFAULT_BATCH_SIZE = 50
def _streaming_request(url, model,messages,type, batch_size=50):
payload = {
"model": model,
"stream": True,
"messages": messages,
"thinking":{
"type":type
}
}
headers = {
'Authorization': API_KEY,
'Content-Type': 'application/json'
}
try:
response = requests.post(
url,
json=payload,
headers=headers,
stream=True,
timeout=300
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
err_msg = f"请求失败: {str(e)}"
def error_generator():
error_data = json.dumps({"message": err_msg, "type": 0}, ensure_ascii=False)
yield f"data: {error_data}\n\n"
return error_generator()
def generate_sse():
buffer = ""
for line in response.iter_lines():
if not line:
continue
try:
line_str = line.decode('utf-8').lstrip('data: ').strip()
if line_str == '[DONE]':
break
chunk = json.loads(line_str)
if isinstance(chunk, dict) and "choices" in chunk:
choice = chunk["choices"][0] if chunk["choices"] else None
if choice and "delta" in choice:
delta = choice["delta"]
reasoning_content = delta.get("reasoning_content")
if reasoning_content:
reasoning_data = json.dumps({
"message": reasoning_content,
"type": 4
}, ensure_ascii=False)
yield f"data: {reasoning_data}\n\n"
content = delta.get("content")
if content:
buffer += content
if len(buffer) >= batch_size:
data = json.dumps({
"message": buffer,
"type": 1
}, ensure_ascii=False)
yield f"data: {data}\n\n"
buffer = ""
except json.JSONDecodeError:
continue
except Exception as e:
error_data = json.dumps({
"message": f"处理数据错误: {str(e)}",
"type": 0
}, ensure_ascii=False)
yield f"data: {error_data}\n\n"
break
if buffer:
data = json.dumps({"message": buffer, "type": 1},
ensure_ascii=False)
yield f"data: {data}\n\n"
return generate_sse()
def _streaming_request_comment(url, model,messages,type, batch_size=50):
payload = {
"model": model,
"stream": True,
"messages": messages,
"thinking":{
"type":type
}
}
headers = {
'Authorization': API_KEY,
'Content-Type': 'application/json'
}
try:
response = requests.post(
url,
json=payload,
headers=headers,
stream=True,
timeout=300
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
err_msg = f"请求失败: {str(e)}"
def error_generator():
error_data = json.dumps({"message": err_msg, "type": 0}, ensure_ascii=False)
yield f"data: {error_data}\n\n"
return error_generator()
def generate_sse():
buffer = ""
for line in response.iter_lines():
if not line:
continue
try:
line_str = line.decode('utf-8').lstrip('data: ').strip()
if line_str == '[DONE]':
break
chunk = json.loads(line_str)
if isinstance(chunk, dict) and "choices" in chunk:
choice = chunk["choices"][0] if chunk["choices"] else None
if choice and "delta" in choice:
delta = choice["delta"]
reasoning_content = delta.get("reasoning_content")
if reasoning_content:
reasoning_data = json.dumps({
"message": reasoning_content,
"type": 4
}, ensure_ascii=False)
yield f"data: {reasoning_data}\n\n"
content = delta.get("content")
if content:
buffer += content
if len(buffer) >= batch_size:
data = json.dumps({
"message": buffer,
"type": 6
}, ensure_ascii=False)
yield f"data: {data}\n\n"
buffer = ""
except json.JSONDecodeError:
continue
except Exception as e:
error_data = json.dumps({
"message": f"处理数据错误: {str(e)}",
"type": 0
}, ensure_ascii=False)
yield f"data: {error_data}\n\n"
break
if buffer:
data = json.dumps({"message": buffer, "type": 6},
ensure_ascii=False)
yield f"data: {data}\n\n"
return generate_sse()
# 标准聊天
def get_ai_chat_history_stream(messages, type = "disabled", batch_size=DEFAULT_BATCH_SIZE):
return _streaming_request(
url=CHAT_COMPLETIONS_URL,
model="ep-20250804181425-5blp4",
messages=messages,
type=type,
batch_size=batch_size
)
# 联网聊天
def get_bot_with_history_stream(messages,type = "auto", batch_size=DEFAULT_BATCH_SIZE):
return _streaming_request(
url=BOTS_CHAT_COMPLETIONS_URL,
model="bot-20250730164611-nbsng",
messages=messages,
type=type,
batch_size=batch_size
)
# 知识库
def get_bot_with_history_rag_stream(messages,type = "auto", batch_size=DEFAULT_BATCH_SIZE):
return _streaming_request(
url=BOTS_CHAT_COMPLETIONS_URL,
model="bot-20250725180737-mjxbx",
messages=messages,
type=type,
batch_size=batch_size
)
# 图像识别
def get_image_to_text_stream(content,type = "auto", batch_size=DEFAULT_BATCH_SIZE):
messages = [{"content": content, "role": "user"}]
return _streaming_request(
url=CHAT_COMPLETIONS_URL,
model="doubao-seed-2-0-lite-260215",
messages=messages,
type=type,
batch_size=batch_size
)
def get_ai_chat_history(messages, stream=False):
url = "https://ark.cn-beijing.volces.com/api/v3/chat/completions"
payload = json.dumps({
"model": "ep-20241214232538-x27zp",
"stream": stream,
"messages": messages
})
headers = {
'Authorization': API_KEY,
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
data = response.json()
ai_reply = data.get('choices', [{}])[0].get('message', {})
total_tokens = data.get('usage', {}).get('total_tokens')
return ai_reply, total_tokens
def get_ai_chat_history_stream_comment(messages, type = "disabled", batch_size=DEFAULT_BATCH_SIZE):
return _streaming_request_comment(
url=CHAT_COMPLETIONS_URL,
model="ep-20250804181425-5blp4",
messages=messages,
type=type,
batch_size=batch_size
)
if __name__ == '__main__':
print(get_ai_chat_history_stream_comment("文案改写"))
import time
import json
import requests
import functools
from typing import Optional, Dict, Any
# 配置类
class Config:
APP_ID = '6523591376'
TOKEN = '8jkXUl2u90wL1drhXMfvs65Cq2JMVhwT'
CLUSTER = 'volc_auc_common'
SERVICE_URL = 'https://openspeech.bytedance.com/api/v1/auc'
MAX_RETRY_COUNT = 1000
MAX_TASK_ATTEMPTS = 500
# 重试装饰器
def retry(max_attempts: int, delay: int = 5):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_attempts - 1:
time.sleep(delay)
return None
return wrapper
return decorator
class SpeechRecognizer:
def __init__(self):
self.headers = {
'Authorization': f'Bearer; {Config.TOKEN}',
'Content-Type': 'application/json'
}
@retry(Config.MAX_RETRY_COUNT)
def _submit_task(self, audio_url: str) -> Optional[str]:
"""提交语音识别任务(内部方法)"""
request_data = {
"app": {
"appid": Config.APP_ID,
"token": Config.TOKEN,
"cluster": Config.CLUSTER
},
"user": {
"uid": "dcc_live"
},
"audio": {
"format": "mp3",
"url": audio_url
},
"additions": {
'with_speaker_info': 'False',
}
}
response = requests.post(
f"{Config.SERVICE_URL}/submit",
data=json.dumps(request_data),
headers=self.headers
)
response.raise_for_status()
print(response.text)
return response.json()['resp']['id']
@retry(Config.MAX_RETRY_COUNT)
def _query_task(self, task_id: str) -> Optional[Dict[str, Any]]:
query_data = {
'appid': Config.APP_ID,
'token': Config.TOKEN,
'id': task_id,
'cluster': Config.CLUSTER
}
response = requests.post(
f"{Config.SERVICE_URL}/query",
data=json.dumps(query_data),
headers=self.headers
)
response.raise_for_status()
print("kakakaka")
return response.json()
@retry(Config.MAX_RETRY_COUNT)
def recognize_audio(self, audio_url: str) -> Optional[Dict[str, Any]]:
task_id = self._submit_task(audio_url)
if not task_id:
return None
for attempt in range(Config.MAX_TASK_ATTEMPTS):
result = self._query_task(task_id)
if not result:
return None
code = result['resp']['code']
if code >= 2000:
time.sleep(1)
print("重拾")
continue
elif code == 1000:
return result.get('resp').get('text')
else:
return None
return None
if __name__ == "__main__":
recognizer = SpeechRecognizer()
audio_url = "https://douchacha-web.tos-cn-beijing.volces.com/assets/7552163083708321058/7552163083708321058.mp4" # 替换为实际音频URL
result = recognizer.recognize_audio(audio_url)
print(result)
\ No newline at end of file
from flask import request
import os, sys
base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_path)
from ai_chat_stream import parse_utils, sys_content,parse_comment_utils
from ai_utils import get_bot_with_history_stream, get_ai_chat_history, get_bot_with_history_rag_stream, \
get_ai_chat_history_stream
from parse_utils import get_last_user_value, convert_dialog_format, create_res_data,extract_user_contents
from flask import Flask, Response
import json
app = Flask(__name__)
def sse_generator(data):
res_data = create_res_data("正在识别用户问题", 3)
yield f"data: {json.dumps(res_data, ensure_ascii=False)}\n\n".encode('utf-8')
sys_list = [{"role": "system", "content": sys_content.action_content}]
ai_reply, total_tokens = get_ai_chat_history(sys_list + get_last_user_value(data))
if ai_reply.get('content') == '1':
res_data = create_res_data("正在解析链接", 3)
yield f"data: {json.dumps(res_data,ensure_ascii=False)}\n\n".encode('utf-8')
convert_dialog_format_text = '拼接'.join(extract_user_contents(convert_dialog_format(data)))
yield from parse_utils.extract_mp4_to_text(convert_dialog_format_text)
elif ai_reply.get('content') == '2':
res_data = create_res_data("正在文案改写", 3)
yield f"data: {json.dumps(res_data, ensure_ascii=False)}\n\n".encode('utf-8')
sys_list = [{"role": "system", "content": sys_content.douyin_text_fix}]
bot_result_list = get_ai_chat_history_stream(sys_list + convert_dialog_format(data))
if bot_result_list:
for result in bot_result_list:
yield result
res_data = create_res_data("WRITE", 10)
yield f"data: {json.dumps(res_data, ensure_ascii=False)}\n\n".encode('utf-8')
elif ai_reply.get('content') == '3':
res_data = create_res_data("正在查询爱搜知识库", 3)
yield f"data: {json.dumps(res_data, ensure_ascii=False)}\n\n".encode('utf-8')
sys_list = [{"role": "system", "content": sys_content.helpful_content_text_rag}]
bot_result_list = get_bot_with_history_rag_stream(sys_list + convert_dialog_format(data))
if bot_result_list:
for result in bot_result_list:
yield result
elif ai_reply.get('content') == '4':
res_data = create_res_data("正在获取评论", 3)
yield f"data: {json.dumps(res_data, ensure_ascii=False)}\n\n".encode('utf-8')
convert_dialog_format_text = '拼接'.join(extract_user_contents(convert_dialog_format(data)))
yield from parse_comment_utils.extract_comment_to_text(convert_dialog_format_text)
res_data = create_res_data("WRITE,VIDEO", 10)
yield f"data: {json.dumps(res_data, ensure_ascii=False)}\n\n".encode('utf-8')
else:
res_data = create_res_data("正在联网搜索", 3)
yield f"data: {json.dumps(res_data, ensure_ascii=False)}\n\n".encode('utf-8')
sys_list = [{"role": "system", "content": sys_content.helpful_content_text}]
bot_result_list = get_bot_with_history_stream(sys_list + convert_dialog_format(data))
if bot_result_list:
for result in bot_result_list:
yield result
@app.route('/api/chat', methods=['POST'])
def sse_stream():
data = request.get_json()
return Response(
sse_generator(data),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8087)
This diff is collapsed.
This diff is collapsed.
import requests
import urllib.parse
import re
from urllib.parse import quote_plus
"""
快手
"""
def is_valid_kuaishou_profile(url: str) -> bool:
pattern = re.compile(r'''
^https:// # 必须以https协议开头
www\.kuaishou\.com # 固定域名
/f/ # 固定路径前缀
[0-9a-zA-Z_-]{10,20} # 用户ID:10-20位数字、字母、下划线或连字符
$ # 字符串结束
''', re.VERBOSE)
return bool(pattern.match(url))
def get_ks_info(video_url):
url = ""
if is_valid_kuaishou_profile(video_url):
url = f"http://172.16.18.10:8875/api/v1/ks?share_url={quote_plus(video_url)}"
payload={}
headers = {}
if url == "":
return None
response = requests.request("GET", url, headers=headers, data=payload)
return response.json()
"""
小红书
"""
def is_valid_xiaohongshu_link(url: str) -> bool:
pattern = re.compile(
r'^https://www\.xiaohongshu\.com/(?:discovery|explore)/'
r'(?:item/)?[0-9a-fA-F]{24}\?'
r'(?:[^&]+&)*?'
r'xsec_token=[^&]+'
r'(?:&[^&]+)*?$'
)
xhslink_pattern = re.compile(r'^http://xhslink\.com/m/[A-Za-z0-9]{11}$')
return bool(pattern.match(url) or xhslink_pattern.match(url))
def get_xhs_info(video_url):
url = ""
if is_valid_xiaohongshu_link(video_url):
print(video_url)
url = f"http://172.16.18.10:8875/api/v1/xhs?share_url={quote_plus(video_url)}"
payload = {}
headers = {}
if url == "":
return None
response = requests.request("GET", url, headers=headers, data=payload)
return response.json()
"""
抖音图文
"""
def get_dy_ocr_info(video_url):
url = ""
pattern1 = r'(?:douyin\.com/(?:video|share/video|note|jingxuan)/|video_id|modal_id=)(\d+)'
match1 = re.search(pattern1, video_url)
if match1:
url = f"http://172.16.18.10:8875/api/v1/ocr?video_id={match1.group(1)}"
pattern = r'https?://v\.douyin\.com/[a-zA-Z0-9-]+/?'
pattern = r'https?://v\.douyin\.com/[a-zA-Z0-9_\-]+/?'
if re.search(pattern, video_url):
url = f"http://172.16.18.10:8875/api/v1/ocr?video_share_url={video_url}"
payload = {}
headers = {}
if url == "":
return None
response = requests.request("GET", url, headers=headers, data=payload)
return response.json()
def is_valid_bilibili_video(url: str) -> bool:
pattern = re.compile(r'''
^https:// # 必须以https协议开头
www\.bilibili\.com # 固定域名
/video/ # 固定路径
BV[0-9a-zA-Z]{10,30} # BV号(放宽长度限制为10-15位)
/?\? # 允许视频ID后有一个可选的斜杠,再必须跟问号
.+ # 问号后至少有一个参数
$ # 字符串结束
''', re.VERBOSE)
return bool(pattern.match(url))
def get_bilibili_info(video_url):
url = ""
if is_valid_bilibili_video(video_url):
url = f"http://172.16.18.10:8875/api/v1/bilibili?share_url={quote_plus(video_url)}"
payload = {}
headers = {}
if url == "":
return None
response = requests.request("GET", url, headers=headers, data=payload)
return response.json()
if __name__ == '__main__':
# #
# # url= "https://v.kuaishou.com/KFy0JkVY"
# # "29 【要当可爱的水手啦~~~ - 夹克本人 | 小红书 - 你的生活兴趣社区】 😆 tx3lJkm14vxxx1A 😆 https://www.xiaohongshu.com/discovery/item/6894c614000000000403dcad?source=webshare&xhsshare=pc_web&xsec_token=CBmYaPr2HJVyffdsKGD_zkN5gOTLXgasc3dPOzNRGjKhs=&xsec_source=pc_share"
# # #
url= "https://v.douyin.com/MXCfDjwnbBs/ "
result = get_dy_ocr_info(url)
print(result)
# # print(len(result))
\ No newline at end of file
action_content = """
# 角色
你是一个文本判断回复助手,用户将输入内容,你需依据规则判断其对应类别并回复。
# 任务描述与要求
1. 仔细分析用户输入内容,判断其与以下类别之一的相关性:
- 文案提取 → 回复1
- 文案改写 → 回复2
- dso抖音搜索 → 回复3
- 评论分析 → 回复4
2. 判断逻辑优先级:
- 若输入中包含“文案提取”“提取文案”等明确指向文案提取的表述,回复1
- 若输入中包含“文案改写”“改写文案”等明确指向文案改写的表述,回复2
- 若输入中包含“dso抖音搜索”“抖音dso搜索”“抖音搜索”等明确指向dso抖音搜索的表述,回复3
- 若输入中包含“评论分析”“分析评论”等明确指向评论分析的表述,回复4
- 若输入中包含链接时优先返回1
- 若不包含上述表述,再判断是否为“纯平台链接”:仅当输入内容为单一链接(无任何其他字符,包括汉字、符号、空格等),且域名包含抖音、快手、小红书、B站时,回复1
# 相关限制
1. 回复必须严格依据上述规则,不得随意给出其他回复
2. 回复内容只能是1、2、3、4中的一个,保持简洁
"""
last = """
# 角色
你是一个文本判断回复助手,用户将输入与ai的对话历史
# 任务描述与要求
# 仔细分析用户输入内容,判断其与文案提取、文案改写、dso抖音搜索、评论分析的相关性。
# 若与文案提取相关,回复:1
# 若与文案改写相关,回复:2
# 若与dso抖音搜索相关,回复:3
# 若与评论分析相关,回复:4
# 相关限制
用户只输入了一个链接并且域名包含抖音、快手、小红书、b站并且没有任何汉字时回复:1
1. 回复必须准确依据规则,不能随意给出其他回复。
2. 回复内容必须简洁,只能是1、2、3、4
"""
douyin_text_fix = """
# 角色
你是一位专业的抖音短视频脚本改写专家,能够在遵循原文逻辑的基础上,
对给定的短视频脚本进行改写。要以自然、简洁的口语化方式表达,模仿真人说话的语气,
坚决不使用排比句以及复杂的词汇。同时,务必保留人物名、时间、地点、数字、政策名称等核心细节,保证原文关键信息不改变。
## 技能
### 技能 1: 抖音短视频脚本改写
1. 接收用户提供的抖音短视频脚本后,认真研读脚本内容,精准把握原文逻辑。
2. 逐句对脚本进行改写,使用简单易懂、贴近生活的口语化表述,摒弃排比句和复杂生僻的词汇。
3. 完成改写后,仔细核对,确保所有核心细节及关键信息与原文一致。
## 限制
- 仅围绕抖音短视频脚本改写提供服务,不回答与脚本改写无关的问题。
- 改写后的脚本必须严格遵循原文逻辑,核心细节和关键信息不得有误。
- 表述要口语化、自然、简洁,杜绝使用排比句和复杂词汇。
"""
helpful_content_text_rag = """
# 角色
你是爱搜灵犀北京爱查查开发的智能助手,你将根据用户的各种咨询需求,依据既定知识和逻辑,一步步为用户提供准确且有用的回答。
# 任务描述与要求
1. 对于用户提出的问题,需深入理解其意图,从多维度分析并给出全面的回答。
2. 回答要简洁明了,避免使用过于复杂的语言和句式,确保用户能轻松理解。
3. 若涉及专业知识,要在解释清楚的基础上,结合通俗易懂的例子辅助说明。
# 相关限制
1. 回答需基于准确的知识,不能随意编造信息。
2. 避免给出模棱两可、没有明确结论的回答。
3. 语言风格要保持友好、专业,不能使用不当或冒犯性的词汇。
不知道答案时返回 我还不太清楚你要表达什么意思
"""
helpful_content_text = """
# 角色
你是爱搜灵犀北京爱查查开发的智能助手,你将根据用户的各种咨询需求,依据既定知识和逻辑,一步步为用户提供准确且有用的回答。
# 任务描述与要求
1. 对于用户提出的问题,需深入理解其意图,从多维度分析并给出全面的回答。
2. 回答要简洁明了,避免使用过于复杂的语言和句式,确保用户能轻松理解。
3. 若涉及专业知识,要在解释清楚的基础上,结合通俗易懂的例子辅助说明。
# 相关限制
1. 回答需基于准确的知识,不能随意编造信息。
2. 避免给出模棱两可、没有明确结论的回答。
3. 语言风格要保持友好、专业,不能使用不当或冒犯性的词汇。
不知道答案时返回 我还不太清楚你要表达什么意思
## 限制 当用户问题是一个url 并且与文案提取相关时 回答链接格式错误 并且返回正确url链接样式
"""
comment_content= """
请根据提供的视频评论内容,生成一份结构清晰、内容详实的分析报告。报告需包含以下核心板块:
核心讨论信息与热点话题:分别提炼出核心讨论信息和热点话题。核心讨论信息要围绕评论中反复提及的关键表述、相关概念对比及用户重点关注的语言特征等方面;热点话题需聚焦用户对评论内容迭代、不同场景下评论表述差异等的反馈。
核心关键词:汇总评论中出现的与评论内容本身、用户表达感受、相关概念对比等紧密相关的关键词汇。
讨论内容情感分布:按正面、负面、中性情感分类,说明各情感占比,并列举相应的典型评论内容,体现不同情感的核心表达。
与评论相关的观点分析:从正面观点和负面观点两方面阐述,正面观点涵盖评论内容的优势、表述清晰度、情感传递效果等;负面观点指出评论内容存在的问题,同时挖掘潜在的优化机会点。
总结:概括评论内容的整体舆情概况,提炼重点信息,包括评论内容的差异化特征、引发关注的核心因素以及需要改进或明确的表述方面等。
整体报告需逻辑连贯,语言简洁明了,能清晰呈现视频评论本身的关键信息和用户对评论的态度。
返回文字就可以 不需要 样式
"""
# -*- coding: utf-8 -*-
import time
import tos
from io import StringIO
import json
TOS_AK = "AKLTMmRiMmU3YmY5ZjZjNDZkMTlhMmQxY2JkYTllYTQzNDI"
TOS_SK = "WkdWak4yUTRNakl3WVdOa05HUXdaR0V4TlRBM1l6YzJZMll5WkRnMFlUTQ=="
TOS_ENDPOINT = "tos-cn-beijing.volces.com"
TOS_REGION = "cn-beijing"
TOS_BUCKET_NAME = "douchacha-web"
def check_file_in_tos(object_key, retry_times=1000):
retry_count = 0
client = tos.TosClientV2(TOS_AK, TOS_SK, TOS_ENDPOINT, TOS_REGION, max_retry_count=3)
while retry_count <= retry_times:
try:
client.head_object(bucket=TOS_BUCKET_NAME, key=object_key)
return True
except (tos.exceptions.TosClientError,
tos.exceptions.TosServerError,
Exception) as e:
if isinstance(e, tos.exceptions.TosServerError) and e.status_code == 404:
retry_count += 1
time.sleep(1)
if retry_count > retry_times:
return False
def check_file_in_tos_buff(object_key):
client = tos.TosClientV2(TOS_AK, TOS_SK, TOS_ENDPOINT, TOS_REGION, max_retry_count=3)
try:
client.head_object(bucket=TOS_BUCKET_NAME, key=object_key)
return True
except (tos.exceptions.TosClientError,
tos.exceptions.TosServerError,
Exception) as e:
return False
def put_string_to_tos( object_key, content, retry_times=3):
client = tos.TosClientV2(TOS_AK, TOS_SK, TOS_ENDPOINT, TOS_REGION, max_retry_count=3)
for _ in range(retry_times):
try:
client.put_object(TOS_BUCKET_NAME, object_key, content=json.dumps(content))
return True
except Exception as e:
print(e)
continue
return False
def get_string_from_tos( object_key, retry_times=3):
client = tos.TosClientV2(TOS_AK, TOS_SK, TOS_ENDPOINT, TOS_REGION, max_retry_count=3)
for _ in range(retry_times):
try:
response = client.get_object(TOS_BUCKET_NAME,object_key)
content = response.read()
return json.loads(content)
except tos.exceptions.TosServerError as e:
if e.status_code == 404:
print(f"文件不存在: {object_key}")
return None
except (tos.exceptions.TosClientError, json.JSONDecodeError) as e:
print(f"JSON解析失败")
return False
#
# if __name__ == '__main__':
#
import json
from flask import request,Flask, jsonify
import redis
redis_client = redis.Redis(host='172.16.0.24', port=6379,db=8, password='aiyingli@@123', socket_timeout=10,decode_responses=True)
app = Flask(__name__)
@app.route('/dso/video_gen', methods=['POST'])
def video_gen():
data = request.get_json()
redis_client.lpush('dso_video_gen', json.dumps(data))
return jsonify({
"code": 200,
"data":'',
"msg": "数据接收成功"
})
@app.route('/dso/image_gen', methods=['POST'])
def image_gen():
data = request.get_json()
redis_client.lpush('dso_image_gen', json.dumps(data))
return jsonify({
"code": 200,
"data":'',
"msg": "数据接收成功"
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8086, debug=True)
\ No newline at end of file
# -*- coding: utf-8 -*-
import requests
import json
import os, sys
import redis
from concurrent.futures import ThreadPoolExecutor, as_completed
base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_path)
import tos_util
redis_client = redis.Redis(host='172.16.0.24', port=6379, db=8, password='aiyingli@@123', socket_timeout=10,
decode_responses=True)
doubao_api_token = 'Bearer fcc424e5-58af-494d-9683-5787413a26c9'
dir = '/Users/yaowentong/Desktop/image/'
tos_dir = ''
size_map = {
'1:1': '2048x2048',
'4:3': '2304x1728',
'3:4': '1728x2304',
'16:9': '2560x1440',
'9:16': '1440x2560',
'3:2': '2496x1664',
'2:3': '1664x2496',
'21:9': '3024x1296',
}
def gen_image(prompt, size,task_id):
url = "https://ark.cn-beijing.volces.com/api/v3/images/generations"
payload = json.dumps({
"model": "doubao-seedream-4-0-250828",
"prompt": prompt,
"size": size_map[size],
"sequential_image_generation": "disabled",
"sequential_image_generation_options": {
"max_images": 1
},
"stream": False,
"response_format": "url",
"watermark": False
})
headers = {
'Authorization': doubao_api_token,
'Content-Type': 'application/json'
}
try:
response = requests.request("POST", url, headers=headers, data=payload, timeout=10000)
return response.json().get('data')
except Exception as e:
json_url = f'dso/ai_gen/single/{task_id}/info.json'
tos_util.put_string_to_tos(f'{json_url}',
["异常:你输入的文字不符合制作规则,请修改后重试"])
print(f"下载失败:{str(e)}")
def download_image(url, save_path):
try:
# 发送 GET 请求获取图片内容
response = requests.get(url, stream=True)
response.raise_for_status() # 检查请求是否成功
# 提取保存路径中的目录部分
save_dir = os.path.dirname(save_path)
# 若目录不存在则创建(包括多级目录)
if save_dir and not os.path.exists(save_dir):
os.makedirs(save_dir, exist_ok=True) # exist_ok=True 避免目录已存在时报错
# 写入文件
with open(save_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk: # 过滤空块
file.write(chunk)
print(f"图片已成功下载至:{save_path}")
except Exception as e:
print(f"下载失败:{str(e)}")
def process_gen_image(prompt, ratio, task_id):
# url_list = []
# for i in range(4):
# url_list_result = gen_image(prompt, ratio)
# url_list.append(url_list_result[0])
# url_list_json = []
# for index, item in enumerate(url_list):
# save_path = f'{dir}{task_id}/{index}.jpeg'
# download_image(item['url'], save_path)
# tos_util.put_video(f'dso/ai_gen/single/{task_id}/{index}.jpeg', save_path)
# url_list_json.append(f'https://static2.douchacha.com/dso/ai_gen/single/{task_id}/{index}.jpeg')
#
# json_url = f'dso/ai_gen/single/{task_id}/info.json'
# tos_util.put_string_to_tos(f'{json_url}', url_list_json)
# 使用线程池并发执行4次请求
url_list = []
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交4个任务
futures = [executor.submit(gen_image, prompt, ratio,task_id) for _ in range(4)]
# 获取执行结果(按完成顺序)
for future in as_completed(futures):
url_list_result = future.result()
url_list.append(url_list_result[0])
# 后续逻辑保持不变
url_list_json = []
for index, item in enumerate(url_list):
save_path = f'{dir}{task_id}/{index}.jpeg'
download_image(item['url'], save_path)
tos_util.put_video(f'dso/ai_gen/single/{task_id}/{index}.jpeg', save_path)
url_list_json.append(f'https://static2.douchacha.com/dso/ai_gen/single/{task_id}/{index}.jpeg')
json_url = f'dso/ai_gen/single/{task_id}/info.json'
tos_util.put_string_to_tos(f'{json_url}', url_list_json)
# os.remove(f'{dir}{task_id}/')
# # 保存路径(可自定义,这里保存为当前目录下的 image.jpg)
# save_path = f'{dir}image.jpeg'
#
# # 调用下载函数
#
if __name__ == '__main__':
task = redis_client.rpop('dso_image_gen_task')
try:
task_json = json.loads(task)
prompt = task_json['prompt']
ratio = task_json['ratio']
task_id = task_json['task_id']
process_gen_image(prompt, ratio, task_id)
except Exception as err:
# redis_client.rpush('dso_image_gen', task)
print(f'{err}')
# prompt = '情趣内衣'
# ratio = '16:9'
# task_id = 'aaa'
# process_gen_image(prompt, ratio, task_id)
import json
import redis
import subprocess
import time
import os
import psutil # 用于通过PID检查进程是否存活
def main():
# Redis连接配置(根据实际情况修改)
redis_host = '172.16.0.24'
redis_port = 6379
redis_db = 8
redis_password = 'aiyingli@@123'
redis_list_key = 'dso_image_gen' # 存储任务的list键名
redis_list_key_task = 'dso_image_gen_task' # 存储任务的list键名
# 最大并发数
max_concurrent = 10
try:
# 连接Redis
r = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
password=redis_password,
decode_responses=True # 自动将返回值解码为字符串
)
r.ping()
print("成功连接到Redis")
except Exception as e:
print(f"Redis连接失败: {e}")
return
# 存储当前运行的实际任务进程PID(而非shell进程)
task_pids = []
try:
print("开始监控任务队列... (按Ctrl+C停止)")
while True:
# 清理已结束的任务进程(检查PID是否存活)
active_pids = []
for pid in task_pids:
if psutil.pid_exists(pid):
print(f"任务进程PID {pid} 正在执行")
# 进程仍在运行
active_pids.append(pid)
else:
print(f"任务进程PID {pid} 已结束")
task_pids = active_pids # 更新存活的PID列表
# 检查是否可以启动新任务
if len(task_pids) < max_concurrent:
# 从Redis列表左侧获取一个任务
task = r.rpop(redis_list_key)
if task:
# print(f"获取到新任务: {json.loads(task)}")
task = r.lpush(redis_list_key_task,task)
try:
# 启动任务并捕获实际进程的PID
# 命令说明:
# 1. 用nohup启动任务,将PID写入临时文件
# 2. 从临时文件读取PID并记录
pid_file = f"/tmp/video_task_{int(time.time())}.pid"
cmd = (
f'nohup python3.9 ./image_gen_process.py'
f'> /dev/null 2>&1 & echo $! > {pid_file}'
)
# 执行命令(启动任务并写入PID)
process = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# 等待命令执行完成(确保PID文件已生成)
stdout, stderr = process.communicate()
if stderr:
raise Exception(f"命令执行错误: {stderr}")
# 读取PID文件获取实际任务的PID
if os.path.exists(pid_file):
with open(pid_file, 'r') as f:
pid = int(f.read().strip())
os.remove(pid_file) # 清理临时文件
task_pids.append(pid)
print(f"启动任务进程(PID: {pid}),当前并发数: {len(task_pids)}/{max_concurrent}")
else:
raise Exception("未生成PID文件,无法获取任务进程ID")
except Exception as e:
print(f"启动任务失败: {e}")
r.rpush(redis_list_key, task) # 失败时放回队列
time.sleep(5)
else:
# 没有任务时短暂休息
time.sleep(5)
else:
# 达到最大并发数,等待10秒
print(f"已达到最大并发数 {max_concurrent},等待10秒...")
time.sleep(10)
except KeyboardInterrupt:
print("\n收到停止信号,正在等待所有任务结束...")
# 等待所有任务进程结束(可选:如果需要强制终止,可调用psutil.Process(pid).terminate())
for pid in task_pids:
if psutil.pid_exists(pid):
psutil.Process(pid).wait() # 等待进程自然结束
print(f"任务进程PID {pid} 已终止")
print("所有任务已处理完毕,程序退出")
if __name__ == "__main__":
main()
\ No newline at end of file
import os
import tos
import json
TOS_AK = "AKLTMmRiMmU3YmY5ZjZjNDZkMTlhMmQxY2JkYTllYTQzNDI"
TOS_SK = "WkdWak4yUTRNakl3WVdOa05HUXdaR0V4TlRBM1l6YzJZMll5WkRnMFlUTQ=="
TOS_ENDPOINT = "tos-cn-beijing.volces.com"
TOS_REGION = "cn-beijing"
TOS_BUCKET_NAME = "douchacha-web"
def put_video(object_key,file_name):
try:
client = tos.TosClientV2(TOS_AK, TOS_SK, TOS_ENDPOINT, TOS_REGION)
client.put_object_from_file(TOS_BUCKET_NAME, object_key, file_name)
except tos.exceptions.TosClientError as e:
print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause))
def put_string_to_tos( object_key, content, retry_times=3):
client = tos.TosClientV2(TOS_AK, TOS_SK, TOS_ENDPOINT, TOS_REGION, max_retry_count=3)
for _ in range(retry_times):
try:
json_str = json.dumps(content, ensure_ascii=False)
client.put_object(TOS_BUCKET_NAME, object_key, content=json_str.encode('utf-8'))
return True
except Exception as e:
print(e)
continue
return False
This diff is collapsed.
This diff is collapsed.
import json
import redis
import subprocess
import time
import os
import psutil # 用于通过PID检查进程是否存活
def main():
# Redis连接配置(根据实际情况修改)
redis_host = '172.16.0.24'
redis_port = 6379
redis_db = 8
redis_password = 'aiyingli@@123'
redis_list_key = 'dso_video_gen' # 存储任务的list键名
redis_list_key_task = 'dso_video_gen_task' # 存储任务的list键名
# 最大并发数
max_concurrent = 10
try:
# 连接Redis
r = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
password=redis_password,
decode_responses=True # 自动将返回值解码为字符串
)
r.ping()
print("成功连接到Redis")
except Exception as e:
print(f"Redis连接失败: {e}")
return
# 存储当前运行的实际任务进程PID(而非shell进程)
task_pids = []
try:
print("开始监控任务队列... (按Ctrl+C停止)")
while True:
# 清理已结束的任务进程(检查PID是否存活)
active_pids = []
for pid in task_pids:
if psutil.pid_exists(pid):
print(f"任务进程PID {pid} 正在执行")
# 进程仍在运行
active_pids.append(pid)
else:
print(f"任务进程PID {pid} 已结束")
task_pids = active_pids # 更新存活的PID列表
# 检查是否可以启动新任务
if len(task_pids) < max_concurrent:
# 从Redis列表左侧获取一个任务
task = r.rpop(redis_list_key)
if task:
print(task)
task = r.lpush(redis_list_key_task,task)
try:
# 启动任务并捕获实际进程的PID
# 命令说明:
# 1. 用nohup启动任务,将PID写入临时文件
# 2. 从临时文件读取PID并记录
pid_file = f"/tmp/video_task_{int(time.time())}.pid"
cmd = (
f'nohup python3.9 ./video_gen_progress.py '
f'> /dev/null 2>&1 & echo $! > {pid_file}'
)
# 执行命令(启动任务并写入PID)
process = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# 等待命令执行完成(确保PID文件已生成)
stdout, stderr = process.communicate()
if stderr:
raise Exception(f"命令执行错误: {stderr}")
# 读取PID文件获取实际任务的PID
if os.path.exists(pid_file):
with open(pid_file, 'r') as f:
pid = int(f.read().strip())
os.remove(pid_file) # 清理临时文件
task_pids.append(pid)
print(f"启动任务进程(PID: {pid}),当前并发数: {len(task_pids)}/{max_concurrent}")
else:
raise Exception("未生成PID文件,无法获取任务进程ID")
except Exception as e:
print(f"启动任务失败: {e}")
r.rpush(redis_list_key, task) # 失败时放回队列
time.sleep(5)
else:
# 没有任务时短暂休息
time.sleep(5)
else:
# 达到最大并发数,等待10秒
print(f"已达到最大并发数 {max_concurrent},等待10秒...")
time.sleep(10)
except KeyboardInterrupt:
print("\n收到停止信号,正在等待所有任务结束...")
# 等待所有任务进程结束(可选:如果需要强制终止,可调用psutil.Process(pid).terminate())
for pid in task_pids:
if psutil.pid_exists(pid):
psutil.Process(pid).wait() # 等待进程自然结束
print(f"任务进程PID {pid} 已终止")
print("所有任务已处理完毕,程序退出")
if __name__ == "__main__":
main()
\ No newline at end of file
# clients/tos_client.py
import tos
from aidso_geo.config import TOSConfig # 从配置中读取参数
class TOSClient:
def __init__(self):
self.client = tos.TosClientV2(
TOSConfig.TOSConfig.TOS_AK,
TOSConfig.TOSConfig.TOS_SK,
TOSConfig.TOSConfig.TOS_ENDPOINT,
TOSConfig.TOSConfig.TOS_REGION,
max_retry_count=3 # 客户端默认重试次数
)
self.bucket_name = TOSConfig.TOSConfig.TOS_BUCKET_NAME
def get_client(self):
return self.client
# 单例实例,全局使用
tos_client = TOSClient()
if __name__ == '__main__':
print(TOSConfig.TOSConfig.TOS_AK)
\ No newline at end of file
from .base_config import BaseConfig
class TOSConfig(BaseConfig):
TOS_AK = "AKLTMmRiMmU3YmY5ZjZjNDZkMTlhMmQxY2JkYTllYTQzNDI"
TOS_SK = "WkdWak4yUTRNakl3WVdOa05HUXdaR0V4TlRBM1l6YzJZMll5WkRnMFlUTQ=="
TOS_ENDPOINT = "tos-cn-beijing.volces.com"
TOS_REGION = "cn-beijing"
TOS_BUCKET_NAME = "douchacha-web"
\ No newline at end of file
# config/base_config.py
import json
import os
import time
from enum import Enum
import redis
def init_redis():
try:
redis_client = redis.Redis(
host='172.16.0.24',
# host='redis-cnlfmu7rl14awitrz.redis.ivolces.com',
port=6379,
db=0,
password='aiyingli@@123',
socket_timeout=5,
decode_responses=True # 自动解码为字符串,避免bytes类型问题
)
return redis_client
except Exception as e:
return None
_redis8_client = None
_redis8_pool = None
def init_redis8():
global _redis8_client, _redis8_pool
if _redis8_client is not None:
return _redis8_client
try:
_redis8_pool = redis.ConnectionPool(
host='redis-cnlfmu7rl14awitrz.redis.ivolces.com',
port=6379,
db=8,
password='aiyingli@@123',
socket_timeout=5,
socket_connect_timeout=5,
decode_responses=True,
retry_on_timeout=True,
health_check_interval=30,
socket_keepalive=True,
max_connections=20,
)
_redis8_client = redis.Redis(connection_pool=_redis8_pool)
_redis8_client.ping()
return _redis8_client
except Exception as e:
print(f"Redis 初始化失败: {e}")
_redis8_client = None
_redis8_pool = None
return None
class PlatformType(Enum):
DEEPSEEK = "DP"
DOUBAO = "DB"
YUANBAO = "TXYB"
QIANWEN = "TYQW"
KIMI = "KIMI"
WENXINYIYAN = "WXYY"
BAIDUAI = "BDAI"
DOUYINAI = "DYAI"
DOUBAOANDROID = "DOUBA"
DEEPSEEKANDROID = "DPA"
QIANWENANDROID = "TYQWA"
YUANBAOANDROID = "TXYBA"
XIAOHONGSHUANDROID = "XHSA"
@classmethod
def from_str(cls, platform_str: str) -> "PlatformType":
for member in cls:
if member.value == platform_str:
return member
raise ValueError(f"无效的平台字符串:{platform_str},可选值:{[m.value for m in cls]}")
class BaseConfig:
base_url = "http://172.16.1.37:10444/api/v1/"
PLATFORM_CONFIGS = {
PlatformType.DEEPSEEK.value: {
"url": f"{base_url}deepseek",
"storage_path": lambda tid: f"geo/{tid}/DP/original.text"
},
PlatformType.DOUBAO.value: {
"url": f"{base_url}doubao",
"storage_path": lambda tid: f"geo/{tid}/DB/original.text"
},
PlatformType.YUANBAO.value: {
"url": f"{base_url}yuanbao",
"storage_path": lambda tid: f"geo/{tid}/TXYB/original.text"
},
PlatformType.QIANWEN.value: {
"url": f"{base_url}qianwen",
"storage_path": lambda tid: f"geo/{tid}/TYQW/original.text"
},
PlatformType.KIMI.value: {
"url": f"{base_url}kimi",
"storage_path": lambda tid: f"geo/{tid}/KIMI/original.text"
},
PlatformType.WENXINYIYAN.value: {
"url": f"{base_url}wenxinyiyan",
"storage_path": lambda tid: f"geo/{tid}/WXYY/original.text"
},
PlatformType.BAIDUAI.value: {
"url": f"{base_url}baiduai",
"storage_path": lambda tid: f"geo/{tid}/BDAI/original.text"
},
PlatformType.DOUYINAI.value: {
"url": f"{base_url}aidouyin",
"storage_path": lambda tid: f"geo/{tid}/DYAI/original.text"
},
PlatformType.DOUBAOANDROID.value: {
"url": f"{base_url}doubao_android",
"storage_path": lambda tid: f"geo/{tid}/DOUBA/original.text"
},
PlatformType.DEEPSEEKANDROID.value: {
"url": f"{base_url}deepseek_android",
"storage_path": lambda tid: f"geo/{tid}/DPA/original.text"
},
PlatformType.QIANWENANDROID.value: {
"url": f"{base_url}qianwen_android",
"storage_path": lambda tid: f"geo/{tid}/TYQWA/original.text"
},
PlatformType.YUANBAOANDROID.value: {
"url": f"{base_url}yuanbao_android",
"storage_path": lambda tid: f"geo/{tid}/TXYBA/original.text"
},
PlatformType.XIAOHONGSHUANDROID.value: {
"url": f"{base_url}hongshu_android",
"storage_path": lambda tid: f"geo/{tid}/XHSA/original.text"
}
}
if __name__ == '__main__':
key_list = ['BDAI:geo:stream_batch:list',
'BDAI:geo:batch:list',
'DB:geo:stream_batch:list',
'DB:geo:batch:list',
'DOUBA:geo:stream_batch:list',
'DOUBA:geo:batch:list',
'DP:geo:stream_batch:list',
'DP:geo:batch:list',
'DPA:geo:stream_batch:list',
'DPA:geo:batch:list',
'DYAI:geo:stream_batch:list',
'DYAI:geo:batch:list',
'KIMI:geo:stream_batch:list',
'KIMI:geo:batch:list',
'TXYB:geo:stream_batch:list',
'TXYB:geo:batch:list',
'TXYBA:geo:stream_batch:list',
'TXYBA:geo:batch:list',
'TYQW:geo:stream_batch:list',
'TYQW:geo:batch:list',
'TYQWA:geo:stream_batch:list',
'TYQWA:geo:batch:list',
'WXYY:geo:stream_batch:list',
'WXYY:geo:batch:list',
'geo:task_commit:list']
t = init_redis()
# print(t.scard('mt_third_task'))
while 1:
for k in key_list:
# print(t.smembers(k))
print(f"{k}----{t.llen(k)}")
time.sleep(10)
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from loguru import logger
from apscheduler.schedulers.blocking import BlockingScheduler
from aidso_geo.config.base_config import init_redis
from aidso_geo.models.process import main_process
redis_client = init_redis()
QUEUE_KEY = "geo:task_commit:list"
# 每轮最多拉取任务数量
BATCH_SIZE = 1000
# 每 60 秒执行一次
INTERVAL_SECONDS = 60
# 每轮内部并发数
CONCURRENT_WORKERS = 40
def parse_task(raw):
try:
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
return json.loads(raw)
except Exception as e:
logger.exception(f"解析队列数据失败: {e}, raw={raw}")
return None
def pull_tasks():
"""
每次最多从 Redis 队列拉取 BATCH_SIZE 条任务
"""
task_list = []
for _ in range(BATCH_SIZE):
raw = redis_client.rpop(QUEUE_KEY)
if not raw:
break
data = parse_task(raw)
if data:
task_list.append(data)
return task_list
def handle_one_task(task):
"""
单条任务处理。
成功:正常结束
失败:重新放回 Redis 队列
"""
req_id = task.get("reqId", "")
platform = task.get("platform", "")
prompt = task.get("prompt", "")
try:
main_process(task)
return True
except Exception as e:
logger.exception(f"{req_id}--{platform}--处理失败,重新放回队列: {e}")
redis_client.lpush(
QUEUE_KEY,
json.dumps(task, ensure_ascii=False)
)
return False
def consume_task_queue():
"""
定时任务:
1. 拉取 Redis 队列
2. 使用线程池并发执行 main_process
"""
start_time = time.time()
try:
tasks = pull_tasks()
if not tasks:
logger.info("本轮没有任务")
return
with ThreadPoolExecutor(max_workers=CONCURRENT_WORKERS) as executor:
futures = [
executor.submit(handle_one_task, task)
for task in tasks
]
for future in as_completed(futures):
try:
ok = future.result()
except Exception as e:
logger.exception(f"线程任务异常: {e}")
cost = round(time.time() - start_time, 2)
logger.success(
f"本轮处理完成,总数: {len(tasks)}, "
f"耗时: {cost}s"
)
except Exception as e:
logger.exception(f"定时消费任务异常: {e}")
if __name__ == "__main__":
scheduler = BlockingScheduler(timezone="Asia/Shanghai")
scheduler.add_job(
consume_task_queue,
trigger="interval",
seconds=INTERVAL_SECONDS,
id="consume_geo_task_commit_queue",
max_instances=5, # 允许最多 5 个调度批次同时跑
coalesce=False, # 不合并错过的执行
misfire_grace_time=60
)
logger.success(
f"geo task commit consumer 启动,每 {INTERVAL_SECONDS}s 执行一次,"
f"QUEUE_KEY={QUEUE_KEY}, "
f"BATCH_SIZE={BATCH_SIZE}, "
f"CONCURRENT_WORKERS={CONCURRENT_WORKERS}, "
f"max_instances=5"
)
consume_task_queue()
scheduler.start()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from flask import Flask
from aidso_geo.core.routes.interface import line_app
from aidso_geo.core.routes.feishu_interface import feishu_app
from aidso_geo.core.routes.third_interface import third_app
app = Flask(__name__)
app.json.ensure_ascii = False
app.register_blueprint(line_app)
app.register_blueprint(third_app)
app.register_blueprint(feishu_app)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8086)
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment