From 52a1aa9367ccce6ebcc336095c2a575296d31eab Mon Sep 17 00:00:00 2001 From: stubbfel Date: Tue, 15 Sep 2015 22:23:20 +0200 Subject: [PATCH] add locked thread queues --- src/map/natmap.cpp | 57 +++++++++++++++++++++++++++++++++++++++------- src/map/natmap.h | 12 ++++++++++ tests/nattest.cpp | 42 +++++++++++++++++++++++++++------- tests/nattest.h | 4 ++++ 4 files changed, 99 insertions(+), 16 deletions(-) diff --git a/src/map/natmap.cpp b/src/map/natmap.cpp index abd9a0b..d9a8a84 100644 --- a/src/map/natmap.cpp +++ b/src/map/natmap.cpp @@ -9,14 +9,19 @@ namespace otonat { ranges.clear(); transMap.clear(); reqIpMap.clear(); - while (!incommingPduQueue.empty()){ + this->incommingQueueMutex.lock(); + while (!incommingPduQueue.empty()) { incommingPduQueue.pop(); } - while (!outgoingPduQueue.empty()){ + this->incommingQueueMutex.unlock(); + + this->outgoingQueueMutex.lock(); + while (!outgoingPduQueue.empty()) { outgoingPduQueue.pop(); } + this->outgoingQueueMutex.unlock(); } NatMap::NatMap(const NatMap& other) : ranges(other.ranges), transMap(other.transMap), reqIpMap(other.reqIpMap), incommingPduQueue(other.incommingPduQueue), outgoingPduQueue(other.outgoingPduQueue), zeroIp(other.zeroIp) { @@ -27,8 +32,12 @@ namespace otonat { ranges = rhs.ranges; transMap = rhs.transMap; + this->incommingQueueMutex.lock(); incommingPduQueue = rhs.incommingPduQueue; + this->incommingQueueMutex.unlock(); + this->outgoingQueueMutex.lock(); outgoingPduQueue = rhs.outgoingPduQueue; + this->outgoingQueueMutex.unlock(); return *this; } @@ -42,7 +51,7 @@ namespace otonat { Tins::ARP * arp = pduCopy->find_pdu(); if (arp != nullptr) { if (handleArp(arp)) { - outgoingPduQueue.push(pduCopy); + this->pushPduToOutgoingPduQueue(pduCopy); } return; } @@ -50,7 +59,7 @@ namespace otonat { Tins::IP * ip = pduCopy->find_pdu(); if (ip != nullptr) { if (handleIp(ip, pduCopy)) { - outgoingPduQueue.push(pduCopy); + this->pushPduToOutgoingPduQueue(pduCopy); } } } @@ -230,13 +239,13 @@ namespace otonat { bool NatMap::handleArpReply(Tins::ARP* arp) { const Tins::IPv4Address targetIp = arp->target_ip_addr(); const Tins::IPv4Address transTargetIp = TranslateArpIp(targetIp); - if(transTargetIp == this->zeroIp){ + if (transTargetIp == this->zeroIp) { return false; } const Tins::IPv4Address senderIp = arp->sender_ip_addr(); const Tins::IPv4Address transSenderIp = TranslateArpIp(senderIp); - if(transSenderIp == this->zeroIp){ + if (transSenderIp == this->zeroIp) { return false; } @@ -260,7 +269,7 @@ namespace otonat { this->reqIpMap.erase(transReqArpIpIter); return transArpIp; } - + return transArpIpIter->second; } @@ -318,7 +327,39 @@ namespace otonat { } const Tins::EthernetII transArp = Tins::ARP::make_arp_request(transTargetIp, transSenderIp, arp->sender_hw_addr()); - this->outgoingPduQueue.push(transArp.clone()); + this->pushPduToOutgoingPduQueue(transArp.clone()); } } + + void NatMap::pushPduToIncommingPduQueue(const Tins::PDU* pdu) { + pushPduToPduQueue(pdu, this->incommingPduQueue, this ->incommingQueueMutex); + } + + const Tins::PDU * NatMap::popPduIncommingPduQueue() { + return popPduPduQueue(this->incommingPduQueue, this->incommingQueueMutex); + } + + void NatMap::pushPduToOutgoingPduQueue(const Tins::PDU* pdu) { + pushPduToPduQueue(pdu, this->outgoingPduQueue, this->outgoingQueueMutex); + } + + const Tins::PDU * NatMap::popPduOutgoingPduQueue() { + return popPduPduQueue(this->outgoingPduQueue, this->outgoingQueueMutex); + } + + const Tins::PDU * NatMap::popPduPduQueue(PduQueue & queue, std::mutex & mtx) { + mtx.lock(); + const Tins::PDU * result = queue.front(); + const Tins::PDU * outPut = result->clone(); + queue.pop(); + mtx.unlock(); + delete result; + return outPut; + } + + void NatMap::pushPduToPduQueue(const Tins::PDU* pdu, PduQueue& queue, std::mutex& mtx) { + mtx.lock(); + queue.push(pdu); + mtx.unlock(); + } } diff --git a/src/map/natmap.h b/src/map/natmap.h index 43932fc..e011ff9 100644 --- a/src/map/natmap.h +++ b/src/map/natmap.h @@ -6,6 +6,8 @@ #include #include #include "NatRange.h" +#include + namespace otonat { class NatMap { @@ -28,7 +30,14 @@ namespace otonat { PduQueue incommingPduQueue; PduQueue outgoingPduQueue; void handlePdu(const Tins::PDU * pdu); + void pushPduToIncommingPduQueue(const Tins::PDU * pdu); + const Tins::PDU * popPduIncommingPduQueue(); + void pushPduToOutgoingPduQueue(const Tins::PDU * pdu); + const Tins::PDU * popPduOutgoingPduQueue(); + static const Tins::PDU * popPduPduQueue(PduQueue & queue, std::mutex & mtx); + static void pushPduToPduQueue(const Tins::PDU * pdu, PduQueue & queue, std::mutex & mtx); + protected: private: @@ -49,6 +58,9 @@ namespace otonat { bool isIpInMyRanges(const Tins::IPv4Address & ipAddr) const; static bool isIpInMyRanges(const Tins::IPv4Address & ipAddr, const NatRangeList & rangeList); void SendTranslatedArpRequest(const Tins::ARP * arp); + + std::mutex incommingQueueMutex; + std::mutex outgoingQueueMutex; }; } diff --git a/tests/nattest.cpp b/tests/nattest.cpp index 6e6429b..0f4ab14 100644 --- a/tests/nattest.cpp +++ b/tests/nattest.cpp @@ -8,7 +8,7 @@ #include "nattest.h" #include "../src/map/NatRange.h" #include - +#include CPPUNIT_TEST_SUITE_REGISTRATION(nattest); nattest::nattest() { @@ -73,7 +73,7 @@ void nattest::testIpCalcEth2() { CPPUNIT_ASSERT_EQUAL(expetedIp, resultIp); } -void nattest::testTranslateArpIp(){ +void nattest::testTranslateArpIp() { natMap.reqIpMap.clear(); natMap.transMap.clear(); Tins::EthernetII arp1 = Tins::ARP::make_arp_request("172.17.0.20", "172.16.3.55", "00:00:00:00:00:02"); @@ -98,7 +98,7 @@ void nattest::testTranslateArpIp(){ const Tins::PDU * resultArp1 = natMap.outgoingPduQueue.front(); checkArp(resultArp1->rfind_pdu(), Tins::ARP::REQUEST, "00:00:00:00:00:00", "00:00:00:00:00:02", "10.0.0.20", "10.0.3.55"); natMap.outgoingPduQueue.pop(); - + natMap.handlePdu(arp3.clone()); CPPUNIT_ASSERT(natMap.outgoingPduQueue.size() == 1); const Tins::PDU * resultArp2 = natMap.outgoingPduQueue.front(); @@ -107,7 +107,7 @@ void nattest::testTranslateArpIp(){ natMap.handlePdu(arp4.clone()); CPPUNIT_ASSERT(natMap.outgoingPduQueue.empty()); - + natMap.handlePdu(ethW.clone()); CPPUNIT_ASSERT(natMap.outgoingPduQueue.empty()); @@ -166,7 +166,7 @@ void nattest::testTranslateArpIp(){ checkEth(result4Ack->rfind_pdu(), "00:00:00:00:00:05", "00:00:00:00:00:02", "10.0.1.41", "10.0.3.55"); natMap.outgoingPduQueue.pop(); CPPUNIT_ASSERT(natMap.outgoingPduQueue.empty()); - + CPPUNIT_ASSERT(natMap.transMap.size() == 9); CPPUNIT_ASSERT(natMap.reqIpMap.empty()); /*for (auto& entry : natMap.transMap){ @@ -191,13 +191,13 @@ void nattest::testTranslateArp() { const Tins::PDU * resultArp1 = natMap.outgoingPduQueue.front(); checkArp(resultArp1->rfind_pdu(), Tins::ARP::REQUEST, "00:00:00:00:00:00", "00:00:00:00:00:02", "10.0.0.20", "10.0.3.55"); natMap.outgoingPduQueue.pop(); - + natMap.handlePdu(arp3.clone()); CPPUNIT_ASSERT(natMap.outgoingPduQueue.size() == 1); const Tins::PDU * resultArp2 = natMap.outgoingPduQueue.front(); checkArp(resultArp2->rfind_pdu(), Tins::ARP::REPLY, "00:00:00:00:00:02", "00:00:00:00:00:01", "172.16.3.55", "172.27.0.20"); natMap.outgoingPduQueue.pop(); - + natMap.handlePdu(arp4.clone()); CPPUNIT_ASSERT(natMap.outgoingPduQueue.empty()); } @@ -299,6 +299,32 @@ void nattest::testNatInterfaces() { CPPUNIT_ASSERT(!natMap.ranges.empty()); } +void nattest::testQueues() { + Tins::EthernetII forMe = Tins::EthernetII("00:00:00:00:00:01", "00:00:00:00:00:03") / Tins::IP("172.16.0.1", "172.17.3.55") / Tins::TCP(); + Tins::EthernetII FromMe = Tins::EthernetII("00:00:00:00:00:01", "00:00:00:00:00:03") / Tins::IP("172.27.0.20", "172.16.0.1") / Tins::TCP(); + natMap.pushPduToIncommingPduQueue(forMe.clone()); + natMap.pushPduToOutgoingPduQueue(FromMe.clone()); + const Tins::PDU * result1 = natMap.popPduOutgoingPduQueue(); + checkEth(result1->rfind_pdu(), "00:00:00:00:00:01", "00:00:00:00:00:03", "172.27.0.20", "172.16.0.1"); + const Tins::PDU * result2 = natMap.popPduIncommingPduQueue(); + checkEth(result2->rfind_pdu(), "00:00:00:00:00:01", "00:00:00:00:00:03", "172.16.0.1", "172.17.3.55"); + delete result1; + delete result2; +} + +void nattest::testThreadQueues() { + std::thread threads[10]; + for (int i = 0; i < 10; ++i) + { + threads[i] = std::thread(&nattest::testQueues, this); + } + + for (auto& th : threads) + { + th.join(); + } +} + void nattest::printIp(const Tins::IP & ip) { std::cout << std::endl << "### Ip-Packet ###" << std::endl; std::cout << "ip_dst: " << ip.dst_addr() << std::endl; @@ -321,7 +347,7 @@ void nattest::printEth(const Tins::EthernetII & eth) { } else { std::cout << std::endl; } - + std::cout << "++++++++++++++++++++++" << std::endl; } diff --git a/tests/nattest.h b/tests/nattest.h index ffc6ec4..3dcec10 100644 --- a/tests/nattest.h +++ b/tests/nattest.h @@ -23,6 +23,8 @@ class nattest : public CPPUNIT_NS::TestFixture { CPPUNIT_TEST(testForMeFromMe); CPPUNIT_TEST(testTranslateArp); CPPUNIT_TEST(testTranslateArpIp); + CPPUNIT_TEST(testQueues); + CPPUNIT_TEST(testThreadQueues); CPPUNIT_TEST_SUITE_END(); public: @@ -68,6 +70,8 @@ private: void testTranslateArp(); void testTranslateArpIp(); void testForMeFromMe(); + void testQueues(); + void testThreadQueues(); void printArp(const Tins::ARP & arp); void printIp(const Tins::IP & ip);