From 155e1e307df2a90f84f1a1783812791661022fa8 Mon Sep 17 00:00:00 2001 From: James Yin Date: Thu, 31 Jan 2019 18:08:08 +0800 Subject: [PATCH] fixed MQDecoder::createMessageId and MQDecoder::decodeMessageId. - make MQDecoder::createMessageId same as MessageDecoder.createMessageId in Java; - fixed MQDecoder::decodeMessageId decode offset error --- src/common/UtilAll.cpp | 32 ++++++++++++-- src/message/MQDecoder.cpp | 93 ++++++++++++++++----------------------- src/message/MQDecoder.h | 6 +-- 3 files changed, 70 insertions(+), 61 deletions(-) diff --git a/src/common/UtilAll.cpp b/src/common/UtilAll.cpp index 4932c1d80..9f9126ba1 100644 --- a/src/common/UtilAll.cpp +++ b/src/common/UtilAll.cpp @@ -50,8 +50,33 @@ bool UtilAll::isBlank(const string &str) { return false; } +const int hex2int[256] = { + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, -1, -1, -1, -1, -1, -1, + -1, 10, 11, 12, 13, 14, 15, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, 10, 11, 12, 13, 14, 15, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 +}; + uint64 UtilAll::hexstr2ull(const char *str) { - return boost::lexical_cast(str); + uint64 num = 0; + unsigned char *ch = (unsigned char *) str; + while (*ch != '\0') { + num = (num << 4) + hex2int[*ch]; + ch++; + } + return num; } int64 UtilAll::str2ll(const char *str) { @@ -73,10 +98,11 @@ string UtilAll::bytes2string(const char *bytes, int len) { return buffer; #else - char hex_str[] = "0123456789ABCDEF"; + static const char hex_str[] = "0123456789ABCDEF"; + char result[len * 2 + 1]; - result[len * 2] = 0; + result[len * 2] = 0; for (int i = 0; i < len; i++) { result[i * 2 + 0] = hex_str[(bytes[i] >> 4) & 0x0F]; result[i * 2 + 1] = hex_str[(bytes[i]) & 0x0F]; diff --git a/src/message/MQDecoder.cpp b/src/message/MQDecoder.cpp index d4f0dd4dd..b64ff1704 100755 --- a/src/message/MQDecoder.cpp +++ b/src/message/MQDecoder.cpp @@ -23,7 +23,9 @@ #include "MemoryOutputStream.h" #include "MessageSysFlag.h" #include "UtilAll.h" + namespace rocketmq { + //sin_addr.s_addr); + outputmen.writeRepeatedByte(0, 2); + outputmen.write(&(sa->sin_port), 2); outputmen.writeInt64BigEndian(offset); - const char* bytes = static_cast(outputmen.getData()); + const char *bytes = static_cast(outputmen.getData()); int len = outputmen.getDataSize(); return UtilAll::bytes2string(bytes, len); } -MQMessageId MQDecoder::decodeMessageId(const string& msgId) { - - string ipstr = msgId.substr(0, 8); - string portstr = msgId.substr(8, 8); - string offsetstr = msgId.substr(16); +MQMessageId MQDecoder::decodeMessageId(const string &msgId) { - char* end; - int ipint = strtoul(ipstr.c_str(), &end, 16); - int portint = strtoul(portstr.c_str(), &end, 16); + string ipStr = msgId.substr(0, 8); + string portStr = msgId.substr(8, 8); + string offsetStr = msgId.substr(16); - int64 offset = UtilAll::hexstr2ull(offsetstr.c_str()); + char *end; + int ipInt = strtoul(ipStr.c_str(), &end, 16); + int portInt = strtoul(portStr.c_str(), &end, 16); - offset = n2hll(offset); - - portint = ntohl(portint); - short port = portint; + int64 offset = UtilAll::hexstr2ull(offsetStr.c_str()); struct sockaddr_in sa; sa.sin_family = AF_INET; - sa.sin_port = htons(port); - sa.sin_addr.s_addr = ipint; - - sockaddr addr; - memcpy(&addr, &sa, sizeof(sockaddr)); - - MQMessageId id(addr, offset); + sa.sin_port = htons(portInt); + sa.sin_addr.s_addr = htonl(ipInt); + MQMessageId id(*((sockaddr*) &sa), offset); return id; } -MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer) { +MQMessageExt *MQDecoder::decode(MemoryInputStream &byteBuffer) { return decode(byteBuffer, true); } -MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) { - MQMessageExt* msgExt = new MQMessageExt(); +MQMessageExt *MQDecoder::decode(MemoryInputStream &byteBuffer, bool readBody) { + MQMessageExt *msgExt = new MQMessageExt(); // 1 TOTALSIZE int storeSize = byteBuffer.readIntBigEndian(); @@ -153,13 +147,12 @@ MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) { MemoryBlock block; byteBuffer.readIntoMemoryBlock(block, bodyLen); - const char* const pBody = static_cast(block.getData()); + const char *const pBody = static_cast(block.getData()); int len = block.getSize(); string msgbody(pBody, len); // decompress body - if ((sysFlag & MessageSysFlag::CompressedFlag) == - MessageSysFlag::CompressedFlag) { + if ((sysFlag & MessageSysFlag::CompressedFlag) == MessageSysFlag::CompressedFlag) { string outbody; if (UtilAll::inflate(msgbody, outbody)) { msgExt->setBody(outbody); @@ -173,10 +166,10 @@ MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) { } // 16 TOPIC - int topicLen = (int)byteBuffer.readByte(); + int topicLen = (int) byteBuffer.readByte(); MemoryBlock block; byteBuffer.readIntoMemoryBlock(block, topicLen); - const char* const pTopic = static_cast(block.getData()); + const char *const pTopic = static_cast(block.getData()); topicLen = block.getSize(); msgExt->setTopic(pTopic, topicLen); @@ -185,7 +178,7 @@ MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) { if (propertiesLen > 0) { MemoryBlock block; byteBuffer.readIntoMemoryBlock(block, propertiesLen); - const char* const pProperty = static_cast(block.getData()); + const char *const pProperty = static_cast(block.getData()); int len = block.getSize(); string propertiesString(pProperty, len); @@ -196,29 +189,25 @@ MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) { } // 18 msg ID - string offsetMsgId = createMessageId(msgExt->getStoreHost(), - (int64)msgExt->getCommitLogOffset()); + string offsetMsgId = createMessageId(msgExt->getStoreHost(), (int64) msgExt->getCommitLogOffset()); msgExt->setOffsetMsgId(offsetMsgId); string msgId = msgExt->getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); - if (msgId.empty()) - { - msgId = offsetMsgId; + if (msgId.empty()) { + msgId = offsetMsgId; } msgExt->setMsgId(msgId); - // LOG_INFO("get msgExt from remote server, its contents - // are:%s",msgExt->toString().c_str()); + // LOG_INFO("get msgExt from remote server, its contents are:%s", msgExt->toString().c_str()); return msgExt; } -void MQDecoder::decodes(const MemoryBlock* mem, vector& mqvec) { +void MQDecoder::decodes(const MemoryBlock *mem, vector &mqvec) { mqvec.clear(); decodes(mem, mqvec, true); } -void MQDecoder::decodes(const MemoryBlock* mem, vector& mqvec, - bool readBody) { +void MQDecoder::decodes(const MemoryBlock *mem, vector &mqvec, bool readBody) { MemoryInputStream rawInput(*mem, true); while (rawInput.getNumBytesRemaining() > 0) { @@ -227,25 +216,21 @@ void MQDecoder::decodes(const MemoryBlock* mem, vector& mqvec, } } -string MQDecoder::messageProperties2String( - const map& properties) { +string MQDecoder::messageProperties2String(const map &properties) { string os; - map::const_iterator it = properties.begin(); - for (; it != properties.end(); ++it) { - // os << it->first << NAME_VALUE_SEPARATOR << it->second << - // PROPERTY_SEPARATOR; - os.append(it->first); + for (const auto &it : properties) { + // os << it->first << NAME_VALUE_SEPARATOR << it->second << PROPERTY_SEPARATOR; + os.append(it.first); os += NAME_VALUE_SEPARATOR; - os.append(it->second); + os.append(it.second); os += PROPERTY_SEPARATOR; } return os; } -void MQDecoder::string2messageProperties(const string& propertiesString, - map& properties) { +void MQDecoder::string2messageProperties(const string &propertiesString, map &properties) { vector out; UtilAll::Split(out, propertiesString, PROPERTY_SEPARATOR); diff --git a/src/message/MQDecoder.h b/src/message/MQDecoder.h index 393e4c72b..d9c94ad48 100755 --- a/src/message/MQDecoder.h +++ b/src/message/MQDecoder.h @@ -32,12 +32,10 @@ class MQDecoder { static void decodes(const MemoryBlock* mem, vector& mqvec); - static void decodes(const MemoryBlock* mem, vector& mqvec, - bool readBody); + static void decodes(const MemoryBlock* mem, vector& mqvec, bool readBody); static string messageProperties2String(const map& properties); - static void string2messageProperties(const string& propertiesString, - map& properties); + static void string2messageProperties(const string& propertiesString, map& properties); private: static MQMessageExt* decode(MemoryInputStream& byteBuffer);