/* * File: MergeApplication.h * Author: sgaebler * * Created on July 8, 2011, 03:37 PM */ #include "MergeApplication.h" #include "Dispatcher.h" #include "Moversight.h" #include "app/Application.h" #include "common/transport/TransportAddress.h" #include "merge/events/MergeRequestEvent.h" #include "merge/events/MergeDoneEvent.h" #include "ms/Invitation.h" #include "ms/PeerResources.h" #include "mt/msg/GroupData.h" #undef DEBUG #define DEBUG(msg) if (module.isPrintDebugAPP()) \ MOV_DEBUG << "APP@"<<(dis->getLocalState()==DISJOINED?"TA_":"");\ if(dis->getLocalState()==DISJOINED){MOV_DEBUG << module.getLocalAddress();}\ else{MOV_DEBUG<< dis->getMembershipService().getLocalID();}MOV_DEBUG<<" "<subscribe(this); dis->subscribe(this); } /** * @brief Invites a peer * @param ta The address to invite */ void MergeApplication::invitePeer(TransportAddress & ta) { std::stringstream buf; buf << "invitePeer - invite peer at address " << ta; DEBUG(buf.str().c_str()); PeerDescription pDesc; dis->invitePeer(ta, pDesc); } /** * @brief Leaves the group */ void MergeApplication::leaveGroup() { DEBUG("leaveGroup - peer leave the group"); dis->leaveGroup(); } /** * @brief Try to merge with second group */ void MergeApplication::mergeRequest(TransportAddress & ta) { DEBUG("mergeRequest - group wants to merge with other group"); dis->mergeGroup(ta); } /** * @brief Sends dummy test data to the group */ void MergeApplication::sendData() { DEBUG("sendData - send 24 byte dummy data to the group"); GroupData data; dis->sendMessage(data); } /** * @brief Tries to start the given test case. * @param i The test case to start */ void MergeApplication::startTestCase(unsigned int i) { switch (i) { case 0: testCase00(); break; case 1: testCase01(); break; case 2: testCase02(); break; default: break; }//End switch } /** * @brief Test case 0: Master-master */ void MergeApplication::testCase00() { state = dis->getLocalState(); // init app if (initApp) { //set up group for each peer if (module.getLocalAddress().getHostAddress().get4().getDByte(3) < (numberOfPeers / 2 + 1)) { createRoster(1); } else { createRoster(2); } initApp = false; dt = simTime(); } // 1 is master if (module.getLocalAddress().getHostAddress().get4().getDByte(3) == 1) { if (dis->getLocalState() == JOINED) { if (!mergeOnce) { mergeOnce = true; // defining the otherMergeDirector - here Master! TransportAddress ta = getNewTransportAddress(numberOfPeers / 2 + 1); ta.setPort(module.getLocalAddress().getPort()); mergeRequest(ta); module.scheduleTestCase(45); }//End if }//End if } } /** * @brief Test case 1: master-slave */ void MergeApplication::testCase01() { state = dis->getLocalState(); // init app if (initApp) { //set up group for each peer if (module.getLocalAddress().getHostAddress().get4().getDByte(3) < (numberOfPeers / 2 + 1)) { createRoster(1); } else { createRoster(2); } initApp = false; dt = simTime(); } // 1 is master if (module.getLocalAddress().getHostAddress().get4().getDByte(3) == 1) { if (dis->getLocalState() == JOINED) { if (!mergeOnce) { mergeOnce = true; // defining the otherMergeDirector - here Slave! TransportAddress ta = getNewTransportAddress(numberOfPeers / 2 + 2); ta.setPort(module.getLocalAddress().getPort()); mergeRequest(ta); module.scheduleTestCase(45); }//End if }//End if } } /** * @brief Test case 2: slave - slave */ void MergeApplication::testCase02() { state = dis->getLocalState(); // init app if (initApp) { //set up group for each peer if (module.getLocalAddress().getHostAddress().get4().getDByte(3) < (numberOfPeers / 2 + 1)) { createRoster(1); } else { createRoster(2); } initApp = false; dt = simTime(); } // 2 is slave if (module.getLocalAddress().getHostAddress().get4().getDByte(3) == 2) { if (dis->getLocalState() == JOINED) { if (!mergeOnce) { mergeOnce = true; // defining the otherMergeDirector - here Slave! TransportAddress ta = getNewTransportAddress(numberOfPeers / 2 + 2); ta.setPort(module.getLocalAddress().getPort()); mergeRequest(ta); module.scheduleTestCase(45); }//End if }//End if } } /** * @brief Creating a new TransportAddress depending on the given parameter i. * @param i - stands for the 3rd bytePosition * @return the created ta */ TransportAddress MergeApplication::getNewTransportAddress(int i) { std::stringstream taStream; taStream << "192.168.0." << i; std::string s(taStream.str()); return TransportAddress(IPvXAddress(s.c_str())); } void MergeApplication::receiveGroupData(const GroupData & data, const PeerID sender) { std::stringstream buf; buf << "receiveGroupData - receive group data from peer ID " << sender; DEBUG(buf.str().c_str()); } /** * @brief Creates a new roster for the given group * @param rosterNumber - if first or second roster to create */ void MergeApplication::createRoster(int rosterNumber) { Roster roster; roster.setNextPeerID(numberOfPeers / 2 + 1); roster.setViewID(numberOfPeers / 2 + 1); PeerDescription dummyPDesc; PeerResources dummyPRes; ClusterID cID = 0; if (clusterSize <= 1) { // necessary to make sure if the numberOfPeers is equal or smaller than // the maxPeerCount that only one cluster is created! roster.setViewID(numberOfPeers + 1); roster.setNextPeerID(numberOfPeers + 1); } if (rosterNumber == 1) { for (int i = 1; i <= numberOfPeers / 2; i++) { TransportAddress ta = getNewTransportAddress(i); ta.setPort(module.getLocalAddress().getPort()); MemberDescription mDesc(i, ta, JOINED, cID, dummyPDesc, dummyPRes); roster.addMemberDescription(mDesc); if ((i % module.getMaxPeerCount()) == 0) { cID++; }//End if } dis->setupGroupFromRoster(roster, module.getLocalAddress().getHostAddress().get4().getDByte(3)); } else { for (int i = (numberOfPeers / 2 + 1); i <= numberOfPeers; i++) { TransportAddress ta = getNewTransportAddress(i); ta.setPort(module.getLocalAddress().getPort()); MemberDescription mDesc(i - (numberOfPeers / 2), ta, JOINED, cID, dummyPDesc, dummyPRes); roster.addMemberDescription(mDesc); if ((i % module.getMaxPeerCount()) == 0) { cID++; }//End if } dis->setupGroupFromRoster(roster, module.getLocalAddress().getHostAddress().get4().getDByte(3) - numberOfPeers / 2); } } /** * @brief Handle an incoming MergeRequestEvent * @param e The event. */ void MergeApplication::handleEvent(const MergeRequestEvent & e) { dis->acceptGroupMerge(); } /** * @brief Handle an incoming MergeDoneEvent. * @param e The event. */ void MergeApplication::handleEvent(const MergeDoneEvent & e) { #if OMNETPP mergeDuration.recordWithTimestamp(simTime() - dt, numberOfPeers); #endif } }//End namespace moversight }//End namespace ubeeme