/* * File: EventService.cc * Author: jgaebler * * Created on February 27, 2014, 4:45 PM */ #include #include "EventService.h" #include "Moversight.h" #include "GroupClosedEvent.h" #include "LocalPeerUpdatedEvent.h" namespace ubeeme { namespace moversight { #undef DEBUG #define DEBUG(msg) if (module.isPrintDebugES()){ if(getLocalState()== DISJOINED){ MOV_DEBUG <<"ES@TA_"<. * @param o Type of the event * @param recipient (optional) Mark this object as to be delivered only to * * This will add to the end of . */ void EventService::signal(DeliverableObject* o, ObjectListener* recipient /* = NULL */) { o->deliverOnlyTo(recipient); pendingObjects.push_back(o); DEBUG("signal - signaled " << o << ". There are " << pendingObjects.size() << " pending objects."); } /** * @brief Process all objects present in . * * A single service may send several messages or signal events before * is called, also messages or events signaled during timer * processing get handled properly after the processing has finished. * * Further note that services might receive the same event or message several * times if they have subscribed to a group of objects which includes * the particular message or event. * * Additionally since there is the option for services to send an event * directly to only one specific service, such an event will * not be processed as part of a group nor will it be sent multiple times. * This functionality is provided mainly to be used within the * Reliable Unicast Transfer (rUT) service so inform other services about * (un)successfully delivered messages. */ void EventService::process() { DEBUG("process - Processing " << pendingObjects.size() << " pending objects: " << pendingObjects); // run through our list of pending objects DeliverableObject * object; for (auto o = pendingObjects.begin(); o != pendingObjects.end();) { object = *o; if (object != NULL) { DEBUG("process - Processing " << object << ", groups: " << object->getGroups()); // check if this object shall be delivered directly to only one listener. if (object->hasRecipient()) { DEBUG("process - Direct delivery of " << object << " to " << object->getRecipient()->getServiceName()); // create a dummy set containing only one subscriber, // then deliver the object to it and skip the group checking. object->sendTo(SubscriberSet(object->getRecipient()), object->getName()); } else { // since every object belongs to a group, run through that list of groups and // deliver the object to whoever subscribed to any of them. GroupList groups = object->getGroups(); for (auto g = groups.begin(); g != groups.end(); g++) { SubscriberSet subscribers = subscriptions[*g]; DEBUG("process - Type " << *g << " has " << subscribers.size() << " subscribers"); if (subscribers.size() > 0) { object->sendTo(subscribers, *g); } } } } else { DEBUG("process - Error: Object is NULL! Delivery aborted."); } // The iterator used here has to be reinitialized after each // erase operation to ensure its sanity. o = pendingObjects.erase(o); DEBUG("process - finished " << object); DEBUG("process - " << pendingObjects.size() << " objects left."); delete object; } DEBUG("process - finished processing pending objects."); } /** * @brief Subscribe the object listener
    to the object type * @param el The object listener. * @param type The type to subscribe to. */ void EventService::subscribe(ObjectListener * ol, std::string type) { // prevent subscriptions to template group types. if (type.rfind("Group") == type.length() - 6) { std::stringstream buf; buf << "subscribe - Error: You cannot subscribe to the template group type " << type; throw IllegalParameterException(buf.str().c_str()); } subscriptions[type].insert(ol); DEBUG("subscribe - " << ol->getServiceName() << " subscribed to " << type); } /** * @brief Unsubscribe the object listener
      from object type . * @param ol The object listener. * @param type The type to unsubscribe from. */ void EventService::unsubscribe(ObjectListener * ol, std::string type) { // remove ol from the object's list of subscribers. // the key value itself will not be removed even if the list becomes empty. subscriptions[type].erase(ol); DEBUG("unsubscribe - " << ol->getServiceName() << " unsubscribed from object type " << type); } /** * @brief Unsubscribe from all registered events in one go. * @param el EventListener to unregister. */ void EventService::unsubscribeAll(ObjectListener* ol) { // run through key values for (auto k = subscriptions.begin(); k != subscriptions.end(); k++) { // remove ol from the list of subscribers (*k).second.erase(ol); } DEBUG("unsubscribeAll - " << ol->getServiceName() << " unsubscribed from all object types"); } } }