From fa51e360b9d61ebf8e931c92c654d3e202922f69 Mon Sep 17 00:00:00 2001 From: limjoe Date: Sat, 17 Aug 2024 22:38:10 +0800 Subject: [PATCH 1/3] Transcript: support custom fix and overlay queue --- DEVELOPER.md | 2 ++ platform/main.go | 8 ++++- platform/transcript.go | 67 ++++++++++++++++++++++++++++++++++++++++-- platform/utils.go | 8 +++++ 4 files changed, 81 insertions(+), 4 deletions(-) diff --git a/DEVELOPER.md b/DEVELOPER.md index 3870e133..5d04d826 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -1190,6 +1190,8 @@ For limit that you can control: * `SRS_FORWARD_LIMIT`: The limit for SRS forward. Default: `10`. * `SRS_VLIVE_LIMIT`: The limit for SRS virtual live. Default: `10`. +* `SRS_TRANSCRIPT_FIX_QUEUE_LIMIT`: The limit for SRS transcript manually fix queue. Default: `2`. +* `SRS_TRANSCRIPT_OVERLAY_QUEUE_LIMIT`: The limit for SRS transcript overlay queue. Default: `9`. For feature control: diff --git a/platform/main.go b/platform/main.go index 9bd97915..1f8dae93 100644 --- a/platform/main.go +++ b/platform/main.go @@ -132,13 +132,18 @@ func doMain(ctx context.Context) error { setEnvDefault("SRS_VLIVE_LIMIT", "10") setEnvDefault("SRS_CAMERA_LIMIT", "10") + // For transcript queue limit. + setEnvDefault("SRS_TRANSCRIPT_FIX_QUEUE_LIMIT", "2") + setEnvDefault("SRS_TRANSCRIPT_OVERLAY_QUEUE_LIMIT", "9") + logger.Tf(ctx, "load .env as MGMT_PASSWORD=%vB, GO_PPROF=%v, "+ "SRS_PLATFORM_SECRET=%vB, CLOUD=%v, REGION=%v, SOURCE=%v, SRT_PORT=%v, RTC_PORT=%v, "+ "NODE_ENV=%v, LOCAL_RELEASE=%v, REDIS_DATABASE=%v, REDIS_HOST=%v, REDIS_PASSWORD=%vB, REDIS_PORT=%v, RTMP_PORT=%v, "+ "PUBLIC_URL=%v, BUILD_PATH=%v, REACT_APP_LOCALE=%v, PLATFORM_LISTEN=%v, HTTP_PORT=%v, "+ "REGISTRY=%v, MGMT_LISTEN=%v, HTTPS_LISTEN=%v, AUTO_SELF_SIGNED_CERTIFICATE=%v, "+ "NAME_LOOKUP=%v, PLATFORM_DOCKER=%v, SRS_FORWARD_LIMIT=%v, SRS_VLIVE_LIMIT=%v, "+ - "SRS_CAMERA_LIMIT=%v, YTDL_PROXY=%v", + "SRS_CAMERA_LIMIT=%v, YTDL_PROXY=%v"+ + "SRS_TRANSCRIPT_FIX_QUEUE_LIMIT=%v, SRS_TRANSCRIPT_OVERLAY_QUEUE_LIMIT=%v", len(envMgmtPassword()), envGoPprof(), len(envApiSecret()), envCloud(), envRegion(), envSource(), envSrtListen(), envRtcListen(), envNodeEnv(), envLocalRelease(), @@ -149,6 +154,7 @@ func doMain(ctx context.Context) error { envSelfSignedCertificate(), envNameLookup(), envPlatformDocker(), envForwardLimit(), envVLiveLimit(), envCameraLimit(), envYtdlProxy(), + envTranscriptFixQueueLimit(), envTranscriptOverlayQueueLimit(), ) // Start the Go pprof if enabled. diff --git a/platform/transcript.go b/platform/transcript.go index 51340915..1cf376c7 100644 --- a/platform/transcript.go +++ b/platform/transcript.go @@ -27,8 +27,39 @@ import ( "github.com/sashabaranov/go-openai" ) -// The total segments in overlay HLS. -const maxOverlaySegments = 9 +// The default total segments in overlay HLS. +const defaultMaxOverlaySegments = 9 + +// Get the total segments in overlay HLS. +func GetMaxOverlaySegments() (int, error) { + var maxOverlaySegments int + if envTranscriptOverlayQueueLimit() != "" { + if iv, err := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64); err != nil { + return defaultMaxOverlaySegments, errors.Wrapf(err, "parse env transcript overlay queue limit %v", envTranscriptOverlayQueueLimit()) + } else { + maxOverlaySegments = int(iv) + } + } + + return maxOverlaySegments, nil +} + +// The default max fix queue limits. +const defaultMaxFixQueueLimit = 2 + +// Get the manually fix queue limit. +func GetMaxFixQueueLimit() (int, error) { + var maxFixQueueLimit int + if envTranscriptFixQueueLimit() != "" { + if iv, err := strconv.ParseInt(envTranscriptFixQueueLimit(), 10, 64); err != nil { + return defaultMaxFixQueueLimit, errors.Wrapf(err, "parse env transcript manually fix queue limit %v", envTranscriptFixQueueLimit()) + } else { + maxFixQueueLimit = int(iv) + } + } + + return maxFixQueueLimit, nil +} var transcriptWorker *TranscriptWorker @@ -1717,6 +1748,12 @@ func (v *TranscriptTask) DriveLiveQueue(ctx context.Context) error { return nil } + // Get the maxOverlaySegments value + maxOverlaySegments, err := GetMaxOverlaySegments() + if err != nil { + logger.Wf(ctx, "transcript: ignore get maxOverlaySegments err %+v, use default value %v", err, defaultMaxOverlaySegments) + } + // Wait if ASR queue is full. if v.AsrQueue.count() >= maxOverlaySegments+1 { return nil @@ -1792,6 +1829,12 @@ func (v *TranscriptTask) DriveAsrQueue(ctx context.Context) error { return nil } + // Get the maxOverlaySegments value + maxOverlaySegments, err := GetMaxOverlaySegments() + if err != nil { + logger.Wf(ctx, "transcript: ignore get maxOverlaySegments err %+v, use default value %v", err, defaultMaxOverlaySegments) + } + // Wait if Fix queue is full. if v.FixQueue.count() >= maxOverlaySegments+1 { return nil @@ -1917,8 +1960,14 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error { return nil } + // Get the maxFixQueueLimit value + maxFixQueueLimit, err := GetMaxFixQueueLimit() + if err != nil { + logger.Wf(ctx, "transcript: ignore get maxFixQueueLimit err %+v, use default value %v", err, defaultMaxFixQueueLimit) + } + // Ignore if not enough segments. - if v.FixQueue.count() <= 2 { + if v.FixQueue.count() <= maxFixQueueLimit { return nil } @@ -1937,6 +1986,12 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error { return nil } + // Get the maxOverlaySegments value + maxOverlaySegments, err := GetMaxOverlaySegments() + if err != nil { + logger.Wf(ctx, "transcript: ignore get maxOverlaySegments err %+v, use default value %v", err, defaultMaxOverlaySegments) + } + // Wait if Overlay queue is full. if v.OverlayQueue.count() >= maxOverlaySegments+1 { return nil @@ -2028,6 +2083,12 @@ func (v *TranscriptTask) DriveOverlayQueue(ctx context.Context) error { return nil } + // Get the maxOverlaySegments value + maxOverlaySegments, err := GetMaxOverlaySegments() + if err != nil { + logger.Wf(ctx, "transcript: ignore get maxOverlaySegments err %+v, use default value %v", err, defaultMaxOverlaySegments) + } + // Ignore if not enough segments. if v.OverlayQueue.count() <= maxOverlaySegments { select { diff --git a/platform/utils.go b/platform/utils.go index 5a116582..805d207a 100644 --- a/platform/utils.go +++ b/platform/utils.go @@ -494,6 +494,14 @@ func envYtdlProxy() string { return os.Getenv("YTDL_PROXY") } +func envTranscriptFixQueueLimit() string { + return os.Getenv("SRS_TRANSCRIPT_FIX_QUEUE_LIMIT") +} + +func envTranscriptOverlayQueueLimit() string { + return os.Getenv("SRS_TRANSCRIPT_OVERLAY_QUEUE_LIMIT") +} + // rdb is a global redis client object. var rdb *redis.Client From 7d3966730881a60fcc2956afcbdc86855809a648 Mon Sep 17 00:00:00 2001 From: limjoe Date: Wed, 28 Aug 2024 18:40:58 +0800 Subject: [PATCH 2/3] Transcript: support custom fix and overlay queue code format --- platform/main.go | 7 ++++ platform/transcript.go | 79 ++++++++---------------------------------- 2 files changed, 22 insertions(+), 64 deletions(-) diff --git a/platform/main.go b/platform/main.go index 1f8dae93..78261122 100644 --- a/platform/main.go +++ b/platform/main.go @@ -14,6 +14,7 @@ import ( "path" "path/filepath" "runtime" + "strconv" "strings" "syscall" "time" @@ -134,7 +135,13 @@ func doMain(ctx context.Context) error { // For transcript queue limit. setEnvDefault("SRS_TRANSCRIPT_FIX_QUEUE_LIMIT", "2") + if _, err := strconv.ParseInt(envTranscriptFixQueueLimit(), 10, 64); err != nil { + return errors.Wrapf(err, "parse env transcript fix queue limit %v", envTranscriptFixQueueLimit()) + } setEnvDefault("SRS_TRANSCRIPT_OVERLAY_QUEUE_LIMIT", "9") + if _, err := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64); err != nil { + return errors.Wrapf(err, "parse env transcript overlay queue limit %v", envTranscriptOverlayQueueLimit()) + } logger.Tf(ctx, "load .env as MGMT_PASSWORD=%vB, GO_PPROF=%v, "+ "SRS_PLATFORM_SECRET=%vB, CLOUD=%v, REGION=%v, SOURCE=%v, SRT_PORT=%v, RTC_PORT=%v, "+ diff --git a/platform/transcript.go b/platform/transcript.go index 1cf376c7..bbaad2a0 100644 --- a/platform/transcript.go +++ b/platform/transcript.go @@ -27,40 +27,6 @@ import ( "github.com/sashabaranov/go-openai" ) -// The default total segments in overlay HLS. -const defaultMaxOverlaySegments = 9 - -// Get the total segments in overlay HLS. -func GetMaxOverlaySegments() (int, error) { - var maxOverlaySegments int - if envTranscriptOverlayQueueLimit() != "" { - if iv, err := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64); err != nil { - return defaultMaxOverlaySegments, errors.Wrapf(err, "parse env transcript overlay queue limit %v", envTranscriptOverlayQueueLimit()) - } else { - maxOverlaySegments = int(iv) - } - } - - return maxOverlaySegments, nil -} - -// The default max fix queue limits. -const defaultMaxFixQueueLimit = 2 - -// Get the manually fix queue limit. -func GetMaxFixQueueLimit() (int, error) { - var maxFixQueueLimit int - if envTranscriptFixQueueLimit() != "" { - if iv, err := strconv.ParseInt(envTranscriptFixQueueLimit(), 10, 64); err != nil { - return defaultMaxFixQueueLimit, errors.Wrapf(err, "parse env transcript manually fix queue limit %v", envTranscriptFixQueueLimit()) - } else { - maxFixQueueLimit = int(iv) - } - } - - return maxFixQueueLimit, nil -} - var transcriptWorker *TranscriptWorker type TranscriptWorker struct { @@ -1748,14 +1714,11 @@ func (v *TranscriptTask) DriveLiveQueue(ctx context.Context) error { return nil } - // Get the maxOverlaySegments value - maxOverlaySegments, err := GetMaxOverlaySegments() - if err != nil { - logger.Wf(ctx, "transcript: ignore get maxOverlaySegments err %+v, use default value %v", err, defaultMaxOverlaySegments) - } + // Get total segments in overlay HLS. + maxOverlaySegments, _ := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64) // Wait if ASR queue is full. - if v.AsrQueue.count() >= maxOverlaySegments+1 { + if v.AsrQueue.count() >= int(maxOverlaySegments)+1 { return nil } @@ -1829,14 +1792,11 @@ func (v *TranscriptTask) DriveAsrQueue(ctx context.Context) error { return nil } - // Get the maxOverlaySegments value - maxOverlaySegments, err := GetMaxOverlaySegments() - if err != nil { - logger.Wf(ctx, "transcript: ignore get maxOverlaySegments err %+v, use default value %v", err, defaultMaxOverlaySegments) - } + // Get total segments in overlay HLS. + maxOverlaySegments, _ := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64) // Wait if Fix queue is full. - if v.FixQueue.count() >= maxOverlaySegments+1 { + if v.FixQueue.count() >= int(maxOverlaySegments)+1 { return nil } @@ -1960,14 +1920,11 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error { return nil } - // Get the maxFixQueueLimit value - maxFixQueueLimit, err := GetMaxFixQueueLimit() - if err != nil { - logger.Wf(ctx, "transcript: ignore get maxFixQueueLimit err %+v, use default value %v", err, defaultMaxFixQueueLimit) - } + // Get total segments in manually fix queue. + maxFixQueueLimit, _ := strconv.ParseInt(envTranscriptFixQueueLimit(), 10, 64) // Ignore if not enough segments. - if v.FixQueue.count() <= maxFixQueueLimit { + if v.FixQueue.count() <= int(maxFixQueueLimit) { return nil } @@ -1986,14 +1943,11 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error { return nil } - // Get the maxOverlaySegments value - maxOverlaySegments, err := GetMaxOverlaySegments() - if err != nil { - logger.Wf(ctx, "transcript: ignore get maxOverlaySegments err %+v, use default value %v", err, defaultMaxOverlaySegments) - } + // Get total segments in overlay HLS. + maxOverlaySegments, _ := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64) // Wait if Overlay queue is full. - if v.OverlayQueue.count() >= maxOverlaySegments+1 { + if v.OverlayQueue.count() >= int(maxOverlaySegments)+1 { return nil } @@ -2083,14 +2037,11 @@ func (v *TranscriptTask) DriveOverlayQueue(ctx context.Context) error { return nil } - // Get the maxOverlaySegments value - maxOverlaySegments, err := GetMaxOverlaySegments() - if err != nil { - logger.Wf(ctx, "transcript: ignore get maxOverlaySegments err %+v, use default value %v", err, defaultMaxOverlaySegments) - } + // Get total segments in overlay HLS. + maxOverlaySegments, _ := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64) // Ignore if not enough segments. - if v.OverlayQueue.count() <= maxOverlaySegments { + if v.OverlayQueue.count() <= int(maxOverlaySegments) { select { case <-ctx.Done(): case <-time.After(1 * time.Second): From c8145aacabfbb031ed02d33a47f2eb3151864af8 Mon Sep 17 00:00:00 2001 From: limjoe Date: Thu, 29 Aug 2024 18:46:22 +0800 Subject: [PATCH 3/3] Transcript: fix queue limit value strconv error --- platform/transcript.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/platform/transcript.go b/platform/transcript.go index bbaad2a0..3c8ec8e3 100644 --- a/platform/transcript.go +++ b/platform/transcript.go @@ -1715,10 +1715,10 @@ func (v *TranscriptTask) DriveLiveQueue(ctx context.Context) error { } // Get total segments in overlay HLS. - maxOverlaySegments, _ := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64) + maxOverlaySegments, _ := strconv.Atoi(envTranscriptOverlayQueueLimit()) // Wait if ASR queue is full. - if v.AsrQueue.count() >= int(maxOverlaySegments)+1 { + if v.AsrQueue.count() >= maxOverlaySegments+1 { return nil } @@ -1793,10 +1793,10 @@ func (v *TranscriptTask) DriveAsrQueue(ctx context.Context) error { } // Get total segments in overlay HLS. - maxOverlaySegments, _ := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64) + maxOverlaySegments, _ := strconv.Atoi(envTranscriptOverlayQueueLimit()) // Wait if Fix queue is full. - if v.FixQueue.count() >= int(maxOverlaySegments)+1 { + if v.FixQueue.count() >= maxOverlaySegments+1 { return nil } @@ -1921,10 +1921,10 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error { } // Get total segments in manually fix queue. - maxFixQueueLimit, _ := strconv.ParseInt(envTranscriptFixQueueLimit(), 10, 64) + maxFixQueueLimit, _ := strconv.Atoi(envTranscriptFixQueueLimit()) // Ignore if not enough segments. - if v.FixQueue.count() <= int(maxFixQueueLimit) { + if v.FixQueue.count() <= maxFixQueueLimit { return nil } @@ -1944,10 +1944,10 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error { } // Get total segments in overlay HLS. - maxOverlaySegments, _ := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64) + maxOverlaySegments, _ := strconv.Atoi(envTranscriptOverlayQueueLimit()) // Wait if Overlay queue is full. - if v.OverlayQueue.count() >= int(maxOverlaySegments)+1 { + if v.OverlayQueue.count() >= maxOverlaySegments+1 { return nil } @@ -2038,10 +2038,10 @@ func (v *TranscriptTask) DriveOverlayQueue(ctx context.Context) error { } // Get total segments in overlay HLS. - maxOverlaySegments, _ := strconv.ParseInt(envTranscriptOverlayQueueLimit(), 10, 64) + maxOverlaySegments, _ := strconv.Atoi(envTranscriptOverlayQueueLimit()) // Ignore if not enough segments. - if v.OverlayQueue.count() <= int(maxOverlaySegments) { + if v.OverlayQueue.count() <= maxOverlaySegments { select { case <-ctx.Done(): case <-time.After(1 * time.Second):