add locked thread queues

This commit is contained in:
stubbfel
2015-09-15 22:23:20 +02:00
parent 5476f0ea56
commit 52a1aa9367
4 changed files with 99 additions and 16 deletions

View File

@@ -9,14 +9,19 @@ namespace otonat {
ranges.clear();
transMap.clear();
reqIpMap.clear();
while (!incommingPduQueue.empty()){
this->incommingQueueMutex.lock();
while (!incommingPduQueue.empty()) {
incommingPduQueue.pop();
}
while (!outgoingPduQueue.empty()){
this->incommingQueueMutex.unlock();
this->outgoingQueueMutex.lock();
while (!outgoingPduQueue.empty()) {
outgoingPduQueue.pop();
}
this->outgoingQueueMutex.unlock();
}
NatMap::NatMap(const NatMap& other) : ranges(other.ranges), transMap(other.transMap), reqIpMap(other.reqIpMap), incommingPduQueue(other.incommingPduQueue), outgoingPduQueue(other.outgoingPduQueue), zeroIp(other.zeroIp) {
@@ -27,8 +32,12 @@ namespace otonat {
ranges = rhs.ranges;
transMap = rhs.transMap;
this->incommingQueueMutex.lock();
incommingPduQueue = rhs.incommingPduQueue;
this->incommingQueueMutex.unlock();
this->outgoingQueueMutex.lock();
outgoingPduQueue = rhs.outgoingPduQueue;
this->outgoingQueueMutex.unlock();
return *this;
}
@@ -42,7 +51,7 @@ namespace otonat {
Tins::ARP * arp = pduCopy->find_pdu<Tins::ARP>();
if (arp != nullptr) {
if (handleArp(arp)) {
outgoingPduQueue.push(pduCopy);
this->pushPduToOutgoingPduQueue(pduCopy);
}
return;
}
@@ -50,7 +59,7 @@ namespace otonat {
Tins::IP * ip = pduCopy->find_pdu<Tins::IP>();
if (ip != nullptr) {
if (handleIp(ip, pduCopy)) {
outgoingPduQueue.push(pduCopy);
this->pushPduToOutgoingPduQueue(pduCopy);
}
}
}
@@ -230,13 +239,13 @@ namespace otonat {
bool NatMap::handleArpReply(Tins::ARP* arp) {
const Tins::IPv4Address targetIp = arp->target_ip_addr();
const Tins::IPv4Address transTargetIp = TranslateArpIp(targetIp);
if(transTargetIp == this->zeroIp){
if (transTargetIp == this->zeroIp) {
return false;
}
const Tins::IPv4Address senderIp = arp->sender_ip_addr();
const Tins::IPv4Address transSenderIp = TranslateArpIp(senderIp);
if(transSenderIp == this->zeroIp){
if (transSenderIp == this->zeroIp) {
return false;
}
@@ -260,7 +269,7 @@ namespace otonat {
this->reqIpMap.erase(transReqArpIpIter);
return transArpIp;
}
return transArpIpIter->second;
}
@@ -318,7 +327,39 @@ namespace otonat {
}
const Tins::EthernetII transArp = Tins::ARP::make_arp_request(transTargetIp, transSenderIp, arp->sender_hw_addr());
this->outgoingPduQueue.push(transArp.clone());
this->pushPduToOutgoingPduQueue(transArp.clone());
}
}
void NatMap::pushPduToIncommingPduQueue(const Tins::PDU* pdu) {
pushPduToPduQueue(pdu, this->incommingPduQueue, this ->incommingQueueMutex);
}
const Tins::PDU * NatMap::popPduIncommingPduQueue() {
return popPduPduQueue(this->incommingPduQueue, this->incommingQueueMutex);
}
void NatMap::pushPduToOutgoingPduQueue(const Tins::PDU* pdu) {
pushPduToPduQueue(pdu, this->outgoingPduQueue, this->outgoingQueueMutex);
}
const Tins::PDU * NatMap::popPduOutgoingPduQueue() {
return popPduPduQueue(this->outgoingPduQueue, this->outgoingQueueMutex);
}
const Tins::PDU * NatMap::popPduPduQueue(PduQueue & queue, std::mutex & mtx) {
mtx.lock();
const Tins::PDU * result = queue.front();
const Tins::PDU * outPut = result->clone();
queue.pop();
mtx.unlock();
delete result;
return outPut;
}
void NatMap::pushPduToPduQueue(const Tins::PDU* pdu, PduQueue& queue, std::mutex& mtx) {
mtx.lock();
queue.push(pdu);
mtx.unlock();
}
}

View File

@@ -6,6 +6,8 @@
#include <queue>
#include <tins/tins.h>
#include "NatRange.h"
#include <mutex>
namespace otonat {
class NatMap {
@@ -28,7 +30,14 @@ namespace otonat {
PduQueue incommingPduQueue;
PduQueue outgoingPduQueue;
void handlePdu(const Tins::PDU * pdu);
void pushPduToIncommingPduQueue(const Tins::PDU * pdu);
const Tins::PDU * popPduIncommingPduQueue();
void pushPduToOutgoingPduQueue(const Tins::PDU * pdu);
const Tins::PDU * popPduOutgoingPduQueue();
static const Tins::PDU * popPduPduQueue(PduQueue & queue, std::mutex & mtx);
static void pushPduToPduQueue(const Tins::PDU * pdu, PduQueue & queue, std::mutex & mtx);
protected:
private:
@@ -49,6 +58,9 @@ namespace otonat {
bool isIpInMyRanges(const Tins::IPv4Address & ipAddr) const;
static bool isIpInMyRanges(const Tins::IPv4Address & ipAddr, const NatRangeList & rangeList);
void SendTranslatedArpRequest(const Tins::ARP * arp);
std::mutex incommingQueueMutex;
std::mutex outgoingQueueMutex;
};
}