Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(pairing): port queries to xandra #834

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/astarte_pairing/config/config.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -26,4 +26,6 @@ config :lager,
error_logger_redirect: false,
handlers: [level: :critical]

config :astarte_pairing, :cluster_name, :xandra

import_config "#{config_env()}.exs"
8 changes: 2 additions & 6 deletions apps/astarte_pairing/lib/astarte_pairing.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,13 +43,9 @@ defmodule Astarte.Pairing do
Config.validate!()
Config.init!()

xandra_options =
Config.xandra_options!()
|> Keyword.put(:name, :xandra)

children = [
Astarte.PairingWeb.Telemetry,
{Xandra.Cluster, xandra_options},
{Xandra.Cluster, Config.xandra_options!()},
{Astarte.RPC.AMQP.Server, [amqp_queue: Protocol.amqp_queue(), handler: Handler]},
{Astarte.Pairing.CredentialsSecret.Cache, []}
]
Expand Down
26 changes: 9 additions & 17 deletions apps/astarte_pairing/lib/astarte_pairing/config.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,21 +61,13 @@ defmodule Astarte.Pairing.Config do
end
end

@doc """
Returns the cassandra node configuration
"""
@spec cassandra_node!() :: {String.t(), integer()}
def cassandra_node!, do: Enum.random(cqex_nodes!())

@doc """
Returns Cassandra nodes formatted in the Xandra format.
"""
defdelegate xandra_nodes, to: DataAccessConfig
defdelegate xandra_nodes!, to: DataAccessConfig
def xandra_options! do
cluster = Application.get_env(:astarte_pairing, :cluster_name)

defdelegate cqex_nodes, to: DataAccessConfig
defdelegate cqex_nodes!, to: DataAccessConfig

defdelegate xandra_options!, to: DataAccessConfig
defdelegate cqex_options!, to: DataAccessConfig
# Dropping :autodiscovery since the option has been deprecated in xandra v0.15.0
# and is now always enabled.
DataAccessConfig.xandra_options!()
|> Keyword.drop([:autodiscovery])
|> Keyword.put(:name, cluster)
end
end
107 changes: 22 additions & 85 deletions apps/astarte_pairing/lib/astarte_pairing/engine.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -27,7 +27,6 @@ defmodule Astarte.Pairing.Engine do
alias Astarte.Pairing.Config
alias Astarte.Pairing.CredentialsSecret
alias Astarte.Pairing.Queries
alias CQEx.Client

require Logger

Expand Down Expand Up @@ -56,23 +55,10 @@ defmodule Astarte.Pairing.Engine do
end

def get_agent_public_key_pems(realm) do
cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
{:ok, jwt_pems} <- Queries.get_agent_public_key_pems(client) do
{:ok, jwt_pems}
else
{:error, :shutdown} ->
{:error, :realm_not_found}

{:error, reason} ->
{:error, reason}
case Queries.get_agent_public_key_pems(realm) do
{:ok, jwt_pems} -> {:ok, jwt_pems}
{:error, :public_key_not_found} -> {:error, :realm_not_found}
{:error, reason} -> {:error, reason}
end
end

Expand All @@ -90,34 +76,25 @@ defmodule Astarte.Pairing.Engine do

:telemetry.execute([:astarte, :pairing, :get_credentials], %{}, %{realm: realm})

cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true),
{:ok, ip_tuple} <- parse_ip(device_ip),
{:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
{:ok, device_row} <- Queries.select_device_for_credentials_request(client, device_id),
{:ok, device_row} <- Queries.select_device_for_credentials_request(realm, device_id),
{:authorized?, true} <-
{:authorized?,
CredentialsSecret.verify(credentials_secret, device_row[:credentials_secret])},
CredentialsSecret.verify(credentials_secret, device_row["credentials_secret"])},
{:credentials_inhibited?, false} <-
{:credentials_inhibited?, device_row[:inhibit_credentials_request]},
_ <- CFSSLCredentials.revoke(device_row[:cert_serial], device_row[:cert_aki]),
{:credentials_inhibited?, device_row["inhibit_credentials_request"]},
_ <- CFSSLCredentials.revoke(device_row["cert_serial"], device_row["cert_aki"]),
encoded_device_id <- Device.encode_device_id(device_id),
{:ok, %{cert: cert, aki: _aki, serial: _serial} = cert_data} <-
CFSSLCredentials.get_certificate(csr, realm, encoded_device_id),
:ok <-
Queries.update_device_after_credentials_request(
client,
realm,
device_id,
cert_data,
ip_tuple,
device_row[:first_credentials_request]
device_row["first_credentials_request"]
) do
{:ok, %{client_crt: cert}}
else
Expand Down Expand Up @@ -153,20 +130,11 @@ defmodule Astarte.Pairing.Engine do
def get_info(realm, hardware_id, credentials_secret) do
Logger.debug("get_info request for device #{inspect(hardware_id)} in realm #{inspect(realm)}")

cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true),
{:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
{:ok, device_row} <- Queries.select_device_for_info(client, device_id),
{:ok, device_row} <- Queries.select_device_for_info(realm, device_id),
{:authorized?, true} <-
{:authorized?,
CredentialsSecret.verify(credentials_secret, device_row[:credentials_secret])} do
CredentialsSecret.verify(credentials_secret, device_row["credentials_secret"])} do
device_status = device_status_string(device_row)
protocols = get_protocol_info()

Expand All @@ -193,19 +161,10 @@ defmodule Astarte.Pairing.Engine do

:telemetry.execute([:astarte, :pairing, :register_new_device], %{}, %{realm: realm})

cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true),
{:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
credentials_secret <- CredentialsSecret.generate(),
secret_hash <- CredentialsSecret.hash(credentials_secret),
:ok <- Queries.register_device(client, device_id, hardware_id, secret_hash, opts) do
:ok <- Queries.register_device(realm, device_id, hardware_id, secret_hash, opts) do
{:ok, credentials_secret}
else
{:error, :shutdown} ->
Expand All @@ -222,17 +181,8 @@ defmodule Astarte.Pairing.Engine do
"in realm #{inspect(realm)}"
)

cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, device_id} <- Device.decode_device_id(encoded_device_id),
{:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
:ok <- Queries.unregister_device(client, device_id) do
:ok <- Queries.unregister_device(realm, device_id) do
:ok
else
{:error, :shutdown} ->
Expand All @@ -248,19 +198,10 @@ defmodule Astarte.Pairing.Engine do
"verify_credentials request for device #{inspect(hardware_id)} in realm #{inspect(realm)}"
)

cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true),
{:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
{:ok, device_row} <- Queries.select_device_for_verify_credentials(client, device_id),
{:ok, device_row} <- Queries.select_device_for_verify_credentials(realm, device_id),
{:authorized?, true} <-
{:authorized?, CredentialsSecret.verify(secret, device_row[:credentials_secret])} do
{:authorized?, CredentialsSecret.verify(secret, device_row["credentials_secret"])} do
CertVerifier.verify(client_crt, Config.ca_cert!())
else
{:authorized?, false} ->
Expand All @@ -286,16 +227,12 @@ defmodule Astarte.Pairing.Engine do
end

defp device_status_string(device_row) do
# The device is pending until the first credendtial request
cond do
Keyword.get(device_row, :inhibit_credentials_request) ->
"inhibited"

Keyword.get(device_row, :first_credentials_request) ->
"confirmed"
# The device is pending until the first credential request

true ->
"pending"
case device_row do
%{"inhibit_credentials_request" => true} -> "inhibited"
%{"first_credentials_request" => nil} -> "pending"
%{} -> "confirmed"
end
end

Expand Down
Loading
Loading