From 51dae313390e3613f37fb8233ad493a306c0613b Mon Sep 17 00:00:00 2001 From: "Peter J. Li" Date: Mon, 31 Aug 2020 12:13:14 -0700 Subject: [PATCH] [psa_switch] first pass at i2e cloning (#935) * first pass at i2e cloning with mirroring sessions * add support for mirroring sessions to psa switch thrift server so they can be called via stf; deprecate unused mirroring commands in psa switch thrift server; * gitignore vim swapfiles * addressing code review comments with better class_of_service metadata handling in psa switch ingress * add name argument to parse_int calls that didnt previously have it * address code review comments: use auto for variables initialized with values; update an unclear comment about ingress cloning; add debug message for when ingress cloning happens on a clone session id thats unconfigured; --- .gitignore | 5 +- targets/psa_switch/psa_switch.cpp | 181 ++++++++++++++---- targets/psa_switch/psa_switch.h | 20 +- targets/psa_switch/pswitch_CLI.py | 57 +++++- targets/psa_switch/thrift/psa_switch.thrift | 22 ++- .../thrift/src/PsaSwitch_server.cpp | 45 +++-- targets/simple_switch/sswitch_CLI.py | 6 - tools/runtime_CLI.py | 8 +- 8 files changed, 270 insertions(+), 74 deletions(-) diff --git a/.gitignore b/.gitignore index 04181dadb..7c762859b 100644 --- a/.gitignore +++ b/.gitignore @@ -27,12 +27,15 @@ *.out *.app -# Python byet code +# Python byte code *.pyc build/* lib* +# Vim +*.swp + # Emacs *~ diff --git a/targets/psa_switch/psa_switch.cpp b/targets/psa_switch/psa_switch.cpp index 9ccf70c65..1d8fade9f 100644 --- a/targets/psa_switch/psa_switch.cpp +++ b/targets/psa_switch/psa_switch.cpp @@ -70,8 +70,50 @@ namespace bm { namespace psa { +static constexpr uint16_t MAX_MIRROR_SESSION_ID = (1u << 15) - 1; packet_id_t PsaSwitch::packet_id = 0; +class PsaSwitch::MirroringSessions { + public: + bool add_session(mirror_id_t mirror_id, + const MirroringSessionConfig &config) { + Lock lock(mutex); + if (0 <= mirror_id && mirror_id <= MAX_MIRROR_SESSION_ID) { + sessions_map[mirror_id] = config; + return true; + } else { + bm::Logger::get()->error("mirror_id out of range. No session added."); + return false; + } + } + + bool delete_session(mirror_id_t mirror_id) { + Lock lock(mutex); + if (0 <= mirror_id && mirror_id <= MAX_MIRROR_SESSION_ID) { + return sessions_map.erase(mirror_id) == 1; + } else { + bm::Logger::get()->error("mirror_id out of range. No session deleted."); + return false; + } + } + + bool get_session(mirror_id_t mirror_id, + MirroringSessionConfig *config) const { + Lock lock(mutex); + auto it = sessions_map.find(mirror_id); + if (it == sessions_map.end()) return false; + *config = it->second; + return true; + } + + private: + using Mutex = std::mutex; + using Lock = std::lock_guard; + + mutable std::mutex mutex; + std::unordered_map sessions_map; +}; + PsaSwitch::PsaSwitch(bool enable_swap) : Switch(enable_swap), input_buffer(1024), @@ -92,7 +134,8 @@ PsaSwitch::PsaSwitch(bool enable_swap) this->transmit_fn(port_num, buffer, len); }), pre(new McSimplePreLAG()), - start(clock::now()) { + start(clock::now()), + mirroring_sessions(new MirroringSessions()) { add_component(pre); add_required_field("psa_ingress_parser_input_metadata", "ingress_port"); @@ -151,7 +194,7 @@ PsaSwitch::receive_(port_t port_num, const char *buffer, int len) { bm::PacketBuffer(len + 512, buffer, len)); BMELOG(packet_in, *packet); - PHV *phv = packet->get_phv(); + auto *phv = packet->get_phv(); // many current p4 programs assume this // from psa spec - PSA does not mandate initialization of user-defined @@ -200,6 +243,23 @@ PsaSwitch::reset_target_state_() { get_component()->reset_state(); } +bool +PsaSwitch::mirroring_add_session(mirror_id_t mirror_id, + const MirroringSessionConfig &config) { + return mirroring_sessions->add_session(mirror_id, config); +} + +bool +PsaSwitch::mirroring_delete_session(mirror_id_t mirror_id) { + return mirroring_sessions->delete_session(mirror_id); +} + +bool +PsaSwitch::mirroring_get_session(mirror_id_t mirror_id, + MirroringSessionConfig *config) const { + return mirroring_sessions->get_session(mirror_id, config); +} + int PsaSwitch::set_egress_queue_depth(size_t port, const size_t depth_pkts) { egress_buffers.set_capacity(port, depth_pkts); @@ -266,7 +326,7 @@ PsaSwitch::enqueue(port_t egress_port, std::unique_ptr &&packet) { packet->set_egress_port(egress_port); #ifdef SSWITCH_PRIORITY_QUEUEING_ON - size_t priority = phv->has_field(SSWITCH_PRIORITY_QUEUEING_SRC) ? + auto priority = phv->has_field(SSWITCH_PRIORITY_QUEUEING_SRC) ? phv->get_field(SSWITCH_PRIORITY_QUEUEING_SRC).get() : 0u; if (priority >= SSWITCH_PRIORITY_QUEUEING_NB_QUEUES) { bm::Logger::get()->error("Priority out of range, dropping packet"); @@ -280,6 +340,30 @@ PsaSwitch::enqueue(port_t egress_port, std::unique_ptr &&packet) { #endif } +void +PsaSwitch::multicast(Packet *packet, unsigned int mgid, PktInstanceType path, unsigned int class_of_service) { + auto phv = packet->get_phv(); + const auto pre_out = pre->replicate({mgid}); + auto &f_eg_cos = phv->get_field("psa_egress_input_metadata.class_of_service"); + auto &f_instance = phv->get_field("psa_egress_input_metadata.instance"); + auto &f_packet_path = phv->get_field("psa_egress_parser_input_metadata.packet_path"); + auto packet_size = packet->get_register(PACKET_LENGTH_REG_IDX); + for (const auto &out : pre_out) { + auto egress_port = out.egress_port; + auto instance = out.rid; + BMLOG_DEBUG_PKT(*packet, + "Replicating packet on port {} with instance {}", + egress_port, instance); + f_eg_cos.set(class_of_service); + f_instance.set(instance); + // TODO use appropriate enum member from JSON + f_packet_path.set(path); + std::unique_ptr packet_copy = packet->clone_with_phv_ptr(); + packet_copy->set_register(PACKET_LENGTH_REG_IDX, packet_size); + enqueue(egress_port, std::move(packet_copy)); + } +} + void PsaSwitch::ingress_thread() { PHV *phv; @@ -301,10 +385,9 @@ PsaSwitch::ingress_thread() { bytes are parsed by the parser ("lifted" into p4 headers). Here, we track the buffer_state prior to parsing so that we can put it back for packets that are cloned or resubmitted, same as in simple_switch.cpp - - TODO */ - + */ const Packet::buffer_state_t packet_in_state = packet->save_buffer_state(); + auto ingress_packet_size = packet->get_register(PACKET_LENGTH_REG_IDX); // The PSA specification says that for all packets, whether they // are new ones from a port, or resubmitted, or recirculated, the @@ -337,17 +420,59 @@ PsaSwitch::ingress_thread() { ingress_mau->apply(packet.get()); packet->reset_exit(); - // prioritize dropping if marked as such - do not move below other checks - const auto &f_drop = phv->get_field("psa_ingress_output_metadata.drop"); - if (f_drop.get_int()) { + const auto &f_ig_cos = phv->get_field("psa_ingress_output_metadata.class_of_service"); + const auto ig_cos = f_ig_cos.get_uint(); + + // ingress cloning - each cloned packet is a copy of the packet as it entered the ingress parser + // - dropped packets should still be cloned - do not move below drop + auto clone = phv->get_field("psa_ingress_output_metadata.clone").get_uint(); + if (clone) { + MirroringSessionConfig config; + auto clone_session_id = phv->get_field("psa_ingress_output_metadata.clone_session_id").get_uint(); + auto is_session_configured = mirroring_get_session(static_cast(clone_session_id), &config); + + if (is_session_configured) { + BMLOG_DEBUG_PKT(*packet, "Cloning packet at ingress to session id {}", clone_session_id); + const Packet::buffer_state_t packet_out_state = packet->save_buffer_state(); + packet->restore_buffer_state(packet_in_state); + + std::unique_ptr packet_copy = packet->clone_no_phv_ptr(); + packet_copy->set_register(PACKET_LENGTH_REG_IDX, ingress_packet_size); + auto phv_copy = packet_copy->get_phv(); + phv_copy->reset_metadata(); + phv_copy->get_field("psa_egress_parser_input_metadata.packet_path").set(PACKET_PATH_CLONE_I2E); + + if (config.mgid_valid) { + BMLOG_DEBUG_PKT(*packet_copy, "Cloning packet to multicast group {}", config.mgid); + // TODO 0 as the last arg (for class_of_service) is currently a placeholder + // implement cos into cloning session configs + multicast(packet_copy.get(), config.mgid, PACKET_PATH_CLONE_I2E, 0); + } + + if (config.egress_port_valid) { + BMLOG_DEBUG_PKT(*packet_copy, "Cloning packet to egress port {}", config.egress_port); + enqueue(config.egress_port, std::move(packet_copy)); + } + + packet->restore_buffer_state(packet_out_state); + } else { + BMLOG_DEBUG_PKT(*packet, + "Cloning packet at ingress to unconfigured session id {} causes no clone packets to be created", + clone_session_id); + } + } + + // drop - packets marked via the ingress_drop action + auto drop = phv->get_field("psa_ingress_output_metadata.drop").get_uint(); + if (drop) { BMLOG_DEBUG_PKT(*packet, "Dropping packet at the end of ingress"); continue; } // resubmit - these packets get immediately resub'd to ingress, and skip // deparsing, do not move below multicast or deparse - const auto &f_resubmit = phv->get_field("psa_ingress_output_metadata.resubmit"); - if (f_resubmit.get_int()) { + auto resubmit = phv->get_field("psa_ingress_output_metadata.resubmit").get_uint(); + if (resubmit) { BMLOG_DEBUG_PKT(*packet, "Resubmitting packet"); packet->restore_buffer_state(packet_in_state); @@ -363,49 +488,25 @@ PsaSwitch::ingress_thread() { deparser->deparse(packet.get()); auto &f_packet_path = phv->get_field("psa_egress_parser_input_metadata.packet_path"); - const auto &f_ig_cos = phv->get_field("psa_ingress_output_metadata.class_of_service"); - const auto ig_cos = f_ig_cos.get_uint(); - - // handling multicast - unsigned int mgid = 0u; - const auto &f_mgid = phv->get_field("psa_ingress_output_metadata.multicast_group"); - mgid = f_mgid.get_uint(); + auto mgid = phv->get_field("psa_ingress_output_metadata.multicast_group").get_uint(); if (mgid != 0) { BMLOG_DEBUG_PKT(*packet, "Multicast requested for packet with multicast group {}", mgid); - const auto pre_out = pre->replicate({mgid}); - auto &f_instance = phv->get_field("psa_egress_input_metadata.instance"); - auto &f_eg_cos = phv->get_field("psa_egress_input_metadata.class_of_service"); - auto packet_size = packet->get_register(PACKET_LENGTH_REG_IDX); - for(const auto &out : pre_out){ - auto egress_port = out.egress_port; - auto instance = out.rid; - BMLOG_DEBUG_PKT(*packet, - "Replicating packet on port {} with instance {}", - egress_port, instance); - f_instance.set(instance); - // TODO use appropriate enum member from JSON - f_packet_path.set(PACKET_PATH_NORMAL_MULTICAST); - f_eg_cos.set(ig_cos); - std::unique_ptr packet_copy = packet->clone_with_phv_ptr(); - packet_copy->set_register(PACKET_LENGTH_REG_IDX, packet_size); - enqueue(egress_port, std::move(packet_copy)); - } + multicast(packet.get(), mgid, PACKET_PATH_NORMAL_MULTICAST, ig_cos); continue; } - const auto &f_egress_port = phv->get_field("psa_ingress_output_metadata.egress_port"); auto &f_instance = phv->get_field("psa_egress_input_metadata.instance"); auto &f_eg_cos = phv->get_field("psa_egress_input_metadata.class_of_service"); - port_t egress_port = f_egress_port.get_uint(); - BMLOG_DEBUG_PKT(*packet, "Egress port is {}", egress_port); f_instance.set(0); // TODO use appropriate enum member from JSON - f_packet_path.set(PACKET_PATH_NORMAL_UNICAST); f_eg_cos.set(ig_cos); + f_packet_path.set(PACKET_PATH_NORMAL_UNICAST); + auto egress_port = phv->get_field("psa_ingress_output_metadata.egress_port").get(); + BMLOG_DEBUG_PKT(*packet, "Egress port is {}", egress_port); enqueue(egress_port, std::move(packet)); } } diff --git a/targets/psa_switch/psa_switch.h b/targets/psa_switch/psa_switch.h index 21f11516c..c7e50bf6f 100644 --- a/targets/psa_switch/psa_switch.h +++ b/targets/psa_switch/psa_switch.h @@ -65,6 +65,13 @@ class PsaSwitch : public Switch { using TransmitFn = std::function; + struct MirroringSessionConfig { + port_t egress_port; + bool egress_port_valid; + unsigned int mgid; + bool mgid_valid; + }; + private: using clock = std::chrono::high_resolution_clock; @@ -80,15 +87,19 @@ class PsaSwitch : public Switch { void reset_target_state_() override; + bool mirroring_add_session(mirror_id_t mirror_id, + const MirroringSessionConfig &config); + bool mirroring_delete_session(mirror_id_t mirror_id); + bool mirroring_get_session(mirror_id_t mirror_id, + MirroringSessionConfig *config) const; + int mirroring_mapping_add(mirror_id_t mirror_id, port_t egress_port) { mirroring_map[mirror_id] = egress_port; return 0; } - int mirroring_mapping_delete(mirror_id_t mirror_id) { return mirroring_map.erase(mirror_id); } - bool mirroring_mapping_get(mirror_id_t mirror_id, port_t *port) const { return get_mirroring_mapping(mirror_id, port); } @@ -205,6 +216,8 @@ class PsaSwitch : public Switch { static constexpr port_t PSA_PORT_RECIRCULATE = 0xfffffffa; static packet_id_t packet_id; + class MirroringSessions; + enum PktInstanceType { PACKET_PATH_NORMAL, PACKET_PATH_NORMAL_UNICAST, @@ -231,6 +244,8 @@ class PsaSwitch : public Switch { void egress_thread(size_t worker_id); void transmit_thread(); + void multicast(Packet *packet, unsigned int mgid, PktInstanceType path, unsigned int class_of_service); + bool get_mirroring_mapping(mirror_id_t mirror_id, port_t *port) const { const auto it = mirroring_map.find(mirror_id); if (it != mirroring_map.end()) { @@ -261,6 +276,7 @@ class PsaSwitch : public Switch { std::shared_ptr pre; clock::time_point start; std::unordered_map mirroring_map; + std::unique_ptr mirroring_sessions; bool with_queueing_metadata{false}; }; diff --git a/targets/psa_switch/pswitch_CLI.py b/targets/psa_switch/pswitch_CLI.py index 417a564f6..d4af47d71 100644 --- a/targets/psa_switch/pswitch_CLI.py +++ b/targets/psa_switch/pswitch_CLI.py @@ -22,10 +22,23 @@ import runtime_CLI +from functools import wraps import sys import os from pswitch_runtime import PsaSwitch +from pswitch_runtime.ttypes import MirroringSessionConfig + +def handle_bad_input(f): + @wraps(f) + @runtime_CLI.handle_bad_input + def handle(*args, **kwargs): + try: + return f(*args, **kwargs) + except InvalidMirroringOperation as e: + error = MirroringOperationErrorCode._VALUES_TO_NAMES[e.code] + print "Invalid mirroring operation (%s)" % error + return handle class PsaSwitchAPI(runtime_CLI.RuntimeAPI): @staticmethod @@ -37,41 +50,69 @@ def __init__(self, pre_type, standard_client, mc_client, pswitch_client): standard_client, mc_client) self.pswitch_client = pswitch_client + @handle_bad_input def do_set_queue_depth(self, line): "Set depth of one / all egress queue(s): set_queue_depth []" args = line.split() - depth = int(args[0]) + depth = self.parse_int(args[0], "nb_pkts") if len(args) > 1: - port = int(args[1]) + port = self.parse_int(args[1], "egress_port") self.pswitch_client.set_egress_queue_depth(port, depth) else: self.pswitch_client.set_all_egress_queue_depths(depth) + @handle_bad_input def do_set_queue_rate(self, line): "Set rate of one / all egress queue(s): set_queue_rate []" args = line.split() - rate = int(args[0]) + rate = self.parse_int(args[0], "rate_pps") if len(args) > 1: - port = int(args[1]) + port = self.parse_int(args[1], "egress_port") self.pswitch_client.set_egress_queue_rate(port, rate) else: self.pswitch_client.set_all_egress_queue_rates(rate) + @handle_bad_input def do_mirroring_add(self, line): "Add mirroring mapping: mirroring_add " args = line.split() - mirror_id, egress_port = int(args[0]), int(args[1]) - self.pswitch_client.mirroring_mapping_add(mirror_id, egress_port) - + mirror_id = self.parse_int(args[0], "mirror_id") + egress_port = self.parse_int(args[1], "egress_port") + config = MirroringSessionConfig(port=egress_port) + self.pswitch_client.mirroring_session_add(mirror_id, config) + + @handle_bad_input + def do_mirroring_add_mc(self, line): + "Add mirroring session to multicast group: mirroring_add_mc " + args = line.split() + self.exactly_n_args(args, 2) + mirror_id = self.parse_int(args[0], "mirror_id") + mgrp = self.parse_int(args[1], "mgrp") + config = MirroringSessionConfig(mgid=mgrp) + self.pswitch_client.mirroring_session_add(mirror_id, config) + print "Associating multicast group", mgrp, "to mirroring session", mirror_id + + @handle_bad_input def do_mirroring_delete(self, line): "Delete mirroring mapping: mirroring_delete " - mirror_id = int(line) + mirror_id = self.parse_int(line, "mirror_id") self.pswitch_client.mirroring_mapping_delete(mirror_id) + @handle_bad_input + def do_mirroring_get(self, line): + "Display mirroring session: mirroring_get " + args = line.split() + self.exactly_n_args(args, 1) + mirror_id = self.parse_int(args[0], "mirror_id") + config = self.pswitch_client.mirroring_session_get(mirror_id) + print config + + @handle_bad_input def do_get_time_elapsed(self, line): "Get time elapsed (in microseconds) since the switch started: get_time_elapsed" print self.pswitch_client.get_time_elapsed_us() + @handle_bad_input def do_get_time_since_epoch(self, line): "Get time elapsed (in microseconds) since the switch clock's epoch: get_time_since_epoch" print self.pswitch_client.get_time_since_epoch_us() diff --git a/targets/psa_switch/thrift/psa_switch.thrift b/targets/psa_switch/thrift/psa_switch.thrift index f38612aca..c32043b10 100644 --- a/targets/psa_switch/thrift/psa_switch.thrift +++ b/targets/psa_switch/thrift/psa_switch.thrift @@ -21,11 +21,27 @@ namespace cpp pswitch_runtime namespace py pswitch_runtime +struct MirroringSessionConfig { + 1:optional i32 port; + 2:optional i32 mgid; +} + +enum MirroringOperationErrorCode { + SESSION_NOT_FOUND = 1, +} + +exception InvalidMirroringOperation { + 1:MirroringOperationErrorCode code; +} + service PsaSwitch { - i32 mirroring_mapping_add(1:i32 mirror_id, 2:i32 egress_port); - i32 mirroring_mapping_delete(1:i32 mirror_id); - i32 mirroring_mapping_get_egress_port(1:i32 mirror_id); + void mirroring_session_add(1:i32 mirror_id, 2:MirroringSessionConfig config) + throws (1:InvalidMirroringOperation ouch); + void mirroring_session_delete(1:i32 mirror_id) + throws (1:InvalidMirroringOperation ouch); + MirroringSessionConfig mirroring_session_get(1:i32 mirror_id) + throws (1:InvalidMirroringOperation ouch); i32 set_egress_queue_depth(1:i32 port_num, 2:i32 depth_pkts); i32 set_all_egress_queue_depths(1:i32 depth_pkts); diff --git a/targets/psa_switch/thrift/src/PsaSwitch_server.cpp b/targets/psa_switch/thrift/src/PsaSwitch_server.cpp index 6db6f754b..efe0d4872 100644 --- a/targets/psa_switch/thrift/src/PsaSwitch_server.cpp +++ b/targets/psa_switch/thrift/src/PsaSwitch_server.cpp @@ -43,24 +43,43 @@ class PsaSwitchHandler : virtual public PsaSwitchIf { explicit PsaSwitchHandler(PsaSwitch *sw) : switch_(sw) { } - int32_t mirroring_mapping_add(const int32_t mirror_id, - const int32_t egress_port) { - bm::Logger::get()->trace("mirroring_mapping_add"); - return switch_->mirroring_mapping_add(mirror_id, egress_port); + void mirroring_session_add(const int32_t mirror_id, + const MirroringSessionConfig &config) { + bm::Logger::get()->trace("mirroring_session_add"); + PsaSwitch::MirroringSessionConfig config_ = {}; // value-initialization + if (config.__isset.port) { + config_.egress_port = config.port; + config_.egress_port_valid = true; + } + if (config.__isset.mgid) { + config_.mgid = config.mgid; + config_.mgid_valid = true; + } + switch_->mirroring_add_session(mirror_id, config_); } - int32_t mirroring_mapping_delete(const int32_t mirror_id) { - bm::Logger::get()->trace("mirroring_mapping_delete"); - return switch_->mirroring_mapping_delete(mirror_id); + void mirroring_session_delete(const int32_t mirror_id) { + bm::Logger::get()->trace("mirroring_session_delete"); + auto session_found = switch_->mirroring_delete_session(mirror_id); + if (!session_found) { + InvalidMirroringOperation e; + e.code = MirroringOperationErrorCode::SESSION_NOT_FOUND; + throw e; + } } - int32_t mirroring_mapping_get_egress_port(const int32_t mirror_id) { - bm::Logger::get()->trace("mirroring_mapping_get_egress_port"); - bm::port_t port; - if (switch_->mirroring_mapping_get(mirror_id, &port)) { - return port; + void mirroring_session_get(MirroringSessionConfig& _return, + const int32_t mirror_id) { + bm::Logger::get()->trace("mirroring_session_get"); + PsaSwitch::MirroringSessionConfig config; + if (switch_->mirroring_get_session(mirror_id, &config)) { + if (config.egress_port_valid) _return.__set_port(config.egress_port); + if (config.mgid_valid) _return.__set_mgid(config.mgid); + } else { + InvalidMirroringOperation e; + e.code = MirroringOperationErrorCode::SESSION_NOT_FOUND; + throw e; } - return -1; } int32_t set_egress_queue_depth(const int32_t port_num, diff --git a/targets/simple_switch/sswitch_CLI.py b/targets/simple_switch/sswitch_CLI.py index e243209ea..0a754b975 100644 --- a/targets/simple_switch/sswitch_CLI.py +++ b/targets/simple_switch/sswitch_CLI.py @@ -51,12 +51,6 @@ def __init__(self, pre_type, standard_client, mc_client, sswitch_client): standard_client, mc_client) self.sswitch_client = sswitch_client - def parse_int(self, arg, name): - try: - return int(arg) - except: - raise UIn_Error("Bad format for {}, expected integer".format(name)) - @handle_bad_input def do_set_queue_depth(self, line): "Set depth of one / all egress queue(s): set_queue_depth []" diff --git a/tools/runtime_CLI.py b/tools/runtime_CLI.py index 794a2d2ed..ba74df66a 100755 --- a/tools/runtime_CLI.py +++ b/tools/runtime_CLI.py @@ -868,6 +868,12 @@ def exactly_n_args(self, args, n): "Wrong number of args, expected %d but got %d" % (n, len(args)) ) + def parse_int(self, arg, name): + try: + return int(arg) + except: + raise UIn_Error("Bad format for {}, expected integer".format(name)) + def _complete_res(self, array, text): res = sorted(array.keys()) if not text: @@ -1987,7 +1993,7 @@ def do_counter_write(self, line): else: self.client.bm_counter_write(0, counter_name, index, BmCounterValue(packets=pkts, bytes = byts)) print "%s[%d] has been updated" % (counter_name, index) - + def complete_counter_write(self, text, line, start_index, end_index): return self._complete_counters(text)