Commit 4e1f70cc authored by Boris Pilka's avatar Boris Pilka

Merge branch 'asyncio-and-code-improvement' into 'master'

Asyncio and code improvement

See merge request !17
parents bed6e717 14676f9e
[flake8]
max-line-length = 120
ignore = E203,W503
.scannerwork
*~
.idea
__pycache__
*.swp
*.egg-info
__pycache__
.idea
.scannerwork
# Sphinx
_build
_autosummary
# Log files
*.log
# Install docker
sudo apt install docker.io docker-compose
# Add user to docker group
usermod -a -G docker $USER
# Logout and login for group change above to take effect
groups | grep docker
pydtn adm cdrom sudo dip plugdev lpadmin sambashare docker
# Start topology
docker-compose up --build
# See pyDTN-1 logs
docker logs -f docker_pyDTN-1_1
# See pyDTN-2 logs
docker logs -f docker_pyDTN-2_1
# See uPCN-1 logs
docker logs -f docker_uPCN-1_1
# Send bundle
# Alternatively, you can use Postman and import docker/REST/pyDTN.postman_collection.json
cd tools/schedule_contact/
./send.sh
# List running containers including commands running inside
docker-compose top
# Restart a container
docker-compose restart pyDTN-1
# Cleanup
docker-compose down
docker container prune -f
docker image prune -a -f
# docker-compose up console
uPCN-1_1 | [Wed Mar 21 14:41:43 2018]: Connection on TCPCL level established 'dtn://pyDTN-2.dtn' [components/cla/src/posix/TCPCL/cla.c:187]
uPCN-1_1 | [Wed Mar 21 14:41:43 2018]: Couldn't find matching contact in existing gs list! (-1) [components/cla/src/posix/TCPCL/cla.c:74]
uPCN-1_1 | [Wed Mar 21 14:41:43 2018]: Will create opportunistic contact rx task! (-1) [components/cla/src/posix/TCPCL/cla.c:75]
uPCN-1_1 | [Wed Mar 21 14:41:43 2018]: contact_rx_task: Started up opportunistic (-1) [components/cla/src/posix/TCPCL/cla_contact_rx_task.c:417]
uPCN-1_1 | [Wed Mar 21 14:41:43 2018]: The peer has disconnected gracefully! (-1) [components/cla/src/posix/TCPCL/cla_io.c:308]
uPCN-1_1 | [Wed Mar 21 14:41:43 2018]: Payload size is 69 [components/upcn/src/bundleProcessor.c:362]
uPCN-1_1 | [Wed Mar 21 14:41:43 2018]: Received local bundle! (-1) [components/upcn/src/bundleProcessor.c:377]
uPCN-1_1 | [Wed Mar 21 14:41:43 2018]: configAgentTask: got routing command! (-1) [components/agents/src/config_agent.c:70]
uPCN-1_1 | [Wed Mar 21 14:41:43 2018]: RouterTask: Command processed successfully (49) [components/upcn/src/routerTask.c:135]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: ContactManager: Scheduled contact added (1184879392) [components/upcn/src/contactManager.c:226]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: contact_rx_task: Started up successfully (-1) [components/cla/src/posix/TCPCL/cla_contact_rx_task.c:419]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: contact_tx_task: Started successfully (1184879936) [components/cla/src/posix/TCPCL/cla_contact_tx_task.c:51]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: Connection on TCPCL level established 'dtn://pyDTN-1.dtn' [components/cla/src/posix/TCPCL/cla.c:187]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: Couldn't find matching contact in existing gs list! (-1) [components/cla/src/posix/TCPCL/cla.c:74]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: Will create opportunistic contact rx task! (-1) [components/cla/src/posix/TCPCL/cla.c:75]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: contact_rx_task: Started up opportunistic (-1) [components/cla/src/posix/TCPCL/cla_contact_rx_task.c:417]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: RouterTask: Processed bundle successfully (2) [components/upcn/src/routerTask.c:183]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: The peer has disconnected gracefully! (-1) [components/cla/src/posix/TCPCL/cla_io.c:308]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: Got dtn! magic! (-1) [components/cla/src/posix/TCPCL/cla_management.c:196]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: Received EID is dtn://node2.dtn [components/cla/src/posix/TCPCL/cla_management.c:220]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: Scheduled contact established on TCPCL level! (-1) [components/cla/src/posix/TCPCL/cla_management.c:222]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: Sending bundle with SN (0) [components/cla/src/posix/TCPCL/cla_contact_tx_task.c:88]
uPCN-1_1 | [Wed Mar 21 14:41:48 2018]: RouterTask: Sent bundle successfully (2) [components/upcn/src/routerTask.c:220]
uPCN-1_1 | [Wed Mar 21 14:42:03 2018]: ContactManager: Contact removed (1184879392) [components/upcn/src/contactManager.c:238]
uPCN-1_1 | [Wed Mar 21 14:42:03 2018]: Set contact pair to opportunistic! (-1) [components/cla/src/posix/TCPCL/cla_management.c:63]
uPCN-1_1 | [Wed Mar 21 14:42:03 2018]: RouterTask: Contact removed from table (1184879392) [components/upcn/src/routerTask.c:201]
# pyDTN-1 console
172.25.0.1 - - [21/Mar/2018 13:41:46] "POST /bundle?destination=dtn:pyDTN-2.dtn HTTP/1.1" 200 -
0000 64 74 6E 21 03 00 00 00 11 64 74 6E 3A 2F 2F 70 dtn!.....dtn://p
0016 79 44 54 4E 2D 31 2E 64 74 6E yDTN-1.dtn
Going to send contact header of length 26
bytearray(b'dtn!\x03\x00\x00\x00\x11dtn://pyDTN-1.dtn')
Message arrived, length 28
0000 64 74 6E 21 03 00 00 00 13 64 74 6E 3A 2F 2F 6F dtn!.....dtn://o
0016 70 73 2D 73 61 74 2E 64 74 6E 00 00 ps-sat.dtn..
# pyDTN-2 console
Received payload block: @
@
@
@
`@` @: `@ @ '@@@@@, @@ @@# @ @@ :@@#
+@ @@ @; @' #@ @@, '@# @@@+, @ @@ ,@,.@@
@@ #@ ;@ @@ @`#@ @: @@: @ @@ @;
@;`@` @` #@@, ;@ @` .@@# ;@ @@ @ `@# @#
,@@+ #@ @.#@ @::@ :@ #@ @ @@ @:@' `@+
@@ #@@@@@ @ ;@ @ `@ +@'+ ::` @ @.@@ @@; `@@,
;@@@ @# @; @@ @# '@@ @':. .;@`@@ @,@' ;@+
@, @, `@.@ @ @ `@#.#,` +,@ @@ @ `@# `@
@@ ;@ @@# @@@ @'@@@@@@`@@ @@ @ @@ @
#@ @@ ;@ .@ `@: ,'@.@ @@ @ @@ @` #@
.@ @+ @ @ .@`:.`:. @@ @ @@ #@@@@
@@+:` `
@@+,
@@
# Overview
This is the PoC (Proof of Concept) implementation of:
* [Bundle Protocol Version 7](https://tools.ietf.org/html/draft-ietf-dtn-bpbis-12)
* [Bundle Protocol Version 7](https://tools.ietf.org/html/draft-ietf-dtn-bpbis-13)
* Send/receive a bundle
* Register EIDs
* Schedule contacts
* Bundle routing
* Send bundle functionality is available via `REST` API, useful for testing
* Schedule contact functionality is available via `REST` API, useful for integration with [µPCN](https://upcn.eu/)
* Scheduling contacts for uPCN nodes is available via `REST` API, useful for integration with [uPCN](https://upcn.eu/)
* [Delay-Tolerant Networking TCP Convergence-Layer Protocol](https://tools.ietf.org/html/rfc7242)
* Send/receive contact headers
* [DTN IP Neighbor Discovery (IPND)](https://tools.ietf.org/html/draft-irtf-dtnrg-ipnd-03)
* Send/receive a beacon, code exists, integration in progress, see %2 milestone
* Send/receive a beacon, code exists, integration halted, since uPCN team decided not to support it
# Repository structure
```
/pydtn - Python library implementing DTN protocols
/pydtn_rest - Python library implementing REST API wrapping pydtn library API
/doc - Project documentation
/demo_app - Python demo application (imports pydtn_rest library)
/tools - Currently contains scripts and data files used for the demo
/docker - Containers for the demo
```
# Installation
Required Python version: 3.7.3+
#### pydtn library
```
pip3 install pydtn/
```
#### pydtn_rest library
```
pip3 install pydtn_rest/
```
# Documentation
# Use Case
![current_state](doc/readme/current_state.png)
#### Install requirements
```
pip3 install sphinx sphinx_rtd_theme
```
#### pydtn library
```
make -C pydtn/docs html
```
output:
pydtn/docs/_build/html/index.html
1. `pyDTN-2` receives `REST` request to schedule contact: `pyDTN-2.dtn` is available on `172.25.1.11:2002`
1. `pyDTN-2` schedules contact with `uPCN`, note following issues:
* Proprietary (not part of any draft) `uPCN` config bundle is used
* `pyDTN-2` receives `uPCN`'s IP address/port on a [command line](docker/docker-compose.yml#L39)
1. `pyDTN-1` receives `REST` request to send a bundle to `pyDTN-2`
1. `pyDTN-1` sends bundle to `uPCN`, note following issues:
* `pyDTN-1` receives `uPCN`'s IP address/port on a [command line](docker/docker-compose.yml#L20)
* IP address/port is then set (kind of hard-coded) in [ipnd/httpd.py](ipnd/httpd.py#L42)
1. `uPCN` receives a bundle from `pyDTN-1` and immediately forwards it to `pyDTN-2`, note following issues:
* `uPCN` currently does not store any bundles, if destination is not available, bundle is simply dropped
#### pydtn_rest library
```
make -C pydtn_rest/docs html
```
output:
pydtn_rest/docs/_build/html/index.html
Issues listed above will be solved as a part of %2 milestone.
# Try it yourself
See [HOWTO.txt](HOWTO.txt) (eventually will be integrated into this file)
# Demonstration
* [Communication with uPCN](UPCN_DEMO.md)
This diff is collapsed.
import argparse
import waitress
from bp_daemon.bp_daemon import BPServer
from ipnd import httpd
parser = argparse.ArgumentParser()
parser.add_argument("--node", type=str)
parser.add_argument("--port", type=int, default=42420)
parser.add_argument("--http-port", type=int, default=8080)
parser.add_argument("--upcn-host", type=str, default="127.0.0.1")
parser.add_argument("--upcn-port", type=int, default=4556)
args = parser.parse_args()
# Start Bundle protocol server
bp = BPServer()
bp.port = args.port
bp.start()
# Start REST endpoint
httpd.node = args.node
httpd.upcn_host = args.upcn_host
httpd.upcn_port = args.upcn_port
waitress.serve(httpd.app, host='0.0.0.0', port=args.http_port)
import select
import socket
import threading
import time
from cbor2 import decoder
from ipnd.hexdump import hexdump
from ipnd import common
from ipnd.sdnv import sdnv_decode
from ipnd.tcpcl import ContactMessage, logging
logging.basicConfig(level=logging.DEBUG)
class Client(object):
"""Internal representation of a BP client as viewed by the server
"""
def __init__(self, socket_, address):
socket_.setblocking(False)
self.socket = socket_
self.address = address
self.buffer = bytearray()
self.send_buffer = bytearray()
self.message_list = []
logging.debug("New client from {}".format(address))
self.send_contact_msg()
self.keepalive_tx_time = None
def send_contact_msg(self):
# TODO: https://tools.ietf.org/html/rfc7242#section-3.2https://tools.ietf.org/html/rfc7242
contact_msg = ContactMessage("dtn://node2.dtn")
self.send(contact_msg.serialize())
def fileno(self):
return self.socket.fileno()
def send(self, data):
self.keepalive_tx_time = time.time()
self.send_buffer += data
def get_ip(self):
return self.address[0]
def handle(self):
try:
msg = self.socket.recv(4096 * 64)
except socket.error:
return False
if len(msg) == 0:
return False
self.buffer += msg
logging.debug("Received msg from {} of length {} (new buffer length: {} message:\n{}):"
.format(self.address, len(msg), len(self.buffer), hexdump(msg)))
try:
# TODO: unused variable: msg_stripped
# msg_stripped = self.decode_tcpcl_data_segment(msg)
self.decode_tcpcl_data_segment(msg)
except Exception as e:
logging.debug("Not a bundle {}".format(e))
return True
def handle_write(self):
if len(self.send_buffer) == 0:
return True
try:
sent = self.socket.send(self.send_buffer)
except socket.error:
return False
self.send_buffer = self.send_buffer[sent:]
return True
def handle_buffer(self):
if not self.buffer:
return
logging.debug("Message: \n{}".format(hexdump(self.buffer)))
self.buffer = bytearray()
def check(self):
"""Perform periodic actions, such as check of ingress buffer."""
self.handle_buffer()
def close(self):
self.socket.close()
@staticmethod
def extract_payload(block):
return block[5]
def decode_bundle(self, data):
obj = decoder.loads(data) # type: list
for block in obj: # type: list
item_no = 0
for item in block:
if item_no == 0 and item == 1:
logging.debug('Received payload block: {}'.format(self.extract_payload(block).decode('ascii')))
break
item_no += 1
def decode_tcpcl_data_segment(self, data):
stripped_header = data[1:]
payload_length, bytes_decoded = sdnv_decode(buffer=stripped_header)
logging.debug("payload payload_length {} decoded bytes {} send for decoding \n{}"
.format(payload_length, bytes_decoded, hexdump(stripped_header[bytes_decoded:])))
return self.decode_bundle(stripped_header[bytes_decoded:])
class BPServer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.host = ""
self.node = ""
self.port = 42420
self.socket = None
self.running = True
self.clients = []
self.clients_lock = threading.Lock()
def connect(self):
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(1)
self.socket.setblocking(False)
except socket.error as e:
logging.error(e)
if self.socket is not None:
self.socket.close()
self.socket = None
return False
return True
def find_client(self, ip):
for client in self.clients:
if client.address[0] == ip:
return client
return None
def is_running(self):
return self.running
def stop(self):
self.running = False
def run(self):
if not self.connect():
return
logging.debug("Bundle protocol server ready, waiting for connections...")
while self.running:
with self.clients_lock:
infds, outfds, exceptfds = select.select([self.socket] +
self.clients, self.clients, self.clients, 0.1)
for node in infds:
if node is self.socket:
client = Client(*self.socket.accept())
self.clients.append(client)
continue
if not node.handle():
logging.debug("Removing peer from clients list")
self.clients.remove(node)
continue
for node in outfds:
if not node.handle_write():
logging.debug("Removing peer from clients list")
if node in self.clients:
self.clients.remove(node)
continue
for node in exceptfds:
logging.error("Exception on {}".format(node))
for client in list(self.clients):
client.check()
time.sleep(0.1)
with self.clients_lock:
for client in self.clients:
client.close()
self.socket.close()
logging.debug("Bundle protocol server shutdown successful")
class BPClient(threading.Thread):
def __init__(self, node=None):
threading.Thread.__init__(self)
self.host = None
self.port = None
self.socket = None
self.running = True
self.buffer = ""
self.node = node
self.contactHeaderSent = False
def connect(self):
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
self.socket.setblocking(False)
except socket.error as e:
logging.error(e)
if self.socket is not None:
self.socket.close()
self.socket = None
return False
return True
def get_receive_buffer(self):
try:
msg = self.socket.recv(4096 * 64)
except socket.error:
return False
if len(msg) < 10:
return False
logging.debug("Message arrived, length {} \n{}".format(len(msg), hexdump(msg)))
def close(self):
self.running = False
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
def run(self):
if not self.connect():
return
while self.running:
self.get_receive_buffer()
time.sleep(1)
def wait_until_ready(self):
if self.socket or not self.running:
return
while True:
if self.socket or not self.running:
return
time.sleep(0.01)
def handle_buffer(self):
if not self.buffer:
return
logging.debug("Buffer: {}").format(self.buffer)
def send_buffer(self, buffer):
self.wait_until_ready()
total_sent = 0
while total_sent < len(buffer):
sent = self.socket.send(buffer[total_sent:])
if sent == 0:
raise RuntimeError("socket connection broken")
total_sent = total_sent + sent
def send_contact_msg(self):
if self.contactHeaderSent:
return
contact_msg = ContactMessage(self.node).serialize()
# TODO: https://tools.ietf.org/html/rfc7242#section-3.2https://tools.ietf.org/html/rfc7242
logging.debug("Going to send contact header of length {} \n {}".format(len(contact_msg), contact_msg))
self.send_buffer(contact_msg)
self.contactHeaderSent = True
def send_config_msg(self, eid, cla_address, start_offset, end_offset):
self.send_contact_msg()
config_msg = common.serialize_upcn_config_message(
eid,
cla_address,
contacts=[
common.make_contact(start_offset, end_offset, 500),
],
)
msg = common.serialize_tcpcl_single_bundle_segment(
common.serialize_bundle(
self.node,
"dtn://ops-sat.dtn/config",
config_msg,
)
)
logging.debug(hexdump(msg))
self.send_buffer(msg)
def send_bundle_msg(self, destination, payload):
self.send_contact_msg()
self.send_buffer(common.serialize_tcpcl_single_bundle_segment(
common.serialize_bundle(
self.node,
destination,
payload,
)
))
# TODO: move to tests
#
# from datetime import timedelta
# from . import Bundle, BundleAgeBlock, BundleProcFlag, BundleStatusReport, CreationTimestamp, HopCountBlock, PayloadBlock,\
# PreviousNodeBlock, PrimaryBlock, ReasonCode, StatusCode, dtn_start, print_hex
#
# primary_block = PrimaryBlock(
# bundle_proc_flags=BundleProcFlag.MUST_NOT_BE_FRAGMENTED
# | BundleProcFlag.CONTAINS_MANIFEST
# | BundleProcFlag.REPORT_DELIVERY
# | BundleProcFlag.REPORT_STATUS_TIME,
# destination="dtn:GS2",
# source="dtn:none",
# report_to="dtn:none",
# creation_time=CreationTimestamp(dtn_start, 0),
# )
#
# payload = PayloadBlock(b"Hello world!")
#
# # Extension blocks
# previous_node = PreviousNodeBlock("dtn:GS4")
# hop_count = HopCountBlock(30, 0)
# bundle_age = BundleAgeBlock(0)
#
# bundle = Bundle([
# primary_block,
# previous_node,
# hop_count,
# bundle_age,
# payload,
# ])
#
# # Bundle Status Report "Received bundle"
# status_report = BundleStatusReport(infos=StatusCode.RECEIVED_BUNDLE,
# reason=ReasonCode.NO_INFO,
# bundle=bundle,
# time=dtn_start + timedelta(seconds=10))
#
# status_report_bundle = Bundle([
# PrimaryBlock(
# bundle_proc_flags=BundleProcFlag.ADMINISTRATIVE_RECORD,
# destination=bundle.primary_block.report_to,
# ),
# status_report,
# ])
#
# print_hex(bundle.serialize())
# # print_hex(status_report_bundle.serialize())
# # print_hex(status_report.serialize())
#
# with open("bundle.cbor", "wb") as fd:
# fd.write(bundle.serialize())
import argparse
import asyncio
import json
import logging.config
import yaml
import pydtn_rest
import app
logging.basicConfig(format="%(asctime)s - [%(name)s] - [%(levelname)-5s] - %(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)