hololinked
is a beginner-friendly pythonic tool suited for instrumentation control and data acquisition over network (IoT & SCADA).
As a beginner, you have a requirement to control and capture data from your hardware, say in your electronics or science lab, and you want to show the data in a dashboard, provide a PyQt GUI or run automated scripts, hololinked
can help. Even for isolated desktop applications or a small setup without networking, one can still separate the concerns of the tools that interact with the hardware & the hardware itself.
If you are a web developer or an industry professional looking for a web standards compatible (high-speed) IoT runtime, hololinked
can be a decent choice as it follows the principles of W3C Web of Things. One can expect a consistent API and flexible bidirectional message flow for interacting with your devices, irrespective of the underlying protocol.
This package is a protocol agnostic RPC framework, currently supporting HTTP & ZMQ, but other protocols like MQTT, websockets are on the way. You can also implement your own protocol bindings. See use cases table.
From pip - pip install hololinked
From conda - conda install -c conda-forge hololinked
Or, clone the repository (main branch for latest codebase) and install pip install .
/ pip install -e .
. The conda env hololinked.yml
or uv environment uv.lock
can also help to setup all dependencies.
(As mentioned earlier) hololinked
is compatible with the W3C Web of Things recommended pattern for developing hardware/instrumentation control software. Each device or thing can be controlled systematically when their design in software is segregated into properties, actions and events. In object oriented terms:
- the hardware is represented by a class
- properties are validated get-set attributes of the class which may be used to model settings, hold captured/computed data or generic network accessible quantities
- actions are methods which issue commands like connect/disconnect, execute a control routine, start/stop measurement, or run arbitray python logic
- events can asynchronously communicate/push arbitrary data to a client, like alarm messages, streaming measured quantities etc.
For example, consider an optical spectrometer, the following code is possible:
from hololinked.core import Thing, Property, action, Event
from hololinked.core.properties import String, Integer, Number, List
from seabreeze.spectrometers import Spectrometer # a device driver
subclass from Thing
class to make a "network accessible Thing":
class OceanOpticsSpectrometer(Thing):
"""
OceanOptics spectrometers using seabreeze library. Device is identified by serial number.
"""
Say, we wish to make device serial number, integration time and the captured intensity as properties. There are certain predefined properties available like String
, Number
, Boolean
etc.
or one may define one's own using pydantic or JSON schema. To create properties:
class OceanOpticsSpectrometer(Thing):
"""class doc"""
serial_number = String(default=None, allow_None=True,
doc="serial number of the spectrometer to connect/or connected")
integration_time = Number(default=1000, bounds=(0.001, None), crop_to_bounds=True,
doc="integration time of measurement in milliseconds")
intensity = List(default=None, allow_None=True, doc="captured intensity", readonly=True,
fget=lambda self: self._intensity)
def __init__(self, id, serial_number, **kwargs):
super().__init__(id=id, serial_number=serial_number, **kwargs)
In non-expert terms, properties look like class attributes however their data containers are instantiated at object instance level by default. This is possible due to python descriptor protocol. For example, the integration_time
property defined above as Number
, whenever set/written, will be validated as a float or int, cropped to bounds and assigned as an attribute to each instance of the OceanOpticsSpectrometer
class with an internally generated name. It is not necessary to know this internally generated name as the property value can be accessed again in any python logic using the dot operator, say, print(self.integration_time)
.
One may overload the get-set (or read-write) of properties to customize their behavior:
class OceanOpticsSpectrometer(Thing):
integration_time = Number(default=1000, bounds=(0.001, None), crop_to_bounds=True,
doc="integration time of measurement in milliseconds")
@integration_time.setter # by default called on http PUT method
def set_integration_time(self, value : float):
self.device.integration_time_micros(int(value*1000))
self._integration_time = int(value)
@integration_time.getter # by default called on http GET method
def get_integration_time(self) -> float:
try:
return self._integration_time
except AttributeError:
return self.properties["integration_time"].default
In this case, instead of generating a data container with an internal name, the setter method is called when integration_time
property is set/written. One might add the hardware device driver logic here (say, supplied by the manufacturer) or a protocol that talks directly to the device to apply the property onto the device. In the above example, there is not a way provided by the device driver library to read the value from the device, so we store it in a variable after applying it and supply the variable back to the getter method. Normally, one would also want the getter to read from the device directly.
Those familiar with Web of Things (WoT) terminology may note that these properties generate the property affordance. An example for integration_time
is as follows:
"integration_time": {
"title": "integration_time",
"description": "integration time of measurement in milliseconds",
"type": "number",
"forms": [{
"href": "https://example.com/spectrometer/integration-time",
"op": "readproperty",
"htv:methodName": "GET",
"contentType": "application/json"
},{
"href": "https://example.com/spectrometer/integration-time",
"op": "writeproperty",
"htv:methodName": "PUT",
"contentType": "application/json"
}
],
"minimum": 0.001
},
If you are not familiar with Web of Things or the term "property affordance", consider the above JSON as a description of what the property represents and how to interact with it from somewhere else (in this case, over HTTP). Such a JSON is both human-readable, yet consumable by any application that may use the property - say, a client provider to create a client object to interact with the property or a GUI application to autogenerate a suitable input field for this property.
For example, the Eclipse ThingWeb
node-wot supports this feature to produce a HTTP(s) client in javascript that can issue readProperty("integration_time")
and writeProperty("integration_time", 1000)
to read and write this property.
decorate with action
decorator on a python method to claim it as a network accessible method:
class OceanOpticsSpectrometer(Thing):
@action(input_schema={"type": "object", "properties": {"serial_number": {"type": "string"}}})
def connect(self, serial_number = None): # by default invoked on HTTP POST
"""connect to spectrometer with given serial number"""
if serial_number is not None:
self.serial_number = serial_number
self.device = Spectrometer.from_serial_number(self.serial_number)
self._wavelengths = self.device.wavelengths().tolist()
@action() # by default invoked on HTTP POST
def disconnect(self):
"""disconnect from the spectrometer"""
self.device.close()
Methods that are neither decorated with action decorator nor acting as getters-setters of properties remain as plain python methods and are not accessible on the network.
In WoT Terminology, again, such a method becomes specified as an action affordance (or a description of what the action represents and how to interact with it):
"connect": {
"title": "connect",
"description": "connect to spectrometer with given serial number",
"forms": [
{
"href": "https://example.com/spectrometer/connect",
"op": "invokeaction",
"htv:methodName": "POST",
"contentType": "application/json"
}
],
"input": {
"type": "object",
"properties": {
"serial_number": {
"type": "string"
}
},
"additionalProperties": false
}
},
input and output schema ("input" field above which describes the argument type
serial_number
) are optional and are discussed in docs
create a named event using Event
object that can push any arbitrary data:
class OceanOpticsSpectrometer(Thing):
# only GET HTTP method possible for events
intensity_measurement_event = Event(name='intensity-measurement-event',
doc="""event generated on measurement of intensity,
max 30 per second even if measurement is faster.""",
schema=intensity_event_schema)
# schema is optional and will be discussed later,
# assume the intensity_event_schema variable is valid
def capture(self): # not an action, but a plain python method
self._run = True
last_time = time.time()
while self._run:
self._intensity = self.device.intensities(
correct_dark_counts=False,
correct_nonlinearity=False
)
curtime = datetime.datetime.now()
measurement_timestamp = curtime.strftime('%d.%m.%Y %H:%M:%S.') + '{:03d}'.format(
int(curtime.microsecond /1000))
if time.time() - last_time > 0.033: # restrict speed to avoid overloading
self.intensity_measurement_event.push({
"timestamp" : measurement_timestamp,
"value" : self._intensity.tolist()
})
last_time = time.time()
@action()
def start_acquisition(self):
if self._acquisition_thread is not None and self._acquisition_thread.is_alive():
return
self._acquisition_thread = threading.Thread(target=self.capture)
self._acquisition_thread.start()
@action()
def stop_acquisition(self):
self._run = False
Events can stream live data without polling or push data to a client whose generation in time is uncontrollable.
In WoT Terminology, such an event becomes specified as an event affordance (or a description of what the event represents and how to subscribe to it) with subprotocol SSE:
"intensity_measurement_event": {
"title": "intensity-measurement-event",
"description": "event generated on measurement of intensity, max 30 per second even if measurement is faster.",
"forms": [
{
"href": "https://example.com/spectrometer/intensity/measurement-event",
"subprotocol": "sse",
"op": "subscribeevent",
"htv:methodName": "GET",
"contentType": "text/plain"
}
],
"data": {
"type": "object",
"properties": {
"value": {
"type": "array",
"items": {
"type": "number"
}
},
"timestamp": {
"type": "string"
}
}
}
}
data schema ("data" field above which describes the event payload) are optional and discussed later
Events follow a pub-sub model with '1 publisher to N subscribers' per Event
object, both through any supported protocol including HTTP server sent events.
One can start the Thing object with one or more protocols simultaneously. Currently HTTP & ZMQ is supported. With HTTP server:
import ssl, os, logging
if __name__ == '__main__':
ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(f'assets{os.sep}security{os.sep}certificate.pem',
keyfile = f'assets{os.sep}security{os.sep}key.pem')
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
OceanOpticsSpectrometer(
id='spectrometer',
serial_number='S14155',
log_level=logging.DEBUG
).run_with_http_server(
port=9000, ssl_context=ssl_context
)
The base URL is constructed as http(s)://<hostname>:<port>/<thing_id>
With ZMQ:
if __name__ == '__main__':
OceanOpticsSpectrometer(
id='spectrometer',
serial_number='S14155',
).run(
access_points=['IPC', 'tcp://*:9999']
)
# both interprocess communication & TCP
Multiple:
if __name__ == '__main__':
OceanOpticsSpectrometer(
id='spectrometer',
serial_number='S14155',
).run(
access_points=['IPC']
)
# HTTP & ZMQ Interprocess Communication
To compose client objects, the JSON description of the properties, actions and events are used, which are summarized into a Thing Description. The following code would be possible:
Import the ClientFactory
and create an instance for the desired protocol:
from hololinked.client import ClientFactory
# for HTTP
thing = ClientFactory.http(url="http://localhost:8000/spectrometer/resources/wot-td")
# For HTTP, one needs to append `/resource/wot-td` to the base URL to construct the full URL as `http(s)://<hostname>:<port>/<thing_id>/resources/wot-td`. At this endpoint, the Thing Description will be autogenerated and loaded to compose a client.
# zmq IPC
thing = ClientFactory.zmq(thing_id='spectrometer', access_point='IPC')
# zmq TCP
thing = ClientFactory.zmq(thing_id='spectrometer', access_point='tcp://localhost:9999')
# For ZMQ Thing Description loading is automatically mediated
To issue operations:
Read Property
thing.read_property("integration_time")
# or use dot operator
thing.integration_time
within an async function:
async def func():
await thing.async_read_property("integration_time")
# dot operator not supported
Write Property
thing.write_property("integration_time", 2000)
# or use dot operator
thing.integration_time = 2000
within an async function:
async def func():
await thing.async_write_property("integration_time", 2000)
# dot operator not supported
Invoke Action
thing.invoke_action("connect", serial_number="S14155")
# or use dot operator
thing.connect(serial_number="S14155")
within an async function:
async def func():
await thing.async_invoke_action("connect", serial_number="S14155")
# dot operator not supported
Subscribe to Event
thing.subscribe_event("intensity_measurement_event", callbacks=lambda value: print(value))
There is no async subscribe, as events by nature appear at arbitrary times only when pushed by the server. Yet, events can be asynchronously listened and callbacks can be asynchronously invoked. Please refer documentation. To unsubscribe:
thing.unsubscribe_event("intensity_measurement_event")
Observe Property
thing.observe_property("integration_time", callbacks=lambda value: print(value))
Only observable properties (property where observable
was set to True
) can be observed. To unobserve:
thing.unobserve_property("integration_time")
Similary, one could consume the Thing Description in a Node.js script using node-wot:
const { Servient } = require("@node-wot/core");
const HttpClientFactory = require("@node-wot/binding-http").HttpClientFactory;
const servient = new Servient();
servient.addClientFactory(new HttpClientFactory());
servient.start().then((WoT) => {
fetch("http://localhost:8000/spectrometer/resources/wot-td")
.then((res) => res.json())
.then((td) => WoT.consume(td))
.then((thing) => {
thing.readProperty("integration_time").then(async(interactionOutput) => {
console.log("Integration Time: ", await interactionOutput.value());
})
)});
If you're using HTTPS, just make sure the server certificate is valid or trusted by the client.
const HttpsClientFactory = require("@node-wot/binding-http").HttpsClientFactory;
servient.addClientFactory(new HttpsClientFactory({ allowSelfSigned: true }));
(example here)
To issue operations:
Read Property
thing.readProperty("integration_time").then(async(interactionOutput) => { console.log("Integration Time:", await interactionOutput.value()); });
Write Property
thing.writeProperty("integration_time", 2000).then(() => { console.log("Integration Time updated"); });
Invoke Action
thing.invokeAction("connect", { serial_number: "S14155" }).then(() => { console.log("Device connected"); });
Subscribe to Event
thing.subscribeEvent("intensity_measurement_event", async (interactionOutput) => { console.log("Received event:", await interactionOutput.value()); });
Links to React Examples
In React, the Thing Description may be fetched inside `useEffect` hook, the client passed via `useContext` hook and the individual operations can be performed in their own callbacks attached to user elements.- examples repository - detailed examples for both clients and servers
- helper GUI - view & interact with your object's actions, properties and events.
- live demo - an example of an oscilloscope available for live test
You may use a script deployment/automation tool to remote stop and start servers, in an attempt to remotely control your hardware scripts.
See organization info for details regarding contributing to this package. There are:
- good first issues
- discord group
- weekly meetings and
- project planning to discuss activities around this repository.
One can setup a development environment with uv as follows:
- Install uv if you don't have it already: https://docs.astral.sh/uv/getting-started/installation/
- Create and activate a virtual environment:
uv venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
- Install the package in development mode with all dependencies:
uv pip install -e .
uv pip install -e ".[dev,test]"
To run the tests with uv:
In linux:
uv run --active coverage run -m unittest discover -s tests -p 'test_*.py'
uv run --active coverage report -m
In windows:
python -m unittest
- control method execution and property write with a custom finite state machine.
- database (Postgres, MySQL, SQLite - based on SQLAlchemy) support for storing and loading properties when the object dies and restarts.
- auto-generate Thing Description for Web of Things applications.
- use serializer of your choice (except for HTTP) - MessagePack, JSON, pickle etc. & extend serialization to suit your requirement. HTTP Server will support only JSON serializer to maintain comptibility with Javascript (MessagePack may be added later). Default is JSON serializer based on msgspec.
- asyncio compatible
Protocol | Plausible Use Cases | Operations |
---|---|---|
HTTP | Web Apps |
readproperty ,
writeproperty ,
observeproperty ,
unobserveproperty ,
invokeaction ,
subscribeevent ,
unsubscribeevent
properties and actions can be operated in a oneway and noblock manner as well |
ZMQ TCP | Networked Control Systems, subnet protected containerized apps like in Kubernetes | |
ZMQ IPC | Desktop Applications, Python Dashboards without exposing device API directly on network | |
ZMQ INPROC | High Speed Desktop Applications (again, not exposed on network), currently you will need some CPP magic or disable GIL to leverage it fully | |
MQTT | Upcoming (October 2025) |
observeproperty ,
unobserveproperty ,
subscribeevent ,
unsubscribeevent
|