596 lines
16 KiB
TypeScript
596 lines
16 KiB
TypeScript
import { ChildProcessWithoutNullStreams, spawn, spawnSync } from 'child_process'
|
|
import { randomUUID } from 'crypto'
|
|
import type { IncomingMessage, Server as HttpServer } from 'http'
|
|
import type { Socket } from 'net'
|
|
import { URL } from 'url'
|
|
import { WebSocketServer, WebSocket } from 'ws'
|
|
import Logger from '../utils/Logger'
|
|
|
|
type VideoTransportMode = 'srt_mpegts'
|
|
|
|
interface SrtGatewayConfig {
|
|
enabled: boolean
|
|
ffmpegPath: string
|
|
portBase: number
|
|
portSpan: number
|
|
latencyMs: number
|
|
publicHost: string
|
|
playbackPath: string
|
|
playbackWsOrigin: string
|
|
idleTimeoutMs: number
|
|
ticketTtlMs: number
|
|
}
|
|
|
|
interface PlaybackTicket {
|
|
token: string
|
|
deviceId: string
|
|
clientId: string
|
|
expiresAt: number
|
|
}
|
|
|
|
interface ViewerMeta {
|
|
deviceId: string
|
|
clientId: string
|
|
}
|
|
|
|
interface SrtSession {
|
|
deviceId: string
|
|
ingestPort: number
|
|
ffmpegProcess: ChildProcessWithoutNullStreams | null
|
|
viewers: Set<WebSocket>
|
|
lastActivityAt: number
|
|
createdAt: number
|
|
restartCount: number
|
|
}
|
|
|
|
export interface SrtSessionInfo {
|
|
deviceId: string
|
|
ingestHost: string
|
|
ingestPort: number
|
|
ingestUrl: string
|
|
playbackUrl: string
|
|
playbackPath: string
|
|
latencyMs: number
|
|
videoTransport: VideoTransportMode
|
|
tokenExpiresAt: number
|
|
}
|
|
|
|
export class SrtGatewayService {
|
|
private readonly logger: Logger
|
|
private readonly httpServer: HttpServer
|
|
private readonly wsServer: WebSocketServer
|
|
private readonly config: SrtGatewayConfig
|
|
private readonly sessions = new Map<string, SrtSession>()
|
|
private readonly usedPorts = new Set<number>()
|
|
private readonly tickets = new Map<string, PlaybackTicket>()
|
|
private readonly viewerMeta = new Map<WebSocket, ViewerMeta>()
|
|
private readonly cleanupTimer: NodeJS.Timeout
|
|
private ffmpegProbeCompleted = false
|
|
private ffmpegAvailable = false
|
|
|
|
constructor(httpServer: HttpServer) {
|
|
this.httpServer = httpServer
|
|
this.logger = new Logger('SrtGateway')
|
|
this.config = this.loadConfig()
|
|
this.wsServer = new WebSocketServer({ noServer: true })
|
|
this.cleanupTimer = setInterval(() => this.cleanupExpiredResources(), 10_000)
|
|
|
|
this.httpServer.on('upgrade', (request: IncomingMessage, socket: Socket, head: Buffer) => {
|
|
this.handleHttpUpgrade(request, socket, head)
|
|
})
|
|
|
|
this.logger.info(
|
|
`Initialized: enabled=${this.config.enabled}, playbackPath=${this.config.playbackPath}, portBase=${this.config.portBase}, portSpan=${this.config.portSpan}`
|
|
)
|
|
}
|
|
|
|
public isEnabled(): boolean {
|
|
return this.config.enabled
|
|
}
|
|
|
|
public getPlaybackPath(): string {
|
|
return this.config.playbackPath
|
|
}
|
|
|
|
public prepareStream(deviceId: string, clientId: string, options?: { ingestHostHint?: string, playbackWsOriginHint?: string }): SrtSessionInfo | null {
|
|
if (!this.config.enabled) {
|
|
return null
|
|
}
|
|
|
|
if (!this.ensureFfmpegAvailable()) {
|
|
this.logger.error('ffmpeg not available; cannot prepare SRT session')
|
|
return null
|
|
}
|
|
|
|
const session = this.ensureSession(deviceId)
|
|
if (!session) {
|
|
return null
|
|
}
|
|
|
|
session.lastActivityAt = Date.now()
|
|
|
|
const ticket = this.issueTicket(deviceId, clientId)
|
|
const ingestHost = this.resolveIngestHost(options?.ingestHostHint)
|
|
const playbackOrigin = this.resolvePlaybackWsOrigin(options?.playbackWsOriginHint)
|
|
|
|
return {
|
|
deviceId,
|
|
ingestHost,
|
|
ingestPort: session.ingestPort,
|
|
ingestUrl: this.buildIngestUrl(ingestHost, session.ingestPort),
|
|
playbackUrl: `${playbackOrigin}${this.config.playbackPath}?token=${encodeURIComponent(ticket.token)}`,
|
|
playbackPath: this.config.playbackPath,
|
|
latencyMs: this.config.latencyMs,
|
|
videoTransport: 'srt_mpegts',
|
|
tokenExpiresAt: ticket.expiresAt
|
|
}
|
|
}
|
|
|
|
public stopClientStream(deviceId: string, clientId: string): void {
|
|
for (const [token, ticket] of this.tickets.entries()) {
|
|
if (ticket.deviceId === deviceId && ticket.clientId === clientId) {
|
|
this.tickets.delete(token)
|
|
}
|
|
}
|
|
|
|
const session = this.sessions.get(deviceId)
|
|
if (!session) {
|
|
return
|
|
}
|
|
|
|
for (const ws of Array.from(session.viewers)) {
|
|
const meta = this.viewerMeta.get(ws)
|
|
if (!meta) continue
|
|
if (meta.deviceId === deviceId && meta.clientId === clientId) {
|
|
try {
|
|
ws.close(1000, 'stream_stopped')
|
|
} catch {
|
|
ws.terminate()
|
|
}
|
|
}
|
|
}
|
|
|
|
session.lastActivityAt = Date.now()
|
|
}
|
|
|
|
public stopSession(deviceId: string): void {
|
|
const session = this.sessions.get(deviceId)
|
|
if (!session) {
|
|
return
|
|
}
|
|
|
|
for (const [token, ticket] of this.tickets.entries()) {
|
|
if (ticket.deviceId === deviceId) {
|
|
this.tickets.delete(token)
|
|
}
|
|
}
|
|
|
|
for (const ws of session.viewers) {
|
|
try {
|
|
ws.close(1001, 'session_closed')
|
|
} catch {
|
|
ws.terminate()
|
|
}
|
|
this.viewerMeta.delete(ws)
|
|
}
|
|
session.viewers.clear()
|
|
|
|
if (session.ffmpegProcess) {
|
|
session.ffmpegProcess.removeAllListeners()
|
|
try {
|
|
session.ffmpegProcess.kill('SIGTERM')
|
|
} catch {
|
|
session.ffmpegProcess.kill()
|
|
}
|
|
session.ffmpegProcess = null
|
|
}
|
|
|
|
this.usedPorts.delete(session.ingestPort)
|
|
this.sessions.delete(deviceId)
|
|
this.logger.info(`Session stopped: device=${deviceId}`)
|
|
}
|
|
|
|
public shutdown(): void {
|
|
clearInterval(this.cleanupTimer)
|
|
for (const deviceId of Array.from(this.sessions.keys())) {
|
|
this.stopSession(deviceId)
|
|
}
|
|
try {
|
|
this.wsServer.close()
|
|
} catch {
|
|
// ignore
|
|
}
|
|
}
|
|
|
|
private loadConfig(): SrtGatewayConfig {
|
|
// Stable default: disabled unless explicitly enabled.
|
|
const enabled = process.env.SRT_GATEWAY_ENABLED === 'true'
|
|
const ffmpegPath = process.env.SRT_FFMPEG_PATH?.trim() || 'ffmpeg'
|
|
const portBase = this.parseNumberEnv(process.env.SRT_PORT_BASE, 12000, 1024, 65500)
|
|
const portSpan = this.parseNumberEnv(process.env.SRT_PORT_SPAN, 1000, 10, 5000)
|
|
const latencyMs = this.parseNumberEnv(process.env.SRT_LATENCY_MS, 80, 20, 800)
|
|
const publicHost = process.env.SRT_PUBLIC_HOST?.trim() || ''
|
|
const playbackPathRaw = process.env.SRT_PLAYBACK_PATH?.trim() || '/ws/srt-play'
|
|
const playbackPath = playbackPathRaw.startsWith('/') ? playbackPathRaw : `/${playbackPathRaw}`
|
|
const playbackWsOrigin = process.env.SRT_PLAYBACK_WS_ORIGIN?.trim() || ''
|
|
const idleTimeoutMs = this.parseNumberEnv(process.env.SRT_IDLE_TIMEOUT_MS, 45_000, 10_000, 10 * 60_000)
|
|
const ticketTtlMs = this.parseNumberEnv(process.env.SRT_TICKET_TTL_MS, 60_000, 5_000, 5 * 60_000)
|
|
|
|
return {
|
|
enabled,
|
|
ffmpegPath,
|
|
portBase,
|
|
portSpan,
|
|
latencyMs,
|
|
publicHost,
|
|
playbackPath,
|
|
playbackWsOrigin,
|
|
idleTimeoutMs,
|
|
ticketTtlMs
|
|
}
|
|
}
|
|
|
|
private parseNumberEnv(rawValue: string | undefined, defaultValue: number, min: number, max: number): number {
|
|
const parsed = Number.parseInt(rawValue || '', 10)
|
|
if (!Number.isFinite(parsed)) {
|
|
return defaultValue
|
|
}
|
|
return Math.max(min, Math.min(max, parsed))
|
|
}
|
|
|
|
private ensureFfmpegAvailable(): boolean {
|
|
if (this.ffmpegProbeCompleted) {
|
|
return this.ffmpegAvailable
|
|
}
|
|
|
|
this.ffmpegProbeCompleted = true
|
|
|
|
try {
|
|
const probe = spawnSync(this.config.ffmpegPath, ['-version'], {
|
|
windowsHide: true,
|
|
timeout: 3000,
|
|
stdio: 'ignore'
|
|
})
|
|
this.ffmpegAvailable = probe.status === 0
|
|
} catch (error) {
|
|
this.ffmpegAvailable = false
|
|
this.logger.error('ffmpeg probe failed', error)
|
|
}
|
|
|
|
if (!this.ffmpegAvailable) {
|
|
this.logger.error(`ffmpeg unavailable: path=${this.config.ffmpegPath}`)
|
|
}
|
|
return this.ffmpegAvailable
|
|
}
|
|
|
|
private ensureSession(deviceId: string): SrtSession | null {
|
|
const existing = this.sessions.get(deviceId)
|
|
if (existing) {
|
|
if (!existing.ffmpegProcess) {
|
|
this.startFfmpeg(existing)
|
|
}
|
|
return existing
|
|
}
|
|
|
|
const ingestPort = this.allocatePort(deviceId)
|
|
if (!ingestPort) {
|
|
this.logger.error(`No free SRT ingest port for device=${deviceId}`)
|
|
return null
|
|
}
|
|
|
|
const session: SrtSession = {
|
|
deviceId,
|
|
ingestPort,
|
|
ffmpegProcess: null,
|
|
viewers: new Set<WebSocket>(),
|
|
lastActivityAt: Date.now(),
|
|
createdAt: Date.now(),
|
|
restartCount: 0
|
|
}
|
|
|
|
this.sessions.set(deviceId, session)
|
|
this.usedPorts.add(ingestPort)
|
|
this.startFfmpeg(session)
|
|
return session
|
|
}
|
|
|
|
private allocatePort(deviceId: string): number | null {
|
|
const span = this.config.portSpan
|
|
const base = this.config.portBase
|
|
const seed = this.stringHash(deviceId)
|
|
|
|
for (let i = 0; i < span; i += 1) {
|
|
const candidate = base + ((seed + i) % span)
|
|
if (!this.usedPorts.has(candidate)) {
|
|
return candidate
|
|
}
|
|
}
|
|
|
|
return null
|
|
}
|
|
|
|
private stringHash(value: string): number {
|
|
let hash = 0
|
|
for (let i = 0; i < value.length; i += 1) {
|
|
hash = ((hash << 5) - hash + value.charCodeAt(i)) | 0
|
|
}
|
|
return Math.abs(hash)
|
|
}
|
|
|
|
private startFfmpeg(session: SrtSession): void {
|
|
if (session.ffmpegProcess || !this.config.enabled) {
|
|
return
|
|
}
|
|
|
|
const inputUrl = `srt://0.0.0.0:${session.ingestPort}?mode=listener&latency=${this.config.latencyMs}&transtype=live`
|
|
const args = [
|
|
'-loglevel', 'warning',
|
|
'-fflags', 'nobuffer',
|
|
'-flags', 'low_delay',
|
|
'-analyzeduration', '0',
|
|
'-probesize', '32768',
|
|
'-i', inputUrl,
|
|
'-an',
|
|
'-c:v', 'copy',
|
|
'-f', 'mpegts',
|
|
'-mpegts_flags', 'resend_headers',
|
|
'pipe:1'
|
|
]
|
|
|
|
this.logger.info(`Start ffmpeg session: device=${session.deviceId}, input=${inputUrl}`)
|
|
|
|
const child = spawn(this.config.ffmpegPath, args, {
|
|
windowsHide: true,
|
|
stdio: ['pipe', 'pipe', 'pipe']
|
|
})
|
|
child.stdin.end()
|
|
|
|
session.ffmpegProcess = child
|
|
|
|
child.stdout.on('data', (chunk: Buffer) => {
|
|
session.lastActivityAt = Date.now()
|
|
session.restartCount = 0
|
|
this.broadcastChunk(session, chunk)
|
|
})
|
|
|
|
child.stderr.on('data', (data: Buffer) => {
|
|
const line = data.toString().trim()
|
|
if (line) {
|
|
this.logger.debug(`ffmpeg[${session.deviceId}] ${line}`)
|
|
}
|
|
})
|
|
|
|
child.on('error', (error) => {
|
|
this.logger.error(`ffmpeg process error: device=${session.deviceId}`, error)
|
|
})
|
|
|
|
child.on('close', (code, signal) => {
|
|
const current = this.sessions.get(session.deviceId)
|
|
if (!current || current !== session) {
|
|
return
|
|
}
|
|
|
|
current.ffmpegProcess = null
|
|
this.logger.warn(`ffmpeg exited: device=${session.deviceId}, code=${code}, signal=${signal}`)
|
|
|
|
if (!this.config.enabled) {
|
|
return
|
|
}
|
|
|
|
current.restartCount += 1
|
|
if (current.restartCount > 5) {
|
|
this.logger.error(`ffmpeg restart limit exceeded, closing session: device=${session.deviceId}`)
|
|
this.stopSession(session.deviceId)
|
|
return
|
|
}
|
|
|
|
const delayMs = Math.min(1000 * current.restartCount, 5000)
|
|
setTimeout(() => {
|
|
const aliveSession = this.sessions.get(session.deviceId)
|
|
if (aliveSession && !aliveSession.ffmpegProcess) {
|
|
this.startFfmpeg(aliveSession)
|
|
}
|
|
}, delayMs)
|
|
})
|
|
}
|
|
|
|
private issueTicket(deviceId: string, clientId: string): PlaybackTicket {
|
|
const token = randomUUID()
|
|
const expiresAt = Date.now() + this.config.ticketTtlMs
|
|
const ticket: PlaybackTicket = { token, deviceId, clientId, expiresAt }
|
|
this.tickets.set(token, ticket)
|
|
return ticket
|
|
}
|
|
|
|
private consumeTicket(token: string): PlaybackTicket | null {
|
|
const ticket = this.tickets.get(token)
|
|
if (!ticket) {
|
|
return null
|
|
}
|
|
if (ticket.expiresAt < Date.now()) {
|
|
this.tickets.delete(token)
|
|
return null
|
|
}
|
|
this.tickets.delete(token)
|
|
return ticket
|
|
}
|
|
|
|
private resolveIngestHost(hostHint?: string): string {
|
|
if (this.config.publicHost) {
|
|
return this.stripPort(this.config.publicHost)
|
|
}
|
|
if (hostHint) {
|
|
return this.stripPort(hostHint)
|
|
}
|
|
return '127.0.0.1'
|
|
}
|
|
|
|
private resolvePlaybackWsOrigin(originHint?: string): string {
|
|
const source = this.config.playbackWsOrigin || originHint || ''
|
|
if (!source) {
|
|
return 'ws://127.0.0.1:3001'
|
|
}
|
|
|
|
const trimmed = source.trim().replace(/\/+$/, '')
|
|
if (trimmed.startsWith('ws://') || trimmed.startsWith('wss://')) {
|
|
return trimmed
|
|
}
|
|
if (trimmed.startsWith('http://')) {
|
|
return `ws://${trimmed.slice('http://'.length)}`
|
|
}
|
|
if (trimmed.startsWith('https://')) {
|
|
return `wss://${trimmed.slice('https://'.length)}`
|
|
}
|
|
return `ws://${trimmed}`
|
|
}
|
|
|
|
private stripPort(hostLike: string): string {
|
|
const value = hostLike.trim()
|
|
if (!value) {
|
|
return '127.0.0.1'
|
|
}
|
|
|
|
if (value.startsWith('[')) {
|
|
const closeIndex = value.indexOf(']')
|
|
if (closeIndex > 0) {
|
|
return value.slice(1, closeIndex)
|
|
}
|
|
}
|
|
|
|
const firstColon = value.indexOf(':')
|
|
const lastColon = value.lastIndexOf(':')
|
|
if (firstColon > 0 && firstColon === lastColon) {
|
|
return value.slice(0, firstColon)
|
|
}
|
|
return value
|
|
}
|
|
|
|
private buildIngestUrl(host: string, port: number): string {
|
|
return `srt://${host}:${port}?mode=caller&latency=${this.config.latencyMs}&transtype=live`
|
|
}
|
|
|
|
private broadcastChunk(session: SrtSession, chunk: Buffer): void {
|
|
if (session.viewers.size === 0) {
|
|
return
|
|
}
|
|
|
|
for (const ws of Array.from(session.viewers)) {
|
|
if (ws.readyState !== WebSocket.OPEN) {
|
|
session.viewers.delete(ws)
|
|
this.viewerMeta.delete(ws)
|
|
continue
|
|
}
|
|
|
|
try {
|
|
ws.send(chunk, { binary: true })
|
|
} catch (error) {
|
|
this.logger.warn(`Viewer send failed: device=${session.deviceId}`, error)
|
|
session.viewers.delete(ws)
|
|
this.viewerMeta.delete(ws)
|
|
try {
|
|
ws.terminate()
|
|
} catch {
|
|
// ignore
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private handleHttpUpgrade(request: IncomingMessage, socket: Socket, head: Buffer): void {
|
|
if (!this.config.enabled) {
|
|
return
|
|
}
|
|
|
|
const parsed = this.safeParseUrl(request)
|
|
if (!parsed || parsed.pathname !== this.config.playbackPath) {
|
|
return
|
|
}
|
|
|
|
const token = parsed.searchParams.get('token') || ''
|
|
const ticket = this.consumeTicket(token)
|
|
if (!ticket) {
|
|
this.rejectUpgrade(socket, 401, 'invalid_or_expired_token')
|
|
return
|
|
}
|
|
|
|
const session = this.sessions.get(ticket.deviceId)
|
|
if (!session) {
|
|
this.rejectUpgrade(socket, 404, 'stream_not_found')
|
|
return
|
|
}
|
|
|
|
this.wsServer.handleUpgrade(request, socket, head, (ws: WebSocket) => {
|
|
this.registerViewer(ws, ticket, session)
|
|
})
|
|
}
|
|
|
|
private registerViewer(ws: WebSocket, ticket: PlaybackTicket, session: SrtSession): void {
|
|
session.viewers.add(ws)
|
|
session.lastActivityAt = Date.now()
|
|
this.viewerMeta.set(ws, {
|
|
deviceId: ticket.deviceId,
|
|
clientId: ticket.clientId
|
|
})
|
|
|
|
ws.on('message', () => {
|
|
// Ignore upstream data from viewers; playback socket is read-only.
|
|
})
|
|
|
|
ws.on('close', () => {
|
|
session.viewers.delete(ws)
|
|
this.viewerMeta.delete(ws)
|
|
session.lastActivityAt = Date.now()
|
|
})
|
|
|
|
ws.on('error', () => {
|
|
session.viewers.delete(ws)
|
|
this.viewerMeta.delete(ws)
|
|
session.lastActivityAt = Date.now()
|
|
try {
|
|
ws.terminate()
|
|
} catch {
|
|
// ignore
|
|
}
|
|
})
|
|
}
|
|
|
|
private safeParseUrl(request: IncomingMessage): URL | null {
|
|
const requestUrl = request.url || ''
|
|
const host = request.headers.host || 'localhost'
|
|
try {
|
|
return new URL(requestUrl, `http://${host}`)
|
|
} catch {
|
|
return null
|
|
}
|
|
}
|
|
|
|
private rejectUpgrade(socket: Socket, statusCode: number, reason: string): void {
|
|
try {
|
|
socket.write(`HTTP/1.1 ${statusCode} ${reason}\r\nConnection: close\r\n\r\n`)
|
|
} finally {
|
|
socket.destroy()
|
|
}
|
|
}
|
|
|
|
private cleanupExpiredResources(): void {
|
|
const now = Date.now()
|
|
|
|
for (const [token, ticket] of this.tickets.entries()) {
|
|
if (ticket.expiresAt < now) {
|
|
this.tickets.delete(token)
|
|
}
|
|
}
|
|
|
|
for (const [deviceId, session] of this.sessions.entries()) {
|
|
const noViewers = session.viewers.size === 0
|
|
const idleTooLong = now - session.lastActivityAt > this.config.idleTimeoutMs
|
|
if (noViewers && idleTooLong) {
|
|
this.logger.info(`Session idle timeout: device=${deviceId}`)
|
|
this.stopSession(deviceId)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
export default SrtGatewayService
|