Skip to main content

Create a simple plugin to generate smartCORE channels with

The simplest (practical) remote plugin you can write for smartCORE is one that simply generates its own data without outside influence. In this example, we will create a plugin that generates three different function signals - a sine wave, a square wave and a shark tooth wave.

Configuration

First, we create a configuration for the smartCORE module that defines the signals we generate. To output signals to smartCORE, we need to add the "producerChannels" key to our configuration. It contains a list of channels with configured name, data type and physical unit. For our signal names we select "remote.test.[sine,square,sharktooth]" and for the data type we select "float". We could also use "int" or "double" here, depending on what the generated data requires. We do not specify a physical unit, as these channels generate arbitrary numbers for our test purposes.

{
"factory": "remote",
"module": "remote",
"config": {
"port": 61616,
"localhost": false,
"producerChannels": [{
"name": "remote.test.sine",
"dataType": "float"
},
{
"name": "remote.test.square",
"dataType": "float"
},
{
"name": "remote.test.sharktooth",
"dataType": "float"
}
]
}
}

Implementation

In our plugin, we need to prepare to execute our main loop. First, we connect to smartCORE and check if it is running.

# Parse command line arguments
parser = argparse.ArgumentParser(description='Generates test signals and outputs them to smartCORE channels')
parser.add_argument('--port', dest='port', default=61616, type=int, required=False)
parser.add_argument('--addr', dest='addr', default='127.0.0.1', type=str, required=False)
args = parser.parse_args()
# establish connection to smartCORE
addr = (args.addr, args.port)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.settimeout(2.0)
buffer = packetHeader(CommandType.LifeSignRequest)
buffer += msgpack.packb({})
sock.sendto(buffer, addr)
# wait to receive a reply
received = sock.recv(1500)
# needs try errorhandling
header = header_from_buffer(received[:HEADER_SIZE])
# check that reply has the correct header
if header.type != CommandType.LifeSignResponse.value:
# TODO errorhandling
raise NotImplementedError
buf = BytesIO()
buf.write(received[HEADER_SIZE:])
buf.seek(0)
unpacker = msgpack.Unpacker(buf, raw=False)
# ensure smartCORE is running
for unpacked in unpacker:
if unpacked["smartcore-state"] != "Running":
raise RuntimeError("smartcore is not running")

Next, we request the list of channels that we have access to. It should contain the channels we have specified in our configuration.

# Request channel list
buffer = packetHeader(CommandType.ChannelListRequest)
buffer += msgpack.packb({})
sock.sendto(buffer, addr)
# TODO: needs better read without cutoff
received = sock.recv(1500)
# TODO: needs try errorhandling
header = header_from_buffer(received[:HEADER_SIZE])
# check that reply has correct header
if header.type != CommandType.ChannelListResponse.value:
# TODO errorhandling
raise NotImplementedError
buf = BytesIO()
buf.write(received[HEADER_SIZE:])
buf.seek(0)
unpacker = msgpack.Unpacker(buf, raw=False)
for unpacked in unpacker:
channels = unpacked['c']

In addition, we create a dict that assigns the channel names to their indices. We use it to send our data to the correct smartCORE channels.

# map channel names to their indices
indexDict = {}

for channel in channels:
indexDict[channel['n']] = channel['i']
print(indexDict)

Now that we have the basic framework out of the way, here comes the fun part. We create an infinite loop and enclose it with a try-catch statement so that we can exit the loop via a keyboard input (CTRL + C). In the loop, we create three variables for our three channels. Sine, which takes the sine of the current time in ms, Square, which alternates between 6 and 0 every two seconds, and Sharktooth, which takes the modulo of the current time.

while True:
# Update signal generators
sine = math.sin(int(time.time() * 1_000))
square = 6 if int(time.time()) % 4 >= 2 else 0
sharktooth = time.time() % 51

Once our data is ready, we start creating a WriteSamplesRequest packet, which consists of the WriteSamplesRequest header and a payload with our data. The payload consists of an "acknowledge" field "a" and a list of channels "c". The channels themselves consist of an index "i", a value "v" and a timestamp "t". We fill in these fields with their indices, current values and the current time.


buffer = packetHeader(CommandType.WriteSamplesRequest)
payload = { "a": "some_placeholder",
"c": [
{
"i": indexDict["remote.test.sine"],
"v": sine,
"t": int(time.time() * 1_000)
},
{
"i": indexDict["remote.test.square"],
"v": square,
"t": int(time.time() * 1_000)
},
{
"i": indexDict["remote.test.sharktooth"],
"v": sharktooth,
"t": int(time.time() * 1_000)
}
]
}
# print(payload)
sys.stdout.flush()
buffer += msgpack.packb(payload)
sock.sendto(buffer, addr)
# TODO: needs better read without cutoff

Once we have sent the packet and received the "Ack packet" back, we should be able to see our signals in optiCONTROL or optiCLOUD.

You can find the complete source code here