package main
import (
"context"
"encoding/base64"
"encoding/binary"
"flag"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/Devpro-Software/vatel-go-sdk"
"github.com/gordonklaus/portaudio"
)
const (
sampleRate = 24000
blockSize = 480
)
func main() {
apiKey := flag.String("api-key", os.Getenv("VATEL_API_KEY"), "Organization API key")
agentID := flag.String("agent-id", os.Getenv("VATEL_AGENT_ID"), "Agent UUID")
baseURL := flag.String("base-url", defaultEnv("VATEL_BASE_URL", "https://api.vatel.ai"), "API base URL")
outputPCM := flag.String("output", "session_out.pcm", "Path to write response audio (raw PCM 16-bit 24kHz mono)")
flag.Parse()
if *apiKey == "" || *agentID == "" {
log.Fatal("error: set -api-key and -agent-id (or VATEL_API_KEY, VATEL_AGENT_ID)")
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
if err := run(ctx, runOpts{
baseURL: *baseURL,
apiKey: *apiKey,
agentID: *agentID,
outputPath: *outputPCM,
}); err != nil && err != context.Canceled {
log.Fatal(err)
}
}
type runOpts struct {
baseURL, apiKey, agentID string
outputPath string
}
func defaultEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func run(ctx context.Context, opts runOpts) error {
client := vatel.New(opts.baseURL, opts.apiKey)
tokenResp, err := client.SessionToken(ctx, opts.agentID)
if err != nil {
return fmt.Errorf("session token: %w", err)
}
conn, err := client.DialConnection(ctx, tokenResp.Token)
if err != nil {
return fmt.Errorf("dial connection: %w", err)
}
defer conn.Close()
if err := portaudio.Initialize(); err != nil {
return fmt.Errorf("portaudio init: %w", err)
}
defer portaudio.Terminate()
var outFile *os.File
if opts.outputPath != "" {
var err error
outFile, err = os.Create(opts.outputPath)
if err != nil {
return fmt.Errorf("create output file: %w", err)
}
defer outFile.Close()
}
player, err := startPlayback()
if err != nil {
return fmt.Errorf("playback: %w", err)
}
defer player.Close()
sendCtx, cancelSend := context.WithCancel(ctx)
defer cancelSend()
var sendWg sync.WaitGroup
sendWg.Add(1)
go func() {
defer sendWg.Done()
runMic(sendCtx, conn)
}()
for msg := range conn.Messages() {
select {
case <-ctx.Done():
cancelSend()
sendWg.Wait()
return ctx.Err()
default:
}
data, _ := msg.ParseData()
switch msg.Type {
case vatel.TypeSessionStarted:
if d, ok := data.(vatel.SessionStartedData); ok {
log.Println("Session started:", d.ID)
}
case vatel.TypeResponseAudio:
if d, ok := data.(vatel.ResponseAudioData); ok {
decoded, err := base64.StdEncoding.DecodeString(d.Audio)
if err != nil {
continue
}
if outFile != nil {
outFile.Write(decoded)
}
player.Push(decoded)
}
case vatel.TypeToolCall:
if d, ok := data.(vatel.ToolCallData); ok {
log.Println("Tool:", d.ToolName)
conn.SendToolCallOutput(d.ToolCallID, "ok")
}
case vatel.TypeSessionEnded:
log.Println("Session ended.")
cancelSend()
sendWg.Wait()
return nil
case vatel.TypeResponseText:
if d, ok := data.(vatel.ResponseTextData); ok {
log.Println("Agent:", d.Text)
}
default:
log.Println("Event:", msg.Type)
}
}
cancelSend()
sendWg.Wait()
return nil
}
func runMic(ctx context.Context, conn *vatel.Connection) {
inBuf := make([]int16, blockSize)
stream, err := portaudio.OpenDefaultStream(1, 0, float64(sampleRate), blockSize, &inBuf)
if err != nil {
log.Printf("open mic stream: %v", err)
return
}
defer stream.Close()
if err := stream.Start(); err != nil {
log.Printf("start mic: %v", err)
return
}
defer stream.Stop()
for {
select {
case <-ctx.Done():
return
default:
}
if err := stream.Read(); err != nil {
return
}
conn.SendInputAudioBytes(int16SliceToBytes(inBuf))
}
}
type playbackPlayer struct {
mu sync.Mutex
buf []byte
stream *portaudio.Stream
}
func (p *playbackPlayer) Push(pcm []byte) {
if len(pcm) == 0 {
return
}
p.mu.Lock()
p.buf = append(p.buf, pcm...)
p.mu.Unlock()
}
func (p *playbackPlayer) callback(out []int16) {
for i := range out {
out[i] = 0
}
p.mu.Lock()
defer p.mu.Unlock()
const bytesPerSample = 2
wantBytes := len(out) * bytesPerSample
haveBytes := min(len(p.buf), wantBytes)
haveBytes = (haveBytes / bytesPerSample) * bytesPerSample
if haveBytes > 0 {
samples := pcm16BytesToInt16LE(p.buf[:haveBytes])
copy(out, samples)
p.buf = p.buf[haveBytes:]
}
}
func (p *playbackPlayer) Close() {
if p.stream != nil {
p.stream.Stop()
p.stream.Close()
p.stream = nil
}
}
func startPlayback() (*playbackPlayer, error) {
p := &playbackPlayer{}
stream, err := portaudio.OpenDefaultStream(0, 1, float64(sampleRate), blockSize, p.callback)
if err != nil {
return nil, err
}
p.stream = stream
if err := stream.Start(); err != nil {
stream.Close()
return nil, err
}
return p, nil
}
func pcm16BytesToInt16LE(b []byte) []int16 {
if len(b)%2 != 0 {
b = b[:len(b)-1]
}
out := make([]int16, len(b)/2)
for i := range out {
out[i] = int16(binary.LittleEndian.Uint16(b[i*2:]))
}
return out
}
func int16SliceToBytes(s []int16) []byte {
b := make([]byte, len(s)*2)
for i, v := range s {
binary.LittleEndian.PutUint16(b[2*i:], uint16(v))
}
return b
}