import { ChildProcessWithoutNullStreams, spawn, spawnSync } from 'child_process' import { randomUUID } from 'crypto' import type { IncomingMessage, Server as HttpServer } from 'http' import type { Socket } from 'net' import { URL } from 'url' import { WebSocketServer, WebSocket } from 'ws' import Logger from '../utils/Logger' type VideoTransportMode = 'srt_mpegts' interface SrtGatewayConfig { enabled: boolean ffmpegPath: string portBase: number portSpan: number latencyMs: number publicHost: string playbackPath: string playbackWsOrigin: string idleTimeoutMs: number ticketTtlMs: number } interface PlaybackTicket { token: string deviceId: string clientId: string expiresAt: number } interface ViewerMeta { deviceId: string clientId: string } interface SrtSession { deviceId: string ingestPort: number ffmpegProcess: ChildProcessWithoutNullStreams | null viewers: Set lastActivityAt: number createdAt: number restartCount: number } export interface SrtSessionInfo { deviceId: string ingestHost: string ingestPort: number ingestUrl: string playbackUrl: string playbackPath: string latencyMs: number videoTransport: VideoTransportMode tokenExpiresAt: number } export class SrtGatewayService { private readonly logger: Logger private readonly httpServer: HttpServer private readonly wsServer: WebSocketServer private readonly config: SrtGatewayConfig private readonly sessions = new Map() private readonly usedPorts = new Set() private readonly tickets = new Map() private readonly viewerMeta = new Map() private readonly cleanupTimer: NodeJS.Timeout private ffmpegProbeCompleted = false private ffmpegAvailable = false constructor(httpServer: HttpServer) { this.httpServer = httpServer this.logger = new Logger('SrtGateway') this.config = this.loadConfig() this.wsServer = new WebSocketServer({ noServer: true }) this.cleanupTimer = setInterval(() => this.cleanupExpiredResources(), 10_000) this.httpServer.on('upgrade', (request: IncomingMessage, socket: Socket, head: Buffer) => { this.handleHttpUpgrade(request, socket, head) }) this.logger.info( `Initialized: enabled=${this.config.enabled}, playbackPath=${this.config.playbackPath}, portBase=${this.config.portBase}, portSpan=${this.config.portSpan}` ) } public isEnabled(): boolean { return this.config.enabled } public getPlaybackPath(): string { return this.config.playbackPath } public prepareStream(deviceId: string, clientId: string, options?: { ingestHostHint?: string, playbackWsOriginHint?: string }): SrtSessionInfo | null { if (!this.config.enabled) { return null } if (!this.ensureFfmpegAvailable()) { this.logger.error('ffmpeg not available; cannot prepare SRT session') return null } const session = this.ensureSession(deviceId) if (!session) { return null } session.lastActivityAt = Date.now() const ticket = this.issueTicket(deviceId, clientId) const ingestHost = this.resolveIngestHost(options?.ingestHostHint) const playbackOrigin = this.resolvePlaybackWsOrigin(options?.playbackWsOriginHint) return { deviceId, ingestHost, ingestPort: session.ingestPort, ingestUrl: this.buildIngestUrl(ingestHost, session.ingestPort), playbackUrl: `${playbackOrigin}${this.config.playbackPath}?token=${encodeURIComponent(ticket.token)}`, playbackPath: this.config.playbackPath, latencyMs: this.config.latencyMs, videoTransport: 'srt_mpegts', tokenExpiresAt: ticket.expiresAt } } public stopClientStream(deviceId: string, clientId: string): void { for (const [token, ticket] of this.tickets.entries()) { if (ticket.deviceId === deviceId && ticket.clientId === clientId) { this.tickets.delete(token) } } const session = this.sessions.get(deviceId) if (!session) { return } for (const ws of Array.from(session.viewers)) { const meta = this.viewerMeta.get(ws) if (!meta) continue if (meta.deviceId === deviceId && meta.clientId === clientId) { try { ws.close(1000, 'stream_stopped') } catch { ws.terminate() } } } session.lastActivityAt = Date.now() } public stopSession(deviceId: string): void { const session = this.sessions.get(deviceId) if (!session) { return } for (const [token, ticket] of this.tickets.entries()) { if (ticket.deviceId === deviceId) { this.tickets.delete(token) } } for (const ws of session.viewers) { try { ws.close(1001, 'session_closed') } catch { ws.terminate() } this.viewerMeta.delete(ws) } session.viewers.clear() if (session.ffmpegProcess) { session.ffmpegProcess.removeAllListeners() try { session.ffmpegProcess.kill('SIGTERM') } catch { session.ffmpegProcess.kill() } session.ffmpegProcess = null } this.usedPorts.delete(session.ingestPort) this.sessions.delete(deviceId) this.logger.info(`Session stopped: device=${deviceId}`) } public shutdown(): void { clearInterval(this.cleanupTimer) for (const deviceId of Array.from(this.sessions.keys())) { this.stopSession(deviceId) } try { this.wsServer.close() } catch { // ignore } } private loadConfig(): SrtGatewayConfig { // Stable default: disabled unless explicitly enabled. const enabled = process.env.SRT_GATEWAY_ENABLED === 'true' const ffmpegPath = process.env.SRT_FFMPEG_PATH?.trim() || 'ffmpeg' const portBase = this.parseNumberEnv(process.env.SRT_PORT_BASE, 12000, 1024, 65500) const portSpan = this.parseNumberEnv(process.env.SRT_PORT_SPAN, 1000, 10, 5000) const latencyMs = this.parseNumberEnv(process.env.SRT_LATENCY_MS, 80, 20, 800) const publicHost = process.env.SRT_PUBLIC_HOST?.trim() || '' const playbackPathRaw = process.env.SRT_PLAYBACK_PATH?.trim() || '/ws/srt-play' const playbackPath = playbackPathRaw.startsWith('/') ? playbackPathRaw : `/${playbackPathRaw}` const playbackWsOrigin = process.env.SRT_PLAYBACK_WS_ORIGIN?.trim() || '' const idleTimeoutMs = this.parseNumberEnv(process.env.SRT_IDLE_TIMEOUT_MS, 45_000, 10_000, 10 * 60_000) const ticketTtlMs = this.parseNumberEnv(process.env.SRT_TICKET_TTL_MS, 60_000, 5_000, 5 * 60_000) return { enabled, ffmpegPath, portBase, portSpan, latencyMs, publicHost, playbackPath, playbackWsOrigin, idleTimeoutMs, ticketTtlMs } } private parseNumberEnv(rawValue: string | undefined, defaultValue: number, min: number, max: number): number { const parsed = Number.parseInt(rawValue || '', 10) if (!Number.isFinite(parsed)) { return defaultValue } return Math.max(min, Math.min(max, parsed)) } private ensureFfmpegAvailable(): boolean { if (this.ffmpegProbeCompleted) { return this.ffmpegAvailable } this.ffmpegProbeCompleted = true try { const probe = spawnSync(this.config.ffmpegPath, ['-version'], { windowsHide: true, timeout: 3000, stdio: 'ignore' }) this.ffmpegAvailable = probe.status === 0 } catch (error) { this.ffmpegAvailable = false this.logger.error('ffmpeg probe failed', error) } if (!this.ffmpegAvailable) { this.logger.error(`ffmpeg unavailable: path=${this.config.ffmpegPath}`) } return this.ffmpegAvailable } private ensureSession(deviceId: string): SrtSession | null { const existing = this.sessions.get(deviceId) if (existing) { if (!existing.ffmpegProcess) { this.startFfmpeg(existing) } return existing } const ingestPort = this.allocatePort(deviceId) if (!ingestPort) { this.logger.error(`No free SRT ingest port for device=${deviceId}`) return null } const session: SrtSession = { deviceId, ingestPort, ffmpegProcess: null, viewers: new Set(), lastActivityAt: Date.now(), createdAt: Date.now(), restartCount: 0 } this.sessions.set(deviceId, session) this.usedPorts.add(ingestPort) this.startFfmpeg(session) return session } private allocatePort(deviceId: string): number | null { const span = this.config.portSpan const base = this.config.portBase const seed = this.stringHash(deviceId) for (let i = 0; i < span; i += 1) { const candidate = base + ((seed + i) % span) if (!this.usedPorts.has(candidate)) { return candidate } } return null } private stringHash(value: string): number { let hash = 0 for (let i = 0; i < value.length; i += 1) { hash = ((hash << 5) - hash + value.charCodeAt(i)) | 0 } return Math.abs(hash) } private startFfmpeg(session: SrtSession): void { if (session.ffmpegProcess || !this.config.enabled) { return } const inputUrl = `srt://0.0.0.0:${session.ingestPort}?mode=listener&latency=${this.config.latencyMs}&transtype=live` const args = [ '-loglevel', 'warning', '-fflags', 'nobuffer', '-flags', 'low_delay', '-analyzeduration', '0', '-probesize', '32768', '-i', inputUrl, '-an', '-c:v', 'copy', '-f', 'mpegts', '-mpegts_flags', 'resend_headers', 'pipe:1' ] this.logger.info(`Start ffmpeg session: device=${session.deviceId}, input=${inputUrl}`) const child = spawn(this.config.ffmpegPath, args, { windowsHide: true, stdio: ['pipe', 'pipe', 'pipe'] }) child.stdin.end() session.ffmpegProcess = child child.stdout.on('data', (chunk: Buffer) => { session.lastActivityAt = Date.now() session.restartCount = 0 this.broadcastChunk(session, chunk) }) child.stderr.on('data', (data: Buffer) => { const line = data.toString().trim() if (line) { this.logger.debug(`ffmpeg[${session.deviceId}] ${line}`) } }) child.on('error', (error) => { this.logger.error(`ffmpeg process error: device=${session.deviceId}`, error) }) child.on('close', (code, signal) => { const current = this.sessions.get(session.deviceId) if (!current || current !== session) { return } current.ffmpegProcess = null this.logger.warn(`ffmpeg exited: device=${session.deviceId}, code=${code}, signal=${signal}`) if (!this.config.enabled) { return } current.restartCount += 1 if (current.restartCount > 5) { this.logger.error(`ffmpeg restart limit exceeded, closing session: device=${session.deviceId}`) this.stopSession(session.deviceId) return } const delayMs = Math.min(1000 * current.restartCount, 5000) setTimeout(() => { const aliveSession = this.sessions.get(session.deviceId) if (aliveSession && !aliveSession.ffmpegProcess) { this.startFfmpeg(aliveSession) } }, delayMs) }) } private issueTicket(deviceId: string, clientId: string): PlaybackTicket { const token = randomUUID() const expiresAt = Date.now() + this.config.ticketTtlMs const ticket: PlaybackTicket = { token, deviceId, clientId, expiresAt } this.tickets.set(token, ticket) return ticket } private consumeTicket(token: string): PlaybackTicket | null { const ticket = this.tickets.get(token) if (!ticket) { return null } if (ticket.expiresAt < Date.now()) { this.tickets.delete(token) return null } this.tickets.delete(token) return ticket } private resolveIngestHost(hostHint?: string): string { if (this.config.publicHost) { return this.stripPort(this.config.publicHost) } if (hostHint) { return this.stripPort(hostHint) } return '127.0.0.1' } private resolvePlaybackWsOrigin(originHint?: string): string { const source = this.config.playbackWsOrigin || originHint || '' if (!source) { return 'ws://127.0.0.1:3001' } const trimmed = source.trim().replace(/\/+$/, '') if (trimmed.startsWith('ws://') || trimmed.startsWith('wss://')) { return trimmed } if (trimmed.startsWith('http://')) { return `ws://${trimmed.slice('http://'.length)}` } if (trimmed.startsWith('https://')) { return `wss://${trimmed.slice('https://'.length)}` } return `ws://${trimmed}` } private stripPort(hostLike: string): string { const value = hostLike.trim() if (!value) { return '127.0.0.1' } if (value.startsWith('[')) { const closeIndex = value.indexOf(']') if (closeIndex > 0) { return value.slice(1, closeIndex) } } const firstColon = value.indexOf(':') const lastColon = value.lastIndexOf(':') if (firstColon > 0 && firstColon === lastColon) { return value.slice(0, firstColon) } return value } private buildIngestUrl(host: string, port: number): string { return `srt://${host}:${port}?mode=caller&latency=${this.config.latencyMs}&transtype=live` } private broadcastChunk(session: SrtSession, chunk: Buffer): void { if (session.viewers.size === 0) { return } for (const ws of Array.from(session.viewers)) { if (ws.readyState !== WebSocket.OPEN) { session.viewers.delete(ws) this.viewerMeta.delete(ws) continue } try { ws.send(chunk, { binary: true }) } catch (error) { this.logger.warn(`Viewer send failed: device=${session.deviceId}`, error) session.viewers.delete(ws) this.viewerMeta.delete(ws) try { ws.terminate() } catch { // ignore } } } } private handleHttpUpgrade(request: IncomingMessage, socket: Socket, head: Buffer): void { if (!this.config.enabled) { return } const parsed = this.safeParseUrl(request) if (!parsed || parsed.pathname !== this.config.playbackPath) { return } const token = parsed.searchParams.get('token') || '' const ticket = this.consumeTicket(token) if (!ticket) { this.rejectUpgrade(socket, 401, 'invalid_or_expired_token') return } const session = this.sessions.get(ticket.deviceId) if (!session) { this.rejectUpgrade(socket, 404, 'stream_not_found') return } this.wsServer.handleUpgrade(request, socket, head, (ws: WebSocket) => { this.registerViewer(ws, ticket, session) }) } private registerViewer(ws: WebSocket, ticket: PlaybackTicket, session: SrtSession): void { session.viewers.add(ws) session.lastActivityAt = Date.now() this.viewerMeta.set(ws, { deviceId: ticket.deviceId, clientId: ticket.clientId }) ws.on('message', () => { // Ignore upstream data from viewers; playback socket is read-only. }) ws.on('close', () => { session.viewers.delete(ws) this.viewerMeta.delete(ws) session.lastActivityAt = Date.now() }) ws.on('error', () => { session.viewers.delete(ws) this.viewerMeta.delete(ws) session.lastActivityAt = Date.now() try { ws.terminate() } catch { // ignore } }) } private safeParseUrl(request: IncomingMessage): URL | null { const requestUrl = request.url || '' const host = request.headers.host || 'localhost' try { return new URL(requestUrl, `http://${host}`) } catch { return null } } private rejectUpgrade(socket: Socket, statusCode: number, reason: string): void { try { socket.write(`HTTP/1.1 ${statusCode} ${reason}\r\nConnection: close\r\n\r\n`) } finally { socket.destroy() } } private cleanupExpiredResources(): void { const now = Date.now() for (const [token, ticket] of this.tickets.entries()) { if (ticket.expiresAt < now) { this.tickets.delete(token) } } for (const [deviceId, session] of this.sessions.entries()) { const noViewers = session.viewers.size === 0 const idleTooLong = now - session.lastActivityAt > this.config.idleTimeoutMs if (noViewers && idleTooLong) { this.logger.info(`Session idle timeout: device=${deviceId}`) this.stopSession(deviceId) } } } } export default SrtGatewayService