diff --git a/pcs/__init__.py b/pcs/__init__.py index d9b9270..801625e 100644 --- a/pcs/__init__.py +++ b/pcs/__init__.py @@ -1315,6 +1315,7 @@ def fromHTML(self): """Create a Packet from HTML.""" pass + class Chain(list): """A chain is simply a list of packets. Chains are used to aggregate related sub packets into one chunk for transmission.""" @@ -1559,6 +1560,7 @@ def fixup(self): self.calc_checksums() self.encode() + class ConnNotImpError(Exception): """Calling a method that is not implemented raises this exception. diff --git a/pcs/packets/__init__.py b/pcs/packets/__init__.py index c4a375a..700b08c 100644 --- a/pcs/packets/__init__.py +++ b/pcs/packets/__init__.py @@ -5,6 +5,7 @@ 'ipv4', 'ipv6', 'icmpv4', + 'icmpv6', 'igmpv2', 'igmpv3', 'tcp', diff --git a/pcs/packets/icmpv6.py b/pcs/packets/icmpv6.py index e289ad2..a1eb809 100644 --- a/pcs/packets/icmpv6.py +++ b/pcs/packets/icmpv6.py @@ -35,8 +35,9 @@ # Description: A class which describes an ICMPv6 packet import pcs -import pcs.packets.pseudoipv6 -import pcs.packets.ipv4 +from pcs.packets.pseudoipv6 import pseudoipv6 +from pcs.packets.ipv4 import ipv4 + import struct @@ -132,6 +133,12 @@ def __init__(self, type = 0, bytes = None, **kv): else: pcs.Packet.__init__(self, [ty, code, cksum], bytes, **kv) + if (bytes is not None): + self.data = self.next(bytes[self.sizeof():len(bytes)], + timestamp = timestamp) + else: + self.data = None + def cksum(self, ip, data = "", nx = 0): """Calculate the checksum for this ICMPv6 header, outside of a chain.""" @@ -153,9 +160,9 @@ def calc_checksum(self): self.checksum = 0 if self._head is not None: payload = self._head.collate_following(self) - ip6 = self._head.find_preceding(self, pcs.packets.ipv6) + ip6 = self._head.find_preceding(self, pcs.packets.ipv6.ipv6)[0] assert ip6 is not None, "No preceding IPv6 header." - pip6 = pseudoipv6.pseudoipv6() + pip6 = pseudoipv6() pip6.src = ip6.src pip6.dst = ip6.dst pip6.next_header = ip6.next_header @@ -168,7 +175,7 @@ def calc_checksum(self): class icmpv6option(pcs.Packet): _layout = pcs.Layout() - + def __init__(self, type = 0, bytes = None, **kv): """add icmp6 option header RFC2461""" ty = pcs.Field("type", 8, default = type) @@ -196,10 +203,15 @@ def __init__(self, type = 0, bytes = None, **kv): elif type == 4: reserved = pcs.StringField("reserved", 48) pcs.Packet.__init__(self, [ty, length, reserved], bytes, **kv) - # MTU + # MTU elif type == 5: reserved = pcs.Field("reserved", 16) mtu = pcs.Field("mtu", 32) pcs.Packet.__init__(self, [ty, length, reserved, mtu], bytes, **kv) else: pcs.Packet.__init__(self, [ty, length], bytes, **kv) + + def calc_length(self): + """Calculate and store the length of the ICMPv6 option into the ICMPv6 + option packet. The length is in units of 8-octets (8-bytes).""" + self.length = len(self) >> 3 diff --git a/pcs/packets/ipv6.py b/pcs/packets/ipv6.py index a9480d6..5225d9c 100644 --- a/pcs/packets/ipv6.py +++ b/pcs/packets/ipv6.py @@ -49,6 +49,7 @@ IPV6_FRAG = 44 IPV6_ESP = 50 IPV6_AH = 51 +IPV6_ICMP = 58 IPV6_NONE = 59 IPV6_DSTOPTS = 60 @@ -57,7 +58,7 @@ class ipv6(pcs.Packet): _layout = pcs.Layout() _map = ipv6_map.map - + def __init__(self, bytes = None, timestamp = None, **kv): """IPv6 Packet from RFC 2460""" version = pcs.Field("version", 4, default = 6) @@ -77,17 +78,17 @@ def __init__(self, bytes = None, timestamp = None, **kv): else: self.timestamp = timestamp - if (bytes is not None): - ## 40 bytes is the standard size of an IPv6 header + # 40 bytes is the standard size of an IPv6 header offset = 40 self.data = self.next(bytes[offset:len(bytes)], timestamp = timestamp) else: self.data = None - + def __str__(self): - """Walk the entire packet and pretty print the values of the fields. Addresses are printed if and only if they are set and not 0.""" + """Walk the entire packet and pretty print the values of the fields. + Addresses are printed if and only if they are set and not 0.""" retval = "" for field in self._layout: if (field.name == "src" or field.name == "dst"): @@ -109,3 +110,9 @@ def getipv6(self, iface): break return v6 + def calc_length(self): + """Calculate the IPv6 payload length and store into header. Payload + length does not include the heads length.""" + self.length = 0 + if self._head is not None: + self.length = len(self._head.collate_following(self)) diff --git a/tests/ptptest.py b/tests/ptptest.py index dc0dd29..c4aa805 100644 --- a/tests/ptptest.py +++ b/tests/ptptest.py @@ -47,7 +47,7 @@ # with extra arguments. from pcs.packets.ptpv1 import * - from pcs.packets.ptpv1_common import Common + from pcs.packets.ptpv1_common import CommonV1 from pcs.packets.ipv4 import ipv4 from pcs.packets.udpv4 import udpv4 import pcs @@ -55,7 +55,7 @@ class ptpTestCase(unittest.TestCase): def test_ptp_header(self): # create one header, copy its bytes, then compare their fields - ptp = Common() + ptp = CommonV1() assert (ptp != None) ptp.versionPTP = 2 @@ -66,7 +66,7 @@ def test_ptp_header(self): ptp.controlField = 0 # Create a packet to compare against - ptpnew = Common() + ptpnew = CommonV1() ptpnew.decode(ptp.bytes) self.assertEqual(ptp.bytes, ptpnew.bytes, "bytes not equal") diff --git a/tests/ptpv1test.py b/tests/ptpv1test.py index 7250aee..9297724 100644 --- a/tests/ptpv1test.py +++ b/tests/ptpv1test.py @@ -47,7 +47,7 @@ # with extra arguments. from pcs.packets.ptpv1 import * - from pcs.packets.ptpv1_common import Common + from pcs.packets.ptpv1_common import CommonV1 from pcs.packets.ipv4 import ipv4 from pcs.packets.udpv4 import udpv4 import pcs @@ -69,7 +69,7 @@ def test_ptp_header(self): ptp.control = 0 # Create a packet to compare against - ptpnew = Common() + ptpnew = CommonV1() ptpnew.decode(ptp.bytes) self.assertEqual(ptp.bytes, ptpnew.bytes, "bytes not equal")