Skip to content

Commit

Permalink
Restructure and add configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Thor77 committed Nov 26, 2019
1 parent 46ab584 commit e411a40
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 149 deletions.
32 changes: 32 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"github.com/BurntSushi/toml"
log "github.com/sirupsen/logrus"
)

// Config stores application configuration
type Config struct {
Addr string
Key string
MsPerSegment int64
LogLevel log.Level
}

func loadConfig(path string) (Config, error) {
var tmpConfig Config
if _, err := toml.DecodeFile(path, &tmpConfig); err != nil {
return tmpConfig, err
}

if tmpConfig.Addr == "" {
tmpConfig.Addr = ":1935"
}
if tmpConfig.LogLevel == 0 {
tmpConfig.LogLevel = log.InfoLevel
}
if tmpConfig.MsPerSegment == 0 {
tmpConfig.MsPerSegment = 15000
}
return tmpConfig, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/netroby/go-rtmp-server
go 1.13

require (
github.com/BurntSushi/toml v0.3.1
github.com/grafov/m3u8 v0.11.1
github.com/nareix/joy4 v0.0.0-20181022032202-3ddbc8f9d431
github.com/sirupsen/logrus v1.4.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/grafov/m3u8 v0.11.1 h1:igZ7EBIB2IAsPPazKwRKdbhxcoBKO3lO1UY57PZDeNA=
Expand Down
154 changes: 154 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package main

import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"

"github.com/grafov/m3u8"
"github.com/nareix/joy4/av"
"github.com/nareix/joy4/format/rtmp"
"github.com/nareix/joy4/format/ts"
log "github.com/sirupsen/logrus"
)

func removeOutdatedSegments(streamLogger *log.Entry, streamName string, playlist *m3u8.MediaPlaylist) error {
currentSegments := make(map[string]struct{}, len(playlist.Segments))
for _, segment := range playlist.Segments {
if segment != nil {
currentSegments[segment.URI] = struct{}{}
}
}
segmentFiles, err := filepath.Glob(fmt.Sprintf("%s*.ts", streamName))
if err != nil {
return err
}
for _, segmentFile := range segmentFiles {
if _, ok := currentSegments[segmentFile]; !ok {
if err := os.Remove(segmentFile); err != nil {
streamLogger.Errorln(err)
} else {
streamLogger.Debugf("Removed segment %s\n", segmentFile)
}
}
}
return nil
}

func publishHandler(conn *rtmp.Conn) {
log.Debugf("Handling request %s\n", conn.URL.RequestURI())
if conn.URL.Query().Get("key") != config.Key {
log.Errorln("Key mismatch, aborting request")
return
}

streamName := strings.ReplaceAll(conn.URL.Path, "/", "")
if streamName == "" {
log.Errorln("Invalid stream name")
return
}

streamLogger := log.WithFields(log.Fields{"stream": streamName})

streamLogger.Infoln("Client connected")

// create hls playlist
playlistFileName := fmt.Sprintf("%s.m3u8", streamName)
playlist, err := m3u8.NewMediaPlaylist(5, 10)
if err != nil {
streamLogger.Errorln(err)
return
}

streams, err := conn.Streams()
if err != nil {
streamLogger.Errorln(err)
return
}

i := 0
clientConnected := true
var lastPacketTime time.Duration = 0
for clientConnected {
// create new segment
segmentName := fmt.Sprintf("%s%04d.ts", streamName, i)
outFile, err := os.Create(segmentName)
if err != nil {
streamLogger.Errorln(err)
return
}
tsMuxer := ts.NewMuxer(outFile)

// write header
if err := tsMuxer.WriteHeader(streams); err != nil {
streamLogger.Errorln(err)
return
}
// write some data
var segmentLength time.Duration = 0
//var lastPacketTime time.Duration = 0
var packetLength time.Duration = 0
for segmentLength.Milliseconds() < config.MsPerSegment {
var packet av.Packet
if packet, err = conn.ReadPacket(); err != nil {
if err == io.EOF {
streamLogger.Infoln("Client disconnected")
clientConnected = false
break
}
streamLogger.Errorln(err)
return
}
if err = tsMuxer.WritePacket(packet); err != nil {
streamLogger.Errorln(err)
return
}
packetLength = packet.Time - lastPacketTime
segmentLength += packetLength
lastPacketTime = packet.Time
}
// write trailer
if err := tsMuxer.WriteTrailer(); err != nil {
streamLogger.Errorln(err)
return
}
streamLogger.Debugf("Wrote segment %s\n", segmentName)

// update playlist
playlist.Slide(segmentName, segmentLength.Seconds(), "")
playlistFile, err := os.Create(playlistFileName)
if err != nil {
streamLogger.Errorln(err)
return
}
playlistFile.Write(playlist.Encode().Bytes())
playlistFile.Close()

// cleanup segments
if err := removeOutdatedSegments(streamLogger, streamName, playlist); err != nil {
streamLogger.Errorln(err)
return
}

// increase counter
i++
}

// remove all segments; this is probably not a good idea
for _, segment := range playlist.Segments {
if segment != nil {
if err := os.Remove(segment.URI); err != nil {
streamLogger.Errorln(err)
return
}
}
}
// remove playlist
if err := os.Remove(playlistFileName); err != nil {
streamLogger.Error(err)
return
}
}
167 changes: 18 additions & 149 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,166 +1,35 @@
package main

import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"

"github.com/grafov/m3u8"
"github.com/nareix/joy4/av"
"github.com/nareix/joy4/format/rtmp"
"github.com/nareix/joy4/format/ts"
"os"

log "github.com/sirupsen/logrus"
)

const addr = ":1935"
const key = "test"
const msPerSegment = 15000
var config Config

// TODO: replace failln with println / switch to logrus to include stream name in msg
func main() {
var err error

func removeOutdatedSegments(streamLogger *log.Entry, streamName string, playlist *m3u8.MediaPlaylist) error {
currentSegments := make(map[string]struct{}, len(playlist.Segments))
for _, segment := range playlist.Segments {
if segment != nil {
currentSegments[segment.URI] = struct{}{}
}
configPath := "config.toml"
if len(os.Args) >= 2 {
configPath = os.Args[1]
}
segmentFiles, err := filepath.Glob(fmt.Sprintf("%s*.ts", streamName))

config, err = loadConfig(configPath)
if err != nil {
return err
log.Fatalln(err)
}
for _, segmentFile := range segmentFiles {
if _, ok := currentSegments[segmentFile]; !ok {
if err := os.Remove(segmentFile); err != nil {
streamLogger.Errorln(err)
} else {
streamLogger.Infof("Removed segment %s\n", segmentFile)
}
}
}
return nil
}

func main() {
server := &rtmp.Server{Addr: addr}

server.HandlePublish = func(conn *rtmp.Conn) {
log.Infof("Handling request %s\n", conn.URL.RequestURI())
if conn.URL.Query().Get("key") != key {
log.Infoln("Key mismatch, aborting request")
return
}

streamName := strings.ReplaceAll(conn.URL.Path, "/", "")
if streamName == "" {
log.Errorln("Invalid stream name")
return
}

streamLogger := log.WithFields(log.Fields{"stream": streamName})

// create hls playlist
playlistFileName := fmt.Sprintf("%s.m3u8", streamName)
playlist, err := m3u8.NewMediaPlaylist(5, 10)
if err != nil {
streamLogger.Errorln(err)
return
}

streams, err := conn.Streams()
if err != nil {
streamLogger.Errorln(err)
return
}

i := 0
clientConnected := true
var lastPacketTime time.Duration = 0
for clientConnected {
// create new segment
segmentName := fmt.Sprintf("%s%04d.ts", streamName, i)
outFile, err := os.Create(segmentName)
if err != nil {
streamLogger.Errorln(err)
return
}
tsMuxer := ts.NewMuxer(outFile)

// write header
if err := tsMuxer.WriteHeader(streams); err != nil {
streamLogger.Errorln(err)
return
}
// write some data
var segmentLength time.Duration = 0
//var lastPacketTime time.Duration = 0
var packetLength time.Duration = 0
for segmentLength.Milliseconds() < msPerSegment {
var packet av.Packet
if packet, err = conn.ReadPacket(); err != nil {
if err == io.EOF {
streamLogger.Infoln("Client disconnected")
clientConnected = false
break
}
streamLogger.Errorln(err)
return
}
if err = tsMuxer.WritePacket(packet); err != nil {
streamLogger.Errorln(err)
return
}
packetLength = packet.Time - lastPacketTime
segmentLength += packetLength
lastPacketTime = packet.Time
}
// write trailer
if err := tsMuxer.WriteTrailer(); err != nil {
streamLogger.Errorln(err)
return
}
streamLogger.Infof("Wrote segment %s\n", segmentName)

// update playlist
playlist.Slide(segmentName, segmentLength.Seconds(), "")
playlistFile, err := os.Create(playlistFileName)
if err != nil {
streamLogger.Errorln(err)
return
}
playlistFile.Write(playlist.Encode().Bytes())
playlistFile.Close()

// cleanup segments
if err := removeOutdatedSegments(streamLogger, streamName, playlist); err != nil {
streamLogger.Errorln(err)
return
}

// increase counter
i++
}

// remove all segments; this is probably not a good idea
for _, segment := range playlist.Segments {
if segment != nil {
if err := os.Remove(segment.URI); err != nil {
streamLogger.Errorln(err)
return
}
}
}
// remove playlist
if err := os.Remove(playlistFileName); err != nil {
streamLogger.Error(err)
return
}
}
// setup logger
log.SetLevel(config.LogLevel)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})

log.Printf("Listening on %s", server.Addr)
server := &rtmp.Server{Addr: config.Addr}
server.HandlePublish = publishHandler
log.Infof("Listening on %s\n", server.Addr)
server.ListenAndServe()
}

0 comments on commit e411a40

Please sign in to comment.