Skip to content

Commit

Permalink
* added code to make it more difficult to inadvertently start two
Browse files Browse the repository at this point in the history
  instances of MQTT engine using the same ClientID.
  • Loading branch information
johanix committed Nov 12, 2024
1 parent e847681 commit 5f2741d
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 15 deletions.
2 changes: 1 addition & 1 deletion bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/spf13/viper"
)

func (td *PopData) BootstrapMqttSource(s *tapir.WBGlist, src SourceConf) (*tapir.WBGlist, error) {
func (td *PopData) BootstrapMqttSource(xxxs *tapir.WBGlist, src SourceConf) (*tapir.WBGlist, error) {
// Initialize the API client
api := &tapir.ApiClient{
BaseUrl: fmt.Sprintf(src.BootstrapUrl, src.Bootstrap[0]), // Must specify a valid BaseUrl
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ module tapir-pop

go 1.22.0

replace (
github.com/dnstapir/tapir => ../tapir
)

require (
github.com/dnstapir/tapir v0.0.0-20240927111630-589bd474c6e4
github.com/go-playground/validator/v10 v10.22.1
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func main() {
mqttclientid = "tapir-pop-" + uuid.New().String()
flag.BoolVarP(&tapir.GlobalCF.Debug, "debug", "d", false, "Debug mode")
flag.BoolVarP(&tapir.GlobalCF.Verbose, "verbose", "v", false, "Verbose mode")
flag.StringVarP(&mqttclientid, "client-id", "", mqttclientid, "MQTT client id, default is a random string")
// flag.StringVarP(&mqttclientid, "client-id", "", mqttclientid, "MQTT client id, default is a random string")

flag.Parse()

Expand Down Expand Up @@ -185,7 +185,7 @@ func main() {
POPExiter("Error unmarshalling config into struct: %v", err)
}

fmt.Printf("%s (TAPIR Edge Manager) version %s (%s) starting.\n", appName, appVersion, appDate)
fmt.Printf("%s (TAPIR Policy Processor) version %s (%s) starting.\n", appName, appVersion, appDate)

var stopch = make(chan struct{}, 10)

Expand Down
10 changes: 8 additions & 2 deletions mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ import (

func (pd *PopData) CreateMqttEngine(clientid string, statusch chan tapir.ComponentStatusUpdate, lg *log.Logger) error {
if clientid == "" {
POPExiter("Error starting MQTT Engine: clientid not specified in config")
POPExiter("CreateMqttEngine: Error: clientid not specified in config")
}

if pd.MqttEngine != nil && pd.MqttEngine.ClientID == clientid {
POPExiter("CreateMqttEngine: Error: clientid %s already in use", clientid)
}

var err error
pd.Logger.Printf("Creating MQTT Engine with clientid %s", clientid)
pd.MqttEngine, err = tapir.NewMqttEngine("tapir-pop", clientid, tapir.TapirSub, statusch, lg) // sub, but no pub
Expand All @@ -31,9 +36,10 @@ func (pd *PopData) StartMqttEngine(meng *tapir.MqttEngine) error {
return nil
}

pd.Logger.Printf("StartMqttEngine: starting MQTT Engine")
cmnder, outbox, inbox, err := meng.StartEngine()
if err != nil {
log.Fatalf("Error from StartEngine(): %v", err)
POPExiter("StartMqttEngine: Error from StartEngine(): %v", err)
}
pd.TapirMqttCmdCh = cmnder
pd.TapirMqttPubCh = outbox
Expand Down
14 changes: 7 additions & 7 deletions sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (pd *PopData) ParseSourcesNG() error {
// defer func() {
//pd.Logger.Printf("<--Thread %d: source \"%s\" (%s) is now complete. %d remaining", thread, name, src.Source, threads)
// }()
pd.Logger.Printf("-->Thread %d: parsing source \"%s\" (source %s)", thread, name, src.Source)
pd.Logger.Printf("ParseSourcesNG: Thread %d: parsing source \"%s\" (source %s)", thread, name, src.Source)

newsource := tapir.WBGlist{
Name: src.Name,
Expand Down Expand Up @@ -251,12 +251,12 @@ func (pd *PopData) ParseSourcesNG() error {
pd.Logger.Printf("ParseSources: source \"%s\" is now complete. %d remaining", tmp, threads)
}

if pd.MqttEngine != nil && !pd.TapirMqttEngineRunning {
err := pd.StartMqttEngine(pd.MqttEngine)
if err != nil {
POPExiter("Error starting MQTT Engine: %v", err)
}
}
// if pd.MqttEngine != nil && !pd.TapirMqttEngineRunning {
// err := pd.StartMqttEngine(pd.MqttEngine)
// if err != nil {
// POPExiter("Error starting MQTT Engine: %v", err)
// }
// }

pd.Logger.Printf("ParseSources: static sources done.")

Expand Down
9 changes: 6 additions & 3 deletions statusupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ func (pd *PopData) StatusUpdater(conf *Config, stopch chan struct{}) {
}
pd.Logger.Printf("StatusUpdater: Topic status for MQTT engine %s: %+v", me.Creator, msg)

_, outbox, _, err := me.StartEngine()
if err != nil {
POPExiter("StatusUpdater: Error starting MQTT Engine: %v", err)
var outbox chan tapir.MqttPkgOut
if !me.Running {
_, outbox, _, err = me.StartEngine()
if err != nil {
POPExiter("StatusUpdater: Error starting MQTT Engine: %v", err)
}
}

log.Printf("StatusUpdater: Starting")
Expand Down

0 comments on commit 5f2741d

Please sign in to comment.