Skip to content

Commit

Permalink
[psa_switch] first pass at i2e cloning (#935)
Browse files Browse the repository at this point in the history
* first pass at i2e cloning with mirroring sessions

* add support for mirroring sessions to psa switch thrift server so they can be called via stf;
deprecate unused mirroring commands in psa switch thrift server;

* gitignore vim swapfiles

* addressing code review comments with better class_of_service metadata handling in psa switch ingress

* add name argument to parse_int calls that didnt previously have it

* address code review comments:
use auto for variables initialized with values;
update an unclear comment about ingress cloning;
add debug message for when ingress cloning happens on a clone session id thats unconfigured;
  • Loading branch information
peteli3 authored Aug 31, 2020
1 parent 5f9ad70 commit 51dae31
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 74 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
*.out
*.app

# Python byet code
# Python byte code
*.pyc

build/*
lib*

# Vim
*.swp

# Emacs
*~

Expand Down
181 changes: 141 additions & 40 deletions targets/psa_switch/psa_switch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,50 @@ namespace bm {

namespace psa {

static constexpr uint16_t MAX_MIRROR_SESSION_ID = (1u << 15) - 1;
packet_id_t PsaSwitch::packet_id = 0;

class PsaSwitch::MirroringSessions {
public:
bool add_session(mirror_id_t mirror_id,
const MirroringSessionConfig &config) {
Lock lock(mutex);
if (0 <= mirror_id && mirror_id <= MAX_MIRROR_SESSION_ID) {
sessions_map[mirror_id] = config;
return true;
} else {
bm::Logger::get()->error("mirror_id out of range. No session added.");
return false;
}
}

bool delete_session(mirror_id_t mirror_id) {
Lock lock(mutex);
if (0 <= mirror_id && mirror_id <= MAX_MIRROR_SESSION_ID) {
return sessions_map.erase(mirror_id) == 1;
} else {
bm::Logger::get()->error("mirror_id out of range. No session deleted.");
return false;
}
}

bool get_session(mirror_id_t mirror_id,
MirroringSessionConfig *config) const {
Lock lock(mutex);
auto it = sessions_map.find(mirror_id);
if (it == sessions_map.end()) return false;
*config = it->second;
return true;
}

private:
using Mutex = std::mutex;
using Lock = std::lock_guard<Mutex>;

mutable std::mutex mutex;
std::unordered_map<mirror_id_t, MirroringSessionConfig> sessions_map;
};

PsaSwitch::PsaSwitch(bool enable_swap)
: Switch(enable_swap),
input_buffer(1024),
Expand All @@ -92,7 +134,8 @@ PsaSwitch::PsaSwitch(bool enable_swap)
this->transmit_fn(port_num, buffer, len);
}),
pre(new McSimplePreLAG()),
start(clock::now()) {
start(clock::now()),
mirroring_sessions(new MirroringSessions()) {
add_component<McSimplePreLAG>(pre);

add_required_field("psa_ingress_parser_input_metadata", "ingress_port");
Expand Down Expand Up @@ -151,7 +194,7 @@ PsaSwitch::receive_(port_t port_num, const char *buffer, int len) {
bm::PacketBuffer(len + 512, buffer, len));

BMELOG(packet_in, *packet);
PHV *phv = packet->get_phv();
auto *phv = packet->get_phv();

// many current p4 programs assume this
// from psa spec - PSA does not mandate initialization of user-defined
Expand Down Expand Up @@ -200,6 +243,23 @@ PsaSwitch::reset_target_state_() {
get_component<McSimplePreLAG>()->reset_state();
}

bool
PsaSwitch::mirroring_add_session(mirror_id_t mirror_id,
const MirroringSessionConfig &config) {
return mirroring_sessions->add_session(mirror_id, config);
}

bool
PsaSwitch::mirroring_delete_session(mirror_id_t mirror_id) {
return mirroring_sessions->delete_session(mirror_id);
}

bool
PsaSwitch::mirroring_get_session(mirror_id_t mirror_id,
MirroringSessionConfig *config) const {
return mirroring_sessions->get_session(mirror_id, config);
}

int
PsaSwitch::set_egress_queue_depth(size_t port, const size_t depth_pkts) {
egress_buffers.set_capacity(port, depth_pkts);
Expand Down Expand Up @@ -266,7 +326,7 @@ PsaSwitch::enqueue(port_t egress_port, std::unique_ptr<Packet> &&packet) {
packet->set_egress_port(egress_port);

#ifdef SSWITCH_PRIORITY_QUEUEING_ON
size_t priority = phv->has_field(SSWITCH_PRIORITY_QUEUEING_SRC) ?
auto priority = phv->has_field(SSWITCH_PRIORITY_QUEUEING_SRC) ?
phv->get_field(SSWITCH_PRIORITY_QUEUEING_SRC).get<size_t>() : 0u;
if (priority >= SSWITCH_PRIORITY_QUEUEING_NB_QUEUES) {
bm::Logger::get()->error("Priority out of range, dropping packet");
Expand All @@ -280,6 +340,30 @@ PsaSwitch::enqueue(port_t egress_port, std::unique_ptr<Packet> &&packet) {
#endif
}

void
PsaSwitch::multicast(Packet *packet, unsigned int mgid, PktInstanceType path, unsigned int class_of_service) {
auto phv = packet->get_phv();
const auto pre_out = pre->replicate({mgid});
auto &f_eg_cos = phv->get_field("psa_egress_input_metadata.class_of_service");
auto &f_instance = phv->get_field("psa_egress_input_metadata.instance");
auto &f_packet_path = phv->get_field("psa_egress_parser_input_metadata.packet_path");
auto packet_size = packet->get_register(PACKET_LENGTH_REG_IDX);
for (const auto &out : pre_out) {
auto egress_port = out.egress_port;
auto instance = out.rid;
BMLOG_DEBUG_PKT(*packet,
"Replicating packet on port {} with instance {}",
egress_port, instance);
f_eg_cos.set(class_of_service);
f_instance.set(instance);
// TODO use appropriate enum member from JSON
f_packet_path.set(path);
std::unique_ptr<Packet> packet_copy = packet->clone_with_phv_ptr();
packet_copy->set_register(PACKET_LENGTH_REG_IDX, packet_size);
enqueue(egress_port, std::move(packet_copy));
}
}

void
PsaSwitch::ingress_thread() {
PHV *phv;
Expand All @@ -301,10 +385,9 @@ PsaSwitch::ingress_thread() {
bytes are parsed by the parser ("lifted" into p4 headers). Here, we
track the buffer_state prior to parsing so that we can put it back
for packets that are cloned or resubmitted, same as in simple_switch.cpp
TODO */

*/
const Packet::buffer_state_t packet_in_state = packet->save_buffer_state();
auto ingress_packet_size = packet->get_register(PACKET_LENGTH_REG_IDX);

// The PSA specification says that for all packets, whether they
// are new ones from a port, or resubmitted, or recirculated, the
Expand Down Expand Up @@ -337,17 +420,59 @@ PsaSwitch::ingress_thread() {
ingress_mau->apply(packet.get());
packet->reset_exit();

// prioritize dropping if marked as such - do not move below other checks
const auto &f_drop = phv->get_field("psa_ingress_output_metadata.drop");
if (f_drop.get_int()) {
const auto &f_ig_cos = phv->get_field("psa_ingress_output_metadata.class_of_service");
const auto ig_cos = f_ig_cos.get_uint();

// ingress cloning - each cloned packet is a copy of the packet as it entered the ingress parser
// - dropped packets should still be cloned - do not move below drop
auto clone = phv->get_field("psa_ingress_output_metadata.clone").get_uint();
if (clone) {
MirroringSessionConfig config;
auto clone_session_id = phv->get_field("psa_ingress_output_metadata.clone_session_id").get_uint();
auto is_session_configured = mirroring_get_session(static_cast<mirror_id_t>(clone_session_id), &config);

if (is_session_configured) {
BMLOG_DEBUG_PKT(*packet, "Cloning packet at ingress to session id {}", clone_session_id);
const Packet::buffer_state_t packet_out_state = packet->save_buffer_state();
packet->restore_buffer_state(packet_in_state);

std::unique_ptr<Packet> packet_copy = packet->clone_no_phv_ptr();
packet_copy->set_register(PACKET_LENGTH_REG_IDX, ingress_packet_size);
auto phv_copy = packet_copy->get_phv();
phv_copy->reset_metadata();
phv_copy->get_field("psa_egress_parser_input_metadata.packet_path").set(PACKET_PATH_CLONE_I2E);

if (config.mgid_valid) {
BMLOG_DEBUG_PKT(*packet_copy, "Cloning packet to multicast group {}", config.mgid);
// TODO 0 as the last arg (for class_of_service) is currently a placeholder
// implement cos into cloning session configs
multicast(packet_copy.get(), config.mgid, PACKET_PATH_CLONE_I2E, 0);
}

if (config.egress_port_valid) {
BMLOG_DEBUG_PKT(*packet_copy, "Cloning packet to egress port {}", config.egress_port);
enqueue(config.egress_port, std::move(packet_copy));
}

packet->restore_buffer_state(packet_out_state);
} else {
BMLOG_DEBUG_PKT(*packet,
"Cloning packet at ingress to unconfigured session id {} causes no clone packets to be created",
clone_session_id);
}
}

// drop - packets marked via the ingress_drop action
auto drop = phv->get_field("psa_ingress_output_metadata.drop").get_uint();
if (drop) {
BMLOG_DEBUG_PKT(*packet, "Dropping packet at the end of ingress");
continue;
}

// resubmit - these packets get immediately resub'd to ingress, and skip
// deparsing, do not move below multicast or deparse
const auto &f_resubmit = phv->get_field("psa_ingress_output_metadata.resubmit");
if (f_resubmit.get_int()) {
auto resubmit = phv->get_field("psa_ingress_output_metadata.resubmit").get_uint();
if (resubmit) {
BMLOG_DEBUG_PKT(*packet, "Resubmitting packet");

packet->restore_buffer_state(packet_in_state);
Expand All @@ -363,49 +488,25 @@ PsaSwitch::ingress_thread() {
deparser->deparse(packet.get());

auto &f_packet_path = phv->get_field("psa_egress_parser_input_metadata.packet_path");
const auto &f_ig_cos = phv->get_field("psa_ingress_output_metadata.class_of_service");
const auto ig_cos = f_ig_cos.get_uint();

// handling multicast
unsigned int mgid = 0u;
const auto &f_mgid = phv->get_field("psa_ingress_output_metadata.multicast_group");
mgid = f_mgid.get_uint();

auto mgid = phv->get_field("psa_ingress_output_metadata.multicast_group").get_uint();
if (mgid != 0) {
BMLOG_DEBUG_PKT(*packet,
"Multicast requested for packet with multicast group {}",
mgid);
const auto pre_out = pre->replicate({mgid});
auto &f_instance = phv->get_field("psa_egress_input_metadata.instance");
auto &f_eg_cos = phv->get_field("psa_egress_input_metadata.class_of_service");
auto packet_size = packet->get_register(PACKET_LENGTH_REG_IDX);
for(const auto &out : pre_out){
auto egress_port = out.egress_port;
auto instance = out.rid;
BMLOG_DEBUG_PKT(*packet,
"Replicating packet on port {} with instance {}",
egress_port, instance);
f_instance.set(instance);
// TODO use appropriate enum member from JSON
f_packet_path.set(PACKET_PATH_NORMAL_MULTICAST);
f_eg_cos.set(ig_cos);
std::unique_ptr<Packet> packet_copy = packet->clone_with_phv_ptr();
packet_copy->set_register(PACKET_LENGTH_REG_IDX, packet_size);
enqueue(egress_port, std::move(packet_copy));
}
multicast(packet.get(), mgid, PACKET_PATH_NORMAL_MULTICAST, ig_cos);
continue;
}

const auto &f_egress_port = phv->get_field("psa_ingress_output_metadata.egress_port");
auto &f_instance = phv->get_field("psa_egress_input_metadata.instance");
auto &f_eg_cos = phv->get_field("psa_egress_input_metadata.class_of_service");
port_t egress_port = f_egress_port.get_uint();
BMLOG_DEBUG_PKT(*packet, "Egress port is {}", egress_port);
f_instance.set(0);
// TODO use appropriate enum member from JSON
f_packet_path.set(PACKET_PATH_NORMAL_UNICAST);
f_eg_cos.set(ig_cos);

f_packet_path.set(PACKET_PATH_NORMAL_UNICAST);
auto egress_port = phv->get_field("psa_ingress_output_metadata.egress_port").get<port_t>();
BMLOG_DEBUG_PKT(*packet, "Egress port is {}", egress_port);
enqueue(egress_port, std::move(packet));
}
}
Expand Down
20 changes: 18 additions & 2 deletions targets/psa_switch/psa_switch.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ class PsaSwitch : public Switch {
using TransmitFn = std::function<void(port_t, packet_id_t,
const char *, int)>;

struct MirroringSessionConfig {
port_t egress_port;
bool egress_port_valid;
unsigned int mgid;
bool mgid_valid;
};

private:
using clock = std::chrono::high_resolution_clock;

Expand All @@ -80,15 +87,19 @@ class PsaSwitch : public Switch {

void reset_target_state_() override;

bool mirroring_add_session(mirror_id_t mirror_id,
const MirroringSessionConfig &config);
bool mirroring_delete_session(mirror_id_t mirror_id);
bool mirroring_get_session(mirror_id_t mirror_id,
MirroringSessionConfig *config) const;

int mirroring_mapping_add(mirror_id_t mirror_id, port_t egress_port) {
mirroring_map[mirror_id] = egress_port;
return 0;
}

int mirroring_mapping_delete(mirror_id_t mirror_id) {
return mirroring_map.erase(mirror_id);
}

bool mirroring_mapping_get(mirror_id_t mirror_id, port_t *port) const {
return get_mirroring_mapping(mirror_id, port);
}
Expand Down Expand Up @@ -205,6 +216,8 @@ class PsaSwitch : public Switch {
static constexpr port_t PSA_PORT_RECIRCULATE = 0xfffffffa;
static packet_id_t packet_id;

class MirroringSessions;

enum PktInstanceType {
PACKET_PATH_NORMAL,
PACKET_PATH_NORMAL_UNICAST,
Expand All @@ -231,6 +244,8 @@ class PsaSwitch : public Switch {
void egress_thread(size_t worker_id);
void transmit_thread();

void multicast(Packet *packet, unsigned int mgid, PktInstanceType path, unsigned int class_of_service);

bool get_mirroring_mapping(mirror_id_t mirror_id, port_t *port) const {
const auto it = mirroring_map.find(mirror_id);
if (it != mirroring_map.end()) {
Expand Down Expand Up @@ -261,6 +276,7 @@ class PsaSwitch : public Switch {
std::shared_ptr<McSimplePreLAG> pre;
clock::time_point start;
std::unordered_map<mirror_id_t, port_t> mirroring_map;
std::unique_ptr<MirroringSessions> mirroring_sessions;
bool with_queueing_metadata{false};
};

Expand Down
Loading

0 comments on commit 51dae31

Please sign in to comment.