Orpheus TTS

Canopy Labs LogoOrpheus TTS

An incredibly lifelike speech synthesis model by Canopy Labs.

Deploy Orpheus TTS behind an API endpoint in seconds.

Example usage

Orpheus TTS must generate ~83 tokens/second for real-time streaming. This implementation supports streaming and, on an H100 MIG GPU, can produce:

  • 16 concurrent real-time streams with variable traffic

  • 24 concurrent real-time streams with consistent traffic

  • 128 concurrent non-real-time generations for cost-efficient batching

Input
1import asyncio
2import aiohttp
3import uuid
4import time
5import os
6from concurrent.futures import ProcessPoolExecutor
7
8# Configuration
9MODEL = "dq4rlnkw"
10BASETEN_HOST = f"https://model-{MODEL}.api.baseten.co/environments/production/predict"
11BASETEN_API_KEY = os.environ["BASETEN_API_KEY"]
12PAYLOADS_PER_PROCESS = 5000
13NUM_PROCESSES = 8
14MAX_REQUESTS_PER_PROCESS = 1
15
16# Sample promptds
17prompts = [
18    """Hello there.
19Thank you for calling our support line.
20My name is Sarah and I'll be helping you today.
21Could you please provide your account number and tell me what issue you're experiencing?"""
22]
23prompt_types = ["short", "medium", "long"]
24
25base_request_payload = {
26    "max_tokens": 4096,
27    "voice": "tara",
28    "stop_token_ids": [128258, 128009],
29}
30
31
32async def stream_to_buffer(
33    session: aiohttp.ClientSession, label: str, payload: dict
34) -> bytes:
35    """Send one streaming request, accumulate into bytes, and log timings."""
36    req_id = str(uuid.uuid4())
37    payload = {**payload, "request_id": req_id}
38
39    t0 = time.perf_counter()
40
41    try:
42        async with session.post(
43            BASETEN_HOST,
44            json=payload,
45            headers={"Authorization": f"Api-Key {BASETEN_API_KEY}"},
46        ) as resp:
47            if resp.status != 200:
48                print(f"[{label}] ← HTTP {resp.status}")
49                return b""
50
51            buf = bytearray()
52            idx = 0
53            # *** CORRECTED: async for on the AsyncStreamIterator ***
54            async for chunk in resp.content.iter_chunked(4_096):
55                elapsed_ms = (time.perf_counter() - t0) * 1_000
56                if idx in [0]:
57                    print(
58                        f"[{label}] ← chunk#{idx} ({len(chunk)} B) @ {elapsed_ms:.1f} ms"
59                    )
60                buf.extend(chunk)
61                idx += 1
62
63            total_s = time.perf_counter() - t0
64            print(f"[{label}] ← done {len(buf)} B in {total_s:.2f}s")
65            return bytes(buf)
66
67    except Exception as e:
68        print(f"[{label}] ⚠️ exception: {e!r}")
69        return b""
70
71
72async def run_session(
73    session: aiohttp.ClientSession,
74    prompt: str,
75    ptype: str,
76    run_id: int,
77    semaphore: asyncio.Semaphore,
78) -> None:
79    """Wrap a single prompt run in its own error‐safe block."""
80    label = f"{ptype}_run{run_id}"
81    async with semaphore:
82        try:
83            payload = {**base_request_payload, "prompt": f"Chapter {run_id}: {prompt}"}
84            buf = await stream_to_buffer(session, label, payload)
85            if run_id < 3 and buf:
86                fn = f"output_{ptype}_run{run_id}.wav"
87                with open(fn, "wb") as f:
88                    f.write(buf)
89                print(f"[{label}] ➔ saved {fn}")
90
91        except Exception as e:
92            print(f"[{label}] 🛑 failed: {e!r}")
93
94
95async def run_with_offset(offset: int) -> None:
96    semph = asyncio.Semaphore(MAX_REQUESTS_PER_PROCESS)
97    connector = aiohttp.TCPConnector(limit_per_host=128, limit=128)
98    async with aiohttp.ClientSession(connector=connector) as session:
99        # warmup once per worker
100        await run_session(session, "warmup", "warmup", 90 + offset, semph)
101
102        tasks = []
103        for i, prompt in enumerate(prompts):
104            ptype = prompt_types[i]
105            print(f"\nWorker@offset {offset}{ptype} prompt starts…")
106            for run_id in range(offset, offset + PAYLOADS_PER_PROCESS):
107                tasks.append(run_session(session, prompt, ptype, run_id, semph))
108
109        await asyncio.gather(*tasks)
110        print(f"Worker@offset {offset} ✅ all done.")
111
112
113def run_with_offset_sync(offset: int) -> None:
114    try:
115        # create and run a fresh event loop in each process
116        asyncio.run(run_with_offset(offset))
117    except Exception as e:
118        print(f"Worker@offset {offset} ❌ error: {e}")
119
120
121def main():
122    offsets = [i * PAYLOADS_PER_PROCESS for i in range(NUM_PROCESSES)]
123    with ProcessPoolExecutor() as exe:
124        # map each offset to its own process
125        exe.map(run_with_offset_sync, offsets)
126
127    print("🎉 All processes completed.")
128
129
130if __name__ == "__main__":
131    main()
JSON output
1null

Deploy any model in just a few commands

Avoid getting tangled in complex deployment processes. Deploy best-in-class open-source models and take advantage of optimized serving for your own models.

$

truss init -- example stable-diffusion-2-1-base ./my-sd-truss

$

cd ./my-sd-truss

$

export BASETEN_API_KEY=MdNmOCXc.YBtEZD0WFOYKso2A6NEQkRqTe

$

truss push

INFO

Serializing Stable Diffusion 2.1 truss.

INFO

Making contact with Baseten 👋 👽

INFO

🚀 Uploading model to Baseten 🚀

Upload progress: 0% | | 0.00G/2.39G