feat: upload latest server source changes
This commit is contained in:
595
src/services/SrtGatewayService.ts
Normal file
595
src/services/SrtGatewayService.ts
Normal file
@@ -0,0 +1,595 @@
|
||||
import { ChildProcessWithoutNullStreams, spawn, spawnSync } from 'child_process'
|
||||
import { randomUUID } from 'crypto'
|
||||
import type { IncomingMessage, Server as HttpServer } from 'http'
|
||||
import type { Socket } from 'net'
|
||||
import { URL } from 'url'
|
||||
import { WebSocketServer, WebSocket } from 'ws'
|
||||
import Logger from '../utils/Logger'
|
||||
|
||||
type VideoTransportMode = 'srt_mpegts'
|
||||
|
||||
interface SrtGatewayConfig {
|
||||
enabled: boolean
|
||||
ffmpegPath: string
|
||||
portBase: number
|
||||
portSpan: number
|
||||
latencyMs: number
|
||||
publicHost: string
|
||||
playbackPath: string
|
||||
playbackWsOrigin: string
|
||||
idleTimeoutMs: number
|
||||
ticketTtlMs: number
|
||||
}
|
||||
|
||||
interface PlaybackTicket {
|
||||
token: string
|
||||
deviceId: string
|
||||
clientId: string
|
||||
expiresAt: number
|
||||
}
|
||||
|
||||
interface ViewerMeta {
|
||||
deviceId: string
|
||||
clientId: string
|
||||
}
|
||||
|
||||
interface SrtSession {
|
||||
deviceId: string
|
||||
ingestPort: number
|
||||
ffmpegProcess: ChildProcessWithoutNullStreams | null
|
||||
viewers: Set<WebSocket>
|
||||
lastActivityAt: number
|
||||
createdAt: number
|
||||
restartCount: number
|
||||
}
|
||||
|
||||
export interface SrtSessionInfo {
|
||||
deviceId: string
|
||||
ingestHost: string
|
||||
ingestPort: number
|
||||
ingestUrl: string
|
||||
playbackUrl: string
|
||||
playbackPath: string
|
||||
latencyMs: number
|
||||
videoTransport: VideoTransportMode
|
||||
tokenExpiresAt: number
|
||||
}
|
||||
|
||||
export class SrtGatewayService {
|
||||
private readonly logger: Logger
|
||||
private readonly httpServer: HttpServer
|
||||
private readonly wsServer: WebSocketServer
|
||||
private readonly config: SrtGatewayConfig
|
||||
private readonly sessions = new Map<string, SrtSession>()
|
||||
private readonly usedPorts = new Set<number>()
|
||||
private readonly tickets = new Map<string, PlaybackTicket>()
|
||||
private readonly viewerMeta = new Map<WebSocket, ViewerMeta>()
|
||||
private readonly cleanupTimer: NodeJS.Timeout
|
||||
private ffmpegProbeCompleted = false
|
||||
private ffmpegAvailable = false
|
||||
|
||||
constructor(httpServer: HttpServer) {
|
||||
this.httpServer = httpServer
|
||||
this.logger = new Logger('SrtGateway')
|
||||
this.config = this.loadConfig()
|
||||
this.wsServer = new WebSocketServer({ noServer: true })
|
||||
this.cleanupTimer = setInterval(() => this.cleanupExpiredResources(), 10_000)
|
||||
|
||||
this.httpServer.on('upgrade', (request: IncomingMessage, socket: Socket, head: Buffer) => {
|
||||
this.handleHttpUpgrade(request, socket, head)
|
||||
})
|
||||
|
||||
this.logger.info(
|
||||
`Initialized: enabled=${this.config.enabled}, playbackPath=${this.config.playbackPath}, portBase=${this.config.portBase}, portSpan=${this.config.portSpan}`
|
||||
)
|
||||
}
|
||||
|
||||
public isEnabled(): boolean {
|
||||
return this.config.enabled
|
||||
}
|
||||
|
||||
public getPlaybackPath(): string {
|
||||
return this.config.playbackPath
|
||||
}
|
||||
|
||||
public prepareStream(deviceId: string, clientId: string, options?: { ingestHostHint?: string, playbackWsOriginHint?: string }): SrtSessionInfo | null {
|
||||
if (!this.config.enabled) {
|
||||
return null
|
||||
}
|
||||
|
||||
if (!this.ensureFfmpegAvailable()) {
|
||||
this.logger.error('ffmpeg not available; cannot prepare SRT session')
|
||||
return null
|
||||
}
|
||||
|
||||
const session = this.ensureSession(deviceId)
|
||||
if (!session) {
|
||||
return null
|
||||
}
|
||||
|
||||
session.lastActivityAt = Date.now()
|
||||
|
||||
const ticket = this.issueTicket(deviceId, clientId)
|
||||
const ingestHost = this.resolveIngestHost(options?.ingestHostHint)
|
||||
const playbackOrigin = this.resolvePlaybackWsOrigin(options?.playbackWsOriginHint)
|
||||
|
||||
return {
|
||||
deviceId,
|
||||
ingestHost,
|
||||
ingestPort: session.ingestPort,
|
||||
ingestUrl: this.buildIngestUrl(ingestHost, session.ingestPort),
|
||||
playbackUrl: `${playbackOrigin}${this.config.playbackPath}?token=${encodeURIComponent(ticket.token)}`,
|
||||
playbackPath: this.config.playbackPath,
|
||||
latencyMs: this.config.latencyMs,
|
||||
videoTransport: 'srt_mpegts',
|
||||
tokenExpiresAt: ticket.expiresAt
|
||||
}
|
||||
}
|
||||
|
||||
public stopClientStream(deviceId: string, clientId: string): void {
|
||||
for (const [token, ticket] of this.tickets.entries()) {
|
||||
if (ticket.deviceId === deviceId && ticket.clientId === clientId) {
|
||||
this.tickets.delete(token)
|
||||
}
|
||||
}
|
||||
|
||||
const session = this.sessions.get(deviceId)
|
||||
if (!session) {
|
||||
return
|
||||
}
|
||||
|
||||
for (const ws of Array.from(session.viewers)) {
|
||||
const meta = this.viewerMeta.get(ws)
|
||||
if (!meta) continue
|
||||
if (meta.deviceId === deviceId && meta.clientId === clientId) {
|
||||
try {
|
||||
ws.close(1000, 'stream_stopped')
|
||||
} catch {
|
||||
ws.terminate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
session.lastActivityAt = Date.now()
|
||||
}
|
||||
|
||||
public stopSession(deviceId: string): void {
|
||||
const session = this.sessions.get(deviceId)
|
||||
if (!session) {
|
||||
return
|
||||
}
|
||||
|
||||
for (const [token, ticket] of this.tickets.entries()) {
|
||||
if (ticket.deviceId === deviceId) {
|
||||
this.tickets.delete(token)
|
||||
}
|
||||
}
|
||||
|
||||
for (const ws of session.viewers) {
|
||||
try {
|
||||
ws.close(1001, 'session_closed')
|
||||
} catch {
|
||||
ws.terminate()
|
||||
}
|
||||
this.viewerMeta.delete(ws)
|
||||
}
|
||||
session.viewers.clear()
|
||||
|
||||
if (session.ffmpegProcess) {
|
||||
session.ffmpegProcess.removeAllListeners()
|
||||
try {
|
||||
session.ffmpegProcess.kill('SIGTERM')
|
||||
} catch {
|
||||
session.ffmpegProcess.kill()
|
||||
}
|
||||
session.ffmpegProcess = null
|
||||
}
|
||||
|
||||
this.usedPorts.delete(session.ingestPort)
|
||||
this.sessions.delete(deviceId)
|
||||
this.logger.info(`Session stopped: device=${deviceId}`)
|
||||
}
|
||||
|
||||
public shutdown(): void {
|
||||
clearInterval(this.cleanupTimer)
|
||||
for (const deviceId of Array.from(this.sessions.keys())) {
|
||||
this.stopSession(deviceId)
|
||||
}
|
||||
try {
|
||||
this.wsServer.close()
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
private loadConfig(): SrtGatewayConfig {
|
||||
// Stable default: disabled unless explicitly enabled.
|
||||
const enabled = process.env.SRT_GATEWAY_ENABLED === 'true'
|
||||
const ffmpegPath = process.env.SRT_FFMPEG_PATH?.trim() || 'ffmpeg'
|
||||
const portBase = this.parseNumberEnv(process.env.SRT_PORT_BASE, 12000, 1024, 65500)
|
||||
const portSpan = this.parseNumberEnv(process.env.SRT_PORT_SPAN, 1000, 10, 5000)
|
||||
const latencyMs = this.parseNumberEnv(process.env.SRT_LATENCY_MS, 80, 20, 800)
|
||||
const publicHost = process.env.SRT_PUBLIC_HOST?.trim() || ''
|
||||
const playbackPathRaw = process.env.SRT_PLAYBACK_PATH?.trim() || '/ws/srt-play'
|
||||
const playbackPath = playbackPathRaw.startsWith('/') ? playbackPathRaw : `/${playbackPathRaw}`
|
||||
const playbackWsOrigin = process.env.SRT_PLAYBACK_WS_ORIGIN?.trim() || ''
|
||||
const idleTimeoutMs = this.parseNumberEnv(process.env.SRT_IDLE_TIMEOUT_MS, 45_000, 10_000, 10 * 60_000)
|
||||
const ticketTtlMs = this.parseNumberEnv(process.env.SRT_TICKET_TTL_MS, 60_000, 5_000, 5 * 60_000)
|
||||
|
||||
return {
|
||||
enabled,
|
||||
ffmpegPath,
|
||||
portBase,
|
||||
portSpan,
|
||||
latencyMs,
|
||||
publicHost,
|
||||
playbackPath,
|
||||
playbackWsOrigin,
|
||||
idleTimeoutMs,
|
||||
ticketTtlMs
|
||||
}
|
||||
}
|
||||
|
||||
private parseNumberEnv(rawValue: string | undefined, defaultValue: number, min: number, max: number): number {
|
||||
const parsed = Number.parseInt(rawValue || '', 10)
|
||||
if (!Number.isFinite(parsed)) {
|
||||
return defaultValue
|
||||
}
|
||||
return Math.max(min, Math.min(max, parsed))
|
||||
}
|
||||
|
||||
private ensureFfmpegAvailable(): boolean {
|
||||
if (this.ffmpegProbeCompleted) {
|
||||
return this.ffmpegAvailable
|
||||
}
|
||||
|
||||
this.ffmpegProbeCompleted = true
|
||||
|
||||
try {
|
||||
const probe = spawnSync(this.config.ffmpegPath, ['-version'], {
|
||||
windowsHide: true,
|
||||
timeout: 3000,
|
||||
stdio: 'ignore'
|
||||
})
|
||||
this.ffmpegAvailable = probe.status === 0
|
||||
} catch (error) {
|
||||
this.ffmpegAvailable = false
|
||||
this.logger.error('ffmpeg probe failed', error)
|
||||
}
|
||||
|
||||
if (!this.ffmpegAvailable) {
|
||||
this.logger.error(`ffmpeg unavailable: path=${this.config.ffmpegPath}`)
|
||||
}
|
||||
return this.ffmpegAvailable
|
||||
}
|
||||
|
||||
private ensureSession(deviceId: string): SrtSession | null {
|
||||
const existing = this.sessions.get(deviceId)
|
||||
if (existing) {
|
||||
if (!existing.ffmpegProcess) {
|
||||
this.startFfmpeg(existing)
|
||||
}
|
||||
return existing
|
||||
}
|
||||
|
||||
const ingestPort = this.allocatePort(deviceId)
|
||||
if (!ingestPort) {
|
||||
this.logger.error(`No free SRT ingest port for device=${deviceId}`)
|
||||
return null
|
||||
}
|
||||
|
||||
const session: SrtSession = {
|
||||
deviceId,
|
||||
ingestPort,
|
||||
ffmpegProcess: null,
|
||||
viewers: new Set<WebSocket>(),
|
||||
lastActivityAt: Date.now(),
|
||||
createdAt: Date.now(),
|
||||
restartCount: 0
|
||||
}
|
||||
|
||||
this.sessions.set(deviceId, session)
|
||||
this.usedPorts.add(ingestPort)
|
||||
this.startFfmpeg(session)
|
||||
return session
|
||||
}
|
||||
|
||||
private allocatePort(deviceId: string): number | null {
|
||||
const span = this.config.portSpan
|
||||
const base = this.config.portBase
|
||||
const seed = this.stringHash(deviceId)
|
||||
|
||||
for (let i = 0; i < span; i += 1) {
|
||||
const candidate = base + ((seed + i) % span)
|
||||
if (!this.usedPorts.has(candidate)) {
|
||||
return candidate
|
||||
}
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
private stringHash(value: string): number {
|
||||
let hash = 0
|
||||
for (let i = 0; i < value.length; i += 1) {
|
||||
hash = ((hash << 5) - hash + value.charCodeAt(i)) | 0
|
||||
}
|
||||
return Math.abs(hash)
|
||||
}
|
||||
|
||||
private startFfmpeg(session: SrtSession): void {
|
||||
if (session.ffmpegProcess || !this.config.enabled) {
|
||||
return
|
||||
}
|
||||
|
||||
const inputUrl = `srt://0.0.0.0:${session.ingestPort}?mode=listener&latency=${this.config.latencyMs}&transtype=live`
|
||||
const args = [
|
||||
'-loglevel', 'warning',
|
||||
'-fflags', 'nobuffer',
|
||||
'-flags', 'low_delay',
|
||||
'-analyzeduration', '0',
|
||||
'-probesize', '32768',
|
||||
'-i', inputUrl,
|
||||
'-an',
|
||||
'-c:v', 'copy',
|
||||
'-f', 'mpegts',
|
||||
'-mpegts_flags', 'resend_headers',
|
||||
'pipe:1'
|
||||
]
|
||||
|
||||
this.logger.info(`Start ffmpeg session: device=${session.deviceId}, input=${inputUrl}`)
|
||||
|
||||
const child = spawn(this.config.ffmpegPath, args, {
|
||||
windowsHide: true,
|
||||
stdio: ['pipe', 'pipe', 'pipe']
|
||||
})
|
||||
child.stdin.end()
|
||||
|
||||
session.ffmpegProcess = child
|
||||
|
||||
child.stdout.on('data', (chunk: Buffer) => {
|
||||
session.lastActivityAt = Date.now()
|
||||
session.restartCount = 0
|
||||
this.broadcastChunk(session, chunk)
|
||||
})
|
||||
|
||||
child.stderr.on('data', (data: Buffer) => {
|
||||
const line = data.toString().trim()
|
||||
if (line) {
|
||||
this.logger.debug(`ffmpeg[${session.deviceId}] ${line}`)
|
||||
}
|
||||
})
|
||||
|
||||
child.on('error', (error) => {
|
||||
this.logger.error(`ffmpeg process error: device=${session.deviceId}`, error)
|
||||
})
|
||||
|
||||
child.on('close', (code, signal) => {
|
||||
const current = this.sessions.get(session.deviceId)
|
||||
if (!current || current !== session) {
|
||||
return
|
||||
}
|
||||
|
||||
current.ffmpegProcess = null
|
||||
this.logger.warn(`ffmpeg exited: device=${session.deviceId}, code=${code}, signal=${signal}`)
|
||||
|
||||
if (!this.config.enabled) {
|
||||
return
|
||||
}
|
||||
|
||||
current.restartCount += 1
|
||||
if (current.restartCount > 5) {
|
||||
this.logger.error(`ffmpeg restart limit exceeded, closing session: device=${session.deviceId}`)
|
||||
this.stopSession(session.deviceId)
|
||||
return
|
||||
}
|
||||
|
||||
const delayMs = Math.min(1000 * current.restartCount, 5000)
|
||||
setTimeout(() => {
|
||||
const aliveSession = this.sessions.get(session.deviceId)
|
||||
if (aliveSession && !aliveSession.ffmpegProcess) {
|
||||
this.startFfmpeg(aliveSession)
|
||||
}
|
||||
}, delayMs)
|
||||
})
|
||||
}
|
||||
|
||||
private issueTicket(deviceId: string, clientId: string): PlaybackTicket {
|
||||
const token = randomUUID()
|
||||
const expiresAt = Date.now() + this.config.ticketTtlMs
|
||||
const ticket: PlaybackTicket = { token, deviceId, clientId, expiresAt }
|
||||
this.tickets.set(token, ticket)
|
||||
return ticket
|
||||
}
|
||||
|
||||
private consumeTicket(token: string): PlaybackTicket | null {
|
||||
const ticket = this.tickets.get(token)
|
||||
if (!ticket) {
|
||||
return null
|
||||
}
|
||||
if (ticket.expiresAt < Date.now()) {
|
||||
this.tickets.delete(token)
|
||||
return null
|
||||
}
|
||||
this.tickets.delete(token)
|
||||
return ticket
|
||||
}
|
||||
|
||||
private resolveIngestHost(hostHint?: string): string {
|
||||
if (this.config.publicHost) {
|
||||
return this.stripPort(this.config.publicHost)
|
||||
}
|
||||
if (hostHint) {
|
||||
return this.stripPort(hostHint)
|
||||
}
|
||||
return '127.0.0.1'
|
||||
}
|
||||
|
||||
private resolvePlaybackWsOrigin(originHint?: string): string {
|
||||
const source = this.config.playbackWsOrigin || originHint || ''
|
||||
if (!source) {
|
||||
return 'ws://127.0.0.1:3001'
|
||||
}
|
||||
|
||||
const trimmed = source.trim().replace(/\/+$/, '')
|
||||
if (trimmed.startsWith('ws://') || trimmed.startsWith('wss://')) {
|
||||
return trimmed
|
||||
}
|
||||
if (trimmed.startsWith('http://')) {
|
||||
return `ws://${trimmed.slice('http://'.length)}`
|
||||
}
|
||||
if (trimmed.startsWith('https://')) {
|
||||
return `wss://${trimmed.slice('https://'.length)}`
|
||||
}
|
||||
return `ws://${trimmed}`
|
||||
}
|
||||
|
||||
private stripPort(hostLike: string): string {
|
||||
const value = hostLike.trim()
|
||||
if (!value) {
|
||||
return '127.0.0.1'
|
||||
}
|
||||
|
||||
if (value.startsWith('[')) {
|
||||
const closeIndex = value.indexOf(']')
|
||||
if (closeIndex > 0) {
|
||||
return value.slice(1, closeIndex)
|
||||
}
|
||||
}
|
||||
|
||||
const firstColon = value.indexOf(':')
|
||||
const lastColon = value.lastIndexOf(':')
|
||||
if (firstColon > 0 && firstColon === lastColon) {
|
||||
return value.slice(0, firstColon)
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
private buildIngestUrl(host: string, port: number): string {
|
||||
return `srt://${host}:${port}?mode=caller&latency=${this.config.latencyMs}&transtype=live`
|
||||
}
|
||||
|
||||
private broadcastChunk(session: SrtSession, chunk: Buffer): void {
|
||||
if (session.viewers.size === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
for (const ws of Array.from(session.viewers)) {
|
||||
if (ws.readyState !== WebSocket.OPEN) {
|
||||
session.viewers.delete(ws)
|
||||
this.viewerMeta.delete(ws)
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
ws.send(chunk, { binary: true })
|
||||
} catch (error) {
|
||||
this.logger.warn(`Viewer send failed: device=${session.deviceId}`, error)
|
||||
session.viewers.delete(ws)
|
||||
this.viewerMeta.delete(ws)
|
||||
try {
|
||||
ws.terminate()
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private handleHttpUpgrade(request: IncomingMessage, socket: Socket, head: Buffer): void {
|
||||
if (!this.config.enabled) {
|
||||
return
|
||||
}
|
||||
|
||||
const parsed = this.safeParseUrl(request)
|
||||
if (!parsed || parsed.pathname !== this.config.playbackPath) {
|
||||
return
|
||||
}
|
||||
|
||||
const token = parsed.searchParams.get('token') || ''
|
||||
const ticket = this.consumeTicket(token)
|
||||
if (!ticket) {
|
||||
this.rejectUpgrade(socket, 401, 'invalid_or_expired_token')
|
||||
return
|
||||
}
|
||||
|
||||
const session = this.sessions.get(ticket.deviceId)
|
||||
if (!session) {
|
||||
this.rejectUpgrade(socket, 404, 'stream_not_found')
|
||||
return
|
||||
}
|
||||
|
||||
this.wsServer.handleUpgrade(request, socket, head, (ws: WebSocket) => {
|
||||
this.registerViewer(ws, ticket, session)
|
||||
})
|
||||
}
|
||||
|
||||
private registerViewer(ws: WebSocket, ticket: PlaybackTicket, session: SrtSession): void {
|
||||
session.viewers.add(ws)
|
||||
session.lastActivityAt = Date.now()
|
||||
this.viewerMeta.set(ws, {
|
||||
deviceId: ticket.deviceId,
|
||||
clientId: ticket.clientId
|
||||
})
|
||||
|
||||
ws.on('message', () => {
|
||||
// Ignore upstream data from viewers; playback socket is read-only.
|
||||
})
|
||||
|
||||
ws.on('close', () => {
|
||||
session.viewers.delete(ws)
|
||||
this.viewerMeta.delete(ws)
|
||||
session.lastActivityAt = Date.now()
|
||||
})
|
||||
|
||||
ws.on('error', () => {
|
||||
session.viewers.delete(ws)
|
||||
this.viewerMeta.delete(ws)
|
||||
session.lastActivityAt = Date.now()
|
||||
try {
|
||||
ws.terminate()
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private safeParseUrl(request: IncomingMessage): URL | null {
|
||||
const requestUrl = request.url || ''
|
||||
const host = request.headers.host || 'localhost'
|
||||
try {
|
||||
return new URL(requestUrl, `http://${host}`)
|
||||
} catch {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
private rejectUpgrade(socket: Socket, statusCode: number, reason: string): void {
|
||||
try {
|
||||
socket.write(`HTTP/1.1 ${statusCode} ${reason}\r\nConnection: close\r\n\r\n`)
|
||||
} finally {
|
||||
socket.destroy()
|
||||
}
|
||||
}
|
||||
|
||||
private cleanupExpiredResources(): void {
|
||||
const now = Date.now()
|
||||
|
||||
for (const [token, ticket] of this.tickets.entries()) {
|
||||
if (ticket.expiresAt < now) {
|
||||
this.tickets.delete(token)
|
||||
}
|
||||
}
|
||||
|
||||
for (const [deviceId, session] of this.sessions.entries()) {
|
||||
const noViewers = session.viewers.size === 0
|
||||
const idleTooLong = now - session.lastActivityAt > this.config.idleTimeoutMs
|
||||
if (noViewers && idleTooLong) {
|
||||
this.logger.info(`Session idle timeout: device=${deviceId}`)
|
||||
this.stopSession(deviceId)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default SrtGatewayService
|
||||
Reference in New Issue
Block a user