Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic levels for MQTT #27

Merged
merged 8 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This workflow will build a golang project
# This workflow will build a Go project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: ARCTIC CAMIO Lab
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
# Go workspace file
go.work

# xperimental results
# Input source → experimental results
*.txt
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
all: build

build:
go build -o diode -ldflags="-X main.SemVer=0.0.7" diode.go
go build -o diode -ldflags="-X main.SemVer=0.0.8" diode.go

test:
go test -v
Expand Down
14 changes: 0 additions & 14 deletions Pipfile

This file was deleted.

445 changes: 0 additions & 445 deletions Pipfile.lock

This file was deleted.

6 changes: 4 additions & 2 deletions config.yaml → config/settings.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Data Diode Settings
input:
ip: "192.168.1.99"
port: 50000
timeout: 60 # seconds
output:
ip: "192.168.1.20"
port: 503
tls: false
broker:
server: "test.mosquitto.org"
port: 1883
topic: "test/message"
message: "Hello, world."
topic: "diode/telemetry"
47 changes: 39 additions & 8 deletions diode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"os"
"time"

"github.com/acep-uaf/data-diode/insights"
analysis "github.com/acep-uaf/data-diode/insights"
"github.com/acep-uaf/data-diode/utility"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v2"
Expand All @@ -25,29 +25,36 @@ var (

type Configuration struct {
Input struct {
IP string
Port int
IP string
Port int
Timeout int
}
Output struct {
IP string
Port int
TLS bool
}
Broker struct {
Server string
Port int
Topic string
Message string
}
}

func exampleContents() {
sample := utility.ReadLineContent("docs/example.txt")
utility.PrintFileContent(sample)
utility.OutputStatistics(sample)
}

func sampleMetrics(server string, port int) {
fmt.Println(">> Local time: ", time.Now())
fmt.Println(">> UTC time: ", time.Now().UTC())
fmt.Println(">> Value: ", analysis.Value())
}

func main() {
data, err := os.ReadFile("config.yaml")
data, err := os.ReadFile("config/settings.yaml")

if err != nil {
panic(err)
Expand All @@ -68,7 +75,6 @@ func main() {

mqttBrokerIP := config.Broker.Server
mqttBrokerPort := config.Broker.Port
mqttBrokerMessage := config.Broker.Message
mqttBrokerTopic := config.Broker.Topic

app := &cli.App{
Expand Down Expand Up @@ -105,7 +111,7 @@ func main() {
Usage: "Testing state synchronization via diode I/O",
Action: func(tCtx *cli.Context) error {
fmt.Println("----- TEST -----")
analysis.Validation()
exampleContents()
return nil
},
},
Expand Down Expand Up @@ -135,7 +141,32 @@ func main() {
Usage: "MQTT (TCP stream) demo",
Action: func(mCtx *cli.Context) error {
fmt.Println("----- MQTT -----")
utility.Republisher(mqttBrokerIP, mqttBrokerPort, mqttBrokerTopic, mqttBrokerMessage)

location := "docs/example.txt"
fileContent := utility.ReadLineContent(location)

fmt.Println(">> Server: ", mqttBrokerIP)
fmt.Println(">> Topic: ", mqttBrokerTopic)
fmt.Println(">> Port: ", mqttBrokerPort)

start := time.Now()

for i := 1; i <= len(fileContent.Lines); i++ {
utility.Observability(mqttBrokerIP, mqttBrokerPort, mqttBrokerTopic, fileContent.Lines[i])
}

t := time.Now()

elapsed := t.Sub(start)

if len(fileContent.Lines) == 0 {
fmt.Println(">> No message content sent.")
} else if len(fileContent.Lines) == 1 {
fmt.Println(">> Sent message from ", location, " to topic: ", mqttBrokerTopic, " in ", elapsed)
} else {
fmt.Println(">> Sent ", len(fileContent.Lines), " messages from ", location, " to topic: ", mqttBrokerTopic, " in ", elapsed)
}

return nil
},
},
Expand Down
9 changes: 4 additions & 5 deletions diode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"os"
"testing"

insights "github.com/acep-uaf/data-diode/insights"
Expand All @@ -17,11 +18,9 @@ func TestCLI(t *testing.T) {
}

func TestConfiguration(t *testing.T) {
got := Configuration{}
want := Configuration{}

if got != want {
t.Errorf("got %q, want %q", got, want)
_, err := os.Stat("config/settings.yaml")
if os.IsNotExist(err) {
t.Errorf("[!] config.yaml does not exist")
}
}

Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ github.com/urfave/cli/v2 v2.27.1 h1:8xSQ6szndafKVRmfyeUMxkNUJQMjL1F2zmsZ+qHpfho=
github.com/urfave/cli/v2 v2.27.1/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e h1:+SOyEddqYF09QP7vr7CgJ1eti3pY9Fn3LHO1M1r/0sI=
github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
Expand Down
62 changes: 0 additions & 62 deletions mqtt/republish.py

This file was deleted.

61 changes: 61 additions & 0 deletions utility/content.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package utility

import (
"bufio"
"fmt"
"log"
"os"
)

type LineContent struct {
Number int
Content string
}

type FileContent struct {
Lines map[int]string
}

type Readability struct {
Words int
Characters int
Paragraphs int
Sentences int
}

func ReadLineContent(location string) FileContent {
file, err := os.Open(location)
if err != nil {
log.Fatal(err)
}
defer file.Close()

lines := make(map[int]string)

scanner := bufio.NewScanner(file)

lineNumber := 1

for scanner.Scan() {
lineContent := scanner.Text()
lines[lineNumber] = lineContent
lineNumber++
}

if err := scanner.Err(); err != nil {
log.Fatal(err)
}

return FileContent{Lines: lines}
}

func OutputStatistics(content FileContent) {
// ? Contextual information about the file content.
fmt.Println(">> Number of lines: ", len(content.Lines))
}

func PrintFileContent(content FileContent) {
for i := 1; i <= len(content.Lines); i++ {
fmt.Println(">> ", content.Lines[i])
}
}
60 changes: 60 additions & 0 deletions utility/stream.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,68 @@
package utility

import (
"crypto/md5"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

type Message struct {
Index int
Topic string
Payload string
Checksum string
}

var (
counterMutex sync.Mutex
messageCounter int
)

func Craft(topic, payload string) Message {
counterMutex.Lock()
defer counterMutex.Unlock()

// TODO: Independent of the topic, the message counter should be incremented?
messageCounter++

return Message{
Index: messageCounter,
Topic: topic,
Payload: payload,
Checksum: Verification(payload),
}
}

func Observability(server string, port int, topic string, message string) {
broker := fmt.Sprintf("tcp://%s:%d", server, port)
clientID := "go_mqtt_client"

opts := mqtt.NewClientOptions().AddBroker(broker).SetClientID(clientID)
client := mqtt.NewClient(opts)

if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}

defer client.Disconnect(250) // ms

sample := Craft(topic, message)

jsonPackage, err := json.Marshal(sample)
if err != nil {
panic(err)
}

token := client.Publish(topic, 0, false, jsonPackage)
token.Wait()
}

func Republisher(server string, port int, topic string, message string) {
fmt.Println(">> MQTT")
fmt.Println(">> Broker: ", server)
Expand Down Expand Up @@ -58,3 +112,9 @@ func Republisher(server string, port int, topic string, message string) {
time.Sleep(1 * time.Second)

}

func Verification(data string) string {
hash := md5.New()
hash.Write([]byte(data))
return fmt.Sprintf("%x", hash.Sum(nil))
}