Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[psa_switch] first pass at i2e cloning #935

Merged
merged 6 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
*.out
*.app

# Python byet code
# Python byte code
*.pyc

build/*
lib*

# Vim
*.swp

# Emacs
*~

Expand Down
175 changes: 137 additions & 38 deletions targets/psa_switch/psa_switch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,50 @@ namespace bm {

namespace psa {

static constexpr uint16_t MAX_MIRROR_SESSION_ID = (1u << 15) - 1;
packet_id_t PsaSwitch::packet_id = 0;

class PsaSwitch::MirroringSessions {
public:
bool add_session(mirror_id_t mirror_id,
const MirroringSessionConfig &config) {
Lock lock(mutex);
if (0 <= mirror_id && mirror_id <= MAX_MIRROR_SESSION_ID) {
sessions_map[mirror_id] = config;
return true;
} else {
bm::Logger::get()->error("mirror_id out of range. No session added.");
return false;
}
}

bool delete_session(mirror_id_t mirror_id) {
Lock lock(mutex);
if (0 <= mirror_id && mirror_id <= MAX_MIRROR_SESSION_ID) {
return sessions_map.erase(mirror_id) == 1;
} else {
bm::Logger::get()->error("mirror_id out of range. No session deleted.");
return false;
}
}

bool get_session(mirror_id_t mirror_id,
MirroringSessionConfig *config) const {
Lock lock(mutex);
auto it = sessions_map.find(mirror_id);
if (it == sessions_map.end()) return false;
*config = it->second;
return true;
}

private:
using Mutex = std::mutex;
using Lock = std::lock_guard<Mutex>;

mutable std::mutex mutex;
std::unordered_map<mirror_id_t, MirroringSessionConfig> sessions_map;
};

PsaSwitch::PsaSwitch(bool enable_swap)
: Switch(enable_swap),
input_buffer(1024),
Expand All @@ -92,7 +134,8 @@ PsaSwitch::PsaSwitch(bool enable_swap)
this->transmit_fn(port_num, buffer, len);
}),
pre(new McSimplePreLAG()),
start(clock::now()) {
start(clock::now()),
mirroring_sessions(new MirroringSessions()) {
add_component<McSimplePreLAG>(pre);

add_required_field("psa_ingress_parser_input_metadata", "ingress_port");
Expand Down Expand Up @@ -200,6 +243,23 @@ PsaSwitch::reset_target_state_() {
get_component<McSimplePreLAG>()->reset_state();
}

bool
PsaSwitch::mirroring_add_session(mirror_id_t mirror_id,
const MirroringSessionConfig &config) {
return mirroring_sessions->add_session(mirror_id, config);
}

bool
PsaSwitch::mirroring_delete_session(mirror_id_t mirror_id) {
return mirroring_sessions->delete_session(mirror_id);
}

bool
PsaSwitch::mirroring_get_session(mirror_id_t mirror_id,
MirroringSessionConfig *config) const {
return mirroring_sessions->get_session(mirror_id, config);
}

int
PsaSwitch::set_egress_queue_depth(size_t port, const size_t depth_pkts) {
egress_buffers.set_capacity(port, depth_pkts);
Expand Down Expand Up @@ -280,6 +340,30 @@ PsaSwitch::enqueue(port_t egress_port, std::unique_ptr<Packet> &&packet) {
#endif
}

void
PsaSwitch::multicast(Packet *packet, unsigned int mgid, PktInstanceType path, unsigned int class_of_service) {
auto phv = packet->get_phv();
const auto pre_out = pre->replicate({mgid});
auto &f_eg_cos = phv->get_field("psa_egress_input_metadata.class_of_service");
auto &f_instance = phv->get_field("psa_egress_input_metadata.instance");
auto &f_packet_path = phv->get_field("psa_egress_parser_input_metadata.packet_path");
auto packet_size = packet->get_register(PACKET_LENGTH_REG_IDX);
for (const auto &out : pre_out) {
auto egress_port = out.egress_port;
auto instance = out.rid;
BMLOG_DEBUG_PKT(*packet,
"Replicating packet on port {} with instance {}",
egress_port, instance);
f_eg_cos.set(class_of_service);
f_instance.set(instance);
// TODO use appropriate enum member from JSON
f_packet_path.set(path);
std::unique_ptr<Packet> packet_copy = packet->clone_with_phv_ptr();
packet_copy->set_register(PACKET_LENGTH_REG_IDX, packet_size);
enqueue(egress_port, std::move(packet_copy));
}
}

void
PsaSwitch::ingress_thread() {
PHV *phv;
Expand All @@ -301,10 +385,9 @@ PsaSwitch::ingress_thread() {
bytes are parsed by the parser ("lifted" into p4 headers). Here, we
track the buffer_state prior to parsing so that we can put it back
for packets that are cloned or resubmitted, same as in simple_switch.cpp

TODO */

*/
const Packet::buffer_state_t packet_in_state = packet->save_buffer_state();
auto ingress_packet_size = packet->get_register(PACKET_LENGTH_REG_IDX);

// The PSA specification says that for all packets, whether they
// are new ones from a port, or resubmitted, or recirculated, the
Expand Down Expand Up @@ -337,17 +420,57 @@ PsaSwitch::ingress_thread() {
ingress_mau->apply(packet.get());
packet->reset_exit();

// prioritize dropping if marked as such - do not move below other checks
const auto &f_drop = phv->get_field("psa_ingress_output_metadata.drop");
if (f_drop.get_int()) {
const auto &f_ig_cos = phv->get_field("psa_ingress_output_metadata.class_of_service");
const auto ig_cos = f_ig_cos.get_uint();

// ingress cloning - sent in their pre-ingress-thread state to either an egress
// port or a multicast group
peteli3 marked this conversation as resolved.
Show resolved Hide resolved
// - dropped packets should still be cloned - do not move below drop
unsigned int clone = phv->get_field("psa_ingress_output_metadata.clone").get_uint();
if (clone) {
MirroringSessionConfig config;
unsigned int clone_session_id = phv->get_field("psa_ingress_output_metadata.clone_session_id").get_uint();
peteli3 marked this conversation as resolved.
Show resolved Hide resolved
bool is_session_configured = mirroring_get_session(
static_cast<mirror_id_t>(clone_session_id), &config);
BMLOG_DEBUG_PKT(*packet, "Cloning packet at ingress to session id {}", clone_session_id);
peteli3 marked this conversation as resolved.
Show resolved Hide resolved

if (is_session_configured) {
const Packet::buffer_state_t packet_out_state = packet->save_buffer_state();
packet->restore_buffer_state(packet_in_state);

std::unique_ptr<Packet> packet_copy = packet->clone_no_phv_ptr();
packet_copy->set_register(PACKET_LENGTH_REG_IDX, ingress_packet_size);
auto phv_copy = packet_copy->get_phv();
phv_copy->reset_metadata();
phv_copy->get_field("psa_egress_parser_input_metadata.packet_path").set(PACKET_PATH_CLONE_I2E);

if (config.mgid_valid) {
BMLOG_DEBUG_PKT(*packet_copy, "Cloning packet to multicast group {}", config.mgid);
// TODO 0 as the last arg (for class_of_service) is currently a placeholder
// implement cos into cloning session configs
multicast(packet_copy.get(), config.mgid, PACKET_PATH_CLONE_I2E, 0);
}

if (config.egress_port_valid) {
BMLOG_DEBUG_PKT(*packet_copy, "Cloning packet to egress port {}", config.egress_port);
enqueue(config.egress_port, std::move(packet_copy));
}

packet->restore_buffer_state(packet_out_state);
}
}

// drop - packets marked via the ingress_drop action
unsigned int drop = phv->get_field("psa_ingress_output_metadata.drop").get_uint();
if (drop) {
BMLOG_DEBUG_PKT(*packet, "Dropping packet at the end of ingress");
continue;
}

// resubmit - these packets get immediately resub'd to ingress, and skip
// deparsing, do not move below multicast or deparse
const auto &f_resubmit = phv->get_field("psa_ingress_output_metadata.resubmit");
if (f_resubmit.get_int()) {
unsigned int resubmit = phv->get_field("psa_ingress_output_metadata.resubmit").get_uint();
peteli3 marked this conversation as resolved.
Show resolved Hide resolved
if (resubmit) {
BMLOG_DEBUG_PKT(*packet, "Resubmitting packet");

packet->restore_buffer_state(packet_in_state);
Expand All @@ -363,49 +486,25 @@ PsaSwitch::ingress_thread() {
deparser->deparse(packet.get());

auto &f_packet_path = phv->get_field("psa_egress_parser_input_metadata.packet_path");
const auto &f_ig_cos = phv->get_field("psa_ingress_output_metadata.class_of_service");
const auto ig_cos = f_ig_cos.get_uint();

// handling multicast
unsigned int mgid = 0u;
const auto &f_mgid = phv->get_field("psa_ingress_output_metadata.multicast_group");
mgid = f_mgid.get_uint();

unsigned int mgid = phv->get_field("psa_ingress_output_metadata.multicast_group").get_uint();
peteli3 marked this conversation as resolved.
Show resolved Hide resolved
if (mgid != 0) {
BMLOG_DEBUG_PKT(*packet,
"Multicast requested for packet with multicast group {}",
mgid);
const auto pre_out = pre->replicate({mgid});
auto &f_instance = phv->get_field("psa_egress_input_metadata.instance");
auto &f_eg_cos = phv->get_field("psa_egress_input_metadata.class_of_service");
auto packet_size = packet->get_register(PACKET_LENGTH_REG_IDX);
for(const auto &out : pre_out){
auto egress_port = out.egress_port;
auto instance = out.rid;
BMLOG_DEBUG_PKT(*packet,
"Replicating packet on port {} with instance {}",
egress_port, instance);
f_instance.set(instance);
// TODO use appropriate enum member from JSON
f_packet_path.set(PACKET_PATH_NORMAL_MULTICAST);
f_eg_cos.set(ig_cos);
std::unique_ptr<Packet> packet_copy = packet->clone_with_phv_ptr();
packet_copy->set_register(PACKET_LENGTH_REG_IDX, packet_size);
enqueue(egress_port, std::move(packet_copy));
}
multicast(packet.get(), mgid, PACKET_PATH_NORMAL_MULTICAST, ig_cos);
continue;
}

const auto &f_egress_port = phv->get_field("psa_ingress_output_metadata.egress_port");
auto &f_instance = phv->get_field("psa_egress_input_metadata.instance");
auto &f_eg_cos = phv->get_field("psa_egress_input_metadata.class_of_service");
port_t egress_port = f_egress_port.get_uint();
BMLOG_DEBUG_PKT(*packet, "Egress port is {}", egress_port);
f_instance.set(0);
// TODO use appropriate enum member from JSON
f_packet_path.set(PACKET_PATH_NORMAL_UNICAST);
f_eg_cos.set(ig_cos);

f_packet_path.set(PACKET_PATH_NORMAL_UNICAST);
port_t egress_port = phv->get_field("psa_ingress_output_metadata.egress_port").get_uint();
peteli3 marked this conversation as resolved.
Show resolved Hide resolved
BMLOG_DEBUG_PKT(*packet, "Egress port is {}", egress_port);
enqueue(egress_port, std::move(packet));
}
}
Expand Down
20 changes: 18 additions & 2 deletions targets/psa_switch/psa_switch.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ class PsaSwitch : public Switch {
using TransmitFn = std::function<void(port_t, packet_id_t,
const char *, int)>;

struct MirroringSessionConfig {
port_t egress_port;
bool egress_port_valid;
unsigned int mgid;
bool mgid_valid;
};

private:
using clock = std::chrono::high_resolution_clock;

Expand All @@ -80,15 +87,19 @@ class PsaSwitch : public Switch {

void reset_target_state_() override;

bool mirroring_add_session(mirror_id_t mirror_id,
const MirroringSessionConfig &config);
bool mirroring_delete_session(mirror_id_t mirror_id);
bool mirroring_get_session(mirror_id_t mirror_id,
MirroringSessionConfig *config) const;

int mirroring_mapping_add(mirror_id_t mirror_id, port_t egress_port) {
mirroring_map[mirror_id] = egress_port;
return 0;
}

int mirroring_mapping_delete(mirror_id_t mirror_id) {
return mirroring_map.erase(mirror_id);
}

bool mirroring_mapping_get(mirror_id_t mirror_id, port_t *port) const {
return get_mirroring_mapping(mirror_id, port);
}
Expand Down Expand Up @@ -205,6 +216,8 @@ class PsaSwitch : public Switch {
static constexpr port_t PSA_PORT_RECIRCULATE = 0xfffffffa;
static packet_id_t packet_id;

class MirroringSessions;

enum PktInstanceType {
PACKET_PATH_NORMAL,
PACKET_PATH_NORMAL_UNICAST,
Expand All @@ -231,6 +244,8 @@ class PsaSwitch : public Switch {
void egress_thread(size_t worker_id);
void transmit_thread();

void multicast(Packet *packet, unsigned int mgid, PktInstanceType path, unsigned int class_of_service);

bool get_mirroring_mapping(mirror_id_t mirror_id, port_t *port) const {
const auto it = mirroring_map.find(mirror_id);
if (it != mirroring_map.end()) {
Expand Down Expand Up @@ -261,6 +276,7 @@ class PsaSwitch : public Switch {
std::shared_ptr<McSimplePreLAG> pre;
clock::time_point start;
std::unordered_map<mirror_id_t, port_t> mirroring_map;
std::unique_ptr<MirroringSessions> mirroring_sessions;
bool with_queueing_metadata{false};
};

Expand Down
Loading