Commit bbeff3a0 authored by Juraj Sloboda's avatar Juraj Sloboda

Asyncio and code improvement

- Use asyncio and aiohttp
- Reorganize code
- Separate BP, CLA and application layers
parent 4cd5f30f
......@@ -66,7 +66,7 @@ uPCN-1_1 | [Wed Mar 21 14:42:03 2018]: Set contact pair to opportunistic! (-1)
uPCN-1_1 | [Wed Mar 21 14:42:03 2018]: RouterTask: Contact removed from table (1184879392) [components/upcn/src/routerTask.c:201]
# pyDTN-1 console
172.25.0.1 - - [21/Mar/2018 13:41:46] "POST /bundle?destination=dtn:pyDTN-2.dtn HTTP/1.1" 200 -
172.25.1.1 - - [21/Mar/2018 13:41:46] "POST /bundle?destination=dtn://pyDTN-2.dtn HTTP/1.1" 200 -
0000 64 74 6E 21 03 00 00 00 11 64 74 6E 3A 2F 2F 70 dtn!.....dtn://p
0016 79 44 54 4E 2D 31 2E 64 74 6E yDTN-1.dtn
......
......@@ -12,21 +12,21 @@ This is the PoC (Proof of Concept) implementation of:
* [DTN IP Neighbor Discovery (IPND)](https://tools.ietf.org/html/draft-irtf-dtnrg-ipnd-03)
* Send/receive a beacon, code exists, integration in progress, see %2 milestone
* Send/receive a Beacon, code exists, integration in progress, see %2 milestone
# Use Case
![current_state](doc/readme/current_state.png)
1. `pyDTN-2` receives `REST` request to schedule contact: `pyDTN-2.dtn` is available on `172.25.1.11:2002`
1. `pyDTN-2` receives `REST` request to schedule contact: `//pyDTN-2.dtn` is available on `172.25.2.11:2002`
1. `pyDTN-2` schedules contact with `uPCN`, note following issues:
* Proprietary (not part of any draft) `uPCN` config bundle is used
* `pyDTN-2` receives `uPCN`'s IP address/port on a [command line](docker/docker-compose.yml#L39)
1. `pyDTN-1` receives `REST` request to send a bundle to `pyDTN-2`
1. `pyDTN-1` sends bundle to `uPCN`, note following issues:
* `pyDTN-1` receives `uPCN`'s IP address/port on a [command line](docker/docker-compose.yml#L20)
* IP address/port is then set (kind of hard-coded) in [ipnd/httpd.py](ipnd/httpd.py#L42)
* IP address/port is then set (kind of hard-coded) in [ipnd/httpd.py](src/httpd/httpd.py#L42)
1. `uPCN` receives a bundle from `pyDTN-1` and immediately forwards it to `pyDTN-2`, note following issues:
* `uPCN` currently does not store any bundles, if destination is not available, bundle is simply dropped
......
import argparse
import waitress
from bp_daemon.bp_daemon import BPServer
from ipnd import httpd
parser = argparse.ArgumentParser()
parser.add_argument("--node", type=str)
parser.add_argument("--port", type=int, default=42420)
parser.add_argument("--http-port", type=int, default=8080)
parser.add_argument("--upcn-host", type=str, default="127.0.0.1")
parser.add_argument("--upcn-port", type=int, default=4556)
args = parser.parse_args()
# Start Bundle protocol server
bp = BPServer()
bp.port = args.port
bp.start()
# Start REST endpoint
httpd.node = args.node
httpd.upcn_host = args.upcn_host
httpd.upcn_port = args.upcn_port
waitress.serve(httpd.app, host='0.0.0.0', port=args.http_port)
import select
import socket
import threading
import time
from cbor2 import decoder
from ipnd.hexdump import hexdump
from ipnd import common
from ipnd.sdnv import sdnv_decode
from ipnd.tcpcl import ContactMessage, logging
logging.basicConfig(level=logging.DEBUG)
class Client(object):
"""Internal representation of a BP client as viewed by the server
"""
def __init__(self, socket_, address):
socket_.setblocking(False)
self.socket = socket_
self.address = address
self.buffer = bytearray()
self.send_buffer = bytearray()
self.message_list = []
logging.debug("New client from {}".format(address))
self.send_contact_msg()
self.keepalive_tx_time = None
def send_contact_msg(self):
# TODO: https://tools.ietf.org/html/rfc7242#section-3.2https://tools.ietf.org/html/rfc7242
contact_msg = ContactMessage("dtn://node2.dtn")
self.send(contact_msg.serialize())
def fileno(self):
return self.socket.fileno()
def send(self, data):
self.keepalive_tx_time = time.time()
self.send_buffer += data
def get_ip(self):
return self.address[0]
def handle(self):
try:
msg = self.socket.recv(4096 * 64)
except socket.error:
return False
if len(msg) == 0:
return False
self.buffer += msg
logging.debug("Received msg from {} of length {} (new buffer length: {} message:\n{}):"
.format(self.address, len(msg), len(self.buffer), hexdump(msg)))
try:
# TODO: unused variable: msg_stripped
# msg_stripped = self.decode_tcpcl_data_segment(msg)
self.decode_tcpcl_data_segment(msg)
except Exception as e:
logging.debug("Not a bundle {}".format(e))
return True
def handle_write(self):
if len(self.send_buffer) == 0:
return True
try:
sent = self.socket.send(self.send_buffer)
except socket.error:
return False
self.send_buffer = self.send_buffer[sent:]
return True
def handle_buffer(self):
if not self.buffer:
return
logging.debug("Message: \n{}".format(hexdump(self.buffer)))
self.buffer = bytearray()
def check(self):
"""Perform periodic actions, such as check of ingress buffer."""
self.handle_buffer()
def close(self):
self.socket.close()
@staticmethod
def extract_payload(block):
return block[5]
def decode_bundle(self, data):
obj = decoder.loads(data) # type: list
for block in obj: # type: list
item_no = 0
for item in block:
if item_no == 0 and item == 1:
logging.debug('Received payload block: {}'.format(self.extract_payload(block).decode('ascii')))
break
item_no += 1
def decode_tcpcl_data_segment(self, data):
stripped_header = data[1:]
payload_length, bytes_decoded = sdnv_decode(buffer=stripped_header)
logging.debug("payload payload_length {} decoded bytes {} send for decoding \n{}"
.format(payload_length, bytes_decoded, hexdump(stripped_header[bytes_decoded:])))
return self.decode_bundle(stripped_header[bytes_decoded:])
class BPServer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.host = ""
self.node = ""
self.port = 42420
self.socket = None
self.running = True
self.clients = []
self.clients_lock = threading.Lock()
def connect(self):
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(1)
self.socket.setblocking(False)
except socket.error as e:
logging.error(e)
if self.socket is not None:
self.socket.close()
self.socket = None
return False
return True
def find_client(self, ip):
for client in self.clients:
if client.address[0] == ip:
return client
return None
def is_running(self):
return self.running
def stop(self):
self.running = False
def run(self):
if not self.connect():
return
logging.debug("Bundle protocol server ready, waiting for connections...")
while self.running:
with self.clients_lock:
infds, outfds, exceptfds = select.select([self.socket] +
self.clients, self.clients, self.clients, 0.1)
for node in infds:
if node is self.socket:
client = Client(*self.socket.accept())
self.clients.append(client)
continue
if not node.handle():
logging.debug("Removing peer from clients list")
self.clients.remove(node)
continue
for node in outfds:
if not node.handle_write():
logging.debug("Removing peer from clients list")
if node in self.clients:
self.clients.remove(node)
continue
for node in exceptfds:
logging.error("Exception on {}".format(node))
for client in list(self.clients):
client.check()
time.sleep(0.1)
with self.clients_lock:
for client in self.clients:
client.close()
self.socket.close()
logging.debug("Bundle protocol server shutdown successful")
class BPClient(threading.Thread):
def __init__(self, node=None):
threading.Thread.__init__(self)
self.host = None
self.port = None
self.socket = None
self.running = True
self.buffer = ""
self.node = node
self.contactHeaderSent = False
def connect(self):
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
self.socket.setblocking(False)
except socket.error as e:
logging.error(e)
if self.socket is not None:
self.socket.close()
self.socket = None
return False
return True
def get_receive_buffer(self):
try:
msg = self.socket.recv(4096 * 64)
except socket.error:
return False
if len(msg) < 10:
return False
logging.debug("Message arrived, length {} \n{}".format(len(msg), hexdump(msg)))
def close(self):
self.running = False
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
def run(self):
if not self.connect():
return
while self.running:
self.get_receive_buffer()
time.sleep(1)
def wait_until_ready(self):
if self.socket or not self.running:
return
while True:
if self.socket or not self.running:
return
time.sleep(0.01)
def handle_buffer(self):
if not self.buffer:
return
logging.debug("Buffer: {}").format(self.buffer)
def send_buffer(self, buffer):
self.wait_until_ready()
total_sent = 0
while total_sent < len(buffer):
sent = self.socket.send(buffer[total_sent:])
if sent == 0:
raise RuntimeError("socket connection broken")
total_sent = total_sent + sent
def send_contact_msg(self):
if self.contactHeaderSent:
return
contact_msg = ContactMessage(self.node).serialize()
# TODO: https://tools.ietf.org/html/rfc7242#section-3.2https://tools.ietf.org/html/rfc7242
logging.debug("Going to send contact header of length {} \n {}".format(len(contact_msg), contact_msg))
self.send_buffer(contact_msg)
self.contactHeaderSent = True
def send_config_msg(self, eid, cla_address, start_offset, end_offset):
self.send_contact_msg()
config_msg = common.serialize_upcn_config_message(
eid,
cla_address,
contacts=[
common.make_contact(start_offset, end_offset, 500),
],
)
msg = common.serialize_tcpcl_single_bundle_segment(
common.serialize_bundle(
self.node,
"dtn://ops-sat.dtn/config",
config_msg,
)
)
logging.debug(hexdump(msg))
self.send_buffer(msg)
def send_bundle_msg(self, destination, payload):
self.send_contact_msg()
self.send_buffer(common.serialize_tcpcl_single_bundle_segment(
common.serialize_bundle(
self.node,
destination,
payload,
)
))
# TODO: move to tests
#
# from datetime import timedelta
# from . import Bundle, BundleAgeBlock, BundleProcFlag, BundleStatusReport, CreationTimestamp, HopCountBlock, PayloadBlock,\
# PreviousNodeBlock, PrimaryBlock, ReasonCode, StatusCode, dtn_start, print_hex
#
# primary_block = PrimaryBlock(
# bundle_proc_flags=BundleProcFlag.MUST_NOT_BE_FRAGMENTED
# | BundleProcFlag.CONTAINS_MANIFEST
# | BundleProcFlag.REPORT_DELIVERY
# | BundleProcFlag.REPORT_STATUS_TIME,
# destination="dtn:GS2",
# source="dtn:none",
# report_to="dtn:none",
# creation_time=CreationTimestamp(dtn_start, 0),
# )
#
# payload = PayloadBlock(b"Hello world!")
#
# # Extension blocks
# previous_node = PreviousNodeBlock("dtn:GS4")
# hop_count = HopCountBlock(30, 0)
# bundle_age = BundleAgeBlock(0)
#
# bundle = Bundle([
# primary_block,
# previous_node,
# hop_count,
# bundle_age,
# payload,
# ])
#
# # Bundle Status Report "Received bundle"
# status_report = BundleStatusReport(infos=StatusCode.RECEIVED_BUNDLE,
# reason=ReasonCode.NO_INFO,
# bundle=bundle,
# time=dtn_start + timedelta(seconds=10))
#
# status_report_bundle = Bundle([
# PrimaryBlock(
# bundle_proc_flags=BundleProcFlag.ADMINISTRATIVE_RECORD,
# destination=bundle.primary_block.report_to,
# ),
# status_report,
# ])
#
# print_hex(bundle.serialize())
# # print_hex(status_report_bundle.serialize())
# # print_hex(status_report.serialize())
#
# with open("bundle.cbor", "wb") as fd:
# fd.write(bundle.serialize())
# pyDTN-1 (172.25.0.11) --- uPCN-1 (172.25.0.21) --- pyDTN-2 (172.25.0.12)
# pyDTN-1 (172.25.1.11) --- uPCN-1 (172.25.1.21, 172.25.2.21) --- pyDTN-2 (172.25.2.12)
version: "2"
services:
......@@ -13,14 +13,11 @@ services:
tty: true
restart: always
volumes:
- ../bp_daemon:/bp_daemon
- ../bundle7:/bundle7
- ../cbor2:/cbor2
- ../ipnd:/ipnd
command: python3 -m bp_daemon --node dtn://pyDTN-1.dtn --port 2001 --http-port 8081 --upcn-host 172.25.0.21 --upcn-port 4556
- ../src:/pyDTN
command: --node dtn://pyDTN-1.dtn --port 2001 --http-port 8081 --upcn-host 172.25.1.21 --upcn-port 4556
networks:
dtnnet1:
ipv4_address: 172.25.0.11
ipv4_address: 172.25.1.11
pyDTN-2:
image: pydtn
......@@ -32,35 +29,32 @@ services:
tty: true
restart: always
volumes:
- ../bp_daemon:/bp_daemon
- ../bundle7:/bundle7
- ../cbor2:/cbor2
- ../ipnd:/ipnd
command: python3 -m bp_daemon --node dtn://pyDTN-2.dtn --port 2002 --http-port 8082 --upcn-host 172.25.1.21 --upcn-port 4556
- ../src:/pyDTN
command: --node dtn://pyDTN-2.dtn --port 2002 --http-port 8082 --upcn-host 172.25.2.21 --upcn-port 4556
networks:
dtnnet2:
ipv4_address: 172.25.1.11
ipv4_address: 172.25.2.11
uPCN-1:
image: upcn
build:
context: uPCN
hostname: uPCN-1
tty: true
networks:
dtnnet1:
ipv4_address: 172.25.0.21
dtnnet2:
ipv4_address: 172.25.1.21
tty: true
dtnnet2:
ipv4_address: 172.25.2.21
networks:
dtnnet1:
driver: bridge
ipam:
config:
- subnet: 172.25.0.0/24
- subnet: 172.25.1.0/24
dtnnet2:
driver: bridge
ipam:
config:
- subnet: 172.25.1.0/24
- subnet: 172.25.2.0/24
......@@ -4,4 +4,8 @@ FROM python:3
# Update aptitude with new repo
RUN apt-get update
RUN apt-get install -y python3-pip joe tcpdump
RUN pip install cbor cbor2 Flask waitress
RUN pip install cbor cbor2 aiohttp
WORKDIR /pyDTN
ENTRYPOINT ["python3", "-m", "main"]
import logging
from flask import Flask, jsonify, request, abort
from bp_daemon import bp_daemon
logging.basicConfig(level=logging.DEBUG)
node = ""
upcn_host = ""
upcn_port = 0
app = Flask(__name__)
@app.route('/config', methods=['POST'])
def handle_config():
if not request.json or 'messages' not in request.json:
abort(400) # HTTP Bad Request Error
client = bp_daemon.BPClient()
client.node = node
client.host = upcn_host
client.port = upcn_port
client.start()
for msg in request.json['messages']:
client.send_config_msg(msg["eid"], msg["cla_address"],
int(msg["start_offset"]),
int(msg["end_offset"]))
client.close()
return jsonify({'success': True})
@app.route('/bundle', methods=['POST'])
def handle_bundle():
if 'destination' not in request.args:
abort(400) # HTTP Bad Request Error
client = bp_daemon.BPClient()
client.node = node
client.host = upcn_host
client.port = upcn_port
client.start()
client.send_bundle_msg(request.args['destination'], request.data)
client.close()
return jsonify({'success': True})
# ******************** PyLint ***********************
#-----------------------------------------------------
#Path to the pylint executable to use in pylint analysis. Set to empty to use the default one (default is pylint).
#It must to be set on Windows. Use either masked backslashes or slashes as path separators; examples: C:\\Python26\\Scripts\\pylint.bat or C:/Python/26/Scripts/pylint.bat
#default : /usr/local/bin/pylint
#sonar.python.pylint=/usr/bin/pylint
#-----------------------------------------------------
#Path to the pylint configuration file (relative to project root or absolute) to use in pylint analysis. Set to empty to use the default.
#default: .pylintrc
#sonar.python.pylint_config
#-----------------------------------------------------
#Ant pattern describing the path to Pylint report, relative to projects root. No default value. The report have to conform to format "{path}:{line}: [{msg_id}({symbol}), {obj}] {msg}".
#defaut: pylint-report.txt
#sonar.python.pylint.reportPath=reports/pylint.log
# ******************** PyUnit ***********************
#-----------------------------------------------------
#Ant pattern describing the path to unit tests execution reports, relative to projects root. Leave unset to use the default ("xunit-reports/xunit-result-*.xml").
#Reports have to conform to the JUnit Report XML format.
#default : nosetests.xml
#sonar.python.xunit.reportPath=reports/unittests.xml
#-----------------------------------------------------
#When enabled the test execution statistics is provided only on project level. Use this mode when paths in report are not found. Disabled by default.
#default : true
#sonar.python.xunit.skipDetails
# ******************** Coverage ***********************
#-----------------------------------------------------
#Ant pattern describing the path to coverage reports, relative to projects root. Leave unset to use the default ("coverage-reports/coverage-*.xml").
#The reports have to conform to the Cobertura XML format.
#default: coverage.xml
#sonar.python.coverage.reportPath=reports/unit/coverage.xml
#-----------------------------------------------------
#Ant pattern describing the path to coverage reports for integration tests, relative to projects root. Leave unset to use the default ("coverage-reports/it-coverage-*.xml").
#The reports have to conform to the Cobertura XML format.
#default:it-coverage.xml
#sonar.python.coverage.itReportPath=reports/integration/coverage.xml
# ******************** Project ***********************
sonar.sources=.
sonar.exclusions=deployment/**/*,tests.py,integration_tests/*
#sonar.tests=.
#sonar.test.inclusions=tests.py
sonar.sourceEncoding=UTF-8
sonar.verbose=true
sonar.language=py
sonar.projectName=Bundle protocol
sonar.projectKey=eu.ifne.space:bp
sonar.projectVersion=1.0
sonar.host.url=https://status.ifne.eu/sonar
sonar.login=a7710c2e4fdff38ad469fe2a068d702593ed7976
import logging
from bundle_protocol import bp_agent
from bundle_protocol.bp import EndOfData
our_eid = None
bp_app = None
async def recv_callback(ctx, src_eid):
global our_eid
logging.info(f'BP client "{our_eid}" receiving ADU from "{src_eid}"')
while True:
try:
data = await bp_agent.receive(ctx)
except EndOfData:
break
print(data.decode('ascii'), end='', flush=True)
print()
async def start(args):
global bp_app, our_eid
our_eid = args.node
bp_app = bp_agent.register(our_eid, recv_callback)
from .bp import BPAgent
bp_agent = BPAgent()
import logging
from cbor2 import CBORDecoder
from bundle_protocol.convergence_layers.api import ConnectFailedError
from encoding.bundle7 import EID
from bundle_protocol.bp_node import get_bp_node_id
from bundle_protocol.conn_manager import ConnectionManager, UnknownEID, NoContact
from bundle_protocol import misc
from util.exceptions import AlreadyRegisteredError
from util.io_util import CountingBytesIO
from util.hexdump import hexdump
class EndOfData(Exception):
pass
class ClientNotRegistered(Exception):
def __init__(self, eid):
self.eid = eid
class BPAgent:
def __init__(self):
self._eid_registrations = []
self._routes = []
self._bundles_to_send = []
self._conn_manager = ConnectionManager(self._recv_callback)
def register_cl_adapter(self, adapter):
self._conn_manager.register_cl_adapter(adapter)
async def run_forever(self, args):
await self._conn_manager.run_forever()
def register(self, eid, recv_callback):
if eid in (reg._app_singleton_eid for reg in self._eid_registrations):
raise AlreadyRegisteredError
logging.info(f'BP Agent is registering client EID "{eid}"')
pa = BPPrivateAPI(self, eid, recv_callback)
self._eid_registrations.append(pa)
return pa
def _get_client_by_eid(self, eid):
for client in self._eid_registrations:
if client._app_singleton_eid == eid:
return client
return None
def add_route(self, target_eid, via_eid):
if (target_eid, via_eid) in self._routes:
raise AlreadyRegisteredError
if target_eid is not None:
logging.info(f'BP Agent adding route to "{target_eid}" via "{via_eid}"')
else:
logging.info(f'BP Agent adding default route via "{via_eid}"')
self._routes.append((target_eid, via_eid))
def _get_via_eid(self, target_eid):
for target, via in self._routes:
if target is None or target == target_eid:
return via
return None
async def _send_bundle(self, dst_eid, bundle):
try:
await self._conn_manager.try_to_send(dst_eid, bundle)
except UnknownEID:
logging.info(f'BP Agent doesn\'t know how to contact "{dst_eid}" node')
except NoContact:
logging.info(f'BP Agent is unable to contact "{dst_eid}" node right now, saving bundle')
self._bundles_to_send.append(bundle)
except ConnectFailedError:
logging.info(f'BP Agent failed to connect to "{dst_eid}" node')
async def _send_as_bundle(self, source, destination, payload):
data = misc.serialize_bundle(source, destination, payload)
via_eid = self._get_via_eid(destination)
if via_eid is None: