Files
2014-06-30 13:58:10 +02:00

209 lines
7.6 KiB
C++

/*
* File: EventService.cc
* Author: jgaebler
*
* Created on February 27, 2014, 4:45 PM
*/
#include <map>
#include "EventService.h"
#include "Moversight.h"
#include "GroupClosedEvent.h"
#include "LocalPeerUpdatedEvent.h"
namespace ubeeme {
namespace moversight {
#undef DEBUG
#define DEBUG(msg) if (module.isPrintDebugES()){ if(getLocalState()== DISJOINED){ MOV_DEBUG <<"ES@TA_"<<getLocalAddress()<<" "<<msg<<endl; } else{ MOV_DEBUG <<"ES@"<<getLocalID() <<" "<< msg <<endl; } }
/**
* @brief Constructor
* @param d A reference to the moversight service
*/
EventService::EventService(Dispatcher & d) : MoversightService(d, "EventService") {
}
/**
* @brief Copy constructor
* @param orig The instance to copy
*/
EventService::EventService(const EventService& orig) : MoversightService(orig) {
operator =(orig);
}
/**
* @brief Destructor
*/
EventService::~EventService() {
}
/**
* @brief Assignment operator
* @param other The instance to assign
* @return
*/
EventService &
EventService::operator=(const EventService & other) {
if (this != &other) {
MoversightService::operator =(other);
pendingObjects = other.pendingObjects;
subscriptions = other.subscriptions;
}
return *this;
}
/**
* @brief Initialise the event service
*/
void
EventService::initialise() {
}
/**
* @brief Finalises the event service
*/
void
EventService::finalise() {
DEBUG("finalise - reset list of subscriptions.");
subscriptions.clear();
}
/**
* @brief Signal the object <o>.
* @param o Type of the event
* @param recipient (optional) Mark this object as to be delivered only to <recipient>
*
* This will add <o> to the end of <pendingObjects>.
*/
void
EventService::signal(DeliverableObject* o, ObjectListener* recipient /* = NULL */) {
o->deliverOnlyTo(recipient);
pendingObjects.push_back(o);
DEBUG("signal - signaled " << o << ". There are " << pendingObjects.size() << " pending objects.");
}
/**
* @brief Process all objects present in <pendingObjects>.
*
* A single service may send several messages or signal events before
* <process()> is called, also messages or events signaled during timer
* processing get handled properly after the processing has finished.
*
* Further note that services might receive the same event or message several
* times if they have subscribed to a group of objects which includes
* the particular message or event.
*
* Additionally since there is the option for services to send an event
* directly to only one specific service, such an event will
* not be processed as part of a group nor will it be sent multiple times.
* This functionality is provided mainly to be used within the
* Reliable Unicast Transfer (rUT) service so inform other services about
* (un)successfully delivered messages.
*/
void
EventService::process() {
DEBUG("process - Processing " << pendingObjects.size() << " pending objects: " << pendingObjects);
// run through our list of pending objects
DeliverableObject * object;
for (auto o = pendingObjects.begin(); o != pendingObjects.end();) {
object = *o;
if (object != NULL) {
DEBUG("process - Processing " << object << ", groups: " << object->getGroups());
// check if this object shall be delivered directly to only one listener.
if (object->hasRecipient()) {
DEBUG("process - Direct delivery of " << object << " to " << object->getRecipient()->getServiceName());
// create a dummy set containing only one subscriber,
// then deliver the object to it and skip the group checking.
object->sendTo(SubscriberSet(object->getRecipient()), object->getName());
}
else {
// since every object belongs to a group, run through that list of groups and
// deliver the object to whoever subscribed to any of them.
GroupList groups = object->getGroups();
for (auto g = groups.begin(); g != groups.end(); g++) {
SubscriberSet subscribers = subscriptions[*g];
DEBUG("process - Type " << *g << " has " << subscribers.size() << " subscribers");
if (subscribers.size() > 0) {
object->sendTo(subscribers, *g);
}
}
}
}
else {
DEBUG("process - Error: Object is NULL! Delivery aborted.");
}
// The iterator <o> used here has to be reinitialized after each
// erase operation to ensure its sanity.
o = pendingObjects.erase(o);
DEBUG("process - finished " << object);
DEBUG("process - " << pendingObjects.size() << " objects left.");
delete object;
}
DEBUG("process - finished processing pending objects.");
}
/**
* @brief Subscribe the object listener <ol> to the object type <type>
* @param el The object listener.
* @param type The type to subscribe to.
*/
void
EventService::subscribe(ObjectListener * ol, std::string type) {
// prevent subscriptions to template group types.
if (type.rfind("Group") == type.length() - 6) {
std::stringstream buf;
buf << "subscribe - Error: You cannot subscribe to the template group type " << type;
throw IllegalParameterException(buf.str().c_str());
}
subscriptions[type].insert(ol);
DEBUG("subscribe - " << ol->getServiceName() << " subscribed to " << type);
}
/**
* @brief Unsubscribe the object listener <ol> from object type <type>.
* @param ol The object listener.
* @param type The type to unsubscribe from.
*/
void
EventService::unsubscribe(ObjectListener * ol, std::string type) {
// remove ol from the object's list of subscribers.
// the key value itself will not be removed even if the list becomes empty.
subscriptions[type].erase(ol);
DEBUG("unsubscribe - " << ol->getServiceName() << " unsubscribed from object type " << type);
}
/**
* @brief Unsubscribe from all registered events in one go.
* @param el EventListener to unregister.
*/
void
EventService::unsubscribeAll(ObjectListener* ol) {
// run through key values
for (auto k = subscriptions.begin(); k != subscriptions.end(); k++) {
// remove ol from the list of subscribers
(*k).second.erase(ol);
}
DEBUG("unsubscribeAll - " << ol->getServiceName() << " unsubscribed from all object types");
}
}
}