# Copyright (c) 2003-2006 CORE Security Technologies # # This software is provided under under a slightly modified version # of the Apache Software License. See the accompanying LICENSE file # for more information. # # $Id: ImpactDecoder.py,v 1.6 2006/05/23 22:25:34 gera Exp $ # # Description: # Convenience packet unpackers for various network protocols # implemented in the ImpactPacket module. # # Author: # Javier Burroni (javier) # Bruce Leidl (brl) import ImpactPacket """Classes to convert from raw packets into a hierarchy of ImpactPacket derived objects. The protocol of the outermost layer must be known in advance, and the packet must be fed to the corresponding decoder. From there it will try to decode the raw data into a hierarchy of ImpactPacket derived objects; if a layer's protocol is unknown, all the remaining data will be wrapped into a ImpactPacket.Data object. """ class Decoder: def decode(self, aBuffer): pass class EthDecoder(Decoder): def __init__(self): pass def decode(self, aBuffer): e = ImpactPacket.Ethernet(aBuffer) off = e.get_header_size() if e.get_ether_type() == ImpactPacket.IP.ethertype: self.ip_decoder = IPDecoder() packet = self.ip_decoder.decode(aBuffer[off:]) elif e.get_ether_type() == ImpactPacket.ARP.ethertype: self.arp_decoder = ARPDecoder() packet = self.arp_decoder.decode(aBuffer[off:]) else: self.data_decoder = DataDecoder() packet = self.data_decoder.decode(aBuffer[off:]) e.contains(packet) return e # Linux "cooked" capture encapsulation. # Used, for instance, for packets returned by the "any" interface. class LinuxSLLDecoder(Decoder): def __init__(self): pass def decode(self, aBuffer): e = ImpactPacket.LinuxSLL(aBuffer) off = 16 if e.get_ether_type() == ImpactPacket.IP.ethertype: self.ip_decoder = IPDecoder() packet = self.ip_decoder.decode(aBuffer[off:]) elif e.get_ether_type() == ImpactPacket.ARP.ethertype: self.arp_decoder = ARPDecoder() packet = self.arp_decoder.decode(aBuffer[off:]) else: self.data_decoder = DataDecoder() packet = self.data_decoder.decode(aBuffer[off:]) e.contains(packet) return e class IPDecoder(Decoder): def __init__(self): pass def decode(self, aBuffer): i = ImpactPacket.IP(aBuffer) off = i.get_header_size() if i.get_ip_p() == ImpactPacket.UDP.protocol: self.udp_decoder = UDPDecoder() packet = self.udp_decoder.decode(aBuffer[off:]) elif i.get_ip_p() == ImpactPacket.TCP.protocol: self.tcp_decoder = TCPDecoder() packet = self.tcp_decoder.decode(aBuffer[off:]) elif i.get_ip_p() == ImpactPacket.ICMP.protocol: self.icmp_decoder = ICMPDecoder() packet = self.icmp_decoder.decode(aBuffer[off:]) else: self.data_decoder = DataDecoder() packet = self.data_decoder.decode(aBuffer[off:]) i.contains(packet) return i class ARPDecoder(Decoder): def __init__(self): pass def decode(self, aBuffer): arp = ImpactPacket.ARP(aBuffer) off = arp.get_header_size() self.data_decoder = DataDecoder() packet = self.data_decoder.decode(aBuffer[off:]) arp.contains(packet) return arp class UDPDecoder(Decoder): def __init__(self): pass def decode(self, aBuffer): u = ImpactPacket.UDP(aBuffer) off = u.get_header_size() self.data_decoder = DataDecoder() packet = self.data_decoder.decode(aBuffer[off:]) u.contains(packet) return u class TCPDecoder(Decoder): def __init__(self): pass def decode(self, aBuffer): t = ImpactPacket.TCP(aBuffer) off = t.get_header_size() self.data_decoder = DataDecoder() packet = self.data_decoder.decode(aBuffer[off:]) t.contains(packet) return t class IPDecoderForICMP(Decoder): """This class was added to parse the IP header of ICMP unreachables packets If you use the "standard" IPDecoder, it might crash (see bug #4870) ImpactPacket.py because the TCP header inside the IP header is incomplete""" def __init__(self): pass def decode(self, aBuffer): i = ImpactPacket.IP(aBuffer) off = i.get_header_size() if i.get_ip_p() == ImpactPacket.UDP.protocol: self.udp_decoder = UDPDecoder() packet = self.udp_decoder.decode(aBuffer[off:]) else: self.data_decoder = DataDecoder() packet = self.data_decoder.decode(aBuffer[off:]) i.contains(packet) return i class ICMPDecoder(Decoder): def __init__(self): pass def decode(self, aBuffer): ic = ImpactPacket.ICMP(aBuffer) off = ic.get_header_size() if ic.get_icmp_type() == ImpactPacket.ICMP.ICMP_UNREACH: self.ip_decoder = IPDecoderForICMP() packet = self.ip_decoder.decode(aBuffer[off:]) else: self.data_decoder = DataDecoder() packet = self.data_decoder.decode(aBuffer[off:]) ic.contains(packet) return ic class DataDecoder(Decoder): def decode(self, aBuffer): d = ImpactPacket.Data(aBuffer) return d