import requests
import time
API_KEY = "<API-KEY>"
BASE_URL = "https://api.tokenops.ai/v1"
def complete_omnihuman_video_generation(image_url, audio_url, prompt=""):
"""
完整的 OmniHuman 数字人视频生成流程
"""
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
# 步骤1: 主体检测(可选,用于获取 mask)
print("步骤1: 主体检测...")
detect_resp = requests.post(
f"{BASE_URL}/omnihuman/subject/detection",
headers=headers,
json={"model": "OmniHuman_1.5", "req_key": "jimeng_realman_avatar_object_detection", "image_url": image_url}
)
mask_urls = []
if detect_resp.status_code == 200:
detect_result = detect_resp.json()
print(f"主体检测完成: status={detect_result.get('status')}")
mask_urls = detect_result.get('mask_urls', [])
if mask_urls:
print(f"检测到 {len(mask_urls)} 个主体")
# 步骤2: 创建视频生成任务
print("\n步骤2: 创建视频生成任务...")
video_data = {
"model": "OmniHuman_1.5",
"req_key": "jimeng_realman_avatar_picture_omni_v15",
"prompt": prompt,
"generation_type": "omni_human",
"output_resolution": 1080,
"content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "audio_url", "audio_url": {"url": audio_url}}
]
}
# 如果有多个主体,使用第一个 mask
if len(mask_urls) > 1:
video_data["mask_urls"] = [mask_urls[0]]
print(f"使用 mask: {mask_urls[0]}")
create_resp = requests.post(
f"{BASE_URL}/videos",
headers=headers,
json=video_data
)
if create_resp.status_code != 200:
print(f"创建任务失败: {create_resp.text}")
return False
video_id = create_resp.json().get('id')
print(f"任务ID: {video_id}")
# 步骤3: 轮询任务状态
print("\n步骤3: 等待任务完成...")
max_wait_time = 600 # 10分钟
start_time = time.time()
while time.time() - start_time < max_wait_time:
status_resp = requests.get(
f"{BASE_URL}/videos/{video_id}",
headers={"Authorization": f"Bearer {API_KEY}"}
)
if status_resp.status_code == 200:
status_result = status_resp.json()
status = status_result.get('status')
print(f"当前状态: {status}")
if status == 'completed':
print("视频生成完成!")
break
elif status == 'failed':
print(f"视频生成失败: {status_result.get('error')}")
return False
time.sleep(15)
else:
print("等待超时")
return False
# 步骤4: 下载视频
print("\n步骤4: 下载视频...")
download_resp = requests.get(
f"{BASE_URL}/videos/{video_id}/content",
headers={"Authorization": f"Bearer {API_KEY}"}
)
if download_resp.status_code == 200:
output_path = f"omnihuman_video_{video_id}.mp4"
with open(output_path, "wb") as f:
f.write(download_resp.content)
print(f"视频已保存: {output_path}")
return True
else:
print(f"下载失败: {download_resp.status_code}")
return False
# 使用示例
if __name__ == "__main__":
success = complete_omnihuman_video_generation(
image_url="https://example.com/portrait.jpg",
audio_url="https://example.com/speech.mp3",
prompt="自然说话,面带微笑"
)
if success:
print("\n✅ 数字人视频生成成功!")
else:
print("\n❌ 数字人视频生成失败!")