diff --git a/examples/stream_dump.cpp b/examples/stream_dump.cpp index 66353ae..c1028f4 100644 --- a/examples/stream_dump.cpp +++ b/examples/stream_dump.cpp @@ -31,6 +31,7 @@ #include #include "tins/tcp_ip/stream_follower.h" #include "tins/sniffer.h" +#include "tins/packet.h" #include "tins/ip_address.h" #include "tins/ipv6_address.h" @@ -44,6 +45,7 @@ using std::exception; using Tins::Sniffer; using Tins::SnifferConfiguration; +using Tins::Packet; using Tins::TCPIP::StreamFollower; using Tins::TCPIP::Stream; @@ -148,7 +150,10 @@ int main(int argc, char* argv[]) { follower.new_stream_callback(&on_new_connection); // Now start capturing. Every time there's a new packet, call // follower.process_packet - sniffer.sniff_loop(bind(&StreamFollower::process_packet, &follower, _1)); + sniffer.sniff_loop([&](Packet& packet) { + follower.process_packet(packet); + return true; + }); } catch (exception& ex) { cerr << "Error: " << ex.what() << endl; diff --git a/include/tins/packet.h b/include/tins/packet.h index e1823a7..13450f7 100644 --- a/include/tins/packet.h +++ b/include/tins/packet.h @@ -148,11 +148,19 @@ public: /** * \brief Constructs a Packet from a PDU* and a Timestamp. * - * The PDU* is cloned using PDU::clone. + * The PDU is cloned using PDU::clone. */ Packet(const PDU* apdu, const Timestamp& tstamp) : pdu_(apdu->clone()), ts_(tstamp) { } + /** + * \brief Constructs a Packet from a PDU& and a Timestamp. + * + * The PDU is cloned using PDU::clone. + */ + Packet(const PDU& apdu, const Timestamp& tstamp) + : pdu_(apdu.clone()), ts_(tstamp) { } + /** * \brief Constructs a Packet from a PDU* and a Timestamp. * diff --git a/include/tins/tcp_ip/flow.h b/include/tins/tcp_ip/flow.h new file mode 100644 index 0000000..69b0858 --- /dev/null +++ b/include/tins/tcp_ip/flow.h @@ -0,0 +1,298 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TINS_TCP_IP_FLOW_H +#define TINS_TCP_IP_FLOW_H + +#include "../cxxstd.h" + +// This classes use C++11 features +#if TINS_IS_CXX11 + +#include +#include +#include +#include +#include +#include "../macros.h" +#include "../hw_address.h" + +namespace Tins { + +class PDU; +class TCP; +class IPv4Address; +class IPv6Address; + +namespace TCPIP { + +/** + * \brief Represents an unidirectional TCP flow between 2 endpoints + * + * This class will keep the state for all the traffic sent by + * one of the peers in a TCP connection. This contains the sequence number, + * payload ready to be read and buffered payload, along with some other + * properties of the flow. + * + * A TCP stream (see class Stream) is made out of 2 Flows, so you should + * probably have a look at that class first. + * + * You shouldn't normally need to interact with this class. Stream already + * provides proxys to most of its Flow's attributes. + */ +class TINS_API Flow { +public: + /** + * \brief Enum that indicates the state of this flow. + * + * Note that although similar, this is not mapped to a TCP state-machine + * state. This is mostly used internally to know which packets the flow is + * expecting and to know when it's done sending data. + */ + enum State { + UNKNOWN, + SYN_SENT, + ESTABLISHED, + FIN_SENT, + RST_SENT + }; + + /** + * The type used to store the payload + */ + typedef std::vector payload_type; + + /** + * The type used to store the buffered payload + */ + typedef std::map buffered_payload_type; + + /** + * The type used to store the callback called when new data is available + */ + typedef std::function data_available_callback_type; + + /** + * \brief The type used to store the callback called when data is buffered + * + * The arguments are the flow, the sequence number and payload that will + * be buffered. + */ + typedef std::function out_of_order_callback_type; + + /** + * Construct a Flow from an IPv4 address + * + * \param dst_address This flow's destination address + * \param dst_port This flow's destination port + * \param sequence_number The initial sequence number to be used + */ + Flow(const IPv4Address& dst_address, uint16_t dst_port, + uint32_t sequence_number); + + /** + * Construct a Flow from an IPv6 address + * + * \param dst_address This flow's destination address + * \param dst_port This flow's destination port + * \param sequence_number The initial sequence number to be used + */ + Flow(const IPv6Address& dst_address, uint16_t dst_port, + uint32_t sequence_number); + + /** + * \brief Sets the callback that will be executed when data is readable + * + * Whenever this flow has readable data, this callback will be executed. + * By readable, this means that there's non-out-of-order data captured. + * + * \param callback The callback to be executed + */ + void data_callback(const data_available_callback_type& callback); + + /** + * \brief Sets the callback that will be executed when out of order data arrives + * + * Whenever this flow receives out-of-order data, this callback will be + * executed. + * + * \param callback The callback to be executed + */ + void out_of_order_callback(const out_of_order_callback_type& callback); + + /** + * \brief Processes a packet. + * + * If this packet contains data and starts or overlaps with the current + * sequence number, then the data will be appended to this flow's payload + * and the data_callback will be executed. + * + * If this packet contains out-of-order data, it will be buffered and the + * buffering_callback will be executed. + * + * \param pdu The packet to be processed + * \sa Flow::data_callback + * \sa Flow::buffering_callback + */ + void process_packet(PDU& pdu); + + /** + * Indicates whether this flow uses IPv6 addresses + */ + bool is_v6() const; + + /** + * \brief Indicates whether this flow is finished + * + * A finished is considered to be finished if either it sent a + * packet with the FIN or RST flags on. + */ + bool is_finished() const; + + /** + * \brief Indicates whether a packet belongs to this flow + * + * Since Flow represents a unidirectional stream, this will only check + * the destination endpoint and not the source one. + * + * \param packet The packet to be checked + */ + bool packet_belongs(const PDU& packet) const; + + /** + * \brief Getter for the IPv4 destination address + * + * Note that it's only safe to execute this method if is_v6() == false + */ + IPv4Address dst_addr_v4() const; + + /** + * \brief Getter for the IPv6 destination address + * + * Note that it's only safe to execute this method if is_v6() == true + */ + IPv6Address dst_addr_v6() const; + + /** + * Getter for this flow's destination port + */ + uint16_t dport() const; + + /** + * Getter for this flow's payload (const) + */ + const payload_type& payload() const; + + /** + * Getter for this flow's destination port + */ + payload_type& payload(); + + /** + * Getter for this flow's state + */ + State state() const; + + /** + * Getter for this flow's sequence number + */ + uint32_t sequence_number() const; + + /** + * Getter for this flow's buffered payload (const) + */ + const buffered_payload_type& buffered_payload() const; + + /** + * Getter for this flow's buffered payload + */ + buffered_payload_type& buffered_payload(); + + /** + * Sets the state of this flow + * + * \param new_state The new state of this flow + */ + void state(State new_state); + + /** + * \brief Sets whether this flow should ignore data packets + * + * If the data packets are ignored then the flow will just be + * followed to keep track of its state. + */ + void ignore_data_packets(); + + /** + * \brief Returns the MSS for this Flow. + * + * If the MSS option wasn't provided by the peer, -1 is returned + */ + int mss() const; + + /** + * \brief Indicates whether this Flow supports selective acknowledgements + */ + bool sack_permitted() const; +private: + // Compress all flags into just one struct using bitfields + struct flags { + flags() : ignore_data_packets(0), sack_permitted(0) { + + } + + uint32_t is_v6:1, + ignore_data_packets:1, + sack_permitted:1; + }; + + void store_payload(uint32_t seq, payload_type payload); + buffered_payload_type::iterator erase_iterator(buffered_payload_type::iterator iter); + void update_state(const TCP& tcp); + + payload_type payload_; + buffered_payload_type buffered_payload_; + uint32_t seq_number_; + std::array dest_address_; + uint16_t dest_port_; + data_available_callback_type on_data_callback_; + out_of_order_callback_type on_out_of_order_callback_; + State state_; + int mss_; + flags flags_; +}; + +} // TCPIP +} // TINS + +#endif // TINS_IS_CXX11 +#endif // TINS_TCP_IP_FLOW_H + diff --git a/include/tins/tcp_ip/stream.h b/include/tins/tcp_ip/stream.h index 7495567..6e5f0b0 100644 --- a/include/tins/tcp_ip/stream.h +++ b/include/tins/tcp_ip/stream.h @@ -39,9 +39,11 @@ #include #include #include +#include #include #include "../macros.h" #include "../hw_address.h" +#include "flow.h" namespace Tins { @@ -52,238 +54,6 @@ class IPv6Address; namespace TCPIP { -/** - * \brief Represents an unidirectional TCP flow between 2 endpoints - * - * This class will keep the state for all the traffic sent by - * one of the peers in a TCP connection. This contains the sequence number, - * payload ready to be read and buffered payload, along with some other - * properties of the flow. - * - * A TCP stream (see class Stream) is made out of 2 Flows, so you should - * probably have a look at that class first. - * - * You shouldn't normally need to interact with this class. Stream already - * provides proxys to most of its Flow's attributes. - */ -class TINS_API Flow { -public: - /** - * \brief Enum that indicates the state of this flow. - * - * Note that although similar, this is not mapped to a TCP state-machine - * state. This is mostly used internally to know which packets the flow is - * expecting and to know when it's done sending data. - */ - enum State { - UNKNOWN, - SYN_SENT, - ESTABLISHED, - FIN_SENT, - RST_SENT - }; - - /** - * The type used to store the payload - */ - typedef std::vector payload_type; - - /** - * The type used to store the buffered payload - */ - typedef std::map buffered_payload_type; - - /** - * The type used to store the callback called when new data is available - */ - typedef std::function data_available_callback_type; - - /** - * \brief The type used to store the callback called when data is buffered - * - * The arguments are the flow, the sequence number and payload that will - * be buffered. - */ - typedef std::function out_of_order_callback_type; - - /** - * Construct a Flow from an IPv4 address - * - * \param dst_address This flow's destination address - * \param dst_port This flow's destination port - * \param sequence_number The initial sequence number to be used - */ - Flow(const IPv4Address& dst_address, uint16_t dst_port, - uint32_t sequence_number); - - /** - * Construct a Flow from an IPv6 address - * - * \param dst_address This flow's destination address - * \param dst_port This flow's destination port - * \param sequence_number The initial sequence number to be used - */ - Flow(const IPv6Address& dst_address, uint16_t dst_port, - uint32_t sequence_number); - - /** - * \brief Sets the callback that will be executed when data is readable - * - * Whenever this flow has readable data, this callback will be executed. - * By readable, this means that there's non-out-of-order data captured. - * - * \param callback The callback to be executed - */ - void data_callback(const data_available_callback_type& callback); - - /** - * \brief Sets the callback that will be executed when out of order data arrives - * - * Whenever this flow receives out-of-order data, this callback will be - * executed. - * - * \param callback The callback to be executed - */ - void out_of_order_callback(const out_of_order_callback_type& callback); - - /** - * \brief Processes a packet. - * - * If this packet contains data and starts or overlaps with the current - * sequence number, then the data will be appended to this flow's payload - * and the data_callback will be executed. - * - * If this packet contains out-of-order data, it will be buffered and the - * buffering_callback will be executed. - * - * \param pdu The packet to be processed - * \sa Flow::data_callback - * \sa Flow::buffering_callback - */ - void process_packet(PDU& pdu); - - /** - * Indicates whether this flow uses IPv6 addresses - */ - bool is_v6() const; - - /** - * \brief Indicates whether this flow is finished - * - * A finished is considered to be finished if either it sent a - * packet with the FIN or RST flags on. - */ - bool is_finished() const; - - /** - * \brief Indicates whether a packet belongs to this flow - * - * Since Flow represents a unidirectional stream, this will only check - * the destination endpoint and not the source one. - * - * \param packet The packet to be checked - */ - bool packet_belongs(const PDU& packet) const; - - /** - * \brief Getter for the IPv4 destination address - * - * Note that it's only safe to execute this method if is_v6() == false - */ - IPv4Address dst_addr_v4() const; - - /** - * \brief Getter for the IPv6 destination address - * - * Note that it's only safe to execute this method if is_v6() == true - */ - IPv6Address dst_addr_v6() const; - - /** - * Getter for this flow's destination port - */ - uint16_t dport() const; - - /** - * Getter for this flow's payload (const) - */ - const payload_type& payload() const; - - /** - * Getter for this flow's destination port - */ - payload_type& payload(); - - /** - * Getter for this flow's state - */ - State state() const; - - /** - * Getter for this flow's sequence number - */ - uint32_t sequence_number() const; - - /** - * Getter for this flow's buffered payload (const) - */ - const buffered_payload_type& buffered_payload() const; - - /** - * Getter for this flow's buffered payload - */ - buffered_payload_type& buffered_payload(); - - /** - * Sets the state of this flow - * - * \param new_state The new state of this flow - */ - void state(State new_state); - - /** - * \brief Sets whether this flow should ignore data packets - * - * If the data packets are ignored then the flow will just be - * followed to keep track of its state. - */ - void ignore_data_packets(); - - /** - * \brief Returns the MSS for this Flow. - * - * If the MSS option wasn't provided by the peer, -1 is returned - */ - int mss() const; -private: - // Compress all flags into just one struct using bitfields - struct flags { - flags() : ignore_data_packets(0) { - - } - - uint32_t is_v6:1, - ignore_data_packets:1; - }; - - void store_payload(uint32_t seq, payload_type payload); - buffered_payload_type::iterator erase_iterator(buffered_payload_type::iterator iter); - void update_state(const TCP& tcp); - - payload_type payload_; - buffered_payload_type buffered_payload_; - uint32_t seq_number_; - std::array dest_address_; - uint16_t dest_port_; - data_available_callback_type on_data_callback_; - out_of_order_callback_type on_out_of_order_callback_; - State state_; - int mss_; - flags flags_; -}; - /** * \brief Represents a TCP stream * @@ -299,16 +69,21 @@ private: */ class TINS_API Stream { public: - /** - * The type used for callbacks - */ - typedef std::function stream_callback_type; - /** * The type used to store payloads */ typedef Flow::payload_type payload_type; + /** + * The type used to represent timestamps + */ + typedef std::chrono::microseconds timestamp_type; + + /** + * The type used for callbacks + */ + typedef std::function stream_callback_type; + /** * The type used for callbacks * @@ -326,14 +101,30 @@ public: /** * \brief Constructs a TCP stream using the provided packet. + * + * \param initial_packet The first packet of the stream + * \param ts The first packet's timestamp */ - Stream(PDU& initial_packet); + Stream(PDU& initial_packet, const timestamp_type& ts = timestamp_type()); /** * \brief Processes this packet. * * This will forward the packet appropriately to the client * or server flow. + * + * \param packet The packet to be processed + * \param ts The packet's timestamp + */ + void process_packet(PDU& packet, const timestamp_type& ts); + + /** + * \brief Processes this packet. + * + * This will forward the packet appropriately to the client + * or server flow. + * + * \param packet The packet to be processed */ void process_packet(PDU& packet); @@ -448,6 +239,16 @@ public: */ payload_type& server_payload(); + /** + * Getter for the creation time of this stream + */ + const timestamp_type& create_time() const; + + /** + * Getter for the last seen time of this stream + */ + const timestamp_type& last_seen() const; + /** * \brief Sets the callback to be executed when the stream is closed * @@ -555,6 +356,8 @@ private: out_of_order_callback_type on_server_out_of_order_callback_; hwaddress_type client_hw_addr_; hwaddress_type server_hw_addr_; + timestamp_type create_time_; + timestamp_type last_seen_; bool auto_cleanup_; }; diff --git a/include/tins/tcp_ip/stream_follower.h b/include/tins/tcp_ip/stream_follower.h index a277f43..5ac2209 100644 --- a/include/tins/tcp_ip/stream_follower.h +++ b/include/tins/tcp_ip/stream_follower.h @@ -44,6 +44,7 @@ class PDU; class TCP; class IPv4Address; class IPv6Address; +class Packet; namespace TCPIP { @@ -90,13 +91,20 @@ public: * and process it, or if it belongs to a new one, in which case it * starts tracking it. * - * This method always returns true so it can be easily plugged as - * the argument to Sniffer::sniff_loop. + * \param packet The packet to be processed + */ + void process_packet(PDU& packet); + + /** + * \brief Processes a packet + * + * This will detect if this packet belongs to an existing stream + * and process it, or if it belongs to a new one, in which case it + * starts tracking it. * * \param packet The packet to be processed - * \return Always true */ - bool process_packet(PDU& packet); + void process_packet(Packet& packet); /** * \brief Sets the callback to be executed when a new stream is captured. @@ -108,6 +116,17 @@ public: */ void new_stream_callback(const stream_callback_type& callback); + /** + * \brief Sets the maximum time a stream will be followed without capturing + * packets that belong to it. + * + * \param keep_alive The maximum time to keep unseen streams + */ + template + void stream_keep_alive(const std::chrono::duration& keep_alive) { + stream_keep_alive_ = keep_alive; + } + /** * Finds the stream identified by the provided arguments. * @@ -130,8 +149,12 @@ public: Stream& find_stream(IPv6Address client_addr, uint16_t client_port, IPv6Address server_addr, uint16_t server_port); private: - static const size_t DEFAULT_MAX_BUFFERED_CHUNKS; typedef std::array address_type; + typedef Stream::timestamp_type timestamp_type; + + static const size_t DEFAULT_MAX_BUFFERED_CHUNKS; + static const timestamp_type DEFAULT_CLEANUP_INTERVAL; + static const timestamp_type DEFAULT_KEEP_ALIVE; struct stream_id { stream_id(const address_type& client_addr, uint16_t client_port, @@ -153,10 +176,14 @@ private: Stream& find_stream(const stream_id& id); static address_type serialize(IPv4Address address); static address_type serialize(const IPv6Address& address); + void process_packet(PDU& packet, const timestamp_type& ts); + void cleanup_streams(const timestamp_type& now); streams_type streams_; stream_callback_type on_new_connection_; size_t max_buffered_chunks_; + timestamp_type last_cleanup_; + timestamp_type stream_keep_alive_; bool attach_to_flows_; }; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d436f0d..5a3da44 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -53,6 +53,7 @@ ADD_LIBRARY( snap.cpp sniffer.cpp tcp.cpp + tcp_ip/flow.cpp tcp_ip/stream.cpp tcp_ip/stream_follower.cpp tcp_stream.cpp diff --git a/src/tcp_ip/flow.cpp b/src/tcp_ip/flow.cpp new file mode 100644 index 0000000..d425e29 --- /dev/null +++ b/src/tcp_ip/flow.cpp @@ -0,0 +1,308 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "tcp_ip/flow.h" + +#if TINS_IS_CXX11 + +#include +#include +#include "memory.h" +#include "ip_address.h" +#include "ipv6_address.h" +#include "tcp.h" +#include "ip.h" +#include "ipv6.h" +#include "rawpdu.h" +#include "exceptions.h" +#include "memory_helpers.h" + +using std::make_pair; +using std::bind; +using std::pair; +using std::runtime_error; +using std::numeric_limits; +using std::max; +using std::swap; + +using Tins::Memory::OutputMemoryStream; +using Tins::Memory::InputMemoryStream; + +namespace Tins { +namespace TCPIP { + +// As defined by RFC 1982 - 2 ^ (SERIAL_BITS - 1) +static const uint32_t seq_number_diff = 2147483648U; + +// Compares sequence numbers as defined by RFC 1982. +int seq_compare(uint32_t seq1, uint32_t seq2) { + if (seq1 == seq2) { + return 0; + } + if (seq1 < seq2) { + return (seq2 - seq1 < seq_number_diff) ? -1 : 1; + } + else { + return (seq1 - seq2 > seq_number_diff) ? -1 : 1; + } +} + +Flow::Flow(const IPv4Address& dest_address, uint16_t dest_port, + uint32_t sequence_number) +: seq_number_(sequence_number), dest_port_(dest_port), state_(UNKNOWN), mss_(-1) { + OutputMemoryStream output(dest_address_.data(), dest_address_.size()); + output.write(dest_address); + flags_.is_v6 = false; +} + +Flow::Flow(const IPv6Address& dest_address, uint16_t dest_port, + uint32_t sequence_number) +: seq_number_(sequence_number), dest_port_(dest_port), state_(UNKNOWN), mss_(-1) { + OutputMemoryStream output(dest_address_.data(), dest_address_.size()); + output.write(dest_address); + flags_.is_v6 = true; +} + +void Flow::data_callback(const data_available_callback_type& callback) { + on_data_callback_ = callback; +} + +void Flow::out_of_order_callback(const out_of_order_callback_type& callback) { + on_out_of_order_callback_ = callback; +} + +void Flow::process_packet(PDU& pdu) { + TCP* tcp = pdu.find_pdu(); + RawPDU* raw = pdu.find_pdu(); + // If we sent a packet with RST or FIN on, this flow is done + if (tcp) { + update_state(*tcp); + } + if (flags_.ignore_data_packets) { + return; + } + if (!tcp || !raw) { + return; + } + const uint32_t chunk_end = tcp->seq() + raw->payload_size(); + // If the end of the chunk ends after our current sequence number, process it. + if (seq_compare(chunk_end, seq_number_) >= 0) { + bool added_some = false; + uint32_t seq = tcp->seq(); + // If we're going to buffer this and we have a buffering callback, execute it + if (seq > seq_number_ && on_out_of_order_callback_) { + on_out_of_order_callback_(*this, seq, raw->payload()); + } + + // If it starts before our sequence number, slice it + if (seq_compare(seq, seq_number_) < 0) { + const uint32_t diff = seq_number_ - seq; + raw->payload().erase( + raw->payload().begin(), + raw->payload().begin() + diff + ); + seq = seq_number_; + } + // Store this payload + store_payload(seq, move(raw->payload())); + // Keep looping while the fragments seq is lower or equal to our seq + buffered_payload_type::iterator iter = buffered_payload_.find(seq_number_); + while (iter != buffered_payload_.end() && seq_compare(iter->first, seq_number_) <= 0) { + // Does this fragment start before our sequence number? + if (seq_compare(iter->first, seq_number_) < 0) { + uint32_t fragment_end = iter->first + iter->second.size(); + int comparison = seq_compare(fragment_end, seq_number_); + // Does it end after our sequence number? + if (comparison > 0) { + // Then slice it + payload_type& payload = iter->second; + payload.erase( + payload.begin(), + payload.begin() + (seq_number_ - iter->first) + ); + store_payload(seq_number_, move(iter->second)); + iter = erase_iterator(iter); + } + else { + // Otherwise, we've seen this part of the payload. Erase it. + iter = erase_iterator(iter); + } + } + else { + // They're equal. Add this payload. + payload_.insert( + payload_.end(), + iter->second.begin(), + iter->second.end() + ); + seq_number_ += iter->second.size(); + iter = erase_iterator(iter); + added_some = true; + // If we don't have any other payload, we're done + if (buffered_payload_.empty()) { + break; + } + } + } + if (added_some) { + if (on_data_callback_) { + on_data_callback_(*this); + } + } + } +} + +void Flow::store_payload(uint32_t seq, payload_type payload) { + buffered_payload_type::iterator iter = buffered_payload_.find(seq); + // New segment, store it + if (iter == buffered_payload_.end()) { + buffered_payload_.insert(make_pair(seq, move(payload))); + } + else if (iter->second.size() < payload.size()) { + // If we already have payload on this position but it's a shorter + // chunk than the new one, replace it + iter->second = move(payload); + } +} + +Flow::buffered_payload_type::iterator Flow::erase_iterator(buffered_payload_type::iterator iter) { + buffered_payload_type::iterator output = iter; + ++output; + buffered_payload_.erase(iter); + if (output == buffered_payload_.end()) { + output = buffered_payload_.begin(); + } + return output; +} + +void Flow::update_state(const TCP& tcp) { + if ((tcp.flags() & TCP::FIN) != 0) { + state_ = FIN_SENT; + } + else if ((tcp.flags() & TCP::RST) != 0) { + state_ = RST_SENT; + } + else if (state_ == SYN_SENT && (tcp.flags() & TCP::ACK) != 0) { + state_ = ESTABLISHED; + seq_number_++; + } + else if (state_ == UNKNOWN && (tcp.flags() & TCP::SYN) != 0) { + state_ = SYN_SENT; + seq_number_ = tcp.seq(); + const TCP::option* mss_option = tcp.search_option(TCP::MSS); + if (mss_option) { + mss_ = mss_option->to(); + } + flags_.sack_permitted = tcp.has_sack_permitted(); + } +} + +bool Flow::is_v6() const { + return flags_.is_v6; +} + +bool Flow::is_finished() const { + return state_ == FIN_SENT || state_ == RST_SENT; +} + +bool Flow::packet_belongs(const PDU& packet) const { + if (is_v6()) { + const IPv6* ip = packet.find_pdu(); + if (!ip || ip->dst_addr() != dst_addr_v6()) { + return false; + } + } + else { + const IP* ip = packet.find_pdu(); + if (!ip || ip->dst_addr() != dst_addr_v4()) { + return false; + } + } + const TCP* tcp = packet.find_pdu(); + return tcp && tcp->dport() == dport(); +} + +IPv4Address Flow::dst_addr_v4() const { + InputMemoryStream stream(dest_address_.data(), dest_address_.size()); + return stream.read(); +} + +IPv6Address Flow::dst_addr_v6() const { + InputMemoryStream stream(dest_address_.data(), dest_address_.size()); + return stream.read(); +} + +uint16_t Flow::dport() const { + return dest_port_; +} + +const Flow::payload_type& Flow::payload() const { + return payload_; +} + +Flow::State Flow::state() const { + return state_; +} + +uint32_t Flow::sequence_number() const { + return seq_number_; +} + +const Flow::buffered_payload_type& Flow::buffered_payload() const { + return buffered_payload_; +} + +Flow::buffered_payload_type& Flow::buffered_payload() { + return buffered_payload_; +} + +Flow::payload_type& Flow::payload() { + return payload_; +} + +void Flow::state(State new_state) { + state_ = new_state; +} + +void Flow::ignore_data_packets() { + flags_.ignore_data_packets = true; +} + +int Flow::mss() const { + return mss_; +} + +bool Flow::sack_permitted() const { + return flags_.sack_permitted; +} + +} // TCPIP +} // Tins + +#endif // TINS_IS_CXX11 diff --git a/src/tcp_ip/stream.cpp b/src/tcp_ip/stream.cpp index 93c109f..cc902f9 100644 --- a/src/tcp_ip/stream.cpp +++ b/src/tcp_ip/stream.cpp @@ -58,253 +58,10 @@ using Tins::Memory::InputMemoryStream; namespace Tins { namespace TCPIP { -// As defined by RFC 1982 - 2 ^ (SERIAL_BITS - 1) -static const uint32_t seq_number_diff = 2147483648U; - -// Compares sequence numbers as defined by RFC 1982. -int seq_compare(uint32_t seq1, uint32_t seq2) { - if (seq1 == seq2) { - return 0; - } - if (seq1 < seq2) { - return (seq2 - seq1 < seq_number_diff) ? -1 : 1; - } - else { - return (seq1 - seq2 > seq_number_diff) ? -1 : 1; - } -} - -// Flow - -Flow::Flow(const IPv4Address& dest_address, uint16_t dest_port, - uint32_t sequence_number) -: seq_number_(sequence_number), dest_port_(dest_port), state_(UNKNOWN), mss_(-1) { - OutputMemoryStream output(dest_address_.data(), dest_address_.size()); - output.write(dest_address); - flags_.is_v6 = false; -} - -Flow::Flow(const IPv6Address& dest_address, uint16_t dest_port, - uint32_t sequence_number) -: seq_number_(sequence_number), dest_port_(dest_port), state_(UNKNOWN), mss_(-1) { - OutputMemoryStream output(dest_address_.data(), dest_address_.size()); - output.write(dest_address); - flags_.is_v6 = true; -} - -void Flow::data_callback(const data_available_callback_type& callback) { - on_data_callback_ = callback; -} - -void Flow::out_of_order_callback(const out_of_order_callback_type& callback) { - on_out_of_order_callback_ = callback; -} - -void Flow::process_packet(PDU& pdu) { - TCP* tcp = pdu.find_pdu(); - RawPDU* raw = pdu.find_pdu(); - // If we sent a packet with RST or FIN on, this flow is done - if (tcp) { - update_state(*tcp); - } - if (flags_.ignore_data_packets) { - return; - } - if (!tcp || !raw) { - return; - } - const uint32_t chunk_end = tcp->seq() + raw->payload_size(); - // If the end of the chunk ends after our current sequence number, process it. - if (seq_compare(chunk_end, seq_number_) >= 0) { - bool added_some = false; - uint32_t seq = tcp->seq(); - // If we're going to buffer this and we have a buffering callback, execute it - if (seq > seq_number_ && on_out_of_order_callback_) { - on_out_of_order_callback_(*this, seq, raw->payload()); - } - - // If it starts before our sequence number, slice it - if (seq_compare(seq, seq_number_) < 0) { - const uint32_t diff = seq_number_ - seq; - raw->payload().erase( - raw->payload().begin(), - raw->payload().begin() + diff - ); - seq = seq_number_; - } - // Store this payload - store_payload(seq, move(raw->payload())); - // Keep looping while the fragments seq is lower or equal to our seq - buffered_payload_type::iterator iter = buffered_payload_.find(seq_number_); - while (iter != buffered_payload_.end() && seq_compare(iter->first, seq_number_) <= 0) { - // Does this fragment start before our sequence number? - if (seq_compare(iter->first, seq_number_) < 0) { - uint32_t fragment_end = iter->first + iter->second.size(); - int comparison = seq_compare(fragment_end, seq_number_); - // Does it end after our sequence number? - if (comparison > 0) { - // Then slice it - payload_type& payload = iter->second; - payload.erase( - payload.begin(), - payload.begin() + (seq_number_ - iter->first) - ); - store_payload(seq_number_, move(iter->second)); - iter = erase_iterator(iter); - } - else { - // Otherwise, we've seen this part of the payload. Erase it. - iter = erase_iterator(iter); - } - } - else { - // They're equal. Add this payload. - payload_.insert( - payload_.end(), - iter->second.begin(), - iter->second.end() - ); - seq_number_ += iter->second.size(); - iter = erase_iterator(iter); - added_some = true; - // If we don't have any other payload, we're done - if (buffered_payload_.empty()) { - break; - } - } - } - if (added_some) { - if (on_data_callback_) { - on_data_callback_(*this); - } - } - } -} - -void Flow::store_payload(uint32_t seq, payload_type payload) { - buffered_payload_type::iterator iter = buffered_payload_.find(seq); - // New segment, store it - if (iter == buffered_payload_.end()) { - buffered_payload_.insert(make_pair(seq, move(payload))); - } - else if (iter->second.size() < payload.size()) { - // If we already have payload on this position but it's a shorter - // chunk than the new one, replace it - iter->second = move(payload); - } -} - -Flow::buffered_payload_type::iterator Flow::erase_iterator(buffered_payload_type::iterator iter) { - buffered_payload_type::iterator output = iter; - ++output; - buffered_payload_.erase(iter); - if (output == buffered_payload_.end()) { - output = buffered_payload_.begin(); - } - return output; -} - -void Flow::update_state(const TCP& tcp) { - if ((tcp.flags() & TCP::FIN) != 0) { - state_ = FIN_SENT; - } - else if ((tcp.flags() & TCP::RST) != 0) { - state_ = RST_SENT; - } - else if (state_ == SYN_SENT && (tcp.flags() & TCP::ACK) != 0) { - state_ = ESTABLISHED; - seq_number_++; - } - else if (state_ == UNKNOWN && (tcp.flags() & TCP::SYN) != 0) { - state_ = SYN_SENT; - seq_number_ = tcp.seq(); - const TCP::option* mss_option = tcp.search_option(TCP::MSS); - if (mss_option) { - mss_ = mss_option->to(); - } - } -} - -bool Flow::is_v6() const { - return flags_.is_v6; -} - -bool Flow::is_finished() const { - return state_ == FIN_SENT || state_ == RST_SENT; -} - -bool Flow::packet_belongs(const PDU& packet) const { - if (is_v6()) { - const IPv6* ip = packet.find_pdu(); - if (!ip || ip->dst_addr() != dst_addr_v6()) { - return false; - } - } - else { - const IP* ip = packet.find_pdu(); - if (!ip || ip->dst_addr() != dst_addr_v4()) { - return false; - } - } - const TCP* tcp = packet.find_pdu(); - return tcp && tcp->dport() == dport(); -} - -IPv4Address Flow::dst_addr_v4() const { - InputMemoryStream stream(dest_address_.data(), dest_address_.size()); - return stream.read(); -} - -IPv6Address Flow::dst_addr_v6() const { - InputMemoryStream stream(dest_address_.data(), dest_address_.size()); - return stream.read(); -} - -uint16_t Flow::dport() const { - return dest_port_; -} - -const Flow::payload_type& Flow::payload() const { - return payload_; -} - -Flow::State Flow::state() const { - return state_; -} - -uint32_t Flow::sequence_number() const { - return seq_number_; -} - -const Flow::buffered_payload_type& Flow::buffered_payload() const { - return buffered_payload_; -} - -Flow::buffered_payload_type& Flow::buffered_payload() { - return buffered_payload_; -} - -Flow::payload_type& Flow::payload() { - return payload_; -} - -void Flow::state(State new_state) { - state_ = new_state; -} - -void Flow::ignore_data_packets() { - flags_.ignore_data_packets = true; -} - -int Flow::mss() const { - return mss_; -} - -// Stream - -Stream::Stream(PDU& packet) +Stream::Stream(PDU& packet, const timestamp_type& ts) : client_flow_(extract_client_flow(packet)), - server_flow_(extract_server_flow(packet)), auto_cleanup_(true) { + server_flow_(extract_server_flow(packet)), create_time_(ts), + last_seen_(ts), auto_cleanup_(true) { // Update client flow state client_flow().process_packet(packet); const EthernetII* eth = packet.find_pdu(); @@ -314,7 +71,8 @@ Stream::Stream(PDU& packet) } } -void Stream::process_packet(PDU& packet) { +void Stream::process_packet(PDU& packet, const timestamp_type& ts) { + last_seen_ = ts; if (client_flow_.packet_belongs(packet)) { client_flow_.process_packet(packet); } @@ -326,6 +84,10 @@ void Stream::process_packet(PDU& packet) { } } +void Stream::process_packet(PDU& packet) { + return process_packet(packet, timestamp_type(0)); +} + Flow& Stream::client_flow() { return client_flow_; } @@ -435,6 +197,14 @@ Stream::payload_type& Stream::server_payload() { return server_flow().payload(); } +const Stream::timestamp_type& Stream::create_time() const { + return create_time_; +} + +const Stream::timestamp_type& Stream::last_seen() const { + return last_seen_; +} + Flow Stream::extract_client_flow(const PDU& packet) { const TCP* tcp = packet.find_pdu(); if (!tcp) { diff --git a/src/tcp_ip/stream_follower.cpp b/src/tcp_ip/stream_follower.cpp index b7b2404..3f643aa 100644 --- a/src/tcp_ip/stream_follower.cpp +++ b/src/tcp_ip/stream_follower.cpp @@ -40,6 +40,7 @@ #include "ip.h" #include "ipv6.h" #include "rawpdu.h" +#include "packet.h" #include "exceptions.h" #include "memory_helpers.h" @@ -50,6 +51,9 @@ using std::runtime_error; using std::numeric_limits; using std::max; using std::swap; +using std::chrono::system_clock; +using std::chrono::minutes; +using std::chrono::duration_cast; using Tins::Memory::OutputMemoryStream; using Tins::Memory::InputMemoryStream; @@ -58,13 +62,25 @@ namespace Tins { namespace TCPIP { const size_t StreamFollower::DEFAULT_MAX_BUFFERED_CHUNKS = 512; +const StreamFollower::timestamp_type StreamFollower::DEFAULT_KEEP_ALIVE = minutes(5); StreamFollower::StreamFollower() -: max_buffered_chunks_(DEFAULT_MAX_BUFFERED_CHUNKS), attach_to_flows_(false) { +: max_buffered_chunks_(DEFAULT_MAX_BUFFERED_CHUNKS), last_cleanup_(0), + stream_keep_alive_(DEFAULT_KEEP_ALIVE), attach_to_flows_(false) { } -bool StreamFollower::process_packet(PDU& packet) { +void StreamFollower::process_packet(PDU& packet) { + // Use current time + const system_clock::duration ts = system_clock::now().time_since_epoch(); + process_packet(packet, duration_cast(ts)); +} + +void StreamFollower::process_packet(Packet& packet) { + process_packet(*packet.pdu(), packet.timestamp()); +} + +void StreamFollower::process_packet(PDU& packet, const timestamp_type& ts) { stream_id identifier = make_stream_id(packet); streams_type::iterator iter = streams_.find(identifier); bool process = true; @@ -73,7 +89,7 @@ bool StreamFollower::process_packet(PDU& packet) { // Start tracking if they're either SYNs or they contain data (attach // to an already running flow). if (tcp.flags() == TCP::SYN || (attach_to_flows_ && tcp.find_pdu() != 0)) { - iter = streams_.insert(make_pair(identifier, Stream(packet))).first; + iter = streams_.insert(make_pair(identifier, Stream(packet, ts))).first; iter->second.setup_flows_callbacks(); if (on_new_connection_) { on_new_connection_(iter->second); @@ -99,14 +115,16 @@ bool StreamFollower::process_packet(PDU& packet) { // it and it contains payload if (process) { Stream& stream = iter->second; - stream.process_packet(packet); + stream.process_packet(packet, ts); size_t total_chunks = stream.client_flow().buffered_payload().size() + stream.server_flow().buffered_payload().size(); if (stream.is_finished() || total_chunks > max_buffered_chunks_) { streams_.erase(iter); } } - return true; + if (last_cleanup_ + stream_keep_alive_ <= ts) { + cleanup_streams(ts); + } } void StreamFollower::new_stream_callback(const stream_callback_type& callback) { @@ -173,6 +191,20 @@ StreamFollower::address_type StreamFollower::serialize(const IPv6Address& addres return addr; } +void StreamFollower::cleanup_streams(const timestamp_type& now) { + streams_type::iterator iter = streams_.begin(); + while (iter != streams_.end()) { + if (iter->second.last_seen() + stream_keep_alive_ <= now) { + // TODO: execute some callback here + streams_.erase(iter++); + } + else { + ++iter; + } + } + last_cleanup_ = now; +} + // stream_id StreamFollower::stream_id::stream_id(const address_type& client_addr, diff --git a/tests/src/tcp_ip.cpp b/tests/src/tcp_ip.cpp index 616649e..cb8581a 100644 --- a/tests/src/tcp_ip.cpp +++ b/tests/src/tcp_ip.cpp @@ -16,9 +16,11 @@ #include "exceptions.h" #include "ethernetII.h" #include "rawpdu.h" +#include "packet.h" #include "utils.h" using namespace std; +using namespace std::chrono; using namespace Tins; using namespace Tins::TCPIP; @@ -331,8 +333,15 @@ TEST_F(FlowTest, StreamFollower_ThreeWayHandshake) { packets[0].dst_addr("05:04:03:02:01:00"); StreamFollower follower; follower.new_stream_callback(bind(&FlowTest::on_new_stream, this, _1)); + + Stream::timestamp_type ts(10000); + Stream::timestamp_type create_time = ts; for (size_t i = 0; i < packets.size(); ++i) { - follower.process_packet(packets[i]); + if (i != 0) { + ts += milliseconds(100); + } + Packet packet(packets[i], ts); + follower.process_packet(packet); } Stream& stream = follower.find_stream(IPv4Address("1.2.3.4"), 22, IPv4Address("4.3.2.1"), 25); @@ -350,6 +359,8 @@ TEST_F(FlowTest, StreamFollower_ThreeWayHandshake) { EXPECT_EQ(HWAddress<6>("05:04:03:02:01:00"), stream.server_hw_addr()); EXPECT_EQ(22, stream.client_port()); EXPECT_EQ(25, stream.server_port()); + EXPECT_EQ(create_time, stream.create_time()); + EXPECT_EQ(ts, stream.last_seen()); IP server_packet = IP("1.2.3.4", "4.3.2.1") / TCP(22, 25); server_packet.rfind_pdu().flags(TCP::ACK); @@ -359,7 +370,7 @@ TEST_F(FlowTest, StreamFollower_ThreeWayHandshake) { EXPECT_EQ(61, stream.server_flow().sequence_number()); } -TEST_F(FlowTest, StreamFollower_MSS) { +TEST_F(FlowTest, StreamFollower_TCPOptions) { using std::placeholders::_1; vector packets = three_way_handshake(29, 60, "1.2.3.4", 22, "4.3.2.1", 25); @@ -367,6 +378,8 @@ TEST_F(FlowTest, StreamFollower_MSS) { packets[0].rfind_pdu().mss(1220); // Server's mss is 1460 packets[1].rfind_pdu().mss(1460); + // Server supports SACK + packets[1].rfind_pdu().sack_permitted(); StreamFollower follower; follower.new_stream_callback(bind(&FlowTest::on_new_stream, this, _1)); for (size_t i = 0; i < packets.size(); ++i) { @@ -376,8 +389,33 @@ TEST_F(FlowTest, StreamFollower_MSS) { IPv4Address("4.3.2.1"), 25); EXPECT_EQ(1220, stream.client_flow().mss()); EXPECT_EQ(1460, stream.server_flow().mss()); + EXPECT_FALSE(stream.client_flow().sack_permitted()); + EXPECT_TRUE(stream.server_flow().sack_permitted()); } +TEST_F(FlowTest, StreamFollower_CleanupWorks) { + using std::placeholders::_1; + + vector packets = three_way_handshake(29, 60, "1.2.3.4", 22, "4.3.2.1", 25); + StreamFollower follower; + follower.new_stream_callback(bind(&FlowTest::on_new_stream, this, _1)); + packets[2].rfind_pdu().src_addr("6.6.6.6"); + auto base_time = duration_cast(system_clock::now().time_since_epoch()); + Packet packet1(packets[0], base_time); + Packet packet2(packets[1], base_time + seconds(50)); + Packet packet3(packets[2], base_time + minutes(10)); + follower.process_packet(packet1); + Stream& stream = follower.find_stream(IPv4Address("1.2.3.4"), 22, + IPv4Address("4.3.2.1"), 25); + EXPECT_EQ(base_time, stream.create_time()); + follower.process_packet(packet2); + follower.process_packet(packet3); + // At this point, it should be cleaned up + EXPECT_THROW( + follower.find_stream(IPv4Address("1.2.3.4"), 22, IPv4Address("4.3.2.1"), 25), + stream_not_found + ); +} TEST_F(FlowTest, StreamFollower_RSTClosesStream) { using std::placeholders::_1;