Erstellen eines einfachen Plugins zur Erzeugung von smartCORE-Kanälen mit
Das einfachste (praktische) Remote-Plugin, das man für smartCORE schreiben kann, ist eines, das einfach seine eigenen Daten ohne Einfluss von außen erzeugt. In diesem Beispiel erstellen wir ein Plugin, das drei verschiedene Funktionssignale erzeugt – eine Sinuswelle, eine Rechteckwelle und eine Haifischzahnwelle.
Konfiguration
Zuerst erstellen wir eine Konfiguration für das smartCORE-Modul, die die von uns erzeugten Signale definiert. Um Signale an smartCORE auszugeben, müssen wir unserer Konfiguration den Schlüssel "producerChannels" hinzufügen. Er enthält eine Liste von Kanälen mit konfiguriertem Namen, Datentyp und physikalischer Einheit. Für unsere Signalnamen wählen wir "remote.test.[sine,square,sharktooth]" und für den Datentyp wählen wir "float". Hier könnten wir auch "int" oder "double" verwenden, je nachdem, was die erzeugten Daten erfordern. Wir geben keine physikalische Einheit an, da diese Kanäle für unsere Testzwecke beliebige Zahlen erzeugen.
{
"factory": "remote",
"module": "remote",
"config": {
"port": 61616,
"localhost": false,
"producerChannels": [{
"name": "remote.test.sine",
"dataType": "float"
},
{
"name": "remote.test.square",
"dataType": "float"
},
{
"name": "remote.test.sharktooth",
"dataType": "float"
}
]
}
}
Implementierung
In unserem Plugin müssen wir uns darauf vorbereiten, unsere Hauptschleife auszuführen. Zuerst stellen wir eine Verbindung zu smartCORE her und prüfen, ob es ausgeführt wird.
# Parse command line arguments
parser = argparse.ArgumentParser(description='Generates test signals and outputs them to smartCORE channels')
parser.add_argument('--port', dest='port', default=61616, type=int, required=False)
parser.add_argument('--addr', dest='addr', default='127.0.0.1', type=str, required=False)
args = parser.parse_args()
# establish connection to smartCORE
addr = (args.addr, args.port)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.settimeout(2.0)
buffer = packetHeader(CommandType.LifeSignRequest)
buffer += msgpack.packb({})
sock.sendto(buffer, addr)
# wait to receive a reply
received = sock.recv(1500)
# needs try errorhandling
header = header_from_buffer(received[:HEADER_SIZE])
# check that reply has the correct header
if header.type != CommandType.LifeSignResponse.value:
# TODO errorhandling
raise NotImplementedError
buf = BytesIO()
buf.write(received[HEADER_SIZE:])
buf.seek(0)
unpacker = msgpack.Unpacker(buf, raw=False)
# ensure smartCORE is running
for unpacked in unpacker:
if unpacked["smartcore-state"] != "Running":
raise RuntimeError("smartcore is not running")
Als Nächstes fordern wir die Liste der Kanäle an, auf die wir Zugriff haben. Sie sollte die Kanäle enthalten, die wir in unserer Konfiguration angegeben haben.
# Request channel list
buffer = packetHeader(CommandType.ChannelListRequest)
buffer += msgpack.packb({})
sock.sendto(buffer, addr)
# TODO: needs better read without cutoff
received = sock.recv(1500)
# TODO: needs try errorhandling
header = header_from_buffer(received[:HEADER_SIZE])
# check that reply has correct header
if header.type != CommandType.ChannelListResponse.value:
# TODO errorhandling
raise NotImplementedError
buf = BytesIO()
buf.write(received[HEADER_SIZE:])
buf.seek(0)
unpacker = msgpack.Unpacker(buf, raw=False)
for unpacked in unpacker:
channels = unpacked['c']
Zusätzlich erstellen wir ein Dict, das die Kanalnamen ihren Indizes zuordnet. Wir verwenden es, um unsere Daten an die richtigen smartCORE-Kanäle zu senden.
# map channel names to their indices
indexDict = {}
for channel in channels:
indexDict[channel['n']] = channel['i']
print(indexDict)
Jetzt, da wir das Grundgerüst aus dem Weg geräumt haben, kommt der spaßige Teil. Wir erstellen eine Endlosschleife und umschließen sie mit einer try-catch-Anweisung, damit wir die Schleife über eine Tastatureingabe (STRG + C) verlassen können. In der Schleife erstellen wir drei Variablen für unsere drei Kanäle. Sine, der den Sinus der aktuellen Zeit in ms nimmt, Square, der alle zwei Sekunden zwischen 6 und 0 wechselt, und Sharktooth, der das Modulo der aktuellen Zeit nimmt.
while True:
# Update signal generators
sine = math.sin(int(time.time() * 1_000))
square = 6 if int(time.time()) % 4 >= 2 else 0
sharktooth = time.time() % 51
Sobald unsere Daten bereit sind, beginnen wir mit der Erstellung eines WriteSamplesRequest-Pakets, das aus dem WriteSamplesRequest-Header und einer Nutzlast mit unseren Daten besteht. Die Nutzlast besteht aus einem "acknowledge"-Feld "a" und einer Liste von Kanälen "c". Die Kanäle selbst bestehen aus einem Index "i", einem Wert "v" und einem Zeitstempel "t". Wir füllen diese Felder mit ihren Indizes, aktuellen Werten und der aktuellen Zeit aus.
buffer = packetHeader(CommandType.WriteSamplesRequest)
payload = { "a": "some_placeholder",
"c": [
{
"i": indexDict["remote.test.sine"],
"v": sine,
"t": int(time.time() * 1_000)
},
{
"i": indexDict["remote.test.square"],
"v": square,
"t": int(time.time() * 1_000)
},
{
"i": indexDict["remote.test.sharktooth"],
"v": sharktooth,
"t": int(time.time() * 1_000)
}
]
}
# print(payload)
sys.stdout.flush()
buffer += msgpack.packb(payload)
sock.sendto(buffer, addr)
# TODO: needs better read without cutoff
Sobald wir das Paket gesendet und das "Ack-Paket" zurückerhalten haben, sollten wir unsere Signale in optiCONTROL oder optiCLOUD sehen können.