diff --git a/.github/workflows/measureMemory.yml b/.github/workflows/measureMemory.yml index a99e43a11..2d9815d64 100644 --- a/.github/workflows/measureMemory.yml +++ b/.github/workflows/measureMemory.yml @@ -276,7 +276,7 @@ jobs: - name: Run a test continue-on-error: true run: | - make test/mem TEST_MEMORY_COAP_GATEWAY_RESOURCE_DATA_SIZE=${{ matrix.resourceDataSize }} TEST_MEMORY_COAP_GATEWAY_TIMEOUT=${{ matrix.timeout }} TEST_MEMORY_COAP_GATEWAY_NUM_DEVICES=${{ matrix.numDevices }} TEST_MEMORY_COAP_GATEWAY_NUM_RESOURCES=${{ matrix.numResources }} | tee >(grep "TestMemoryWithDevices.result:" | sed -e "s/.*TestMemoryWithDevices.result://g" | jq -r -c > out.json) + make test/mem TEST_MEMORY_COAP_GATEWAY_RESOURCE_DATA_SIZE=${{ matrix.resourceDataSize }} TEST_TIMEOUT=${{ matrix.timeout }} TEST_MEMORY_COAP_GATEWAY_NUM_DEVICES=${{ matrix.numDevices }} TEST_MEMORY_COAP_GATEWAY_NUM_RESOURCES=${{ matrix.numResources }} | tee >(grep "TestMemoryWithDevices.result:" | sed -e "s/.*TestMemoryWithDevices.result://g" | jq -r -c > out.json) - name: Dump file if: success() diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6d98cca76..efd2bf7ba 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,12 +22,42 @@ jobs: fail-fast: false matrix: include: + # test with check race with coverage and sonarcloud - name: test cmd: test - codecov: "true" - sonarcloud: "true" + checkRace: "true" + coapGateway: + log: + level: "debug" + dumpBody: "true" + + # test without check race - name: test/norace - cmd: test/norace + cmd: test + coapGateway: + log: + level: "debug" + dumpBody: "true" + + # test without check race with logs from all services + - name: test/norace/logs + cmd: test + coapGateway: + log: + level: "debug" + dumpBody: "true" + grpcGateway: + log: + level: "debug" + dumpBody: "true" + resourceAggregate: + log: + level: "debug" + dumpBody: "true" + identityStore: + log: + level: "debug" + dumpBody: "true" # Steps represent a sequence of tasks that will be executed as part of the job steps: @@ -41,27 +71,55 @@ jobs: with: fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis - - name: Setup kernel for multicast messages, increase read buffer to 8MB - run: sudo sysctl -w net.core.rmem_max=8388608 + - name: Run a test + run: | + make ${{ matrix.cmd }} TEST_CHECK_RACE=${{ matrix.checkRace }} \ + TEST_COAP_GATEWAY_LOG_LEVEL=${{ matrix.coapGateway.log.level }} TEST_COAP_GATEWAY_LOG_DUMP_BODY=${{ matrix.coapGateway.log.dumpBody }} \ + TEST_RESOURCE_AGGREGATE_LOG_LEVEL=${{ matrix.resourceAggregate.log.level }} TEST_RESOURCE_AGGREGATE_LOG_DUMP_BODY=${{ matrix.resourceAggregate.log.dumpBody }} \ + TEST_GRPC_GATEWAY_LOG_LEVEL=${{ matrix.grpcGateway.log.level }} TEST_GRPC_GATEWAY_LOG_DUMP_BODY=${{ matrix.grpcGateway.log.dumpBody }} \ + TEST_IDENTITY_STORE_LOG_LEVEL=${{ matrix.identityStore.log.level }} TEST_IDENTITY_STORE_LOG_DUMP_BODY=${{ matrix.identityStore.log.dumpBody }} - - name: Get file limit - run: sudo sysctl fs.file-max + - name: Prepare upload files + run: | + mkdir -p ./outputs + cp -r .tmp/coverage ./outputs/ + cp -r .tmp/report ./outputs/ + + - name: Upload coverage and report files + uses: actions/upload-artifact@v3 + with: + name: ${{ hashFiles('./outputs') || 'none' }} + path: ./outputs + retention-days: 1 + if-no-files-found: warn - - name: Get hard ulimits - run: ulimit -a + coverage-sonar-cloud-scan: + needs: test + # The type of runner that the job will run on + runs-on: ubuntu-latest + steps: + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - uses: actions/checkout@v3 + with: + fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis - - name: Get soft ulimits open files - run: ulimit -S + - name: Download artifacts + uses: actions/download-artifact@v3 + with: + path: ./outputs - - name: Run a test - run: make ${{ matrix.cmd }} ${{ matrix.args }} + - name: Prepare coverage and report files + run: | + mkdir -p .tmp/coverage + mkdir -p .tmp/report + find ./outputs -name "*.coverage.txt" -exec sh -c 'cp $1 .tmp/coverage/$(echo $1 | sed "s/[\/.]/-/g" ).coverage.txt' _ {} \; + find ./outputs -name "*.report.json" -exec sh -c 'cp $1 .tmp/report/$(echo $1 | sed "s/[\/.]/-/g" ).report.json' _ {} \; - name: Publish the coverage for main branch - if: ${{ github.ref == 'refs/heads/main' && matrix.codecov == 'true' }} + if: ${{ github.ref == 'refs/heads/main' }} run: bash <(curl -s https://codecov.io/bash) - name: SonarCloud Scan - if: ${{ matrix.sonarcloud == 'true' }} uses: SonarSource/sonarcloud-github-action@master env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.vscode/settings.json b/.vscode/settings.json index 775b6368a..a6badd5c2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -25,9 +25,15 @@ "TEST_MEMORY_COAP_GATEWAY_NUM_DEVICES": "1", "TEST_MEMORY_COAP_GATEWAY_NUM_RESOURCES": "1", "TEST_MEMORY_COAP_GATEWAY_EXPECTED_RSS_IN_MB": "1000", - "TEST_MEMORY_COAP_GATEWAY_LOG_LEVEL": "info", - "TEST_MEMORY_COAP_GATEWAY_LOG_DUMP_BODY": "false", "TEST_MEMORY_COAP_GATEWAY_RESOURCE_DATA_SIZE": "200", + "TEST_COAP_GATEWAY_LOG_LEVEL": "info", + "TEST_COAP_GATEWAY_LOG_DUMP_BODY": "false", + "TEST_RESOURCE_AGGREGATE_LOG_LEVEL": "info", + "TEST_RESOURCE_AGGREGATE_LOG_DUMP_BODY": "false", + "TEST_GRPC_GATEWAY_LOG_LEVEL": "info", + "TEST_GRPC_GATEWAY_LOG_DUMP_BODY": "false", + "TEST_IDENTITY_STORE_LOG_LEVEL": "info", + "TEST_IDENTITY_STORE_LOG_DUMP_BODY": "false", // "GODEBUG": "scavtrace=1", // "TEST_COAP_GATEWAY_UDP_ENABLED": "true", // "GOMAXPROCS": 1, diff --git a/Makefile b/Makefile index e942dc73c..d088d660e 100644 --- a/Makefile +++ b/Makefile @@ -11,13 +11,26 @@ GOPATH ?= $(shell go env GOPATH) WORKING_DIRECTORY := $(shell pwd) USER_ID := $(shell id -u) GROUP_ID := $(shell id -g) -TEST_MEMORY_COAP_GATEWAY_LOG_LEVEL ?= info -TEST_MEMORY_COAP_GATEWAY_LOG_DUMP_BODY ?= false +TEST_CHECK_RACE ?= false +ifeq ($(TEST_CHECK_RACE),true) +GO_BUILD_ARG := -race +else +GO_BUILD_ARG := $(GO_BUILD_ARG) +endif +TEST_TIMEOUT ?= 45m +TEST_COAP_GATEWAY_UDP_ENABLED ?= true +TEST_COAP_GATEWAY_LOG_LEVEL ?= info +TEST_COAP_GATEWAY_LOG_DUMP_BODY ?= false +TEST_RESOURCE_AGGREGATE_LOG_LEVEL ?= info +TEST_RESOURCE_AGGREGATE_LOG_DUMP_BODY ?= false +TEST_GRPC_GATEWAY_LOG_LEVEL ?= info +TEST_GRPC_GATEWAY_LOG_DUMP_BODY ?= false +TEST_IDENTITY_STORE_LOG_LEVEL ?= info +TEST_IDENTITY_STORE_LOG_DUMP_BODY ?= false TEST_MEMORY_COAP_GATEWAY_NUM_DEVICES ?= 1 TEST_MEMORY_COAP_GATEWAY_NUM_RESOURCES ?= 1 TEST_MEMORY_COAP_GATEWAY_EXPECTED_RSS_IN_MB ?= 50 TEST_MEMORY_COAP_GATEWAY_RESOURCE_DATA_SIZE ?= 200 -TEST_MEMORY_COAP_GATEWAY_TIMEOUT ?= 120m #$(error MY_FLAG=$(BUILD_TAG)AAA) @@ -204,23 +217,32 @@ test: env @mkdir -p $(WORKING_DIRECTORY)/.tmp/home @mkdir -p $(WORKING_DIRECTORY)/.tmp/home/certificate-authority @mkdir -p $(WORKING_DIRECTORY)/.tmp/report - @$(call RUN-TESTS,hub,./...,-timeout=45m -race -p 1 -v -tags=test,"") - @$(call RUN-TESTS,grpc-gateway-dtls,./grpc-gateway/service,-timeout=45m -race -p 1 -v -tags=test,"TEST_COAP_GATEWAY_UDP_ENABLED=true") + @$(call RUN-TESTS,hub,./...,-timeout=$(TEST_TIMEOUT) $(GO_BUILD_ARG) -p 1 -v -tags=test,\ + TEST_COAP_GATEWAY_LOG_LEVEL=$(TEST_COAP_GATEWAY_LOG_LEVEL) TEST_COAP_GATEWAY_LOG_DUMP_BODY=$(TEST_COAP_GATEWAY_LOG_DUMP_BODY) \ + TEST_RESOURCE_AGGREGATE_LEVEL=$(TEST_RESOURCE_AGGREGATE_LEVEL) TEST_RESOURCE_AGGREGATE_LOG_DUMP_BODY=$(TEST_RESOURCE_AGGREGATE_LOG_DUMP_BODY) \ + TEST_GRPC_GATEWAY_LOG_LEVEL=$(TEST_GRPC_GATEWAY_LOG_LEVEL) TEST_GRPC_GATEWAY_LOG_DUMP_BODY=$(TEST_GRPC_GATEWAY_LOG_DUMP_BODY) \ + TEST_IDENTITY_STORE_LOG_LEVEL=$(TEST_IDENTITY_STORE_LOG_LEVEL) TEST_IDENTITY_STORE_LOG_DUMP_BODY=$(TEST_IDENTITY_STORE_LOG_DUMP_BODY)) + @$(call RUN-TESTS,grpc-gateway-dtls,./grpc-gateway/service,-timeout=$(TEST_TIMEOUT) $(GO_BUILD_ARG) -p 1 -v -tags=test,\ + TEST_COAP_GATEWAY_UDP_ENABLED=$(TEST_COAP_GATEWAY_UDP_ENABLED) \ + TEST_COAP_GATEWAY_LOG_LEVEL=$(TEST_COAP_GATEWAY_LOG_LEVEL) TEST_COAP_GATEWAY_LOG_DUMP_BODY=$(TEST_COAP_GATEWAY_LOG_DUMP_BODY) \ + TEST_RESOURCE_AGGREGATE_LEVEL=$(TEST_RESOURCE_AGGREGATE_LEVEL) TEST_RESOURCE_AGGREGATE_LOG_DUMP_BODY=$(TEST_RESOURCE_AGGREGATE_LOG_DUMP_BODY) \ + TEST_GRPC_GATEWAY_LOG_LEVEL=$(TEST_GRPC_GATEWAY_LOG_LEVEL) TEST_GRPC_GATEWAY_LOG_DUMP_BODY=$(TEST_GRPC_GATEWAY_LOG_DUMP_BODY) \ + TEST_IDENTITY_STORE_LOG_LEVEL=$(TEST_IDENTITY_STORE_LOG_LEVEL) TEST_IDENTITY_STORE_LOG_DUMP_BODY=$(TEST_IDENTITY_STORE_LOG_DUMP_BODY)) .PHONY: test -test/norace: env - @mkdir -p $(WORKING_DIRECTORY)/.tmp/home - @mkdir -p $(WORKING_DIRECTORY)/.tmp/home/certificate-authority - @mkdir -p $(WORKING_DIRECTORY)/.tmp/report - @$(call RUN-TESTS,hub,./...,-timeout=45m -p 1 -v,"") - @$(call RUN-TESTS,grpc-gateway-dtls,./grpc-gateway/service,-timeout=45m -p 1 -v -tags=test,"TEST_COAP_GATEWAY_UDP_ENABLED=true") -.PHONY: test/norace - test/mem: env/test/mem @mkdir -p $(WORKING_DIRECTORY)/.tmp/home @mkdir -p $(WORKING_DIRECTORY)/.tmp/home/certificate-authority @mkdir -p $(WORKING_DIRECTORY)/.tmp/report - @$(call RUN-TESTS,coap-gateway-mem,./coap-gateway/service,-timeout=$(TEST_MEMORY_COAP_GATEWAY_TIMEOUT) -p 1 -v -tags=test_mem,TEST_MEMORY_COAP_GATEWAY_NUM_DEVICES=$(TEST_MEMORY_COAP_GATEWAY_NUM_DEVICES) TEST_MEMORY_COAP_GATEWAY_NUM_RESOURCES=$(TEST_MEMORY_COAP_GATEWAY_NUM_RESOURCES) TEST_MEMORY_COAP_GATEWAY_EXPECTED_RSS_IN_MB=$(TEST_MEMORY_COAP_GATEWAY_EXPECTED_RSS_IN_MB) TEST_MEMORY_COAP_GATEWAY_LOG_LEVEL=$(TEST_MEMORY_COAP_GATEWAY_LOG_LEVEL) TEST_MEMORY_COAP_GATEWAY_LOG_DUMP_BODY=$(TEST_MEMORY_COAP_GATEWAY_LOG_DUMP_BODY) TEST_MEMORY_COAP_GATEWAY_RESOURCE_DATA_SIZE=$(TEST_MEMORY_COAP_GATEWAY_RESOURCE_DATA_SIZE)) + @$(call RUN-TESTS,coap-gateway-mem,./coap-gateway/service,-timeout=$(TEST_TIMEOUT) -p 1 -v -tags=test_mem,\ + TEST_MEMORY_COAP_GATEWAY_NUM_DEVICES=$(TEST_MEMORY_COAP_GATEWAY_NUM_DEVICES) \ + TEST_MEMORY_COAP_GATEWAY_NUM_RESOURCES=$(TEST_MEMORY_COAP_GATEWAY_NUM_RESOURCES) \ + TEST_MEMORY_COAP_GATEWAY_EXPECTED_RSS_IN_MB=$(TEST_MEMORY_COAP_GATEWAY_EXPECTED_RSS_IN_MB) \ + TEST_MEMORY_COAP_GATEWAY_RESOURCE_DATA_SIZE=$(TEST_MEMORY_COAP_GATEWAY_RESOURCE_DATA_SIZE) \ + TEST_COAP_GATEWAY_LOG_LEVEL=$(TEST_COAP_GATEWAY_LOG_LEVEL) TEST_COAP_GATEWAY_LOG_DUMP_BODY=$(TEST_COAP_GATEWAY_LOG_DUMP_BODY) \ + TEST_RESOURCE_AGGREGATE_LEVEL=$(TEST_RESOURCE_AGGREGATE_LEVEL) TEST_RESOURCE_AGGREGATE_LOG_DUMP_BODY=$(TEST_RESOURCE_AGGREGATE_LOG_DUMP_BODY) \ + TEST_GRPC_GATEWAY_LOG_LEVEL=$(TEST_GRPC_GATEWAY_LOG_LEVEL) TEST_GRPC_GATEWAY_LOG_DUMP_BODY=$(TEST_GRPC_GATEWAY_LOG_DUMP_BODY) \ + TEST_IDENTITY_STORE_LOG_LEVEL=$(TEST_IDENTITY_STORE_LOG_LEVEL) TEST_IDENTITY_STORE_LOG_DUMP_BODY=$(TEST_IDENTITY_STORE_LOG_DUMP_BODY)) .PHONY: test/mem test-targets := $(addprefix test-,$(patsubst ./%/,%,$(DIRECTORIES))) diff --git a/charts/plgd-hub/README.md b/charts/plgd-hub/README.md index c3be36729..bbb12853b 100644 --- a/charts/plgd-hub/README.md +++ b/charts/plgd-hub/README.md @@ -53,7 +53,7 @@ global: | Key | Type | Default | Description | |-----|------|---------|-------------| | certificateauthority.affinity | string | `nil` | Affinity definition | -| certificateauthority.apis | object | `{"grpc":{"address":null,"authorization":{"audience":null,"authority":null,"http":{"idleConnTimeout":"30s","maxConnsPerHost":32,"maxIdleConns":16,"maxIdleConnsPerHost":16,"timeout":"10s","tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":true}},"ownerClaim":null},"enforcementPolicy":{"minTime":"5s","permitWithoutStream":true},"keepAlive":{"maxConnectionAge":"0s","maxConnectionAgeGrace":"0s","maxConnectionIdle":"0s","time":"2h","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194305,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":false,"keyFile":null}},"http":{"address":null,"idleTimeout":"30s","readHeaderTimeout":"4s","readTimeout":"8s","writeTimeout":"16s"}}` | For complete certificate-authority service configuration see [plgd/certificate-authority](https://github.com/plgd-dev/hub/tree/main/certificate-authority) | +| certificateauthority.apis | object | `{"grpc":{"address":null,"authorization":{"audience":null,"authority":null,"http":{"idleConnTimeout":"30s","maxConnsPerHost":32,"maxIdleConns":16,"maxIdleConnsPerHost":16,"timeout":"10s","tls":{"caPool":null,"certFile":null,"keyFile":null,"useSystemCAPool":true}},"ownerClaim":null},"enforcementPolicy":{"minTime":"5s","permitWithoutStream":true},"keepAlive":{"maxConnectionAge":"0s","maxConnectionAgeGrace":"0s","maxConnectionIdle":"0s","time":"2h","timeout":"20s"},"recvMsgSize":4194304,"sendMsgSize":4194304,"tls":{"caPool":null,"certFile":null,"clientCertificateRequired":false,"keyFile":null}},"http":{"address":null,"idleTimeout":"30s","readHeaderTimeout":"4s","readTimeout":"8s","writeTimeout":"16s"}}` | For complete certificate-authority service configuration see [plgd/certificate-authority](https://github.com/plgd-dev/hub/tree/main/certificate-authority) | | certificateauthority.ca | object | `{"cert":"tls.crt","key":"tls.key","secret":{"name":null},"volume":{"mountPath":"/certs/coap-device-ca","name":"coap-device-ca"}}` | CA section | | certificateauthority.ca.cert | string | `"tls.crt"` | Cert file name | | certificateauthority.ca.key | string | `"tls.key"` | Cert key file name | @@ -91,6 +91,7 @@ global: | certificateauthority.ingress.http.secretName | string | `nil` | Override name of host/tls secret. If not specified, it will be generated | | certificateauthority.initContainersTpl | string | `nil` | Init containers definition | | certificateauthority.livenessProbe | string | `nil` | Liveness probe. certificate-authority doesn't have any default liveness probe | +| certificateauthority.log.dumpBody | bool | `false` | Dump grpc messages | | certificateauthority.log.encoderConfig.timeEncoder | string | `"rfc3339nano"` | Time format for logs. The supported values are: "rfc3339nano", "rfc3339" | | certificateauthority.log.encoding | string | `"json"` | The supported values are: "json", "console" | | certificateauthority.log.level | string | `"info"` | Logging enabled from level | @@ -298,6 +299,7 @@ global: | grpcgateway.ingress.secretName | string | `nil` | Override name of host/tls secret. If not specified, it will be generated | | grpcgateway.initContainersTpl | object | `{}` | Init containers definition | | grpcgateway.livenessProbe | object | `{}` | Liveness probe. grpc-gateway doesn't have any default liveness probe | +| grpcgateway.log.dumpBody | bool | `false` | Dump grpc messages | | grpcgateway.log.encoderConfig.timeEncoder | string | `"rfc3339nano"` | Time format for logs. The supported values are: "rfc3339nano", "rfc3339" | | grpcgateway.log.encoding | string | `"json"` | The supported values are: "json", "console" | | grpcgateway.log.level | string | `"info"` | Logging enabled from level | @@ -413,6 +415,7 @@ global: | identitystore.imagePullSecrets | object | `{}` | Image pull secrets | | identitystore.initContainersTpl | object | `{}` | Init containers definition. Resolved as template | | identitystore.livenessProbe | object | `{}` | Liveness probe. Identity doesn't have any default liveness probe | +| identitystore.log.dumpBody | bool | `false` | Dump grpc messages | | identitystore.log.encoderConfig.timeEncoder | string | `"rfc3339nano"` | Time format for logs. The supported values are: "rfc3339nano", "rfc3339" | | identitystore.log.encoding | string | `"json"` | The supported values are: "json", "console" | | identitystore.log.level | string | `"info"` | Logging enabled from level | @@ -508,7 +511,7 @@ global: | mockoauthserver.service.targetPort | string | `"http"` | Target port | | mockoauthserver.service.type | string | `"ClusterIP"` | | | mockoauthserver.tolerations | object | `{}` | Toleration definition | -| mongodb | object | `{"arbiter":{"enabled":false},"architecture":"replicaset","auth":{"enabled":false},"customLivenessProbe":{"exec":{"command":["mongosh","--tls","--tlsCertificateKeyFile=/certs/cert.pem","--tlsCAFile=/certs/ca.pem","--eval","db.adminCommand('ping')"]},"failureThreshold":6,"initialDelaySeconds":30,"periodSeconds":10,"successThreshold":1,"timeoutSeconds":5},"customReadinessProbe":{"exec":{"command":["bash","-ec","TLS_OPTIONS='--tls --tlsCertificateKeyFile=/certs/cert.pem --tlsCAFile=/certs/ca.pem'\nmongosh $TLS_OPTIONS --eval 'db.hello().isWritablePrimary || db.hello().secondary' | grep -q 'true'\n"]},"failureThreshold":6,"initialDelaySeconds":5,"periodSeconds":10,"successThreshold":1,"timeoutSeconds":5},"enabled":true,"extraEnvVars":[{"name":"MONGODB_EXTRA_FLAGS","value":"--tlsMode=requireTLS --tlsCertificateKeyFile=/certs/cert.pem --tlsCAFile=/certs/ca.pem"},{"name":"MONGODB_CLIENT_EXTRA_FLAGS","value":"--tls --tlsCertificateKeyFile=/certs/cert.pem --tlsCAFile=/certs/ca.pem"}],"extraVolumeMounts":[{"mountPath":"/certs","name":"mongodb-crt"}],"extraVolumes":[{"emptyDir":{},"name":"mongodb-crt"},{"name":"mongodb-cm-crt","secret":{"secretName":"mongodb-cm-crt"}}],"fullnameOverride":"mongodb","image":{"debug":true,"net":{"port":27017}},"initContainers":[{"command":["sh","-c","/bin/bash <<'EOF'\ncat /tmp/certs/tls.crt >> /certs/cert.pem\ncat /tmp/certs/tls.key >> /certs/cert.pem\ncp /tmp/certs/ca.crt /certs/ca.pem\nEOF\n"],"image":"docker.io/bitnami/nginx:1.20.2-debian-10-r63","imagePullPolicy":"IfNotPresent","name":"convert-cm-crt","volumeMounts":[{"mountPath":"/certs","name":"mongodb-crt"},{"mountPath":"/tmp/certs","name":"mongodb-cm-crt"}]}],"livenessProbe":{"enabled":false},"persistence":{"enabled":true},"readinessProbe":{"enabled":false},"replicaCount":3,"replicaSetName":"rs0","tls":{"enabled":false}}` | External mongodb-replica dependency setup | +| mongodb | object | `{"arbiter":{"enabled":false},"architecture":"replicaset","auth":{"enabled":false},"customLivenessProbe":{"exec":{"command":["mongosh","--tls","--tlsCertificateKeyFile=/certs/cert.pem","--tlsCAFile=/certs/ca.pem","--eval","db.adminCommand('ping')"]},"failureThreshold":6,"initialDelaySeconds":30,"periodSeconds":20,"successThreshold":1,"timeoutSeconds":10},"customReadinessProbe":{"exec":{"command":["bash","-ec","TLS_OPTIONS='--tls --tlsCertificateKeyFile=/certs/cert.pem --tlsCAFile=/certs/ca.pem'\nmongosh $TLS_OPTIONS --eval 'db.hello().isWritablePrimary || db.hello().secondary' | grep -q 'true'\n"]},"failureThreshold":6,"initialDelaySeconds":10,"periodSeconds":20,"successThreshold":1,"timeoutSeconds":10},"enabled":true,"extraEnvVars":[{"name":"MONGODB_EXTRA_FLAGS","value":"--tlsMode=requireTLS --tlsCertificateKeyFile=/certs/cert.pem --tlsCAFile=/certs/ca.pem"},{"name":"MONGODB_CLIENT_EXTRA_FLAGS","value":"--tls --tlsCertificateKeyFile=/certs/cert.pem --tlsCAFile=/certs/ca.pem"}],"extraVolumeMounts":[{"mountPath":"/certs","name":"mongodb-crt"}],"extraVolumes":[{"emptyDir":{},"name":"mongodb-crt"},{"name":"mongodb-cm-crt","secret":{"secretName":"mongodb-cm-crt"}}],"fullnameOverride":"mongodb","image":{"debug":true,"net":{"port":27017}},"initContainers":[{"command":["sh","-c","/bin/bash <<'EOF'\ncat /tmp/certs/tls.crt >> /certs/cert.pem\ncat /tmp/certs/tls.key >> /certs/cert.pem\ncp /tmp/certs/ca.crt /certs/ca.pem\nEOF\n"],"image":"docker.io/bitnami/nginx:1.20.2-debian-10-r63","imagePullPolicy":"IfNotPresent","name":"convert-cm-crt","volumeMounts":[{"mountPath":"/certs","name":"mongodb-crt"},{"mountPath":"/tmp/certs","name":"mongodb-cm-crt"}]}],"livenessProbe":{"enabled":false},"persistence":{"enabled":true},"readinessProbe":{"enabled":false},"replicaCount":3,"replicaSetName":"rs0","tls":{"enabled":false}}` | External mongodb-replica dependency setup | | nats | object | `{"cluster":{"enabled":false,"noAdvertise":false},"enabled":true,"leafnodes":{"enabled":false,"noAdvertise":false},"nats":{"tls":{"ca":"ca.crt","cert":"tls.crt","key":"tls.key","secret":{"name":"nats-service-crt"},"verify":true}},"natsbox":{"enabled":false}}` | External nats dependency setup | | resourceaggregate.affinity | object | `{}` | Affinity definition | | resourceaggregate.apis.grpc.address | string | `nil` | | @@ -558,6 +561,7 @@ global: | resourceaggregate.imagePullSecrets | object | `{}` | Image pull secrets | | resourceaggregate.initContainersTpl | object | `{}` | Init containers definition. Resolved as template | | resourceaggregate.livenessProbe | object | `{}` | Liveness probe. resource-aggregate doesn't have any default liveness probe | +| resourceaggregate.log.dumpBody | bool | `false` | Dump grpc messages | | resourceaggregate.log.encoderConfig.timeEncoder | string | `"rfc3339nano"` | Time format for logs. The supported values are: "rfc3339nano", "rfc3339" | | resourceaggregate.log.encoding | string | `"json"` | The supported values are: "json", "console" | | resourceaggregate.log.level | string | `"info"` | Logging enabled from level | @@ -608,7 +612,8 @@ global: | resourcedirectory.image.tag | string | `nil` | Image tag. | | resourcedirectory.initContainersTpl | object | `{}` | Init containers definition. Resolved as template | | resourcedirectory.livenessProbe | object | `{}` | Liveness probe. resource-directory doesn't have any default liveness probe | -| resourcedirectory.log | object | `{"encoderConfig":{"timeEncoder":"rfc3339nano"},"encoding":"json","level":"info","stacktrace":{"enabled":false,"level":"warn"}}` | Log section | +| resourcedirectory.log | object | `{"dumpBody":false,"encoderConfig":{"timeEncoder":"rfc3339nano"},"encoding":"json","level":"info","stacktrace":{"enabled":false,"level":"warn"}}` | Log section | +| resourcedirectory.log.dumpBody | bool | `false` | Dump grpc messages | | resourcedirectory.log.encoderConfig.timeEncoder | string | `"rfc3339nano"` | Time format for logs. The supported values are: "rfc3339nano", "rfc3339" | | resourcedirectory.log.encoding | string | `"json"` | The supported values are: "json", "console" | | resourcedirectory.log.level | string | `"info"` | Logging enabled from level | diff --git a/charts/plgd-hub/templates/certificate-authority/config.yaml b/charts/plgd-hub/templates/certificate-authority/config.yaml index b777c88c8..bed4427db 100644 --- a/charts/plgd-hub/templates/certificate-authority/config.yaml +++ b/charts/plgd-hub/templates/certificate-authority/config.yaml @@ -10,6 +10,7 @@ data: {{- with .Values.certificateauthority }} log: level: {{ .log.level }} + dumpBody: {{ .log.dumpBody }} encoding: {{ .log.encoding }} stacktrace: enabled: {{ .log.stacktrace.enabled }} diff --git a/charts/plgd-hub/templates/grpc-gateway/config.yaml b/charts/plgd-hub/templates/grpc-gateway/config.yaml index f0a70eda1..912a7c14a 100644 --- a/charts/plgd-hub/templates/grpc-gateway/config.yaml +++ b/charts/plgd-hub/templates/grpc-gateway/config.yaml @@ -10,6 +10,7 @@ data: {{- with .Values.grpcgateway }} log: level: {{ .log.level }} + dumpBody: {{ .log.dumpBody }} encoding: {{ .log.encoding }} stacktrace: enabled: {{ .log.stacktrace.enabled }} diff --git a/charts/plgd-hub/templates/identity-store/config.yaml b/charts/plgd-hub/templates/identity-store/config.yaml index 702fd90b3..463e7bc02 100644 --- a/charts/plgd-hub/templates/identity-store/config.yaml +++ b/charts/plgd-hub/templates/identity-store/config.yaml @@ -10,6 +10,7 @@ data: {{- with .Values.identitystore }} log: level: {{ .log.level }} + dumpBody: {{ .log.dumpBody }} encoding: {{ .log.encoding }} stacktrace: enabled: {{ .log.stacktrace.enabled }} diff --git a/charts/plgd-hub/templates/resource-aggregate/config.yaml b/charts/plgd-hub/templates/resource-aggregate/config.yaml index ae55b0242..bd8bc9a73 100644 --- a/charts/plgd-hub/templates/resource-aggregate/config.yaml +++ b/charts/plgd-hub/templates/resource-aggregate/config.yaml @@ -10,6 +10,7 @@ data: {{- with .Values.resourceaggregate }} log: level: {{ .log.level }} + dumpBody: {{ .log.dumpBody }} encoding: {{ .log.encoding }} stacktrace: enabled: {{ .log.stacktrace.enabled }} diff --git a/charts/plgd-hub/templates/resource-directory/config.yaml b/charts/plgd-hub/templates/resource-directory/config.yaml index cbf00445c..298205811 100644 --- a/charts/plgd-hub/templates/resource-directory/config.yaml +++ b/charts/plgd-hub/templates/resource-directory/config.yaml @@ -10,6 +10,7 @@ data: {{- with .Values.resourcedirectory }} log: level: {{ .log.level }} + dumpBody: {{ .log.dumpBody }} encoding: {{ .log.encoding }} stacktrace: enabled: {{ .log.stacktrace.enabled }} diff --git a/charts/plgd-hub/values.yaml b/charts/plgd-hub/values.yaml index 733c4cf28..a5163f143 100644 --- a/charts/plgd-hub/values.yaml +++ b/charts/plgd-hub/values.yaml @@ -391,6 +391,8 @@ resourcedirectory: log: # -- Logging enabled from level level: info + # -- Dump grpc messages + dumpBody: false # -- The supported values are: "json", "console" encoding: json stacktrace: @@ -610,6 +612,8 @@ resourceaggregate: log: # -- Logging enabled from level level: info + # -- Dump grpc messages + dumpBody: false # -- The supported values are: "json", "console" encoding: json stacktrace: @@ -1029,6 +1033,8 @@ identitystore: log: # -- Logging enabled from level level: info + # -- Dump grpc messages + dumpBody: false # -- The supported values are: "json", "console" encoding: json stacktrace: @@ -1416,6 +1422,8 @@ grpcgateway: log: # -- Logging enabled from level level: info + # -- Dump grpc messages + dumpBody: false # -- The supported values are: "json", "console" encoding: json stacktrace: @@ -1686,6 +1694,8 @@ certificateauthority: log: # -- Logging enabled from level level: info + # -- Dump grpc messages + dumpBody: false # -- The supported values are: "json", "console" encoding: json stacktrace: diff --git a/cloud2cloud-gateway/service/subscribeToDevice_test.go b/cloud2cloud-gateway/service/subscribeToDevice_test.go index cae46e042..45e4bfd6e 100644 --- a/cloud2cloud-gateway/service/subscribeToDevice_test.go +++ b/cloud2cloud-gateway/service/subscribeToDevice_test.go @@ -139,7 +139,6 @@ func TestRequestHandlerSubscribeToDevice(t *testing.T) { switchResources := test.AddDeviceSwitchResources(ctx, t, deviceID, c, switchIDs...) publishedSwitches := test.ResourceLinksToResources(deviceID, switchResources) events = c2cTest.WaitForEvents(dataChan, 3*time.Second) - require.Len(t, events, len(switchIDs)) var links schema.ResourceLinks for _, ev := range events { require.Equal(t, c2cEvents.EventType_ResourcesPublished, ev.GetHeader().EventType) diff --git a/coap-gateway/cmd/service/main.go b/coap-gateway/cmd/service/main.go index c95ed1f76..fbfa9c69c 100644 --- a/coap-gateway/cmd/service/main.go +++ b/coap-gateway/cmd/service/main.go @@ -35,7 +35,7 @@ func main() { if err != nil { log.Fatalf("cannot load config: %v", err) } - logger := log.NewLogger(cfg.Log.Config) + logger := log.NewLogger(cfg.Log) log.Set(logger) logger.Infof("config: %v", cfg.String()) diff --git a/coap-gateway/service/clientCreateHandler_test.go b/coap-gateway/service/clientCreateHandler_test.go index e1d35a8ec..344b04c8e 100644 --- a/coap-gateway/service/clientCreateHandler_test.go +++ b/coap-gateway/service/clientCreateHandler_test.go @@ -28,9 +28,7 @@ func TestClientCreateHandler(t *testing.T) { coapgwCfg.Log.Level = zap.DebugLevel shutdown := setUp(t, coapgwCfg) defer shutdown() - - log.Setup(coapgwCfg.Log.Config) - + log.Setup(coapgwCfg.Log) co := testCoapDial(t, "", false, time.Now().Add(time.Minute)) if co == nil { return @@ -79,7 +77,6 @@ func TestClientCreateHandler(t *testing.T) { } testPrepareDevice(t, co) - time.Sleep(time.Second) // for publish content of device resources for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/coap-gateway/service/clientDeleteHandler_test.go b/coap-gateway/service/clientDeleteHandler_test.go index d65b220b5..3cff53694 100644 --- a/coap-gateway/service/clientDeleteHandler_test.go +++ b/coap-gateway/service/clientDeleteHandler_test.go @@ -68,7 +68,6 @@ func TestClientDeleteHandler(t *testing.T) { } testPrepareDevice(t, co) - time.Sleep(time.Second) // for publish content of device resources for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/coap-gateway/service/clientObserveHandler_test.go b/coap-gateway/service/clientObserveHandler_test.go index abe2cc893..8710f099e 100644 --- a/coap-gateway/service/clientObserveHandler_test.go +++ b/coap-gateway/service/clientObserveHandler_test.go @@ -96,7 +96,6 @@ func TestClientObserveHandler(t *testing.T) { } testPrepareDevice(t, co) - time.Sleep(time.Second) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/coap-gateway/service/clientResetHandler_test.go b/coap-gateway/service/clientResetHandler_test.go index b06e92d26..38ce37c42 100644 --- a/coap-gateway/service/clientResetHandler_test.go +++ b/coap-gateway/service/clientResetHandler_test.go @@ -70,7 +70,6 @@ func TestClientResetHandler(t *testing.T) { } testPrepareDevice(t, co) - time.Sleep(time.Second) messagePool := pool.New(0, 0) for _, tt := range tests { diff --git a/coap-gateway/service/clientRetrieveHandler_test.go b/coap-gateway/service/clientRetrieveHandler_test.go index 119c04a62..f391860c3 100644 --- a/coap-gateway/service/clientRetrieveHandler_test.go +++ b/coap-gateway/service/clientRetrieveHandler_test.go @@ -68,7 +68,6 @@ func TestClientRetrieveHandler(t *testing.T) { } testPrepareDevice(t, co) - time.Sleep(time.Second) // for publish content of device resources for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/coap-gateway/service/clientUpdateHandler_test.go b/coap-gateway/service/clientUpdateHandler_test.go index ba17bbe7e..04d06dee6 100644 --- a/coap-gateway/service/clientUpdateHandler_test.go +++ b/coap-gateway/service/clientUpdateHandler_test.go @@ -72,7 +72,6 @@ func TestClientUpdateHandler(t *testing.T) { } testPrepareDevice(t, co) - time.Sleep(time.Second) // for publish content of device resources for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/coap-gateway/service/config.go b/coap-gateway/service/config.go index 20ee10bca..33337e6d2 100644 --- a/coap-gateway/service/config.go +++ b/coap-gateway/service/config.go @@ -39,11 +39,7 @@ func (c *Config) Validate() error { return nil } -// Config represent application configuration -type LogConfig struct { - log.Config `yaml:",inline"` - DumpBody bool `yaml:"dumpBody" json:"dumpBody"` -} +type LogConfig = log.Config type APIsConfig struct { COAP COAPConfig `yaml:"coap" json:"coap"` diff --git a/coap-gateway/service/mem_test.go b/coap-gateway/service/mem_test.go index c627c6adc..ca98578d0 100644 --- a/coap-gateway/service/mem_test.go +++ b/coap-gateway/service/mem_test.go @@ -12,7 +12,6 @@ import ( "runtime" "runtime/debug" "strconv" - "strings" "sync" "syscall" "testing" @@ -34,8 +33,6 @@ import ( "github.com/plgd-dev/kit/v2/codec/json" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) @@ -163,14 +160,10 @@ func TestMemoryWithDevices(t *testing.T) { require.NoError(t, err) resourceDataSize, err := strconv.Atoi(os.Getenv("TEST_MEMORY_COAP_GATEWAY_RESOURCE_DATA_SIZE")) require.NoError(t, err) - - logLvl, err := zap.ParseAtomicLevel(os.Getenv("TEST_MEMORY_COAP_GATEWAY_LOG_LEVEL")) - require.NoError(t, err) - logDumpBody := strings.ToLower(os.Getenv("TEST_MEMORY_COAP_GATEWAY_LOG_DUMP_BODY")) == "true" - testDevices(t, numDevices, numResources, expRSSInMB, logLvl.Level(), logDumpBody, resourceDataSize) + testDevices(t, numDevices, numResources, expRSSInMB, resourceDataSize) } -func testDevices(t *testing.T, numDevices, numResources, expRSSInMB int, logLvl zapcore.Level, logDumpBody bool, resourceDataSize int) { +func testDevices(t *testing.T, numDevices, numResources, expRSSInMB int, resourceDataSize int) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -238,12 +231,10 @@ func testDevices(t *testing.T, numDevices, numResources, expRSSInMB int, logLvl ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t)) cfg := coapgwTest.MakeConfig(t) - cfg.Log.DumpBody = logDumpBody - cfg.Log.Level = logLvl cfg.APIs.COAP.MaxMessageSize = uint32(numResources*(resourceDataSize+1024) + 8*1024) cfg.APIs.COAP.BlockwiseTransfer.Enabled = true - logger := log.NewLogger(cfg.Log.Config) + logger := log.NewLogger(cfg.Log) fileWatcher, err := fsnotify.NewWatcher() require.NoError(t, err) @@ -363,8 +354,8 @@ func testDevices(t *testing.T, numDevices, numResources, expRSSInMB int, logLvl ExpectedMemRSS: expRSSInMB, CurrentMemRSS: int(rssMb), InitMemRSS: int(beforeTestRSSMB), - LogLevel: logLvl.String(), - LogDumpBody: logDumpBody, + LogLevel: cfg.Log.Level.String(), + LogDumpBody: cfg.Log.DumpBody, Duration: duration, ResourceDataSize: resourceDataSize, } diff --git a/coap-gateway/service/utils_test.go b/coap-gateway/service/utils_test.go index 897a63d73..7dd431b09 100644 --- a/coap-gateway/service/utils_test.go +++ b/coap-gateway/service/utils_test.go @@ -272,6 +272,7 @@ func testPrepareDevice(t *testing.T, co *coapTcpClient.Conn) { for _, tt := range publishResEl { testPostHandler(t, uri.ResourceDirectory, tt, co) } + time.Sleep(time.Second) // for publish content of device resources } func testCoapDial(t *testing.T, deviceID string, withTLS bool, validTo time.Time) *coapTcpClient.Conn { diff --git a/coap-gateway/test/test.go b/coap-gateway/test/test.go index 017d2fb5c..96916445c 100644 --- a/coap-gateway/test/test.go +++ b/coap-gateway/test/test.go @@ -12,14 +12,11 @@ import ( coapService "github.com/plgd-dev/hub/v2/pkg/net/coap/service" "github.com/plgd-dev/hub/v2/test/config" "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" ) func MakeConfig(t require.TestingT) service.Config { var cfg service.Config - cfg.Log.Config = log.MakeDefaultConfig() - cfg.Log.DumpBody = true - cfg.Log.Level = zapcore.DebugLevel + cfg.Log = config.MakeLogConfig(t, "TEST_COAP_GATEWAY_LOG_LEVEL", "TEST_COAP_GATEWAY_LOG_DUMP_BODY") cfg.TaskQueue.GoPoolSize = 1600 cfg.TaskQueue.Size = 2 * 1024 * 1024 cfg.APIs.COAP.Addr = config.COAP_GW_HOST @@ -71,7 +68,7 @@ func SetUp(t require.TestingT) (tearDown func()) { // New creates test coap-gateway. func New(t require.TestingT, cfg service.Config) func() { ctx := context.Background() - logger := log.NewLogger(cfg.Log.Config) + logger := log.NewLogger(cfg.Log) fileWatcher, err := fsnotify.NewWatcher() require.NoError(t, err) diff --git a/grpc-gateway/service/getResourceFromDevice_test.go b/grpc-gateway/service/getResourceFromDevice_test.go index c12f79a5e..09087035b 100644 --- a/grpc-gateway/service/getResourceFromDevice_test.go +++ b/grpc-gateway/service/getResourceFromDevice_test.go @@ -150,6 +150,8 @@ func TestRequestHandlerGetResourceFromDevice(t *testing.T) { _, shutdownDevSim := test.OnboardDevSim(ctx, t, c, deviceID, config.ACTIVE_COAP_SCHEME+"://"+config.COAP_GW_HOST, test.GetAllBackendResourceLinks()) defer shutdownDevSim() test.AddDeviceSwitchResources(ctx, t, deviceID, c, switchID) + // for update resource-directory cache + time.Sleep(time.Second) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/grpc-gateway/test/test.go b/grpc-gateway/test/test.go index a4a33e523..468aad325 100644 --- a/grpc-gateway/test/test.go +++ b/grpc-gateway/test/test.go @@ -15,7 +15,7 @@ import ( func MakeConfig(t require.TestingT) service.Config { var cfg service.Config - cfg.Log = log.MakeDefaultConfig() + cfg.Log = config.MakeLogConfig(t, "TEST_GRPC_GATEWAY_LOG_LEVEL", "TEST_GRPC_GATEWAY_LOG_DUMP_BODY") cfg.APIs.GRPC.Config = config.MakeGrpcServerConfig(config.GRPC_GW_HOST) cfg.APIs.GRPC.OwnerCacheExpiration = time.Minute diff --git a/identity-store/service/addDevice.go b/identity-store/service/addDevice.go index 4423e03a9..d19f38500 100644 --- a/identity-store/service/addDevice.go +++ b/identity-store/service/addDevice.go @@ -13,12 +13,26 @@ import ( "github.com/plgd-dev/hub/v2/pkg/opentelemetry/propagation" "github.com/plgd-dev/hub/v2/pkg/security/jwt" pkgTime "github.com/plgd-dev/hub/v2/pkg/time" + "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/publisher" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/utils" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -func (s *Service) publishDevicesRegistered(ctx context.Context, owner, userID string, deviceID []string) error { +func (s *Service) publishEvent(subject string, event utils.ProtobufMarshaler) error { + data, err := utils.Marshal(event) + if err != nil { + return err + } + err = s.publisher.PublishData(subject, data) + if err != nil { + return err + } + // timeout si driven by flusherTimeout. + return s.publisher.Flush(context.Background()) +} + +func (s *Service) publishDevicesRegistered(ctx context.Context, owner, userID string, deviceID []string) { v := events.Event{ Type: &events.Event_DevicesRegistered{ DevicesRegistered: &events.DevicesRegistered{ @@ -32,22 +46,9 @@ func (s *Service) publishDevicesRegistered(ctx context.Context, owner, userID st }, }, } - data, err := utils.Marshal(&v) - if err != nil { - return err - } - - err = s.publisher.PublishData(events.GetDevicesRegisteredSubject(owner), data) - if err != nil { - return err - } - - // timeout si driven by flusherTimeout. - err = s.publisher.Flush(context.Background()) - if err != nil { - return err - } - return nil + subject := events.GetDevicesRegisteredSubject(owner) + err := s.publishEvent(subject, &v) + publisher.LogPublish(s.logger, &v, []string{subject}, err) } func (s *Service) parseTokenMD(ctx context.Context) (owner, subject string, err error) { @@ -104,10 +105,7 @@ func (s *Service) AddDevice(ctx context.Context, request *pb.AddDeviceRequest) ( return nil, log.LogAndReturnError(status.Errorf(codes.Internal, "cannot add device up: %v", err.Error())) } - err = s.publishDevicesRegistered(ctx, owner, userID, []string{request.DeviceId}) - if err != nil { - log.Errorf("cannot publish devices registered event with device('%v') and owner('%v'): %w", request.DeviceId, owner, err) - } + s.publishDevicesRegistered(ctx, owner, userID, []string{request.DeviceId}) return &pb.AddDeviceResponse{}, nil } diff --git a/identity-store/service/deleteDevices.go b/identity-store/service/deleteDevices.go index 8891f3d2b..2cda6acfa 100644 --- a/identity-store/service/deleteDevices.go +++ b/identity-store/service/deleteDevices.go @@ -12,7 +12,7 @@ import ( "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/pkg/opentelemetry/propagation" pkgTime "github.com/plgd-dev/hub/v2/pkg/time" - "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/utils" + "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/publisher" "github.com/plgd-dev/kit/v2/strings" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -39,7 +39,7 @@ func getOwnerDevices(tx persistence.PersistenceTx, owner string) ([]string, erro return deviceIds, nil } -func (s *Service) publishDevicesUnregistered(ctx context.Context, owner, userID string, deviceIDs []string) error { +func (s *Service) publishDevicesUnregistered(ctx context.Context, owner, userID string, deviceIDs []string) { v := events.Event{ Type: &events.Event_DevicesUnregistered{ DevicesUnregistered: &events.DevicesUnregistered{ @@ -53,22 +53,10 @@ func (s *Service) publishDevicesUnregistered(ctx context.Context, owner, userID }, }, } - data, err := utils.Marshal(&v) - if err != nil { - return err - } - - err = s.publisher.PublishData(events.GetDevicesUnregisteredSubject(owner), data) - if err != nil { - return err - } - // timeout si driven by flusherTimeout. - err = s.publisher.Flush(context.Background()) - if err != nil { - return err - } - return nil + subject := events.GetDevicesUnregisteredSubject(owner) + err := s.publishEvent(subject, &v) + publisher.LogPublish(s.logger, &v, []string{subject}, err) } func getDeviceIDs(request *pb.DeleteDevicesRequest, tx persistence.PersistenceTx, owner string) ([]string, error) { @@ -133,9 +121,7 @@ func (s *Service) DeleteDevices(ctx context.Context, request *pb.DeleteDevicesRe deletedDeviceIDs = append(deletedDeviceIDs, deviceID) } - if err := s.publishDevicesUnregistered(ctx, owner, userID, deletedDeviceIDs); err != nil { - log.Errorf("cannot publish devices unregistered event with devices('%v') and owner('%v'): %w", deletedDeviceIDs, owner, err) - } + s.publishDevicesUnregistered(ctx, owner, userID, deletedDeviceIDs) return &pb.DeleteDevicesResponse{ DeviceIds: deletedDeviceIDs, diff --git a/identity-store/service/service.go b/identity-store/service/service.go index 420911819..b215ee832 100644 --- a/identity-store/service/service.go +++ b/identity-store/service/service.go @@ -32,6 +32,7 @@ type Service struct { persistence Persistence publisher *publisher.Publisher ownerClaim string + logger log.Logger } // Server is an HTTP server for the Service. @@ -41,11 +42,12 @@ type Server struct { cfg Config } -func NewService(persistence Persistence, publisher *publisher.Publisher, ownerClaim string) *Service { +func NewService(persistence Persistence, publisher *publisher.Publisher, ownerClaim string, logger log.Logger) *Service { return &Service{ persistence: persistence, ownerClaim: ownerClaim, publisher: publisher, + logger: logger, } } @@ -71,7 +73,7 @@ func NewServer(ctx context.Context, cfg Config, fileWatcher *fsnotify.Watcher, l } }) - service := NewService(persistence, publisher, cfg.APIs.GRPC.Authorization.OwnerClaim) + service := NewService(persistence, publisher, cfg.APIs.GRPC.Authorization.OwnerClaim, logger) pb.RegisterIdentityStoreServer(grpcServer.Server, service) diff --git a/identity-store/test/config.go b/identity-store/test/config.go index 422744a08..cacc57d59 100644 --- a/identity-store/test/config.go +++ b/identity-store/test/config.go @@ -2,7 +2,6 @@ package test import ( "github.com/plgd-dev/hub/v2/identity-store/service" - "github.com/plgd-dev/hub/v2/pkg/log" "github.com/plgd-dev/hub/v2/test/config" "github.com/stretchr/testify/require" ) @@ -10,7 +9,7 @@ import ( func MakeConfig(t require.TestingT) service.Config { var cfg service.Config - cfg.Log = log.MakeDefaultConfig() + cfg.Log = config.MakeLogConfig(t, "TEST_IDENTITY_STORE_LOG_LEVEL", "TEST_IDENTITY_STORE_LOG_DUMP_BODY") cfg.APIs.GRPC = config.MakeGrpcServerConfig(config.IDENTITY_STORE_HOST) diff --git a/pkg/log/log.go b/pkg/log/log.go index fe7bd004f..17baaafd9 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -185,7 +185,7 @@ type StacktraceConfig struct { } func (c *StacktraceConfig) Validate() error { - if c.Level < zapcore.DebugLevel || c.Level > zap.FatalLevel { + if c.Level < DebugLevel || c.Level > FatalLevel { return fmt.Errorf("level('%v')", c.Level) } return nil @@ -206,10 +206,13 @@ type Config struct { // zapcore. EncoderConfig EncoderConfig `json:"encoderConfig" yaml:"encoderConfig"` // zap.Config `yaml:",inline"` + + // DumpBody if true, dump request/response/stream body to log. + DumpBody bool `json:"dumpBody" yaml:"dumpBody"` } func (c *Config) Validate() error { - if c.Level < zapcore.DebugLevel || c.Level > zap.FatalLevel { + if c.Level < DebugLevel || c.Level > FatalLevel { return fmt.Errorf("level('%v')", c.Level) } if err := c.Stacktrace.Validate(); err != nil { @@ -245,6 +248,7 @@ type Logger interface { LogAndReturnError(err error) error Config() Config Check(lvl zapcore.Level) bool + GetLogFunc(lvl zapcore.Level) func(args ...interface{}) DTLSLoggerFactory() logging.LoggerFactory } @@ -278,6 +282,25 @@ func (l *WrapSuggarLogger) Check(lvl zapcore.Level) bool { return l.logger.Desugar().Core().Enabled(lvl) } +var getLogFuncMap = map[zapcore.Level]func(l *WrapSuggarLogger) func(args ...interface{}){ + DebugLevel: func(l *WrapSuggarLogger) func(args ...interface{}) { return l.Debug }, + InfoLevel: func(l *WrapSuggarLogger) func(args ...interface{}) { return l.Info }, + WarnLevel: func(l *WrapSuggarLogger) func(args ...interface{}) { return l.Warn }, + ErrorLevel: func(l *WrapSuggarLogger) func(args ...interface{}) { return l.Error }, + FatalLevel: func(l *WrapSuggarLogger) func(args ...interface{}) { return l.Fatal }, +} + +var emptyLogFunc = func(args ...interface{}) { + // do nothing +} + +func (l *WrapSuggarLogger) GetLogFunc(lvl zapcore.Level) func(args ...interface{}) { + if f, ok := getLogFuncMap[lvl]; ok { + return f(l) + } + return emptyLogFunc +} + func (l *WrapSuggarLogger) with(args ...interface{}) *WrapSuggarLogger { return &WrapSuggarLogger{ logger: l.logger.With(args...), @@ -481,12 +504,12 @@ func NewLogger(config Config) *WrapSuggarLogger { ) opts := make([]zap.Option, 0, 16) if config.Stacktrace.Enabled { - opts = append(opts, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.WarnLevel))) + opts = append(opts, zap.AddStacktrace(zap.NewAtomicLevelAt(WarnLevel))) } // From a zapcore.Core, it's easy to construct a Logger. logger := zap.New(core, opts...) - return &WrapSuggarLogger{logger: logger.Sugar()} + return &WrapSuggarLogger{logger: logger.Sugar(), config: config} } func Get() Logger { diff --git a/pkg/log/logKeys.go b/pkg/log/logKeys.go index d8a35c4a5..caf1246d7 100644 --- a/pkg/log/logKeys.go +++ b/pkg/log/logKeys.go @@ -6,7 +6,6 @@ var ( CorrelationIDKey = "correlationId" DeviceIDKey = "deviceId" ResourceHrefKey = "href" - JWTSubKey = "jwt.sub" CommandFilterKey = "commandFilter" DeviceIDFilterKey = "deviceIdFilter" ResourceIDFilterKey = "resourceIdFilter" @@ -27,6 +26,8 @@ var ( ErrorKey = "error" X509Key = "x509" TraceIDKey = "traceId" + MessageKey = "message" + SubjectsKey = "subjects" ) func DurationToMilliseconds(duration time.Duration) float32 { diff --git a/pkg/net/grpc/server/logInterceptors.go b/pkg/net/grpc/server/logInterceptors.go new file mode 100644 index 000000000..4476d3fe3 --- /dev/null +++ b/pkg/net/grpc/server/logInterceptors.go @@ -0,0 +1,316 @@ +package server + +import ( + context "context" + "encoding/json" + "errors" + "io" + "strings" + "time" + + "github.com/google/uuid" + "github.com/plgd-dev/hub/v2/http-gateway/serverMux" + "github.com/plgd-dev/hub/v2/pkg/log" + kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" + "github.com/plgd-dev/hub/v2/resource-aggregate/commands" + "github.com/plgd-dev/hub/v2/resource-aggregate/events" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func tryToMoveData(req interface{}) interface{} { + switch v := req.(type) { + case interface { + GetData() *events.ResourceUpdated + }: + return v.GetData() + case interface { + GetData() *events.ResourceRetrieved + }: + return v.GetData() + case interface { + GetData() *events.ResourceChanged + }: + return v.GetData() + case interface { + GetData() *events.ResourceDeleted + }: + return v.GetData() + case interface { + GetData() *events.ResourceCreated + }: + return v.GetData() + case interface { + GetData() *events.DeviceMetadataUpdated + }: + return v.GetData() + } + return req +} + +func FillLoggerWithDeviceIDHrefCorrelationID(logger log.Logger, req interface{}) log.Logger { + if req == nil { + return logger + } + req = tryToMoveData(req) + if d, ok := req.(interface{ GetDeviceId() string }); ok && d.GetDeviceId() != "" { + logger = logger.With(log.DeviceIDKey, d.GetDeviceId()) + } + if r, ok := req.(interface{ GetResourceId() *commands.ResourceId }); ok { + logger = logger.With(log.DeviceIDKey, r.GetResourceId().GetDeviceId(), log.ResourceHrefKey, r.GetResourceId().GetHref()) + } + if r, ok := req.(interface{ GetCorrelationId() string }); ok && r.GetCorrelationId() != "" { + logger = logger.With(log.CorrelationIDKey, r.GetCorrelationId()) + } + if r, ok := req.(interface{ GetAuditContext() *commands.AuditContext }); ok && r.GetAuditContext().GetCorrelationId() != "" { + logger = logger.With(log.CorrelationIDKey, r.GetAuditContext().GetCorrelationId()) + } + + return logger +} + +func DecodeToJsonObject(m interface{}) map[string]interface{} { + if m == nil { + return nil + } + marshaler := serverMux.NewJsonMarshaler() + marshaler.JSONPb.MarshalOptions.EmitUnpopulated = false + data, err := marshaler.Marshal(m) + if err == nil && len(data) > 2 { + var v map[string]interface{} + err = json.Unmarshal(data, &v) + if err == nil && len(v) > 0 { + return v + } + } + return nil +} + +type jwtMember struct { + Sub string `json:"sub,omitempty"` +} + +type logGrpcMessage struct { + // request + JWT *jwtMember `json:"jwt,omitempty"` + Method string `json:"method,omitempty"` + Service string `json:"service,omitempty"` + + // response + Code string `json:"code,omitempty"` + Error string `json:"error,omitempty"` + + // request/response/stream + Body map[string]interface{} `json:"body,omitempty"` + + // stream + // for pairing streams + StreamID string `json:"streamId,omitempty"` +} + +func (m logGrpcMessage) IsEmpty() bool { + return m.Body == nil && m.JWT == nil && m.Method == "" && m.Service == "" && m.Code == "" && m.Error == "" && m.StreamID == "" +} + +func parseServiceAndMethod(fullMethod string) (string, string) { + elems := strings.SplitAfterN(fullMethod, "/", 3) + if len(elems) == 3 { + service := strings.ReplaceAll(elems[1], "/", "") + method := strings.ReplaceAll(elems[2], "/", "") + return service, method + } + return "", fullMethod +} + +func logUnary(ctx context.Context, logger log.Logger, req interface{}, resp interface{}, code *codes.Code, err error, fullMethod string, dumpBody bool, startTime time.Time, duration time.Duration, streamID string) log.Logger { + if !startTime.IsZero() { + logger = logger.With(log.StartTimeKey, startTime) + } + if duration > 0 { + logger = logger.With(log.DurationMSKey, log.DurationToMilliseconds(duration)) + } + deadline, ok := ctx.Deadline() + if ok { + logger = logger.With(log.DeadlineKey, deadline) + } + logger = FillLoggerWithDeviceIDHrefCorrelationID(logger, req) + spanCtx := trace.SpanContextFromContext(ctx) + if spanCtx.HasTraceID() { + logger = logger.With(log.TraceIDKey, spanCtx.TraceID().String()) + } + + reqData := logGrpcMessage{ + StreamID: streamID, + } + reqData.Service, reqData.Method = parseServiceAndMethod(fullMethod) + sub := getSub(ctx) + if sub != "" { + reqData.JWT = &jwtMember{ + Sub: sub, + } + } + if dumpBody { + reqData.Body = DecodeToJsonObject(req) + } + if !reqData.IsEmpty() { + logger = logger.With(log.RequestKey, reqData) + } + + respData := logGrpcMessage{} + if code != nil { + respData.Code = code.String() + } + if err != nil { + respData.Error = err.Error() + } + if dumpBody { + respData.Body = DecodeToJsonObject(resp) + } + if !respData.IsEmpty() { + logger = logger.With(log.ResponseKey, respData) + } + + return logger +} + +func handleUnary(ctx context.Context, logger log.Logger, dumpBody bool, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + startTime := time.Now() + resp, err := handler(ctx, req) + code := status.Code(err) + level := DefaultCodeToLevel(code) + if !logger.Check(level) { + return resp, err + } + duration := time.Since(startTime) + logger = logUnary(ctx, logger, req, resp, &code, err, info.FullMethod, dumpBody, startTime, duration, "") + logger.GetLogFunc(level)("finished unary call with code " + code.String()) + return resp, err +} + +func NewLogUnaryServerInterceptor(logger log.Logger, dumpBody bool) grpc.UnaryServerInterceptor { + logger = logger.With(log.ProtocolKey, "GRPC") + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return handleUnary(ctx, logger, dumpBody, req, info, handler) + } +} + +// serverStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and +// SendMsg method call. +type logServerStream struct { + grpc.ServerStream + logger log.Logger + dumpBody bool + streamID string + service string + method string + sub string +} + +func (w *logServerStream) SendMsg(m interface{}) error { + err := w.ServerStream.SendMsg(m) + code := status.Code(err) + level := DefaultCodeToLevel(code) + if errors.Is(err, io.EOF) { + level = log.DebugLevel + } + if !w.logger.Check(level) { + return err + } + logger := w.getLoggerForStreamMessage(m, err) + logger.Debug("sent a streaming message") + return err +} + +func (w *logServerStream) getLoggerForStreamMessage(m interface{}, err error) log.Logger { + r := logGrpcMessage{ + Service: w.service, + Method: w.method, + StreamID: w.streamID, + } + if err != nil { + r.Error = err.Error() + } + if w.sub != "" { + r.JWT = &jwtMember{ + Sub: w.sub, + } + } + if w.dumpBody { + r.Body = DecodeToJsonObject(m) + } + logger := w.logger + logger = FillLoggerWithDeviceIDHrefCorrelationID(logger, m) + if !r.IsEmpty() { + logger = logger.With(log.MessageKey, r) + } + return logger +} + +func (w *logServerStream) RecvMsg(m interface{}) error { + err := w.ServerStream.RecvMsg(m) + code := status.Code(err) + level := DefaultCodeToLevel(code) + if errors.Is(err, io.EOF) { + return err + } + if !w.logger.Check(level) { + return err + } + logger := w.getLoggerForStreamMessage(m, err) + logger.Debug("received a streaming message") + return err +} + +func getSub(ctx context.Context) string { + if sub, err := kitNetGrpc.OwnerFromTokenMD(ctx, "sub"); err == nil { + return sub + } + return "" +} + +func wrapServerStreamNew(logger log.Logger, dumpBody bool, fullMethod string, ss grpc.ServerStream) *logServerStream { + service, method := parseServiceAndMethod(fullMethod) + sub := getSub(ss.Context()) + return &logServerStream{ + ServerStream: ss, + logger: logger, + dumpBody: dumpBody, + streamID: uuid.New().String(), + service: service, + method: method, + sub: sub, + } +} + +func logStartStream(ctx context.Context, logger log.Logger, startTime time.Time, fullMethod, streamID string) { + if logger.Check(log.DebugLevel) { + logger = logUnary(ctx, logger, nil, nil, nil, nil, fullMethod, false, startTime, 0, streamID) + logger.Debug("started streaming call") + } +} + +func handleStream(logger log.Logger, dumpBody bool, srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + startTime := time.Now() + logServerStream := wrapServerStreamNew(logger, dumpBody, info.FullMethod, stream) + logStartStream(logServerStream.Context(), logger, startTime, info.FullMethod, logServerStream.streamID) + err := handler(srv, logServerStream) + code := status.Code(err) + level := DefaultCodeToLevel(code) + if !logger.Check(level) { + return err + } + duration := time.Since(startTime) + logger = logUnary(stream.Context(), logger, nil, nil, &code, err, info.FullMethod, false, startTime, duration, logServerStream.streamID) + logger.GetLogFunc(level)("finished streaming call with code " + code.String()) + return err +} + +// StreamServerInterceptor returns a new streaming server interceptor that adds zap.Logger to the context. +func NewLogStreamServerInterceptor(logger log.Logger, dumpBody bool) grpc.StreamServerInterceptor { + logger = logger.With(log.ProtocolKey, "GRPC") + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handleStream(logger, dumpBody, srv, stream, info, handler) + } +} diff --git a/pkg/net/grpc/server/makeDefaultOptions.go b/pkg/net/grpc/server/makeDefaultOptions.go index e8205c722..0471a974f 100644 --- a/pkg/net/grpc/server/makeDefaultOptions.go +++ b/pkg/net/grpc/server/makeDefaultOptions.go @@ -7,7 +7,6 @@ import ( "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" "github.com/plgd-dev/hub/v2/grpc-gateway/pb" @@ -31,23 +30,23 @@ const ( ) var defaultCodeToLevel = map[codes.Code]zapcore.Level{ - codes.OK: zap.DebugLevel, - codes.Canceled: zap.DebugLevel, - codes.Unknown: zap.ErrorLevel, - codes.InvalidArgument: zap.DebugLevel, - codes.DeadlineExceeded: zap.WarnLevel, - codes.NotFound: zap.DebugLevel, - codes.AlreadyExists: zap.DebugLevel, - codes.PermissionDenied: zap.WarnLevel, - codes.Unauthenticated: zap.DebugLevel, - codes.ResourceExhausted: zap.WarnLevel, - codes.FailedPrecondition: zap.WarnLevel, - codes.Aborted: zap.WarnLevel, - codes.OutOfRange: zap.WarnLevel, - codes.Unimplemented: zap.ErrorLevel, - codes.Internal: zap.ErrorLevel, - codes.Unavailable: zap.WarnLevel, - codes.DataLoss: zap.ErrorLevel, + codes.OK: log.DebugLevel, + codes.Canceled: log.DebugLevel, + codes.Unknown: log.ErrorLevel, + codes.InvalidArgument: log.DebugLevel, + codes.DeadlineExceeded: log.WarnLevel, + codes.NotFound: log.DebugLevel, + codes.AlreadyExists: log.DebugLevel, + codes.PermissionDenied: log.WarnLevel, + codes.Unauthenticated: log.DebugLevel, + codes.ResourceExhausted: log.WarnLevel, + codes.FailedPrecondition: log.WarnLevel, + codes.Aborted: log.WarnLevel, + codes.OutOfRange: log.WarnLevel, + codes.Unimplemented: log.ErrorLevel, + codes.Internal: log.ErrorLevel, + codes.Unavailable: log.WarnLevel, + codes.DataLoss: log.ErrorLevel, } // DefaultCodeToLevel is the default implementation of gRPC return codes and interceptor log level for server side. @@ -56,7 +55,7 @@ func DefaultCodeToLevel(code codes.Code) zapcore.Level { if ok { return lvl } - return zap.ErrorLevel + return log.ErrorLevel } func setLogBasicLabels(m map[string]interface{}, req interface{}) { @@ -256,17 +255,14 @@ func MakeDefaultOptions(auth kitNetGrpc.AuthInterceptors, logger log.Logger, tra return handler(ctx, req) }, } - zapLogger, ok := logger.Unwrap().(*zap.SugaredLogger) - if ok { - cfg := logger.Config() - if cfg.EncoderConfig.EncodeTime.TimeEncoder == nil { - cfg.EncoderConfig.EncodeTime = log.MakeDefaultConfig().EncoderConfig.EncodeTime - } - streamInterceptors = append(streamInterceptors, grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(CodeGenRequestFieldExtractor)), - grpc_zap.StreamServerInterceptor(zapLogger.Desugar(), grpc_zap.WithTimestampFormat(cfg.EncoderConfig.EncodeTime.TimeEncoder.TimeString()), grpc_zap.WithLevels(DefaultCodeToLevel), grpc_zap.WithMessageProducer(MakeDefaultMessageProducer(zapLogger.Desugar())))) - unaryInterceptors = append(unaryInterceptors, grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(CodeGenRequestFieldExtractor)), - grpc_zap.UnaryServerInterceptor(zapLogger.Desugar(), grpc_zap.WithTimestampFormat(cfg.EncoderConfig.EncodeTime.TimeEncoder.TimeString()), grpc_zap.WithLevels(DefaultCodeToLevel), grpc_zap.WithMessageProducer(MakeDefaultMessageProducer(zapLogger.Desugar())))) + + cfg := logger.Config() + if cfg.EncoderConfig.EncodeTime.TimeEncoder == nil { + cfg.EncoderConfig.EncodeTime = log.MakeDefaultConfig().EncoderConfig.EncodeTime } + streamInterceptors = append(streamInterceptors, NewLogStreamServerInterceptor(logger, cfg.DumpBody)) + unaryInterceptors = append(unaryInterceptors, NewLogUnaryServerInterceptor(logger, cfg.DumpBody)) + streamInterceptors = append(streamInterceptors, auth.Stream()) unaryInterceptors = append(unaryInterceptors, auth.Unary()) diff --git a/pkg/net/http/loggingMiddleware.go b/pkg/net/http/loggingMiddleware.go index 8f1646359..029bcb87b 100644 --- a/pkg/net/http/loggingMiddleware.go +++ b/pkg/net/http/loggingMiddleware.go @@ -13,7 +13,6 @@ import ( "github.com/plgd-dev/hub/v2/pkg/log" "github.com/plgd-dev/hub/v2/pkg/security/jwt" "go.opentelemetry.io/otel/trace" - "go.uber.org/zap/zapcore" rpcStatus "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/protojson" @@ -76,21 +75,21 @@ var toNil = func(args ...interface{}) { } func toDebug(logger log.Logger) func(args ...interface{}) { - if logger.Check(zapcore.DebugLevel) { + if logger.Check(log.DebugLevel) { return logger.Debug } return nil } func toWarn(logger log.Logger) func(args ...interface{}) { - if logger.Check(zapcore.WarnLevel) { + if logger.Check(log.WarnLevel) { return logger.Warn } return nil } func toError(logger log.Logger) func(args ...interface{}) { - if logger.Check(zapcore.ErrorLevel) { + if logger.Check(log.ErrorLevel) { return logger.Error } return nil diff --git a/resource-aggregate/cqrs/eventbus/nats/publisher/logPublishEvents.go b/resource-aggregate/cqrs/eventbus/nats/publisher/logPublishEvents.go new file mode 100644 index 000000000..44698e9ac --- /dev/null +++ b/resource-aggregate/cqrs/eventbus/nats/publisher/logPublishEvents.go @@ -0,0 +1,31 @@ +package publisher + +import ( + "github.com/plgd-dev/hub/v2/pkg/log" + "github.com/plgd-dev/hub/v2/pkg/net/grpc/server" +) + +func LogPublish(logger log.Logger, event interface{}, subjects []string, err error) { + lvl := log.DebugLevel + if err != nil { + lvl = log.ErrorLevel + } + if !logger.Check(lvl) { + return + } + v := struct { + Subjects []string `json:"subjects,omitempty"` + Body interface{} `json:"body,omitempty"` + Error string `json:"error,omitempty"` + }{} + v.Subjects = subjects + if logger.Config().DumpBody { + v.Body = server.DecodeToJsonObject(event) + } + if err != nil { + v.Error = err.Error() + } + logger = logger.With(log.ProtocolKey, "NATS", log.MessageKey, v) + logger = server.FillLoggerWithDeviceIDHrefCorrelationID(logger, event) + logger.GetLogFunc(lvl)("published event message") +} diff --git a/resource-aggregate/service/aggregate_test.go b/resource-aggregate/service/aggregate_test.go index 6198f1658..af8402d25 100644 --- a/resource-aggregate/service/aggregate_test.go +++ b/resource-aggregate/service/aggregate_test.go @@ -123,8 +123,7 @@ func TestAggregateHandlePublishResourceLinks(t *testing.T) { assert.Equal(t, tt.want, s.Code()) } else { require.NoError(t, err) - err = service.PublishEvents(publisher, tt.args.userID, tt.args.request.GetDeviceId(), ag.ResourceID(), events) - assert.NoError(t, err) + service.PublishEvents(publisher, tt.args.userID, tt.args.request.GetDeviceId(), ag.ResourceID(), events, logger) } } t.Run(tt.name, tfunc) @@ -138,8 +137,7 @@ func testHandlePublishResource(ctx context.Context, t *testing.T, publisher *pub assert.NoError(t, err) events, err := ag.PublishResourceLinks(ctx, pc) require.NoError(t, err) - err = service.PublishEvents(publisher, userID, deviceID, ag.ResourceID(), events) - assert.NoError(t, err) + service.PublishEvents(publisher, userID, deviceID, ag.ResourceID(), events, log.Get()) } func TestAggregateDuplicitPublishResource(t *testing.T) { @@ -246,8 +244,7 @@ func TestAggregateHandleUnpublishResource(t *testing.T) { events, err := ag.UnpublishResourceLinks(ctx, pc) assert.NoError(t, err) - err = service.PublishEvents(publisher, userID, deviceID, ag.ResourceID(), events) - assert.NoError(t, err) + service.PublishEvents(publisher, userID, deviceID, ag.ResourceID(), events, logger) _, err = ag.UnpublishResourceLinks(ctx, pc) assert.NoError(t, err) @@ -307,8 +304,7 @@ func TestAggregateHandleUnpublishAllResources(t *testing.T) { assert.Equal(t, 3, len(unpublishedResourceLinks)) assert.Contains(t, unpublishedResourceLinks, resourceID1, resourceID2, resourceID3) - err = service.PublishEvents(publisher, userID, deviceID, ag.ResourceID(), events) - assert.NoError(t, err) + service.PublishEvents(publisher, userID, deviceID, ag.ResourceID(), events, logger) events, err = ag.UnpublishResourceLinks(ctx, pc) require.NoError(t, err) @@ -366,8 +362,7 @@ func TestAggregateHandleUnpublishResourceSubset(t *testing.T) { assert.Equal(t, 1, len(events)) assert.Equal(t, []string{resourceID1, resourceID3}, (events[0].(*raEvents.ResourceLinksUnpublished)).Hrefs) - err = service.PublishEvents(publisher, userID, deviceID, ag.ResourceID(), events) - assert.NoError(t, err) + service.PublishEvents(publisher, userID, deviceID, ag.ResourceID(), events, logger) pc = testMakeUnpublishResourceRequest(deviceID, []string{resourceID1, resourceID4, resourceID4}) events, err = ag.UnpublishResourceLinks(ctx, pc) diff --git a/resource-aggregate/service/cancelDeviceMetadataUpdates.go b/resource-aggregate/service/cancelDeviceMetadataUpdates.go index b59f5402b..9a5110d8d 100644 --- a/resource-aggregate/service/cancelDeviceMetadataUpdates.go +++ b/resource-aggregate/service/cancelDeviceMetadataUpdates.go @@ -54,10 +54,7 @@ func (r RequestHandler) CancelPendingMetadataUpdates(ctx context.Context, reques return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.InvalidArgument, "cannot cancel resource('%v') metadata updates: %v", request.GetDeviceId(), err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), cancelEvents) - if err != nil { - log.Errorf("cannot publish device ('%v') metadata events: %w", request.GetDeviceId(), err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), cancelEvents, r.logger) correlationIDs := make([]string, 0, len(cancelEvents)) for _, e := range cancelEvents { diff --git a/resource-aggregate/service/cancelDeviceMetadataUpdates_test.go b/resource-aggregate/service/cancelDeviceMetadataUpdates_test.go index db16e5bdb..fd66e20de 100644 --- a/resource-aggregate/service/cancelDeviceMetadataUpdates_test.go +++ b/resource-aggregate/service/cancelDeviceMetadataUpdates_test.go @@ -133,8 +133,7 @@ func TestAggregateHandleCancelPendingMetadataUpdates(t *testing.T) { return } require.NoError(t, err) - err = service.PublishEvents(publisher, tt.args.userID, tt.args.request.GetDeviceId(), ag.ResourceID(), events) - assert.NoError(t, err) + service.PublishEvents(publisher, tt.args.userID, tt.args.request.GetDeviceId(), ag.ResourceID(), events, logger) } t.Run(tt.name, tfunc) } @@ -245,7 +244,7 @@ func TestRequestHandlerCancelPendingMetadataUpdates(t *testing.T) { })), testMakeUpdateDeviceMetadataRequest(deviceID, correlationID2, nil, newTwinEnabled(false), 0)) require.NoError(t, err) - requestHandler := service.NewRequestHandler(cfg, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(cfg, eventstore, publisher, mockGetOwnerDevices, logger) for _, tt := range test { tfunc := func(t *testing.T) { diff --git a/resource-aggregate/service/cancelResourceCommands.go b/resource-aggregate/service/cancelResourceCommands.go index 8973b237f..9d3547fae 100644 --- a/resource-aggregate/service/cancelResourceCommands.go +++ b/resource-aggregate/service/cancelResourceCommands.go @@ -57,10 +57,7 @@ func (r RequestHandler) CancelPendingCommands(ctx context.Context, request *comm return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.InvalidArgument, "cannot cancel resource('%v') command: %v", request.GetResourceId().ToString(), err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), cancelEvents) - if err != nil { - log.Errorf("cannot publish resource('%v') events: %w", request.GetResourceId().ToString(), err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), cancelEvents, r.logger) correlationIDs := make([]string, 0, len(cancelEvents)) for _, e := range cancelEvents { diff --git a/resource-aggregate/service/cancelResourceCommands_test.go b/resource-aggregate/service/cancelResourceCommands_test.go index e8bb3ede7..cba1d382a 100644 --- a/resource-aggregate/service/cancelResourceCommands_test.go +++ b/resource-aggregate/service/cancelResourceCommands_test.go @@ -195,7 +195,7 @@ func TestRequestHandlerCancelPendingCommands(t *testing.T) { _, err = ag1.DeleteResource(ctx, testMakeDeleteResourceRequest(deviceID, resID1, correlationID3, 0)) require.NoError(t, err) - requestHandler := service.NewRequestHandler(cfg, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(cfg, eventstore, publisher, mockGetOwnerDevices, logger) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/resource-aggregate/service/confirmDeviceMetadataUpdate.go b/resource-aggregate/service/confirmDeviceMetadataUpdate.go index 1bb881e12..dabacd362 100644 --- a/resource-aggregate/service/confirmDeviceMetadataUpdate.go +++ b/resource-aggregate/service/confirmDeviceMetadataUpdate.go @@ -59,10 +59,8 @@ func (r RequestHandler) ConfirmDeviceMetadataUpdate(ctx context.Context, request return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot confirm device('%v') metadata update: %v", request.GetDeviceId(), err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish device('%v') metadata events: %w", request.GetDeviceId(), err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) + return &commands.ConfirmDeviceMetadataUpdateResponse{ AuditContext: commands.NewAuditContext(owner, ""), }, nil diff --git a/resource-aggregate/service/confirmDeviceMetadataUpdate_test.go b/resource-aggregate/service/confirmDeviceMetadataUpdate_test.go index 32287a749..a6bdba058 100644 --- a/resource-aggregate/service/confirmDeviceMetadataUpdate_test.go +++ b/resource-aggregate/service/confirmDeviceMetadataUpdate_test.go @@ -124,8 +124,7 @@ func TestAggregateHandleConfirmDeviceMetadataUpdate(t *testing.T) { return } require.NoError(t, err) - err = service.PublishEvents(publisher, tt.args.userID, tt.args.request.GetDeviceId(), ag.ResourceID(), events) - assert.NoError(t, err) + service.PublishEvents(publisher, tt.args.userID, tt.args.request.GetDeviceId(), ag.ResourceID(), events, logger) } t.Run(tt.name, tfunc) } @@ -207,7 +206,7 @@ func TestRequestHandlerConfirmDeviceMetadataUpdate(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) _, err = requestHandler.UpdateDeviceMetadata(ctx, testMakeUpdateDeviceMetadataRequest(deviceID, "", newConnectionStatus(commands.Connection_ONLINE), nil, time.Hour)) require.NoError(t, err) diff --git a/resource-aggregate/service/deleteDevices_test.go b/resource-aggregate/service/deleteDevices_test.go index 5b8b59282..c909d294d 100644 --- a/resource-aggregate/service/deleteDevices_test.go +++ b/resource-aggregate/service/deleteDevices_test.go @@ -50,7 +50,7 @@ func TestRequestHandler_DeleteDevices(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(cfg, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(cfg, eventstore, publisher, mockGetOwnerDevices, logger) type args struct { req *commands.DeleteDevicesRequest diff --git a/resource-aggregate/service/grpcApi.go b/resource-aggregate/service/grpcApi.go index c82f901cc..8353685df 100644 --- a/resource-aggregate/service/grpcApi.go +++ b/resource-aggregate/service/grpcApi.go @@ -3,12 +3,12 @@ package service import ( "context" - "github.com/hashicorp/go-multierror" "github.com/plgd-dev/hub/v2/pkg/log" kitNetGrpc "github.com/plgd-dev/hub/v2/pkg/net/grpc" "github.com/plgd-dev/hub/v2/resource-aggregate/commands" cqrsAggregate "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/aggregate" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus" + "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/publisher" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventstore" "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/utils" raEvents "github.com/plgd-dev/hub/v2/resource-aggregate/events" @@ -24,28 +24,27 @@ type RequestHandler struct { eventstore EventStore publisher eventbus.Publisher getOwnerDevicesFunc getOwnerDevicesFunc + logger log.Logger } // NewRequestHandler factory for new RequestHandler -func NewRequestHandler(config Config, eventstore EventStore, publisher eventbus.Publisher, getOwnerDevicesFunc getOwnerDevicesFunc) *RequestHandler { +func NewRequestHandler(config Config, eventstore EventStore, publisher eventbus.Publisher, getOwnerDevicesFunc getOwnerDevicesFunc, logger log.Logger) *RequestHandler { return &RequestHandler{ config: config, eventstore: eventstore, publisher: publisher, getOwnerDevicesFunc: getOwnerDevicesFunc, + logger: logger, } } -func PublishEvents(publisher eventbus.Publisher, owner, deviceID, resourceID string, events []eventbus.Event) error { - var errors *multierror.Error +func PublishEvents(pub eventbus.Publisher, owner, deviceID, resourceID string, events []eventbus.Event, logger log.Logger) { for _, event := range events { // timeout si driven by flusherTimeout. - err := publisher.Publish(context.Background(), utils.GetPublishSubject(owner, event), deviceID, resourceID, event) - if err != nil { - errors = multierror.Append(errors, err) - } + subjects := utils.GetPublishSubject(owner, event) + err := pub.Publish(context.Background(), subjects, deviceID, resourceID, event) + publisher.LogPublish(logger, event, subjects, err) } - return errors.ErrorOrNil() } // Check if device with given ID belongs to given owner @@ -118,10 +117,7 @@ func (r RequestHandler) PublishResourceLinks(ctx context.Context, request *comma return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot publish resource links: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish resource links published events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) auditContext := commands.NewAuditContext(owner, "") return newPublishResourceLinksResponse(events, aggregate.DeviceID(), auditContext), nil } @@ -159,10 +155,7 @@ func (r RequestHandler) UnpublishResourceLinks(ctx context.Context, request *com return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot unpublish resource links: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish resource links unpublished events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) auditContext := commands.NewAuditContext(owner, "") resp := newUnpublishResourceLinksResponse(events, aggregate.DeviceID(), auditContext) @@ -210,10 +203,7 @@ func (r RequestHandler) notifyResourceChanged(ctx context.Context, request *comm return log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot notify about resource content change: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish resource content changed notification events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) return nil } @@ -251,10 +241,7 @@ func (r RequestHandler) UpdateResource(ctx context.Context, request *commands.Up return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot update resource content: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish resource content update events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) var validUntil int64 for _, e := range events { @@ -286,10 +273,7 @@ func (r RequestHandler) ConfirmResourceUpdate(ctx context.Context, request *comm return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot confirm resource content update: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish resource content update confirmation events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) auditContext := commands.NewAuditContext(owner, request.GetCorrelationId()) return &commands.ConfirmResourceUpdateResponse{ AuditContext: auditContext, @@ -313,10 +297,7 @@ func (r RequestHandler) RetrieveResource(ctx context.Context, request *commands. return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot retrieve resource content: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish resource content retrieve events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) var validUntil int64 for _, e := range events { @@ -348,10 +329,7 @@ func (r RequestHandler) ConfirmResourceRetrieve(ctx context.Context, request *co return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot confirm resource content retrieve: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish resource content retrieve confirmation events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) auditContext := commands.NewAuditContext(owner, request.GetCorrelationId()) return &commands.ConfirmResourceRetrieveResponse{ @@ -376,10 +354,7 @@ func (r RequestHandler) DeleteResource(ctx context.Context, request *commands.De return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot delete resource: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish delete resource events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) var validUntil int64 for _, e := range events { @@ -412,10 +387,7 @@ func (r RequestHandler) ConfirmResourceDelete(ctx context.Context, request *comm return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot confirm resource deletion: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish resource delete confirmation events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) auditContext := commands.NewAuditContext(owner, request.GetCorrelationId()) return &commands.ConfirmResourceDeleteResponse{ @@ -440,10 +412,7 @@ func (r RequestHandler) CreateResource(ctx context.Context, request *commands.Cr return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot create resource: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish resource create events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) var validUntil int64 for _, e := range events { @@ -476,10 +445,8 @@ func (r RequestHandler) ConfirmResourceCreate(ctx context.Context, request *comm return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot confirm resource creation: %v", err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events) - if err != nil { - log.Errorf("cannot publish resource create confirmation events: %v", err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), events, r.logger) + auditContext := commands.NewAuditContext(owner, request.GetCorrelationId()) return &commands.ConfirmResourceCreateResponse{ AuditContext: auditContext, diff --git a/resource-aggregate/service/grpcApi_test.go b/resource-aggregate/service/grpcApi_test.go index 09e389cee..86bec42d9 100644 --- a/resource-aggregate/service/grpcApi_test.go +++ b/resource-aggregate/service/grpcApi_test.go @@ -125,7 +125,7 @@ func TestRequestHandlerPublishResource(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) for _, tt := range test { tfunc := func(t *testing.T) { @@ -240,7 +240,7 @@ func TestRequestHandlerUnpublishResource(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(cfg, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(cfg, eventstore, publisher, mockGetOwnerDevices, logger) pubReq := testMakePublishResourceRequest(deviceID, []string{href}) _, err = requestHandler.PublishResourceLinks(ctx, pubReq) @@ -325,7 +325,7 @@ func TestRequestHandlerNotifyResourceChanged(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) for _, tt := range test { tfunc := func(t *testing.T) { response, err := requestHandler.NotifyResourceChanged(ctx, tt.args.request) @@ -420,7 +420,7 @@ func TestRequestHandlerUpdateResourceContent(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) for _, tt := range test { tfunc := func(t *testing.T) { if tt.args.request.GetResourceId().GetDeviceId() != "" && tt.args.request.GetResourceId().GetHref() != "" { @@ -513,7 +513,7 @@ func TestRequestHandlerConfirmResourceUpdate(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) _, err = requestHandler.NotifyResourceChanged(ctx, testMakeNotifyResourceChangedRequest(deviceID, resID, 0)) require.NoError(t, err) _, err = requestHandler.UpdateResource(ctx, testMakeUpdateResourceRequest(deviceID, resID, "", correlationID, time.Hour)) @@ -603,7 +603,7 @@ func TestRequestHandlerRetrieveResource(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) for _, tt := range test { tfunc := func(t *testing.T) { if tt.args.request.GetResourceId().GetDeviceId() != "" && tt.args.request.GetResourceId().GetHref() != "" { @@ -696,7 +696,7 @@ func TestRequestHandlerConfirmResourceRetrieve(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) _, err = requestHandler.NotifyResourceChanged(ctx, testMakeNotifyResourceChangedRequest(deviceID, resID, 0)) require.NoError(t, err) _, err = requestHandler.RetrieveResource(ctx, testMakeRetrieveResourceRequest(deviceID, resID, correlationID, time.Hour)) @@ -789,7 +789,7 @@ func TestRequestHandlerDeleteResource(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) for _, tt := range test { tfunc := func(t *testing.T) { if tt.args.request.GetResourceId().GetDeviceId() != "" && tt.args.request.GetResourceId().GetHref() != "" { @@ -882,7 +882,7 @@ func TestRequestHandlerConfirmResourceDelete(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) _, err = requestHandler.NotifyResourceChanged(ctx, testMakeNotifyResourceChangedRequest(deviceID, resID, 0)) require.NoError(t, err) _, err = requestHandler.DeleteResource(ctx, testMakeDeleteResourceRequest(deviceID, resID, correlationID, time.Hour)) @@ -975,7 +975,7 @@ func TestRequestHandlerCreateResource(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) for _, tt := range test { tfunc := func(t *testing.T) { if tt.args.request.GetResourceId().GetDeviceId() != "" && tt.args.request.GetResourceId().GetHref() != "" { @@ -1068,7 +1068,7 @@ func TestRequestHandlerConfirmResourceCreate(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) _, err = requestHandler.NotifyResourceChanged(ctx, testMakeNotifyResourceChangedRequest(deviceID, resID, 0)) require.NoError(t, err) _, err = requestHandler.CreateResource(ctx, testMakeCreateResourceRequest(deviceID, resID, correlationID, time.Hour)) diff --git a/resource-aggregate/service/service.go b/resource-aggregate/service/service.go index 8bb0972c8..914d0d238 100644 --- a/resource-aggregate/service/service.go +++ b/resource-aggregate/service/service.go @@ -149,7 +149,7 @@ func NewService(ctx context.Context, config Config, fileWatcher *fsnotify.Watche return ownerCache.GetSelectedDevices(ctx, deviceIDs) } return ownerCache.GetDevices(ctx) - }) + }, logger) RegisterResourceAggregateServer(grpcServer.Server, requestHandler) return service.New(grpcServer), nil diff --git a/resource-aggregate/service/service_test.go b/resource-aggregate/service/service_test.go index 6436dffe9..a92508c15 100644 --- a/resource-aggregate/service/service_test.go +++ b/resource-aggregate/service/service_test.go @@ -17,7 +17,6 @@ import ( oauthTest "github.com/plgd-dev/hub/v2/test/oauth-server/test" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" ) func TestPublishUnpublish(t *testing.T) { @@ -31,7 +30,7 @@ func TestPublishUnpublish(t *testing.T) { idShutdown := idService.SetUp(t) defer idShutdown() logCfg := log.MakeDefaultConfig() - logCfg.Level = zap.DebugLevel + logCfg.Level = log.DebugLevel log.Setup(logCfg) raShutdown := test.New(t, cfg) defer raShutdown() @@ -72,7 +71,7 @@ func TestPublishUnpublish(t *testing.T) { require.NoError(t, err) }() - logCfg.Level = zap.DebugLevel + logCfg.Level = log.DebugLevel log.Setup(logCfg) pubReq := testMakePublishResourceRequest(deviceID, []string{href}) _, err = raClient.PublishResourceLinks(ctx, pubReq) diff --git a/resource-aggregate/service/updateDeviceMetadata.go b/resource-aggregate/service/updateDeviceMetadata.go index e5c82d786..2971ee341 100644 --- a/resource-aggregate/service/updateDeviceMetadata.go +++ b/resource-aggregate/service/updateDeviceMetadata.go @@ -82,10 +82,7 @@ func (r RequestHandler) UpdateDeviceMetadata(ctx context.Context, request *comma return nil, log.LogAndReturnError(kitNetGrpc.ForwardErrorf(codes.Internal, "cannot update device('%v') metadata: %v", request.GetDeviceId(), err)) } - err = PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), publishEvents) - if err != nil { - log.Errorf("cannot publish device('%v') metadata events: %w", request.GetDeviceId(), err) - } + PublishEvents(r.publisher, owner, aggregate.DeviceID(), aggregate.ResourceID(), publishEvents, r.logger) var validUntil int64 for _, e := range publishEvents { diff --git a/resource-aggregate/service/updateDeviceMetadata_test.go b/resource-aggregate/service/updateDeviceMetadata_test.go index 6ad77388c..5ff635944 100644 --- a/resource-aggregate/service/updateDeviceMetadata_test.go +++ b/resource-aggregate/service/updateDeviceMetadata_test.go @@ -130,8 +130,7 @@ func TestAggregateHandleUpdateDeviceMetadata(t *testing.T) { return } require.NoError(t, err) - err = service.PublishEvents(publisher, tt.args.userID, tt.args.request.GetDeviceId(), ag.ResourceID(), events) - assert.NoError(t, err) + service.PublishEvents(publisher, tt.args.userID, tt.args.request.GetDeviceId(), ag.ResourceID(), events, logger) } t.Run(tt.name, tfunc) } @@ -243,7 +242,7 @@ func TestRequestHandlerUpdateDeviceMetadata(t *testing.T) { naClient.Close() }() - requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices) + requestHandler := service.NewRequestHandler(config, eventstore, publisher, mockGetOwnerDevices, logger) for _, tt := range test { tfunc := func(t *testing.T) { diff --git a/resource-aggregate/test/service.go b/resource-aggregate/test/service.go index 8247b740e..a4c618bff 100644 --- a/resource-aggregate/test/service.go +++ b/resource-aggregate/test/service.go @@ -15,7 +15,7 @@ import ( func MakeConfig(t require.TestingT) service.Config { var cfg service.Config - cfg.Log = log.MakeDefaultConfig() + cfg.Log = config.MakeLogConfig(t, "TEST_RESOURCE_AGGREGATE_LOG_LEVEL", "TEST_RESOURCE_AGGREGATE_LOG_DUMP_BODY") cfg.APIs.GRPC.OwnerCacheExpiration = time.Minute cfg.APIs.GRPC.Config = config.MakeGrpcServerConfig(config.RESOURCE_AGGREGATE_HOST) diff --git a/test/coap-gateway/test/test.go b/test/coap-gateway/test/test.go index c8c7facd1..58dfd513d 100644 --- a/test/coap-gateway/test/test.go +++ b/test/coap-gateway/test/test.go @@ -10,12 +10,11 @@ import ( "github.com/plgd-dev/hub/v2/test/coap-gateway/service" "github.com/plgd-dev/hub/v2/test/config" "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" ) func MakeConfig(t require.TestingT) service.Config { var cfg service.Config - cfg.Log.Config.Level = zapcore.DebugLevel + cfg.Log.Config.Level = log.DebugLevel cfg.Log.DumpCoapMessages = true cfg.APIs.COAP.Addr = config.COAP_GW_HOST cfg.APIs.COAP.TLS.Config = config.MakeTLSServerConfig() diff --git a/test/config/config.go b/test/config/config.go index 21eb61022..e8e4c06f6 100644 --- a/test/config/config.go +++ b/test/config/config.go @@ -1,13 +1,16 @@ package config import ( + "fmt" "os" + "strings" "testing" "time" "github.com/golang-jwt/jwt/v4" "github.com/plgd-dev/device/v2/schema" c2curi "github.com/plgd-dev/hub/v2/cloud2cloud-connector/uri" + "github.com/plgd-dev/hub/v2/pkg/log" pkgMongo "github.com/plgd-dev/hub/v2/pkg/mongodb" grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client" grpcServer "github.com/plgd-dev/hub/v2/pkg/net/grpc/server" @@ -25,6 +28,7 @@ import ( "github.com/plgd-dev/hub/v2/test/http" "github.com/plgd-dev/hub/v2/test/oauth-server/uri" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) const ( @@ -47,6 +51,7 @@ const ( HTTP_GW_HOST = "localhost:20010" DEVICE_PROVIDER = "plgd" OPENTELEMETRY_COLLECTOR_HOST = "localhost:55690" + TRUE_STRING = "true" ) var ( @@ -56,9 +61,9 @@ var ( MONGODB_URI = "mongodb://localhost:27017" NATS_URL = "nats://localhost:4222" OWNER_CLAIM = "sub" - COAP_GATEWAY_UDP_ENABLED = os.Getenv("TEST_COAP_GATEWAY_UDP_ENABLED") == "true" + COAP_GATEWAY_UDP_ENABLED = os.Getenv("TEST_COAP_GATEWAY_UDP_ENABLED") == TRUE_STRING ACTIVE_COAP_SCHEME = func() string { - if os.Getenv("TEST_COAP_GATEWAY_UDP_ENABLED") == "true" { + if os.Getenv("TEST_COAP_GATEWAY_UDP_ENABLED") == TRUE_STRING { return string(schema.UDPSecureScheme) } return string(schema.TCPSecureScheme) @@ -231,3 +236,25 @@ func CreateJwtToken(t *testing.T, claims jwt.MapClaims) string { require.NoError(t, err) return tokenString } + +func MakeLogConfig(t require.TestingT, envLogLevel, envLogDumpBody string) log.Config { + cfg := log.MakeDefaultConfig() + logLvlString := os.Getenv(envLogLevel) + logLvl := zap.NewAtomicLevelAt(log.InfoLevel) + if logLvlString != "" { + var err error + logLvl, err = zap.ParseAtomicLevel(logLvlString) + require.NoError(t, err) + } + cfg.Level = logLvl.Level() + logDumpBodyStr := strings.ToLower(os.Getenv(envLogDumpBody)) + switch logDumpBodyStr { + case TRUE_STRING, "false": + cfg.DumpBody = logDumpBodyStr == TRUE_STRING + case "": + cfg.DumpBody = false + default: + require.NoError(t, fmt.Errorf("invalid value %v for %v", logDumpBodyStr, envLogDumpBody)) + } + return cfg +} diff --git a/test/test.go b/test/test.go index 0d0b91d85..546dd882d 100644 --- a/test/test.go +++ b/test/test.go @@ -506,9 +506,12 @@ func WaitForDevice(ctx context.Context, t *testing.T, client pb.GrpcGateway_Subs CheckProtobufs(t, expectedEvent, ev, RequireToCheckFunc(require.Equal)) delete(expectedEvents, getID(ev)) if len(expectedEvents) == 0 { - return + break } } + + // wait for sync RD + time.Sleep(500 * time.Millisecond) } func MustGetHostname() string {