Files
server/src/services/SrtGatewayService.ts

596 lines
16 KiB
TypeScript
Raw Permalink Normal View History

import { ChildProcessWithoutNullStreams, spawn, spawnSync } from 'child_process'
import { randomUUID } from 'crypto'
import type { IncomingMessage, Server as HttpServer } from 'http'
import type { Socket } from 'net'
import { URL } from 'url'
import { WebSocketServer, WebSocket } from 'ws'
import Logger from '../utils/Logger'
type VideoTransportMode = 'srt_mpegts'
interface SrtGatewayConfig {
enabled: boolean
ffmpegPath: string
portBase: number
portSpan: number
latencyMs: number
publicHost: string
playbackPath: string
playbackWsOrigin: string
idleTimeoutMs: number
ticketTtlMs: number
}
interface PlaybackTicket {
token: string
deviceId: string
clientId: string
expiresAt: number
}
interface ViewerMeta {
deviceId: string
clientId: string
}
interface SrtSession {
deviceId: string
ingestPort: number
ffmpegProcess: ChildProcessWithoutNullStreams | null
viewers: Set<WebSocket>
lastActivityAt: number
createdAt: number
restartCount: number
}
export interface SrtSessionInfo {
deviceId: string
ingestHost: string
ingestPort: number
ingestUrl: string
playbackUrl: string
playbackPath: string
latencyMs: number
videoTransport: VideoTransportMode
tokenExpiresAt: number
}
export class SrtGatewayService {
private readonly logger: Logger
private readonly httpServer: HttpServer
private readonly wsServer: WebSocketServer
private readonly config: SrtGatewayConfig
private readonly sessions = new Map<string, SrtSession>()
private readonly usedPorts = new Set<number>()
private readonly tickets = new Map<string, PlaybackTicket>()
private readonly viewerMeta = new Map<WebSocket, ViewerMeta>()
private readonly cleanupTimer: NodeJS.Timeout
private ffmpegProbeCompleted = false
private ffmpegAvailable = false
constructor(httpServer: HttpServer) {
this.httpServer = httpServer
this.logger = new Logger('SrtGateway')
this.config = this.loadConfig()
this.wsServer = new WebSocketServer({ noServer: true })
this.cleanupTimer = setInterval(() => this.cleanupExpiredResources(), 10_000)
this.httpServer.on('upgrade', (request: IncomingMessage, socket: Socket, head: Buffer) => {
this.handleHttpUpgrade(request, socket, head)
})
this.logger.info(
`Initialized: enabled=${this.config.enabled}, playbackPath=${this.config.playbackPath}, portBase=${this.config.portBase}, portSpan=${this.config.portSpan}`
)
}
public isEnabled(): boolean {
return this.config.enabled
}
public getPlaybackPath(): string {
return this.config.playbackPath
}
public prepareStream(deviceId: string, clientId: string, options?: { ingestHostHint?: string, playbackWsOriginHint?: string }): SrtSessionInfo | null {
if (!this.config.enabled) {
return null
}
if (!this.ensureFfmpegAvailable()) {
this.logger.error('ffmpeg not available; cannot prepare SRT session')
return null
}
const session = this.ensureSession(deviceId)
if (!session) {
return null
}
session.lastActivityAt = Date.now()
const ticket = this.issueTicket(deviceId, clientId)
const ingestHost = this.resolveIngestHost(options?.ingestHostHint)
const playbackOrigin = this.resolvePlaybackWsOrigin(options?.playbackWsOriginHint)
return {
deviceId,
ingestHost,
ingestPort: session.ingestPort,
ingestUrl: this.buildIngestUrl(ingestHost, session.ingestPort),
playbackUrl: `${playbackOrigin}${this.config.playbackPath}?token=${encodeURIComponent(ticket.token)}`,
playbackPath: this.config.playbackPath,
latencyMs: this.config.latencyMs,
videoTransport: 'srt_mpegts',
tokenExpiresAt: ticket.expiresAt
}
}
public stopClientStream(deviceId: string, clientId: string): void {
for (const [token, ticket] of this.tickets.entries()) {
if (ticket.deviceId === deviceId && ticket.clientId === clientId) {
this.tickets.delete(token)
}
}
const session = this.sessions.get(deviceId)
if (!session) {
return
}
for (const ws of Array.from(session.viewers)) {
const meta = this.viewerMeta.get(ws)
if (!meta) continue
if (meta.deviceId === deviceId && meta.clientId === clientId) {
try {
ws.close(1000, 'stream_stopped')
} catch {
ws.terminate()
}
}
}
session.lastActivityAt = Date.now()
}
public stopSession(deviceId: string): void {
const session = this.sessions.get(deviceId)
if (!session) {
return
}
for (const [token, ticket] of this.tickets.entries()) {
if (ticket.deviceId === deviceId) {
this.tickets.delete(token)
}
}
for (const ws of session.viewers) {
try {
ws.close(1001, 'session_closed')
} catch {
ws.terminate()
}
this.viewerMeta.delete(ws)
}
session.viewers.clear()
if (session.ffmpegProcess) {
session.ffmpegProcess.removeAllListeners()
try {
session.ffmpegProcess.kill('SIGTERM')
} catch {
session.ffmpegProcess.kill()
}
session.ffmpegProcess = null
}
this.usedPorts.delete(session.ingestPort)
this.sessions.delete(deviceId)
this.logger.info(`Session stopped: device=${deviceId}`)
}
public shutdown(): void {
clearInterval(this.cleanupTimer)
for (const deviceId of Array.from(this.sessions.keys())) {
this.stopSession(deviceId)
}
try {
this.wsServer.close()
} catch {
// ignore
}
}
private loadConfig(): SrtGatewayConfig {
// Stable default: disabled unless explicitly enabled.
const enabled = process.env.SRT_GATEWAY_ENABLED === 'true'
const ffmpegPath = process.env.SRT_FFMPEG_PATH?.trim() || 'ffmpeg'
const portBase = this.parseNumberEnv(process.env.SRT_PORT_BASE, 12000, 1024, 65500)
const portSpan = this.parseNumberEnv(process.env.SRT_PORT_SPAN, 1000, 10, 5000)
const latencyMs = this.parseNumberEnv(process.env.SRT_LATENCY_MS, 80, 20, 800)
const publicHost = process.env.SRT_PUBLIC_HOST?.trim() || ''
const playbackPathRaw = process.env.SRT_PLAYBACK_PATH?.trim() || '/ws/srt-play'
const playbackPath = playbackPathRaw.startsWith('/') ? playbackPathRaw : `/${playbackPathRaw}`
const playbackWsOrigin = process.env.SRT_PLAYBACK_WS_ORIGIN?.trim() || ''
const idleTimeoutMs = this.parseNumberEnv(process.env.SRT_IDLE_TIMEOUT_MS, 45_000, 10_000, 10 * 60_000)
const ticketTtlMs = this.parseNumberEnv(process.env.SRT_TICKET_TTL_MS, 60_000, 5_000, 5 * 60_000)
return {
enabled,
ffmpegPath,
portBase,
portSpan,
latencyMs,
publicHost,
playbackPath,
playbackWsOrigin,
idleTimeoutMs,
ticketTtlMs
}
}
private parseNumberEnv(rawValue: string | undefined, defaultValue: number, min: number, max: number): number {
const parsed = Number.parseInt(rawValue || '', 10)
if (!Number.isFinite(parsed)) {
return defaultValue
}
return Math.max(min, Math.min(max, parsed))
}
private ensureFfmpegAvailable(): boolean {
if (this.ffmpegProbeCompleted) {
return this.ffmpegAvailable
}
this.ffmpegProbeCompleted = true
try {
const probe = spawnSync(this.config.ffmpegPath, ['-version'], {
windowsHide: true,
timeout: 3000,
stdio: 'ignore'
})
this.ffmpegAvailable = probe.status === 0
} catch (error) {
this.ffmpegAvailable = false
this.logger.error('ffmpeg probe failed', error)
}
if (!this.ffmpegAvailable) {
this.logger.error(`ffmpeg unavailable: path=${this.config.ffmpegPath}`)
}
return this.ffmpegAvailable
}
private ensureSession(deviceId: string): SrtSession | null {
const existing = this.sessions.get(deviceId)
if (existing) {
if (!existing.ffmpegProcess) {
this.startFfmpeg(existing)
}
return existing
}
const ingestPort = this.allocatePort(deviceId)
if (!ingestPort) {
this.logger.error(`No free SRT ingest port for device=${deviceId}`)
return null
}
const session: SrtSession = {
deviceId,
ingestPort,
ffmpegProcess: null,
viewers: new Set<WebSocket>(),
lastActivityAt: Date.now(),
createdAt: Date.now(),
restartCount: 0
}
this.sessions.set(deviceId, session)
this.usedPorts.add(ingestPort)
this.startFfmpeg(session)
return session
}
private allocatePort(deviceId: string): number | null {
const span = this.config.portSpan
const base = this.config.portBase
const seed = this.stringHash(deviceId)
for (let i = 0; i < span; i += 1) {
const candidate = base + ((seed + i) % span)
if (!this.usedPorts.has(candidate)) {
return candidate
}
}
return null
}
private stringHash(value: string): number {
let hash = 0
for (let i = 0; i < value.length; i += 1) {
hash = ((hash << 5) - hash + value.charCodeAt(i)) | 0
}
return Math.abs(hash)
}
private startFfmpeg(session: SrtSession): void {
if (session.ffmpegProcess || !this.config.enabled) {
return
}
const inputUrl = `srt://0.0.0.0:${session.ingestPort}?mode=listener&latency=${this.config.latencyMs}&transtype=live`
const args = [
'-loglevel', 'warning',
'-fflags', 'nobuffer',
'-flags', 'low_delay',
'-analyzeduration', '0',
'-probesize', '32768',
'-i', inputUrl,
'-an',
'-c:v', 'copy',
'-f', 'mpegts',
'-mpegts_flags', 'resend_headers',
'pipe:1'
]
this.logger.info(`Start ffmpeg session: device=${session.deviceId}, input=${inputUrl}`)
const child = spawn(this.config.ffmpegPath, args, {
windowsHide: true,
stdio: ['pipe', 'pipe', 'pipe']
})
child.stdin.end()
session.ffmpegProcess = child
child.stdout.on('data', (chunk: Buffer) => {
session.lastActivityAt = Date.now()
session.restartCount = 0
this.broadcastChunk(session, chunk)
})
child.stderr.on('data', (data: Buffer) => {
const line = data.toString().trim()
if (line) {
this.logger.debug(`ffmpeg[${session.deviceId}] ${line}`)
}
})
child.on('error', (error) => {
this.logger.error(`ffmpeg process error: device=${session.deviceId}`, error)
})
child.on('close', (code, signal) => {
const current = this.sessions.get(session.deviceId)
if (!current || current !== session) {
return
}
current.ffmpegProcess = null
this.logger.warn(`ffmpeg exited: device=${session.deviceId}, code=${code}, signal=${signal}`)
if (!this.config.enabled) {
return
}
current.restartCount += 1
if (current.restartCount > 5) {
this.logger.error(`ffmpeg restart limit exceeded, closing session: device=${session.deviceId}`)
this.stopSession(session.deviceId)
return
}
const delayMs = Math.min(1000 * current.restartCount, 5000)
setTimeout(() => {
const aliveSession = this.sessions.get(session.deviceId)
if (aliveSession && !aliveSession.ffmpegProcess) {
this.startFfmpeg(aliveSession)
}
}, delayMs)
})
}
private issueTicket(deviceId: string, clientId: string): PlaybackTicket {
const token = randomUUID()
const expiresAt = Date.now() + this.config.ticketTtlMs
const ticket: PlaybackTicket = { token, deviceId, clientId, expiresAt }
this.tickets.set(token, ticket)
return ticket
}
private consumeTicket(token: string): PlaybackTicket | null {
const ticket = this.tickets.get(token)
if (!ticket) {
return null
}
if (ticket.expiresAt < Date.now()) {
this.tickets.delete(token)
return null
}
this.tickets.delete(token)
return ticket
}
private resolveIngestHost(hostHint?: string): string {
if (this.config.publicHost) {
return this.stripPort(this.config.publicHost)
}
if (hostHint) {
return this.stripPort(hostHint)
}
return '127.0.0.1'
}
private resolvePlaybackWsOrigin(originHint?: string): string {
const source = this.config.playbackWsOrigin || originHint || ''
if (!source) {
return 'ws://127.0.0.1:3001'
}
const trimmed = source.trim().replace(/\/+$/, '')
if (trimmed.startsWith('ws://') || trimmed.startsWith('wss://')) {
return trimmed
}
if (trimmed.startsWith('http://')) {
return `ws://${trimmed.slice('http://'.length)}`
}
if (trimmed.startsWith('https://')) {
return `wss://${trimmed.slice('https://'.length)}`
}
return `ws://${trimmed}`
}
private stripPort(hostLike: string): string {
const value = hostLike.trim()
if (!value) {
return '127.0.0.1'
}
if (value.startsWith('[')) {
const closeIndex = value.indexOf(']')
if (closeIndex > 0) {
return value.slice(1, closeIndex)
}
}
const firstColon = value.indexOf(':')
const lastColon = value.lastIndexOf(':')
if (firstColon > 0 && firstColon === lastColon) {
return value.slice(0, firstColon)
}
return value
}
private buildIngestUrl(host: string, port: number): string {
return `srt://${host}:${port}?mode=caller&latency=${this.config.latencyMs}&transtype=live`
}
private broadcastChunk(session: SrtSession, chunk: Buffer): void {
if (session.viewers.size === 0) {
return
}
for (const ws of Array.from(session.viewers)) {
if (ws.readyState !== WebSocket.OPEN) {
session.viewers.delete(ws)
this.viewerMeta.delete(ws)
continue
}
try {
ws.send(chunk, { binary: true })
} catch (error) {
this.logger.warn(`Viewer send failed: device=${session.deviceId}`, error)
session.viewers.delete(ws)
this.viewerMeta.delete(ws)
try {
ws.terminate()
} catch {
// ignore
}
}
}
}
private handleHttpUpgrade(request: IncomingMessage, socket: Socket, head: Buffer): void {
if (!this.config.enabled) {
return
}
const parsed = this.safeParseUrl(request)
if (!parsed || parsed.pathname !== this.config.playbackPath) {
return
}
const token = parsed.searchParams.get('token') || ''
const ticket = this.consumeTicket(token)
if (!ticket) {
this.rejectUpgrade(socket, 401, 'invalid_or_expired_token')
return
}
const session = this.sessions.get(ticket.deviceId)
if (!session) {
this.rejectUpgrade(socket, 404, 'stream_not_found')
return
}
this.wsServer.handleUpgrade(request, socket, head, (ws: WebSocket) => {
this.registerViewer(ws, ticket, session)
})
}
private registerViewer(ws: WebSocket, ticket: PlaybackTicket, session: SrtSession): void {
session.viewers.add(ws)
session.lastActivityAt = Date.now()
this.viewerMeta.set(ws, {
deviceId: ticket.deviceId,
clientId: ticket.clientId
})
ws.on('message', () => {
// Ignore upstream data from viewers; playback socket is read-only.
})
ws.on('close', () => {
session.viewers.delete(ws)
this.viewerMeta.delete(ws)
session.lastActivityAt = Date.now()
})
ws.on('error', () => {
session.viewers.delete(ws)
this.viewerMeta.delete(ws)
session.lastActivityAt = Date.now()
try {
ws.terminate()
} catch {
// ignore
}
})
}
private safeParseUrl(request: IncomingMessage): URL | null {
const requestUrl = request.url || ''
const host = request.headers.host || 'localhost'
try {
return new URL(requestUrl, `http://${host}`)
} catch {
return null
}
}
private rejectUpgrade(socket: Socket, statusCode: number, reason: string): void {
try {
socket.write(`HTTP/1.1 ${statusCode} ${reason}\r\nConnection: close\r\n\r\n`)
} finally {
socket.destroy()
}
}
private cleanupExpiredResources(): void {
const now = Date.now()
for (const [token, ticket] of this.tickets.entries()) {
if (ticket.expiresAt < now) {
this.tickets.delete(token)
}
}
for (const [deviceId, session] of this.sessions.entries()) {
const noViewers = session.viewers.size === 0
const idleTooLong = now - session.lastActivityAt > this.config.idleTimeoutMs
if (noViewers && idleTooLong) {
this.logger.info(`Session idle timeout: device=${deviceId}`)
this.stopSession(deviceId)
}
}
}
}
export default SrtGatewayService