Skip to content

Commit

Permalink
Kickoff publisher when stream is idle, which means no players. v6.0.3…
Browse files Browse the repository at this point in the history
…1, v5.0.144 (#3105)

For some use scenario, the publisher is invited when player want to view the stream:

1. Publisher connect to system, but does not publish any stream to SRS yet.
2. Player connect to system and start to request the stream.
3. System notifies publisher to publish stream to SRS.
4. Player play the stream from SRS.

Please notice that `system` means your business system, not SRS.

This is what we called `on-demand-live-streaming`, so when the last player stop to view the stream, what happends?

1. System needs to notify publisher to stop publish.
2. Or, SRS disconnect the publisher when idle(the last player stops playing).

This PR is for the solution 2, so that the cleanup is very simple, your system does not need to notify publisher to stop publish, because SRS has already disconnected the publihser.

PICK 8fde036

---------

Co-authored-by: winlin <[email protected]>
Co-authored-by: chundonglinlin <[email protected]>
  • Loading branch information
3 people committed Mar 6, 2023
1 parent 0a0fb23 commit 36a7228
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 20 deletions.
6 changes: 6 additions & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,12 @@ vhost publish.srs.com {
# Overwrite by env SRS_VHOST_PUBLISH_TRY_ANNEXB_FIRST for all vhosts.
# default: on
try_annexb_first on;
# The timeout in seconds to disconnect publisher when idle, which means no players.
# Note that 0 means no timeout or this feature is disabled.
# Note that this feature conflicts with forward, because it disconnect the publisher stream.
# Overwrite by env SRS_VHOST_PUBLISH_KICKOFF_FOR_IDLE for all vhosts.
# default: 0
kickoff_for_idle 0;
}
}

Expand Down
13 changes: 13 additions & 0 deletions trunk/conf/rtmp.kickoff.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# the config for srs to delivery RTMP with kicking off publish as no one watching.
# @see https://github.com/ossrs/srs/wiki/v1_CN_SampleRTMP
# @see full.conf for detail config.

listen 1935;
max_connections 1000;
daemon off;
srs_log_tank console;
vhost __defaultVhost__ {
publish {
kickoff_for_idle 60000;
}
}
3 changes: 2 additions & 1 deletion trunk/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ The changelog for SRS.

## SRS 5.0 Changelog

* v5.0, 2023-02-25, Merge [#3424](https://github.com/ossrs/srs/pull/3424): API: Add service_id for http_hooks, which identify the process. v5.0.142 (#3424)
* v5.0, 2023-03-04, Merge [#3105](https://github.com/ossrs/srs/pull/3105): Kickoff publisher when stream is idle, which means no players. v5.0.144 (#3105)
* v5.0, 2023-02-25, Merge [#3424](https://github.com/ossrs/srs/pull/3424): API: Add service_id for http_hooks, which identify the process. v5.0.143 (#3424)
* v5.0, 2023-02-22, Compatible with legacy RTMP URL. v5.0.142
* v5.0, 2023-02-12, Merge [#3409](https://github.com/ossrs/srs/pull/3409): SRT: Reduce latency to 200ms of srt2rtc.conf. v5.0.141 (#3409)
* v5.0, 2023-02-08, Merge [#3391](https://github.com/ossrs/srs/pull/3391): Config: Error when both HLS and HTTP-TS enabled. v5.0.140 (#3391)
Expand Down
4 changes: 2 additions & 2 deletions trunk/doc/Features.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ The features of SRS.
- [x] Live: Support origin cluster, please read [#464](https://github.com/ossrs/srs/issues/464), [RTMP 302](https://github.com/ossrs/srs/issues/92).
- [x] Live: Support NGINX HLS Cluster, see [CN](https://ossrs.net/lts/zh-cn/docs/v4/doc/sample-hls-cluster) or [EN](https://ossrs.io/lts/en-us/docs/v4/doc/sample-hls-cluster).
- [x] Live: SRT: Support PUSH SRT by IP and optional port, see [#3198](https://github.com/ossrs/srs/issues/3198). v5.0.76+
- [x] Live: [Experimental] Support MPEG-DASH, the future live streaming protocol, read [#299](https://github.com/ossrs/srs/issues/299).
- [x] Live: [Experimental] Support SRT server, read [#1147](https://github.com/ossrs/srs/issues/1147).
- [x] Live: Kickoff publisher when stream is idle, which means no players. v5.0.144+
- [x] Live: [Experimental] Support SRT server, read [#1147](https://github.com/ossrs/srs/issues/1147). v4.0.143+
- [x] Live: [Experimental] Support Coroutine Native SRT over ST, [#3010](https://github.com/ossrs/srs/pull/3010). v5.0.30+
- [x] RTC: Support playing stream by WebRTC, [#307](https://github.com/ossrs/srs/issues/307).
- [x] RTC: Support publishing stream by WebRTC, [#307](https://github.com/ossrs/srs/issues/307).
Expand Down
39 changes: 38 additions & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2559,7 +2559,7 @@ srs_error_t SrsConfig::check_normal_config()
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "mr" && m != "mr_latency" && m != "firstpkt_timeout" && m != "normal_timeout"
&& m != "parse_sps" && m != "try_annexb_first") {
&& m != "parse_sps" && m != "try_annexb_first" && m != "kickoff_for_idle") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.publish.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
Expand Down Expand Up @@ -2727,6 +2727,14 @@ srs_error_t SrsConfig::check_normal_config()
}
}

// Check forward dnd kickoff for publsher idle.
for (int n = 0; n < (int)vhosts.size(); n++) {
SrsConfDirective* vhost = vhosts[n];
if (get_forward_enabled(vhost) && get_publish_kickoff_for_idle(vhost)) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "vhost.forward conflicts with vhost.publish.kickoff_for_idle");
}
}

// check ingest id unique.
for (int i = 0; i < (int)vhosts.size(); i++) {
SrsConfDirective* vhost = vhosts[i];
Expand Down Expand Up @@ -5340,6 +5348,35 @@ srs_utime_t SrsConfig::get_publish_normal_timeout(string vhost)
return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS);
}

srs_utime_t SrsConfig::get_publish_kickoff_for_idle(std::string vhost)
{
return get_publish_kickoff_for_idle(get_vhost(vhost));
}

srs_utime_t SrsConfig::get_publish_kickoff_for_idle(SrsConfDirective* vhost)
{
SRS_OVERWRITE_BY_ENV_FLOAT_SECONDS("srs.vhost.publish.kickoff_for_idle"); // SRS_VHOST_PUBLISH_KICKOFF_FOR_IDLE

static srs_utime_t DEFAULT = 0 * SRS_UTIME_SECONDS;

SrsConfDirective* conf = vhost;
if (!conf) {
return DEFAULT;
}

conf = conf->get("publish");
if (!conf) {
return DEFAULT;
}

conf = conf->get("kickoff_for_idle");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}

return (srs_utime_t)(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS);
}

int SrsConfig::get_global_chunk_size()
{
SRS_OVERWRITE_BY_ENV_INT("srs.vhost.chunk_size"); // SRS_VHOST_CHUNK_SIZE
Expand Down
3 changes: 3 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,9 @@ class SrsConfig
virtual srs_utime_t get_publish_1stpkt_timeout(std::string vhost);
// The normal packet timeout in srs_utime_t for encoder.
virtual srs_utime_t get_publish_normal_timeout(std::string vhost);
// The kickoff timeout in srs_utime_t for publisher.
virtual srs_utime_t get_publish_kickoff_for_idle(std::string vhost);
virtual srs_utime_t get_publish_kickoff_for_idle(SrsConfDirective* vhost);
private:
// Get the global chunk size.
virtual int get_global_chunk_size();
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_gb28181.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ srs_error_t SrsLazyGbSession::cycle()

// It maybe success with message.
if (srs_error_code(err) == ERROR_SUCCESS) {
srs_trace("client finished%s.", srs_error_summary(err).c_str());
srs_trace("client finished %s.", srs_error_summary(err).c_str());
srs_freep(err);
return err;
}
Expand Down
6 changes: 6 additions & 0 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
// initialize the publish timeout.
publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
srs_utime_t publish_kickoff_for_idle = _srs_config->get_publish_kickoff_for_idle(req->vhost);

// set the sock options.
set_sock_options();
Expand All @@ -1008,6 +1009,11 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
return srs_error_wrap(err, "rtmp: thread quit");
}

// Kick off the publisher when idle for a period of timeout.
if (source->publisher_is_idle_for(publish_kickoff_for_idle)) {
return srs_error_new(ERROR_KICKOFF_FOR_IDLE, "kicked for idle, url=%s, timeout=%ds", req->tcUrl.c_str(), srsu2si(publish_kickoff_for_idle));
}

pprint->elapse();

// cond wait for timeout.
Expand Down
49 changes: 40 additions & 9 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1866,7 +1866,7 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut
// @see https://github.com/ossrs/srs/issues/714
#if 0
// When source expired, remove it.
if (source->expired()) {
if (source->stream_is_dead()) {
int cid = source->source_id();
if (cid == -1 && source->pre_source_id() > 0) {
cid = source->pre_source_id();
Expand Down Expand Up @@ -1915,7 +1915,8 @@ SrsLiveSource::SrsLiveSource()
mix_queue = new SrsMixQueue();

_can_publish = true;
die_at = 0;
stream_die_at_ = 0;
publisher_idle_at_ = 0;

handler = NULL;
bridge_ = NULL;
Expand Down Expand Up @@ -1972,10 +1973,10 @@ srs_error_t SrsLiveSource::cycle()
return srs_success;
}

bool SrsLiveSource::expired()
bool SrsLiveSource::stream_is_dead()
{
// unknown state?
if (die_at == 0) {
if (stream_die_at_ == 0) {
return false;
}

Expand All @@ -1990,13 +1991,26 @@ bool SrsLiveSource::expired()
}

srs_utime_t now = srs_get_system_time();
if (now > die_at + SRS_SOURCE_CLEANUP) {
if (now > stream_die_at_ + SRS_SOURCE_CLEANUP) {
return true;
}

return false;
}

bool SrsLiveSource::publisher_is_idle_for(srs_utime_t timeout)
{
if (!publisher_idle_at_ || !timeout) {
return false;
}

srs_utime_t now = srs_get_system_time();
if (now > publisher_idle_at_ + timeout) {
return true;
}
return false;
}

srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -2623,6 +2637,11 @@ srs_error_t SrsLiveSource::on_publish()

SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_publish(req, _source_id.c_str());

// When no players, the publisher is idle now.
if (consumers.empty()) {
publisher_idle_at_ = srs_get_system_time();
}

return err;
}
Expand Down Expand Up @@ -2669,7 +2688,7 @@ void SrsLiveSource::on_unpublish()

// no consumer, stream is die.
if (consumers.empty()) {
die_at = srs_get_system_time();
stream_die_at_ = srs_get_system_time();
}
}

Expand All @@ -2679,7 +2698,11 @@ srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer)

consumer = new SrsLiveConsumer(this);
consumers.push_back(consumer);


// There should be one consumer, so reset the timeout.
stream_die_at_ = 0;
publisher_idle_at_ = 0;

// for edge, when play edge stream, check the state
if (_srs_config->get_vhost_is_edge(req->vhost)) {
// notice edge to start for the first client.
Expand Down Expand Up @@ -2741,10 +2764,18 @@ void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer)
if (it != consumers.end()) {
it = consumers.erase(it);
}

if (consumers.empty()) {
play_edge->on_all_client_stop();
die_at = srs_get_system_time();

// For edge server, the stream die when the last player quit, because the edge stream is created by player
// activities, so it should die when all players quit.
if (_srs_config->get_vhost_is_edge(req->vhost)) {
stream_die_at_ = srs_get_system_time();
}

// When no players, the publisher is idle now.
publisher_idle_at_ = srs_get_system_time();
}
}

Expand Down
13 changes: 8 additions & 5 deletions trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,17 +533,20 @@ class SrsLiveSource : public ISrsReloadHandler
private:
// Whether source is avaiable for publishing.
bool _can_publish;
// The last die time, when all consumers quit and no publisher,
// We will remove the source when source die.
srs_utime_t die_at;
// The last die time, while die means neither publishers nor players.
srs_utime_t stream_die_at_;
// The last idle time, while idle means no players.
srs_utime_t publisher_idle_at_;
public:
SrsLiveSource();
virtual ~SrsLiveSource();
public:
virtual void dispose();
virtual srs_error_t cycle();
// Remove source when expired.
virtual bool expired();
// Whether stream is dead, which is no publisher or player.
virtual bool stream_is_dead();
// Whether publisher is idle for a period of timeout.
bool publisher_is_idle_for(srs_utime_t timeout);
public:
// Initialize the hls with handlers.
virtual srs_error_t initialize(SrsRequest* r, ISrsLiveSourceHandler* h);
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core_version5.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

#define VERSION_MAJOR 5
#define VERSION_MINOR 0
#define VERSION_REVISION 143
#define VERSION_REVISION 144

#endif
1 change: 1 addition & 0 deletions trunk/src/kernel/srs_kernel_error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
XX(ERROR_RTMP_MESSAGE_CREATE , 2053, "MessageCreate", "Failed to create shared pointer message") \
XX(ERROR_RTMP_PROXY_EXCEED , 2054, "RtmpProxy", "Failed to decode message of RTMP proxy") \
XX(ERROR_RTMP_CREATE_STREAM_DEPTH , 2055, "RtmpIdentify", "Failed to identify RTMP client") \
XX(ERROR_KICKOFF_FOR_IDLE , 2056, "KickoffForIdle", "Kickoff for publisher is idle") \
XX(ERROR_CONTROL_REDIRECT , 2997, "RtmpRedirect", "RTMP 302 redirection") \
XX(ERROR_CONTROL_RTMP_CLOSE , 2998, "RtmpClose", "RTMP connection is closed") \
XX(ERROR_CONTROL_REPUBLISH , 2999, "RtmpRepublish", "RTMP stream is republished")
Expand Down
3 changes: 3 additions & 0 deletions trunk/src/utest/srs_utest_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4458,6 +4458,9 @@ VOID TEST(ConfigEnvTest, CheckEnvValuesVhostPublish)

SrsSetEnvConfig(try_annexb_first, "SRS_VHOST_PUBLISH_TRY_ANNEXB_FIRST", "off");
EXPECT_FALSE(conf.try_annexb_first("__defaultVhost__"));

SrsSetEnvConfig(kickoff_for_idle, "SRS_VHOST_PUBLISH_KICKOFF_FOR_IDLE", "30");
EXPECT_EQ(30 * SRS_UTIME_SECONDS, conf.get_publish_kickoff_for_idle("__defaultVhost__"));
}
}

Expand Down

0 comments on commit 36a7228

Please sign in to comment.