1
0
mirror of https://github.com/mfontanini/libtins synced 2026-01-29 21:14:28 +01:00

Add callbacks for stream termination events

This commit is contained in:
Matias Fontanini
2016-02-13 11:23:08 -08:00
parent 20a3868e82
commit 48c068b84a
7 changed files with 116 additions and 41 deletions

View File

@@ -75,32 +75,40 @@ int seq_compare(uint32_t seq1, uint32_t seq2) {
Flow::Flow(const IPv4Address& dest_address, uint16_t dest_port,
uint32_t sequence_number)
: seq_number_(sequence_number), dest_port_(dest_port), state_(UNKNOWN), mss_(-1) {
: seq_number_(sequence_number), dest_port_(dest_port) {
OutputMemoryStream output(dest_address_.data(), dest_address_.size());
output.write(dest_address);
flags_.is_v6 = false;
initialize();
}
Flow::Flow(const IPv6Address& dest_address, uint16_t dest_port,
uint32_t sequence_number)
: seq_number_(sequence_number), dest_port_(dest_port), state_(UNKNOWN), mss_(-1) {
: seq_number_(sequence_number), dest_port_(dest_port) {
OutputMemoryStream output(dest_address_.data(), dest_address_.size());
output.write(dest_address);
flags_.is_v6 = true;
initialize();
}
void Flow::initialize() {
total_buffered_bytes_ = 0;
state_ = UNKNOWN;
mss_ = -1;
}
void Flow::data_callback(const data_available_callback_type& callback) {
on_data_callback_ = callback;
}
void Flow::out_of_order_callback(const out_of_order_callback_type& callback) {
void Flow::out_of_order_callback(const flow_packet_callback_type& callback) {
on_out_of_order_callback_ = callback;
}
void Flow::process_packet(PDU& pdu) {
TCP* tcp = pdu.find_pdu<TCP>();
RawPDU* raw = pdu.find_pdu<RawPDU>();
// If we sent a packet with RST or FIN on, this flow is done
// Update the internal state first
if (tcp) {
update_state(*tcp);
}
@@ -142,6 +150,8 @@ void Flow::process_packet(PDU& pdu) {
if (comparison > 0) {
// Then slice it
payload_type& payload = iter->second;
// First update this counter
total_buffered_bytes_ -= payload.size();
payload.erase(
payload.begin(),
payload.begin() + (seq_number_ - iter->first)
@@ -164,10 +174,6 @@ void Flow::process_packet(PDU& pdu) {
seq_number_ += iter->second.size();
iter = erase_iterator(iter);
added_some = true;
// If we don't have any other payload, we're done
if (buffered_payload_.empty()) {
break;
}
}
}
if (added_some) {
@@ -182,9 +188,12 @@ void Flow::store_payload(uint32_t seq, payload_type payload) {
buffered_payload_type::iterator iter = buffered_payload_.find(seq);
// New segment, store it
if (iter == buffered_payload_.end()) {
total_buffered_bytes_ += payload.size();
buffered_payload_.insert(make_pair(seq, move(payload)));
}
else if (iter->second.size() < payload.size()) {
// Increment by the diff between sizes
total_buffered_bytes_ += (payload.size() - iter->second.size());
// If we already have payload on this position but it's a shorter
// chunk than the new one, replace it
iter->second = move(payload);
@@ -193,6 +202,7 @@ void Flow::store_payload(uint32_t seq, payload_type payload) {
Flow::buffered_payload_type::iterator Flow::erase_iterator(buffered_payload_type::iterator iter) {
buffered_payload_type::iterator output = iter;
total_buffered_bytes_ -= iter->second.size();
++output;
buffered_payload_.erase(iter);
if (output == buffered_payload_.end()) {
@@ -282,6 +292,10 @@ Flow::buffered_payload_type& Flow::buffered_payload() {
return buffered_payload_;
}
uint32_t Flow::total_buffered_bytes() const {
return total_buffered_bytes_;
}
Flow::payload_type& Flow::payload() {
return payload_;
}

View File

@@ -116,11 +116,11 @@ void Stream::server_data_callback(const stream_callback_type& callback) {
on_server_data_callback_ = callback;
}
void Stream::client_out_of_order_callback(const out_of_order_callback_type& callback) {
void Stream::client_out_of_order_callback(const stream_packet_callback_type& callback) {
on_client_out_of_order_callback_ = callback;
}
void Stream::server_out_of_order_callback(const out_of_order_callback_type& callback) {
void Stream::server_out_of_order_callback(const stream_packet_callback_type& callback) {
on_server_out_of_order_callback_ = callback;
}

View File

@@ -62,10 +62,12 @@ namespace Tins {
namespace TCPIP {
const size_t StreamFollower::DEFAULT_MAX_BUFFERED_CHUNKS = 512;
const uint32_t StreamFollower::DEFAULT_MAX_BUFFERED_BYTES = 3 * 1024 * 1024; // 3MB
const StreamFollower::timestamp_type StreamFollower::DEFAULT_KEEP_ALIVE = minutes(5);
StreamFollower::StreamFollower()
: max_buffered_chunks_(DEFAULT_MAX_BUFFERED_CHUNKS), last_cleanup_(0),
: max_buffered_chunks_(DEFAULT_MAX_BUFFERED_CHUNKS),
max_buffered_bytes_(DEFAULT_MAX_BUFFERED_BYTES), last_cleanup_(0),
stream_keep_alive_(DEFAULT_KEEP_ALIVE), attach_to_flows_(false) {
}
@@ -118,7 +120,15 @@ void StreamFollower::process_packet(PDU& packet, const timestamp_type& ts) {
stream.process_packet(packet, ts);
size_t total_chunks = stream.client_flow().buffered_payload().size() +
stream.server_flow().buffered_payload().size();
if (stream.is_finished() || total_chunks > max_buffered_chunks_) {
uint32_t total_buffered_bytes = stream.client_flow().total_buffered_bytes() +
stream.server_flow().total_buffered_bytes();
bool terminate_stream = total_chunks > max_buffered_chunks_ ||
total_buffered_bytes > max_buffered_bytes_;
if (stream.is_finished() || terminate_stream) {
// If we're terminating the stream, execute the termination callback
if (terminate_stream && on_stream_termination_) {
on_stream_termination_(stream, BUFFERED_DATA);
}
streams_.erase(iter);
}
}
@@ -131,15 +141,19 @@ void StreamFollower::new_stream_callback(const stream_callback_type& callback) {
on_new_connection_ = callback;
}
Stream& StreamFollower::find_stream(IPv4Address client_addr, uint16_t client_port,
IPv4Address server_addr, uint16_t server_port) {
void StreamFollower::stream_termination_callback(const stream_termination_callback_type& callback) {
on_stream_termination_ = callback;
}
Stream& StreamFollower::find_stream(const IPv4Address& client_addr, uint16_t client_port,
const IPv4Address& server_addr, uint16_t server_port) {
stream_id identifier(serialize(client_addr), client_port,
serialize(server_addr), server_port);
return find_stream(identifier);
}
Stream& StreamFollower::find_stream(IPv6Address client_addr, uint16_t client_port,
IPv6Address server_addr, uint16_t server_port) {
Stream& StreamFollower::find_stream(const IPv6Address& client_addr, uint16_t client_port,
const IPv6Address& server_addr, uint16_t server_port) {
stream_id identifier(serialize(client_addr), client_port,
serialize(server_addr), server_port);
return find_stream(identifier);
@@ -195,7 +209,10 @@ void StreamFollower::cleanup_streams(const timestamp_type& now) {
streams_type::iterator iter = streams_.begin();
while (iter != streams_.end()) {
if (iter->second.last_seen() + stream_keep_alive_ <= now) {
// TODO: execute some callback here
// If we have a termination callback, execute it
if (on_stream_termination_) {
on_stream_termination_(iter->second, TIMEOUT);
}
streams_.erase(iter++);
}
else {