treetrace

 1from .tracing.tracingnode import TracingNode, Tag, current_tracing_node, with_trace
 2from treetrace.tracing.serialization import (
 3    register_custom_serializer,
 4    unregister_custom_serializer,
 5)
 6from .tracing.data.format_str import FormatStr
 7from .tracing.data.blob import DataWithMime
 8from .tracing.data.html import Html
 9from .tracing.storage import FileStorage, StorageBase, current_storage
10from .utils.html_color import HtmlColor
11from .utils.text import shorten_str
12from .ui.console_server import ConsoleServer
13from . import ext
14
15__all__ = [
16    "TracingNode",
17    "Tag",
18    "current_tracing_node",
19    "current_storage",
20    "with_trace",
21    "FileStorage",
22    "StorageBase",
23    "DataWithMime",
24    "Html",
25    "FormatStr",
26    "ext",
27    "HtmlColor",
28    "shorten_str",
29    "ConsoleServer",
30    "register_custom_serializer",
31    "unregister_custom_serializer",
32]
class TracingNode:
 57class TracingNode:
 58    """
 59    A tracing object that represents a single request or (sub)task in a nested hierarchy.
 60
 61    The class has several attributes that are intended as read-only; use setters to modify them.
 62
 63    The `TracingNode` can be used as context manager, e.g.:
 64
 65    ```python
 66    with TracingNode("my node", inputs={"z": 42}) as c:
 67        c.add_input("x", 1)
 68        y = do_some_computation(x=1)
 69        # The tracing node would also note any exceptions raised here
 70        # (letting it propagate upwards), but a result needs to be set manually:
 71        c.set_result(y)
 72    # <- Here the tracing node is already closed.
 73    ```
 74    """
 75
 76    def __init__(
 77        self,
 78        name: str,
 79        kind: Optional[str] = None,
 80        inputs: Optional[Dict[str, Any]] = None,
 81        meta: Optional[Dict[str, Data]] = None,
 82        tags: Optional[Sequence[str | Tag]] = None,
 83        storage: Optional["StorageBase"] = None,
 84        directory=False,
 85        result=None,
 86    ):
 87        """
 88        - `name` - A description or name for the tracing node.
 89        - `kind` - Indicates category of the tracing node, may e.g. influence display of the tracing node.
 90        - `inputs` - A dictionary of inputs for the tracing node.
 91        - `meta` - A dictionary of any metadata for the tracing node, e.g. UI style data.
 92        - `tags` - A list of tags for the tracing node
 93        - `storage` - A storage object for the tracing node. Set on the root tracing node to log all nodes below it.
 94        - `directory` - Whether to create a sub-directory for the tracing node while storing.
 95          This allows you to split the stored data across multiple files.
 96        - `result` - The result value of the tracing node, if it has already been computed.
 97        """
 98
 99        if storage is None and current_tracing_node(False) is None:
100            storage = current_storage()
101
102        if inputs:
103            assert isinstance(inputs, dict)
104            assert all(isinstance(key, str) for key in inputs)
105            inputs = serialize_with_type(inputs)
106
107        if meta:
108            meta = serialize_with_type(meta)
109
110        if result:
111            result = serialize_with_type(result)
112
113        if tags is not None:
114            tags = [Tag.into_tag(tag) for tag in tags]
115
116        self.name = name
117        self.kind = kind
118        self.inputs = inputs
119        self.result = result
120        self.error = None
121        self.state: TracingNodeState = (
122            TracingNodeState.NEW if result is None else TracingNodeState.FINISHED
123        )
124        self.uid = generate_uid(name)
125        self.children: List[TracingNode] = []
126        self.tags: List[Tag] = tags
127        self.start_time = None
128        self.end_time = None if result is None else datetime.datetime.now()
129        self.meta = meta
130        self.storage = storage
131        self.directory = directory
132        self._token = None
133        self._depth = 0
134        self._lock = Lock()
135
136        if storage:
137            storage.register_node(self)
138
139    @classmethod
140    def deserialize(cls, data: Data, depth=0):
141        """
142        Deserialize a `TracingNode` object from given JSON data.
143
144        - `data` - A dictionary containing the serialized tracing data.
145        """
146        assert isinstance(data, dict)
147
148        # For backward compatibility we also "Context"
149        assert data["_type"] == "TracingNode" or data["_type"] == "Context"
150        self = cls.__new__(cls)
151        self.uid = data["uid"]
152        self.name = data["name"]
153
154        state = data.get("state")
155        if state:
156            state = TracingNodeState(state)
157        else:
158            state = TracingNodeState.FINISHED
159        self.state = state
160        for name in ["kind", "inputs", "result", "error", "tags", "meta"]:
161            setattr(self, name, data.get(name))
162        self.kind = data.get("kind")
163        self.inputs = data.get("inputs")
164        self.tags = data.get("tags")
165
166        start_time = data.get("start_time")
167        if start_time is None:
168            self.start_time = None
169        else:
170            self.start_time = datetime.datetime.fromisoformat(start_time)
171
172        end_time = data.get("end_time")
173        if end_time is None:
174            self.end_time = None
175        else:
176            self.end_time = datetime.datetime.fromisoformat(end_time)
177
178        children = data.get("children")
179        if children is None:
180            self.children = None
181        else:
182            new_depth = depth + 1
183            self.children = [
184                TracingNode.deserialize(child, depth=new_depth) for child in children
185            ]
186
187        self._token = None
188        self._depth = depth
189        self._lock = Lock()
190        return self
191
192    def to_dict(self, with_children=True, root=True):
193        """
194        Serialize `TracingNode` object into JSON structure.
195
196        - `with_children` - If True then children are recursively serialized.
197                            If False then serialization of children is skipped and only
198                            children UIDs are put into key `children_uids`
199        """
200        with self._lock:
201            result = {"_type": "TracingNode", "name": self.name, "uid": self.uid}
202            if root:
203                result["version"] = TRACING_FORMAT_VERSION
204            if self.state != TracingNodeState.FINISHED:
205                result["state"] = self.state.value
206            for name in ["kind", "result", "error", "tags"]:
207                value = getattr(self, name)
208                if value is not None:
209                    result[name] = value
210            if self.inputs:
211                result["inputs"] = self.inputs
212            if with_children and self.children:
213                result["children"] = [c.to_dict(root=False) for c in self.children]
214            if not with_children and self.children:
215                result["children_uids"] = [c.uid for c in self.children]
216            if self.start_time:
217                result["start_time"] = self.start_time.isoformat()
218            if self.end_time:
219                result["end_time"] = self.end_time.isoformat()
220            if self.meta:
221                result["meta"] = self.meta
222            if self.tags:
223                result["tags"] = serialize_with_type(self.tags)
224            return result
225
226    @property
227    def _pad(self):
228        return " " * self._depth
229
230    def __enter__(self):
231        def _helper(depth):
232            with self._lock:
233                assert not self._token
234                assert self.state == TracingNodeState.NEW
235                self.start_time = datetime.datetime.now()
236                self._depth = depth
237                self._token = _TRACING_STACK.set(parents + (self,))
238                self.state = TracingNodeState.OPEN
239                _LOG.debug(
240                    f"{self._pad}TracingNode {self.kind} inputs={shorten_str(self.inputs, 50)}"
241                )
242
243        # First we need to get Lock from parent to not get in collision
244        # with to_dict() that goes down the tree
245        parents = _TRACING_STACK.get()
246        if parents:
247            parent = parents[-1]
248            with parent._lock:  # noqa
249                _helper(len(parents))
250                parent.children.append(self)
251        else:
252            _helper(0)
253        return self
254
255    def __exit__(self, _exc_type, exc_val, _exc_tb):
256        with self._lock:
257            assert self._token
258            assert self.state == TracingNodeState.OPEN
259            if exc_val:
260                # Do not call set_error here as it takes a lock
261                self.state = TracingNodeState.ERROR
262                self.error = serialize_with_type(exc_val)
263                _LOG.debug(
264                    f"{self._pad}-> ERR  {self.kind} error={shorten_str(exc_val, 50)}"
265                )
266            else:
267                self.state = TracingNodeState.FINISHED
268                _LOG.debug(
269                    f"{self._pad}-> OK   {self.kind} result={shorten_str(repr(self.result), 50)}"
270                )
271            self.end_time = datetime.datetime.now()
272            _TRACING_STACK.reset(self._token)
273            self._token = None
274        if self.storage:
275            self.storage.write_node(self)
276        return False  # Propagate any exception
277
278    def add_tag(self, tag: str | Tag):
279        """
280        Add a tag to the tracing node.
281        """
282        with self._lock:
283            if self.tags is None:
284                self.tags = [Tag.into_tag(tag)]
285            else:
286                self.tags.append(Tag.into_tag(tag))
287
288    def add_event(
289        self,
290        name: str,
291        kind: Optional[str] = None,
292        data: Optional[Any] = None,
293        meta: Optional[Dict[str, Data]] = None,
294        tags: Optional[List[str | Tag]] = None,
295    ) -> "TracingNode":
296        event = TracingNode(name=name, kind=kind, result=data, meta=meta, tags=tags)
297        with self._lock:
298            self.children.append(event)
299        return event
300
301    def add_input(self, name: str, value: object):
302        """
303        Add a named input value to the tracing node.
304
305        If an input of the same name already exists, an exception is raised.
306        """
307        with self._lock:
308            if self.inputs is None:
309                self.inputs = {}
310            if name in self.inputs:
311                raise Exception(f"Input {name} already exists")
312            self.inputs[name] = serialize_with_type(value)
313
314    def add_inputs(self, inputs: dict[str, object]):
315        """
316        Add a new input values to the tracing node.
317
318        If an input of the same name already exists, an exception is raised.
319        """
320        with self._lock:
321            if self.inputs is None:
322                self.inputs = {}
323            for name in inputs:
324                if name in self.inputs:
325                    raise Exception(f"Input {name} already exists")
326            for name, value in inputs.items():
327                self.inputs[name] = serialize_with_type(value)
328
329    def set_result(self, value: Any):
330        """
331        Set the result value of the tracing node.
332        """
333        with self._lock:
334            self.result = serialize_with_type(value)
335
336    def set_error(self, exc: Any):
337        """
338        Set the error value of the tracing node (usually an `Exception` instance).
339        """
340        with self._lock:
341            self.state = TracingNodeState.ERROR
342            self.error = serialize_with_type(exc)
343
344    def has_tag_name(self, tag_name: str):
345        """
346        Returns `True` if the tracing node has a tag with the given name.
347        """
348        if not self.tags:
349            return False
350        for tag in self.tags:
351            if tag == tag_name or (isinstance(tag, Tag) and tag.name == tag_name):
352                return True
353        return False
354
355    def find_nodes(self, predicate: Callable) -> List["TracingNode"]:
356        """
357        Find all nodes matching the given callable `predicate`.
358
359        The predicate is called with a single argument, the `TracingNode` to check, and should return `bool`.
360        """
361
362        def _helper(node: TracingNode):
363            with node._lock:
364                if predicate(node):
365                    result.append(node)
366                if node.children:
367                    for child in node.children:
368                        _helper(child)
369
370        result = []
371        _helper(self)
372        return result
373
374    def write_html(self, filename: str):
375        from ..ui.staticview import create_node_static_page
376
377        html = create_node_static_page(self)
378        with open(filename, "w") as f:
379            f.write(html)
380
381    def display(self):
382        """Show tracing in Jupyter notebook"""
383        from IPython.core.display import HTML
384        from IPython.display import display
385
386        from ..ui.staticview import create_node_static_html
387
388        html = create_node_static_html(self)
389        display(HTML(html))

A tracing object that represents a single request or (sub)task in a nested hierarchy.

The class has several attributes that are intended as read-only; use setters to modify them.

The TracingNode can be used as context manager, e.g.:

with TracingNode("my node", inputs={"z": 42}) as c:
    c.add_input("x", 1)
    y = do_some_computation(x=1)
    # The tracing node would also note any exceptions raised here
    # (letting it propagate upwards), but a result needs to be set manually:
    c.set_result(y)
# <- Here the tracing node is already closed.
TracingNode( name: str, kind: Optional[str] = None, inputs: Optional[Dict[str, Any]] = None, meta: Optional[Dict[str, Union[Dict[str, Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], List[Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], int, float, str, bool, NoneType]]] = None, tags: Optional[Sequence[str | Tag]] = None, storage: Optional[StorageBase] = None, directory=False, result=None)
 76    def __init__(
 77        self,
 78        name: str,
 79        kind: Optional[str] = None,
 80        inputs: Optional[Dict[str, Any]] = None,
 81        meta: Optional[Dict[str, Data]] = None,
 82        tags: Optional[Sequence[str | Tag]] = None,
 83        storage: Optional["StorageBase"] = None,
 84        directory=False,
 85        result=None,
 86    ):
 87        """
 88        - `name` - A description or name for the tracing node.
 89        - `kind` - Indicates category of the tracing node, may e.g. influence display of the tracing node.
 90        - `inputs` - A dictionary of inputs for the tracing node.
 91        - `meta` - A dictionary of any metadata for the tracing node, e.g. UI style data.
 92        - `tags` - A list of tags for the tracing node
 93        - `storage` - A storage object for the tracing node. Set on the root tracing node to log all nodes below it.
 94        - `directory` - Whether to create a sub-directory for the tracing node while storing.
 95          This allows you to split the stored data across multiple files.
 96        - `result` - The result value of the tracing node, if it has already been computed.
 97        """
 98
 99        if storage is None and current_tracing_node(False) is None:
100            storage = current_storage()
101
102        if inputs:
103            assert isinstance(inputs, dict)
104            assert all(isinstance(key, str) for key in inputs)
105            inputs = serialize_with_type(inputs)
106
107        if meta:
108            meta = serialize_with_type(meta)
109
110        if result:
111            result = serialize_with_type(result)
112
113        if tags is not None:
114            tags = [Tag.into_tag(tag) for tag in tags]
115
116        self.name = name
117        self.kind = kind
118        self.inputs = inputs
119        self.result = result
120        self.error = None
121        self.state: TracingNodeState = (
122            TracingNodeState.NEW if result is None else TracingNodeState.FINISHED
123        )
124        self.uid = generate_uid(name)
125        self.children: List[TracingNode] = []
126        self.tags: List[Tag] = tags
127        self.start_time = None
128        self.end_time = None if result is None else datetime.datetime.now()
129        self.meta = meta
130        self.storage = storage
131        self.directory = directory
132        self._token = None
133        self._depth = 0
134        self._lock = Lock()
135
136        if storage:
137            storage.register_node(self)
  • name - A description or name for the tracing node.
  • kind - Indicates category of the tracing node, may e.g. influence display of the tracing node.
  • inputs - A dictionary of inputs for the tracing node.
  • meta - A dictionary of any metadata for the tracing node, e.g. UI style data.
  • tags - A list of tags for the tracing node
  • storage - A storage object for the tracing node. Set on the root tracing node to log all nodes below it.
  • directory - Whether to create a sub-directory for the tracing node while storing. This allows you to split the stored data across multiple files.
  • result - The result value of the tracing node, if it has already been computed.
@classmethod
def deserialize( cls, data: Union[Dict[str, Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], List[Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], int, float, str, bool, NoneType], depth=0):
139    @classmethod
140    def deserialize(cls, data: Data, depth=0):
141        """
142        Deserialize a `TracingNode` object from given JSON data.
143
144        - `data` - A dictionary containing the serialized tracing data.
145        """
146        assert isinstance(data, dict)
147
148        # For backward compatibility we also "Context"
149        assert data["_type"] == "TracingNode" or data["_type"] == "Context"
150        self = cls.__new__(cls)
151        self.uid = data["uid"]
152        self.name = data["name"]
153
154        state = data.get("state")
155        if state:
156            state = TracingNodeState(state)
157        else:
158            state = TracingNodeState.FINISHED
159        self.state = state
160        for name in ["kind", "inputs", "result", "error", "tags", "meta"]:
161            setattr(self, name, data.get(name))
162        self.kind = data.get("kind")
163        self.inputs = data.get("inputs")
164        self.tags = data.get("tags")
165
166        start_time = data.get("start_time")
167        if start_time is None:
168            self.start_time = None
169        else:
170            self.start_time = datetime.datetime.fromisoformat(start_time)
171
172        end_time = data.get("end_time")
173        if end_time is None:
174            self.end_time = None
175        else:
176            self.end_time = datetime.datetime.fromisoformat(end_time)
177
178        children = data.get("children")
179        if children is None:
180            self.children = None
181        else:
182            new_depth = depth + 1
183            self.children = [
184                TracingNode.deserialize(child, depth=new_depth) for child in children
185            ]
186
187        self._token = None
188        self._depth = depth
189        self._lock = Lock()
190        return self

Deserialize a TracingNode object from given JSON data.

  • data - A dictionary containing the serialized tracing data.
def to_dict(self, with_children=True, root=True):
192    def to_dict(self, with_children=True, root=True):
193        """
194        Serialize `TracingNode` object into JSON structure.
195
196        - `with_children` - If True then children are recursively serialized.
197                            If False then serialization of children is skipped and only
198                            children UIDs are put into key `children_uids`
199        """
200        with self._lock:
201            result = {"_type": "TracingNode", "name": self.name, "uid": self.uid}
202            if root:
203                result["version"] = TRACING_FORMAT_VERSION
204            if self.state != TracingNodeState.FINISHED:
205                result["state"] = self.state.value
206            for name in ["kind", "result", "error", "tags"]:
207                value = getattr(self, name)
208                if value is not None:
209                    result[name] = value
210            if self.inputs:
211                result["inputs"] = self.inputs
212            if with_children and self.children:
213                result["children"] = [c.to_dict(root=False) for c in self.children]
214            if not with_children and self.children:
215                result["children_uids"] = [c.uid for c in self.children]
216            if self.start_time:
217                result["start_time"] = self.start_time.isoformat()
218            if self.end_time:
219                result["end_time"] = self.end_time.isoformat()
220            if self.meta:
221                result["meta"] = self.meta
222            if self.tags:
223                result["tags"] = serialize_with_type(self.tags)
224            return result

Serialize TracingNode object into JSON structure.

  • with_children - If True then children are recursively serialized. If False then serialization of children is skipped and only children UIDs are put into key children_uids
def add_tag(self, tag: str | Tag):
278    def add_tag(self, tag: str | Tag):
279        """
280        Add a tag to the tracing node.
281        """
282        with self._lock:
283            if self.tags is None:
284                self.tags = [Tag.into_tag(tag)]
285            else:
286                self.tags.append(Tag.into_tag(tag))

Add a tag to the tracing node.

def add_input(self, name: str, value: object):
301    def add_input(self, name: str, value: object):
302        """
303        Add a named input value to the tracing node.
304
305        If an input of the same name already exists, an exception is raised.
306        """
307        with self._lock:
308            if self.inputs is None:
309                self.inputs = {}
310            if name in self.inputs:
311                raise Exception(f"Input {name} already exists")
312            self.inputs[name] = serialize_with_type(value)

Add a named input value to the tracing node.

If an input of the same name already exists, an exception is raised.

def add_inputs(self, inputs: dict[str, object]):
314    def add_inputs(self, inputs: dict[str, object]):
315        """
316        Add a new input values to the tracing node.
317
318        If an input of the same name already exists, an exception is raised.
319        """
320        with self._lock:
321            if self.inputs is None:
322                self.inputs = {}
323            for name in inputs:
324                if name in self.inputs:
325                    raise Exception(f"Input {name} already exists")
326            for name, value in inputs.items():
327                self.inputs[name] = serialize_with_type(value)

Add a new input values to the tracing node.

If an input of the same name already exists, an exception is raised.

def set_result(self, value: Any):
329    def set_result(self, value: Any):
330        """
331        Set the result value of the tracing node.
332        """
333        with self._lock:
334            self.result = serialize_with_type(value)

Set the result value of the tracing node.

def set_error(self, exc: Any):
336    def set_error(self, exc: Any):
337        """
338        Set the error value of the tracing node (usually an `Exception` instance).
339        """
340        with self._lock:
341            self.state = TracingNodeState.ERROR
342            self.error = serialize_with_type(exc)

Set the error value of the tracing node (usually an Exception instance).

def has_tag_name(self, tag_name: str):
344    def has_tag_name(self, tag_name: str):
345        """
346        Returns `True` if the tracing node has a tag with the given name.
347        """
348        if not self.tags:
349            return False
350        for tag in self.tags:
351            if tag == tag_name or (isinstance(tag, Tag) and tag.name == tag_name):
352                return True
353        return False

Returns True if the tracing node has a tag with the given name.

def find_nodes( self, predicate: Callable) -> List[TracingNode]:
355    def find_nodes(self, predicate: Callable) -> List["TracingNode"]:
356        """
357        Find all nodes matching the given callable `predicate`.
358
359        The predicate is called with a single argument, the `TracingNode` to check, and should return `bool`.
360        """
361
362        def _helper(node: TracingNode):
363            with node._lock:
364                if predicate(node):
365                    result.append(node)
366                if node.children:
367                    for child in node.children:
368                        _helper(child)
369
370        result = []
371        _helper(self)
372        return result

Find all nodes matching the given callable predicate.

The predicate is called with a single argument, the TracingNode to check, and should return bool.

def display(self):
381    def display(self):
382        """Show tracing in Jupyter notebook"""
383        from IPython.core.display import HTML
384        from IPython.display import display
385
386        from ..ui.staticview import create_node_static_html
387
388        html = create_node_static_html(self)
389        display(HTML(html))

Show tracing in Jupyter notebook

@dataclass
class Tag:
37@dataclass
38class Tag:
39    """
40    A simple class representing a tag that can be applied to a tracing node. Optionally with style information.
41    """
42
43    name: str
44    """The name of the tag; any short string."""
45    color: Optional[str] = None
46    """HTML hex color code, e.g. `#ff0000`."""
47
48    @staticmethod
49    def into_tag(obj: Union[str, "Tag"]) -> "Tag":
50        if isinstance(obj, Tag):
51            return obj
52        if isinstance(obj, str):
53            return Tag(obj)
54        raise Exception(f"Object {obj!r} cannot be converted into Tag")

A simple class representing a tag that can be applied to a tracing node. Optionally with style information.

name: str

The name of the tag; any short string.

color: Optional[str] = None

HTML hex color code, e.g. #ff0000.

def current_tracing_node( check: bool = True) -> Optional[TracingNode]:
455def current_tracing_node(check: bool = True) -> Optional[TracingNode]:
456    """
457    Returns the inner-most open tracing node, if any.
458
459    Throws an error if `check` is `True` and there is no current tracing node. If `check` is `False` and there is
460    no current tracing node, it returns `None`.
461    """
462    stack = _TRACING_STACK.get()
463    if not stack:
464        if check:
465            raise Exception("No current tracing")
466        return None
467    return stack[-1]

Returns the inner-most open tracing node, if any.

Throws an error if check is True and there is no current tracing node. If check is False and there is no current tracing node, it returns None.

def with_trace( fn: Callable = None, *, name=None, kind=None, tags: Optional[List[str | Tag]] = None):
392def with_trace(
393    fn: Callable = None, *, name=None, kind=None, tags: Optional[List[str | Tag]] = None
394):
395    """
396    A decorator wrapping every execution of the function in a new `TracingNode`.
397
398    The `inputs`, `result`, and `error` (if any) are set automatically.
399    Note that you can access the created tracing in your function using `current_tracing_node`.
400
401    *Usage:*
402
403    ```python
404    @with_trace
405    def func():
406        pass
407
408    @with_trace(name="custom_name", kind="custom_kind", tags=['tag1', 'tag2'])
409    def func():
410        pass
411    ```
412    """
413    if isinstance(fn, str):
414        raise TypeError("use `with_tracing()` with explicit `name=...` parameter")
415
416    def helper(func):
417        signature = inspect.signature(func)
418
419        @functools.wraps(func)
420        def wrapper(*a, **kw):
421            binding = signature.bind(*a, **kw)
422            with TracingNode(
423                name=name or func.__name__,
424                kind=kind or "call",
425                inputs=binding.arguments,
426                tags=tags,
427            ) as node:
428                result = func(*a, **kw)
429                node.set_result(result)
430                return result
431
432        async def async_wrapper(*a, **kw):
433            binding = signature.bind(*a, **kw)
434            with TracingNode(
435                name=name or func.__name__,
436                kind=kind or "acall",
437                inputs=binding.arguments,
438            ) as node:
439                result = await func(*a, **kw)
440                node.set_result(result)
441                return result
442
443        if inspect.iscoroutinefunction(func):
444            return async_wrapper
445        else:
446            return wrapper
447
448    if fn is not None:
449        assert callable(fn)
450        return helper(fn)
451    else:
452        return helper

A decorator wrapping every execution of the function in a new TracingNode.

The inputs, result, and error (if any) are set automatically. Note that you can access the created tracing in your function using current_tracing_node.

Usage:

@with_trace
def func():
    pass

@with_trace(name="custom_name", kind="custom_kind", tags=['tag1', 'tag2'])
def func():
    pass
class FileStorage(treetrace.StorageBase):
118class FileStorage(StorageBase):
119    def __init__(self, directory: PathLike | str):
120        super().__init__()
121        directory = os.path.abspath(directory)
122        os.makedirs(directory, exist_ok=True)
123        self.directory = directory
124
125    def _file_path(self, directory, name) -> str:
126        return os.path.join(directory, f"{name}.gz")
127
128    def _dir_path(self, directory, name: str) -> str:
129        return os.path.join(directory, f"{name}.ctx")
130
131    def write_node(self, node: TracingNode):
132        if not validate_uid(node.uid):
133            raise Exception("Invalid uid")
134        self._write_node_into(self.directory, node, True)
135        with self._lock:
136            self._ephemeral_nodes.pop(node.uid, None)
137
138    def _write_node_into(self, directory: str, node: TracingNode, root_node: bool):
139        if not validate_uid(node.uid):
140            raise Exception("Invalid uid")
141        if node.directory:
142            self._write_node_dir(directory, node, root_node)
143        else:
144            data = json.dumps(node.to_dict(root=root_node)).encode()
145            data_root = json.dumps(node.to_dict(False, root=root_node)).encode()
146            # Write full first, so when root exists, then full is definitely there
147            self._write_node_file(directory, node.uid + ".full", data)
148            self._write_node_file(directory, node.uid + ".root", data_root)
149
150    def _write_node_dir(self, directory: str, node: TracingNode, node_content: bool):
151        path = self._dir_path(directory, node.uid)
152        tmp_path = path + "._tmp"
153        try:
154            os.mkdir(tmp_path)
155            data_root = json.dumps(node.to_dict(False, root=node_content)).encode()
156            self._write_node_file(tmp_path, "_self", data_root)
157            for child in node.children:
158                self._write_node_into(tmp_path, child, False)
159            os.rename(tmp_path, path)
160        finally:
161            if os.path.exists(tmp_path):
162                shutil.rmtree(tmp_path)
163
164    def _write_node_file(self, directory: str, name: str, data: Data):
165        path = self._file_path(directory, name)
166        tmp_path = path + "._tmp"
167        try:
168            with gzip.open(tmp_path, "w") as f:
169                f.write(data)
170            os.rename(tmp_path, path)
171        finally:
172            if os.path.exists(tmp_path):
173                os.unlink(tmp_path)
174
175    def read(self, uid: str) -> Data:
176        if not validate_uid(uid):
177            raise Exception("Invalid uid")
178        node = self.get_ephemeral_node(uid)
179        if node:
180            return node.to_dict()
181        return self._read_from(self.directory, uid)
182
183    def read_root(self, uid) -> Data:
184        if not validate_uid(uid):
185            raise Exception("Invalid uid")
186        node = self.get_ephemeral_node(uid)
187        if node:
188            return node.to_dict(with_children=False)
189
190        dir_path = self._dir_path(self.directory, uid)
191        if os.path.isdir(dir_path):
192            path = self._file_path(dir_path, "_self")
193        else:
194            path = self._file_path(self.directory, uid + ".root")
195        return self._read_file(path)
196
197    def _read_from(self, directory: str, uid: str) -> Data:
198        path = self._dir_path(directory, uid)
199        if os.path.isdir(path):
200            return self._read_dir(path)
201        else:
202            path = self._file_path(directory, uid + ".full")
203            return self._read_file(path)
204
205    def _read_file(self, path: str):
206        with gzip.open(path, "r") as f:
207            data = f.read()
208        return json.loads(data)
209
210    def _read_dir(self, path: str):
211        self_path = self._file_path(path, "_self")
212        self_data = self._read_file(self_path)
213        children_uids = self_data.pop("children_uids", None)
214        if children_uids is None:
215            return self_data
216        self_data["children"] = [self._read_from(path, uid) for uid in children_uids]
217        return self_data
218
219    def list(self) -> List[str]:
220        file_suffix = ".root.gz"
221        dir_suffix = ".ctx"
222
223        with self._lock:
224            ephemeral = list(self._ephemeral_nodes.keys())
225
226        lst = os.listdir(self.directory)
227        return (
228            ephemeral
229            + [name[: -len(file_suffix)] for name in lst if name.endswith(file_suffix)]
230            + [name[: -len(dir_suffix)] for name in lst if name.endswith(dir_suffix)]
231        )
232
233    def remove_node(self, uid: str):
234        if not validate_uid(uid):
235            raise Exception("Invalid uid")
236        # First try to remove .root, so it immediately disappear from listing
237        path = self._file_path(self.directory, uid + ".root")
238        if os.path.isfile(path):
239            os.unlink(path)
240            path = self._file_path(self.directory, uid + ".full")
241            os.unlink(path)
242        else:
243            path = self._dir_path(self.directory, uid)
244            if os.path.isdir(path):
245                shutil.rmtree(path)
246        with self._lock:
247            self._ephemeral_nodes.pop(uid, None)
248
249    def __repr__(self):
250        return f"<FileStorage directory={repr(self.directory)}>"

Helper class that provides a standard way to create an ABC using inheritance.

def write_node(self, node: TracingNode):
131    def write_node(self, node: TracingNode):
132        if not validate_uid(node.uid):
133            raise Exception("Invalid uid")
134        self._write_node_into(self.directory, node, True)
135        with self._lock:
136            self._ephemeral_nodes.pop(node.uid, None)

Write tracing node into persistent storage

def read( self, uid: str) -> Union[Dict[str, Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], List[Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], int, float, str, bool, NoneType]:
175    def read(self, uid: str) -> Data:
176        if not validate_uid(uid):
177            raise Exception("Invalid uid")
178        node = self.get_ephemeral_node(uid)
179        if node:
180            return node.to_dict()
181        return self._read_from(self.directory, uid)

Read unserialized tracing from the storage

def read_root( self, uid) -> Union[Dict[str, Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], List[Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], int, float, str, bool, NoneType]:
183    def read_root(self, uid) -> Data:
184        if not validate_uid(uid):
185            raise Exception("Invalid uid")
186        node = self.get_ephemeral_node(uid)
187        if node:
188            return node.to_dict(with_children=False)
189
190        dir_path = self._dir_path(self.directory, uid)
191        if os.path.isdir(dir_path):
192            path = self._file_path(dir_path, "_self")
193        else:
194            path = self._file_path(self.directory, uid + ".root")
195        return self._read_file(path)

Read tracing without children

def list(self) -> List[str]:
219    def list(self) -> List[str]:
220        file_suffix = ".root.gz"
221        dir_suffix = ".ctx"
222
223        with self._lock:
224            ephemeral = list(self._ephemeral_nodes.keys())
225
226        lst = os.listdir(self.directory)
227        return (
228            ephemeral
229            + [name[: -len(file_suffix)] for name in lst if name.endswith(file_suffix)]
230            + [name[: -len(dir_suffix)] for name in lst if name.endswith(dir_suffix)]
231        )

List all tracing uids in storage

def remove_node(self, uid: str):
233    def remove_node(self, uid: str):
234        if not validate_uid(uid):
235            raise Exception("Invalid uid")
236        # First try to remove .root, so it immediately disappear from listing
237        path = self._file_path(self.directory, uid + ".root")
238        if os.path.isfile(path):
239            os.unlink(path)
240            path = self._file_path(self.directory, uid + ".full")
241            os.unlink(path)
242        else:
243            path = self._dir_path(self.directory, uid)
244            if os.path.isdir(path):
245                shutil.rmtree(path)
246        with self._lock:
247            self._ephemeral_nodes.pop(uid, None)

Remove tracing from storage

class StorageBase(abc.ABC):
 20class StorageBase(abc.ABC):
 21    def __init__(self):
 22        self._lock = threading.Lock()
 23        self._ephemeral_nodes = {}
 24        self._server = None
 25        self._token = None
 26
 27    def register_node(self, node: TracingNode):
 28        """Register running tracing node (without writing into the persistent storage)"""
 29        with self._lock:
 30            self._ephemeral_nodes[node.uid] = node
 31
 32    def get_ephemeral_node(self, uid: str) -> Optional[TracingNode]:
 33        """Get running tracing node (not from persistent storage)"""
 34        with self._lock:
 35            return self._ephemeral_nodes.get(uid)
 36
 37    @abc.abstractmethod
 38    def write_node(self, node: TracingNode):
 39        """Write tracing node into persistent storage"""
 40        raise NotImplementedError
 41
 42    @abc.abstractmethod
 43    def read(self, uid: str) -> Data:
 44        """Read unserialized tracing from the storage"""
 45        raise NotImplementedError
 46
 47    @abc.abstractmethod
 48    def read_root(self, uid: str) -> Data:
 49        """Read tracing without children"""
 50        raise NotImplementedError
 51
 52    @abc.abstractmethod
 53    def remove_node(self, uid: str):
 54        """Remove tracing from storage"""
 55        raise NotImplementedError
 56
 57    @abc.abstractmethod
 58    def list(self) -> List[str]:
 59        """List all tracing uids in storage"""
 60        raise NotImplementedError
 61
 62    def read_roots(self, uids: Sequence[str]) -> List[Data]:
 63        """Read given root nodes without children"""
 64        return [self.read_root(uid) for uid in uids]
 65
 66    def read_node(self, uid: str) -> TracingNode:
 67        """Read and serialize tracing node from storage"""
 68        return TracingNode.deserialize(self.read(uid))
 69
 70    def read_all_nodes(self) -> Iterator[TracingNode]:
 71        """Read all nodes from storage"""
 72        for uid in self.list():
 73            yield self.read_node(uid)
 74
 75    def live_display(self, width="95%", height=700):
 76        """Show tracing in Jupyter notebook"""
 77        if self._server is None:
 78            self.start_server()
 79        display_iframe(self._server.url, self._server.port, width, height)
 80
 81    @property
 82    def server(self):
 83        if self._server is None:
 84            raise Exception("Server not stared. Use method 'start_server' on storage")
 85        return self._server
 86
 87    def start_server(self, port=0):
 88        if self._server is not None:
 89            raise Exception("Server already started")
 90        from ..ui.storage_server import start_storage_server
 91
 92        self._server = start_storage_server(storage=self, port=port)
 93        return self._server
 94
 95    def find_nodes(self, predicate: Callable) -> Iterator[TracingNode]:
 96        for node in self.read_all_nodes():
 97            yield from node.find_nodes(predicate)
 98
 99    def __enter__(self):
100        if self._token:
101            raise Exception("Storage is already active")
102        old = _STORAGE_STACK.get()
103        self._token = _STORAGE_STACK.set(old + (self,))
104        return self
105
106    def __exit__(self, _exc_type, exc_val, _exc_tb):
107        _STORAGE_STACK.reset(self._token)
108        self._token = False

Helper class that provides a standard way to create an ABC using inheritance.

def register_node(self, node: TracingNode):
27    def register_node(self, node: TracingNode):
28        """Register running tracing node (without writing into the persistent storage)"""
29        with self._lock:
30            self._ephemeral_nodes[node.uid] = node

Register running tracing node (without writing into the persistent storage)

def get_ephemeral_node(self, uid: str) -> Optional[TracingNode]:
32    def get_ephemeral_node(self, uid: str) -> Optional[TracingNode]:
33        """Get running tracing node (not from persistent storage)"""
34        with self._lock:
35            return self._ephemeral_nodes.get(uid)

Get running tracing node (not from persistent storage)

@abc.abstractmethod
def write_node(self, node: TracingNode):
37    @abc.abstractmethod
38    def write_node(self, node: TracingNode):
39        """Write tracing node into persistent storage"""
40        raise NotImplementedError

Write tracing node into persistent storage

@abc.abstractmethod
def read( self, uid: str) -> Union[Dict[str, Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], List[Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], int, float, str, bool, NoneType]:
42    @abc.abstractmethod
43    def read(self, uid: str) -> Data:
44        """Read unserialized tracing from the storage"""
45        raise NotImplementedError

Read unserialized tracing from the storage

@abc.abstractmethod
def read_root( self, uid: str) -> Union[Dict[str, Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], List[Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], int, float, str, bool, NoneType]:
47    @abc.abstractmethod
48    def read_root(self, uid: str) -> Data:
49        """Read tracing without children"""
50        raise NotImplementedError

Read tracing without children

@abc.abstractmethod
def remove_node(self, uid: str):
52    @abc.abstractmethod
53    def remove_node(self, uid: str):
54        """Remove tracing from storage"""
55        raise NotImplementedError

Remove tracing from storage

@abc.abstractmethod
def list(self) -> List[str]:
57    @abc.abstractmethod
58    def list(self) -> List[str]:
59        """List all tracing uids in storage"""
60        raise NotImplementedError

List all tracing uids in storage

def read_roots( self, uids: Sequence[str]) -> List[Union[Dict[str, Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], List[Union[Dict[str, ForwardRef('Data')], List[ForwardRef('Data')], int, float, str, bool, NoneType]], int, float, str, bool, NoneType]]:
62    def read_roots(self, uids: Sequence[str]) -> List[Data]:
63        """Read given root nodes without children"""
64        return [self.read_root(uid) for uid in uids]

Read given root nodes without children

def read_node(self, uid: str) -> TracingNode:
66    def read_node(self, uid: str) -> TracingNode:
67        """Read and serialize tracing node from storage"""
68        return TracingNode.deserialize(self.read(uid))

Read and serialize tracing node from storage

def read_all_nodes(self) -> Iterator[TracingNode]:
70    def read_all_nodes(self) -> Iterator[TracingNode]:
71        """Read all nodes from storage"""
72        for uid in self.list():
73            yield self.read_node(uid)

Read all nodes from storage

def live_display(self, width='95%', height=700):
75    def live_display(self, width="95%", height=700):
76        """Show tracing in Jupyter notebook"""
77        if self._server is None:
78            self.start_server()
79        display_iframe(self._server.url, self._server.port, width, height)

Show tracing in Jupyter notebook

class Html:
 2class Html:
 3
 4    """
 5    Wrapper around HTML code that is directly rendered in Data Browser
 6    """
 7
 8    def __init__(self, html: str):
 9        self.html = html
10
11    def __trace_to_node__(self):
12        return {"_type": "$html", "html": self.html}

Wrapper around HTML code that is directly rendered in Data Browser

class FormatStr:
 75class FormatStr:
 76    """
 77    A partially-formatted string that remembers the substitutions made (incl. nesting). Immutable.
 78    """
 79
 80    _parts: tuple[str | _SubFormatStr | _FormatStrField]
 81    _text: str | None
 82
 83    def __init__(self, fstr: str | None = None, _parts=None, _text=None):
 84        if _parts is not None:
 85            assert fstr is None
 86            self._parts = tuple(_parts)
 87            self._text = _text
 88            if self._text is None:
 89                self._text = self._gen_text()
 90        else:
 91            assert _text is None
 92            if fstr is None:
 93                fstr = ""
 94            self._parts = tuple(self._parse_fstring(fstr))
 95            self._text = fstr
 96        self._initialized = True
 97
 98    def __setattr__(self, name: str, value: Any) -> None:
 99        if hasattr(self, "_initialized") and self._initialized:
100            raise TypeError(f"Instances of {self.__class__.__name__} are immutable")
101        return super().__setattr__(name, value)
102
103    def into_html(self, level=0) -> str:
104        ps = [p if isinstance(p, str) else p.into_html(level) for p in self._parts]
105        return f'<span class="FmtString__span" style="white-space: pre-wrap;">{"".join(ps)}</span>'
106
107    def __trace_to_node__(self):
108        return {"_type": "$html", "html": self.into_html()}
109
110    @classmethod
111    def _parse_fstring(cls, fstr: str) -> list[str | _SubFormatStr | _FormatStrField]:
112        """Split a given string into"""
113        res = []
114        for literal_text, field_name, format_spec, conversion in _formatter.parse(fstr):
115            if literal_text:
116                res.append(literal_text)
117            if field_name is not None:
118                if "#" in field_name:
119                    field_name, color = field_name.split("#", 1)
120                else:
121                    color = None
122                if not field_name or field_name.isnumeric():
123                    raise ValueError("Position-based format arguments are unsupported")
124                if "{" in format_spec:
125                    raise ValueError(
126                        f"Format specifiers with arguments unsupported in {cls.__name__}"
127                    )
128                res.append(
129                    _FormatStrField(
130                        field_name=field_name,
131                        format_spec=format_spec,
132                        conversion=conversion,
133                        color=color,
134                    )
135                )
136        return res
137
138    def __str__(self) -> str:
139        return self._text
140
141    def _gen_text(self) -> str:
142        """Generate the underlying (partially formatted) string"""
143        res = []
144        for p in self._parts:
145            if isinstance(p, str):
146                res.append(p)
147            elif isinstance(p, _FormatStrField):
148                res.append("{" + p.field_format_spec + "}")
149            elif isinstance(p, _SubFormatStr):
150                res.append(str(p.value))
151            else:
152                assert False
153        return "".join(res)
154
155    def __add__(self, other: str | FormatStr) -> FormatStr:
156        parts = list(self._parts)
157        if isinstance(other, str):
158            parts.append(other)
159        elif isinstance(other, FormatStr):
160            parts.extend(other._parts)
161        else:
162            raise TypeError(
163                f"{self.__class__.__name__}.__add__() only accepts str and FormatStr, got {type(other)}"
164            )
165        return self.__class__(None, _parts=parts)
166
167    def join(self, iter: Iterable[str | FormatStr]) -> FormatStr:
168        parts = []
169        for i, s in enumerate(iter):
170            if i > 0:
171                parts.extend(self._parts)
172            if isinstance(s, str):
173                parts.append(s)
174            elif isinstance(s, FormatStr):
175                parts.extend(s._parts)
176            else:
177                raise TypeError(
178                    f"{self.__class__.__name__}.join() only accepts str and FormatStr, got {type(s)}"
179                )
180        return self.__class__(None, _parts=parts)
181
182    def format(self, _recursive=False, _partial=True, **kwargs) -> "FormatStr":
183        assert _recursive is True or _recursive is False
184        assert _partial is True or _partial is False
185
186        res = []
187        for p in self._parts:
188            if isinstance(p, str):
189                res.append(p)
190            elif isinstance(p, _SubFormatStr):
191                p2 = copy.copy(p)
192                if _recursive:
193                    p2.value = p.value.format(
194                        _recursive=_recursive, _partial=_partial, **kwargs
195                    )
196                res.append(p2)
197            elif isinstance(p, _FormatStrField):
198                if p.field_name in kwargs:
199                    v = kwargs[p.field_name]
200                    if p.conversion:
201                        v = _formatter.convert_field(v, p.conversion)
202                    if p.format_spec:
203                        v = _formatter.format_field(v, p.format_spec)
204                    # In case v is not already a string or FmtString
205                    if not isinstance(v, (FormatStr, str)):
206                        v = str(v)
207                    res.append(
208                        _SubFormatStr(
209                            field_name=p.field_name,
210                            field_format_spec=p.field_format_spec,
211                            value=v,
212                            color=p.color,
213                        )
214                    )
215                else:
216                    if not _partial:
217                        raise KeyError(p.field_name)
218                    res.append(p)
219        return FormatStr(None, _parts=res)
220
221    def free_params(self, _recursive=False) -> set[str]:
222        """Return names of all free parameters"""
223        res = set()
224        for p in self._parts:
225            if isinstance(p, _FormatStrField):
226                res.add(p.field_name)
227            if isinstance(p, _SubFormatStr) and _recursive:
228                res.update(p.value.free_params(_recursive=_recursive))
229        return res

A partially-formatted string that remembers the substitutions made (incl. nesting). Immutable.

def free_params(self, _recursive=False) -> set[str]:
221    def free_params(self, _recursive=False) -> set[str]:
222        """Return names of all free parameters"""
223        res = set()
224        for p in self._parts:
225            if isinstance(p, _FormatStrField):
226                res.add(p.field_name)
227            if isinstance(p, _SubFormatStr) and _recursive:
228                res.update(p.value.free_params(_recursive=_recursive))
229        return res

Return names of all free parameters

class HtmlColor:
  9class HtmlColor:
 10    """Helper class representing RGB color. Accepts HTML hex notation (3, 4, 6 or 8 hex digits, with optional '#')."""
 11
 12    def __init__(self, color: str):
 13        color = color.strip()
 14        m = re.fullmatch(r"[#]?([0-9a-f]{3,8})", color.lower())
 15        if not m:
 16            raise ValueError(
 17                f"Invalid color spec: {color!r}, expected HTML hex color spec"
 18            )
 19        c = m.groups()[0]
 20        if len(c) not in (3, 4, 6, 8):
 21            raise ValueError(
 22                f"Invalid color spec: {color!r}, expected HTML hex color spec"
 23            )
 24        if len(c) <= 4:
 25            c = "".join(2 * x for x in c)
 26        self.r = int(c[0:2], base=16)
 27        self.g = int(c[2:4], base=16)
 28        self.b = int(c[4:6], base=16)
 29        if len(c) == 8:
 30            self.a = int(c[6:8], base=16)
 31        else:
 32            self.a = None
 33
 34    def as_floats(self) -> tuple:
 35        "Returns (r,g,b) or (r,g,b,a), with all values 0..1"
 36        cf = (self.r / 255, self.g / 255, self.b / 255)
 37        if self.a is not None:
 38            cf += (self.a / 255,)
 39        return cf
 40
 41    @classmethod
 42    def from_floats(cls, r: float, g: float, b: float, a: float = None) -> "HtmlColor":
 43        """Returns (r,g,b) or (r,g,b,a), with all values 0.0 .. 1.0."""
 44        c = cls("000000")
 45        c.r = round(r * 255)
 46        c.g = round(g * 255)
 47        c.b = round(b * 255)
 48        if a is not None:
 49            c.a = round(a * 255)
 50        return c
 51
 52    def __str__(self):
 53        """Returns the color in '#RRGGBB[AA]' HTML hex format."""
 54        s = f"#{self.r:02x}{self.g:02x}{self.b:02x}"
 55        if self.a is not None:
 56            s += f"{self.a:02x}"
 57        return s
 58
 59    def __repr__(self):
 60        return f"{self.__class__.__name__}({str(self)!r})"
 61
 62    def copy(self):
 63        return copy.copy(self)
 64
 65    def _blend_lightness(self, tgt: float, rate: float) -> "HtmlColor":
 66        hue, light, sat = colorsys.rgb_to_hls(*self.as_floats())
 67        light = rate * tgt + (1 - rate) * light
 68        c = self.from_floats(*colorsys.hls_to_rgb(hue, light, sat))
 69        c.a = self.a
 70        return c
 71
 72    def lighter(self, rate=0.2) -> "HtmlColor":
 73        "Returns a lighter copy of the color: rate=1.0 returns white, rate=0 returns the same color."
 74        return self._blend_lightness(1.0, rate)
 75
 76    def darker(self, rate=0.2) -> "HtmlColor":
 77        "Returns a darker copy of the color: rate=1.0 returns black, rate=0 returns the same color."
 78        return self._blend_lightness(0.0, rate)
 79
 80    def with_alpha(self, a: float) -> "HtmlColor":
 81        "Returns a copy of the color with given alpha: float 0.0-1.0"
 82        assert isinstance(a, (float, int)) and a <= 1.0 and a >= 0.0
 83        c = self.copy()
 84        c.a = round(a * 255)
 85        return c
 86
 87    @classmethod
 88    def random_color(
 89        cls,
 90        seed: Any | None = None,
 91        saturation=0.7,
 92        lighness=0.5,
 93        *,
 94        vary_lightness_rate=0.2,
 95        vary_saturation_rate=0.3,
 96    ) -> str:
 97        """Returns a pseudo-random color for the given object, or a random color.
 98
 99        The color is primarily determined by a random hue and given saturation and lightness.
100        By default, the lightness and saturation of the generated color slightly varies from the given values.
101        The returned color is guaranteed to be stable across runs for string seeds
102        (or objects with stable __repr__ value).
103        """
104        if (seed is not None) and (not isinstance(seed, str)):
105            # This can bring instability between versions of your __repr__ or Python
106            # and for some objects this may just vary even among identical objects
107            seed = repr(seed)
108            # Remove any concrete pointer addresses, as these will vary
109            seed = re.sub(f"0x[0-9a-fA-F]{8,20}", "<0xPTR>", seed)
110        rng = random.Random(seed)
111
112        hue = rng.random()
113        tgt_s = rng.random()
114        tgt_l = rng.random()
115        return cls.from_floats(
116            *colorsys.hls_to_rgb(
117                hue,
118                (1 - vary_lightness_rate) * lighness + vary_lightness_rate * tgt_l,
119                (1 - vary_saturation_rate) * saturation + vary_saturation_rate * tgt_s,
120            )
121        )
122
123    def __eq__(self, other):
124        if not isinstance(other, self.__class__):
125            return False
126        return (
127            (self.r == other.r)
128            and (self.g == other.g)
129            and (self.b == other.b)
130            and (self.a == other.a)
131        )

Helper class representing RGB color. Accepts HTML hex notation (3, 4, 6 or 8 hex digits, with optional '#').

def as_floats(self) -> tuple:
34    def as_floats(self) -> tuple:
35        "Returns (r,g,b) or (r,g,b,a), with all values 0..1"
36        cf = (self.r / 255, self.g / 255, self.b / 255)
37        if self.a is not None:
38            cf += (self.a / 255,)
39        return cf

Returns (r,g,b) or (r,g,b,a), with all values 0..1

@classmethod
def from_floats( cls, r: float, g: float, b: float, a: float = None) -> HtmlColor:
41    @classmethod
42    def from_floats(cls, r: float, g: float, b: float, a: float = None) -> "HtmlColor":
43        """Returns (r,g,b) or (r,g,b,a), with all values 0.0 .. 1.0."""
44        c = cls("000000")
45        c.r = round(r * 255)
46        c.g = round(g * 255)
47        c.b = round(b * 255)
48        if a is not None:
49            c.a = round(a * 255)
50        return c

Returns (r,g,b) or (r,g,b,a), with all values 0.0 .. 1.0.

def lighter(self, rate=0.2) -> HtmlColor:
72    def lighter(self, rate=0.2) -> "HtmlColor":
73        "Returns a lighter copy of the color: rate=1.0 returns white, rate=0 returns the same color."
74        return self._blend_lightness(1.0, rate)

Returns a lighter copy of the color: rate=1.0 returns white, rate=0 returns the same color.

def darker(self, rate=0.2) -> HtmlColor:
76    def darker(self, rate=0.2) -> "HtmlColor":
77        "Returns a darker copy of the color: rate=1.0 returns black, rate=0 returns the same color."
78        return self._blend_lightness(0.0, rate)

Returns a darker copy of the color: rate=1.0 returns black, rate=0 returns the same color.

def with_alpha(self, a: float) -> HtmlColor:
80    def with_alpha(self, a: float) -> "HtmlColor":
81        "Returns a copy of the color with given alpha: float 0.0-1.0"
82        assert isinstance(a, (float, int)) and a <= 1.0 and a >= 0.0
83        c = self.copy()
84        c.a = round(a * 255)
85        return c

Returns a copy of the color with given alpha: float 0.0-1.0

@classmethod
def random_color( cls, seed: Optional[Any] = None, saturation=0.7, lighness=0.5, *, vary_lightness_rate=0.2, vary_saturation_rate=0.3) -> str:
 87    @classmethod
 88    def random_color(
 89        cls,
 90        seed: Any | None = None,
 91        saturation=0.7,
 92        lighness=0.5,
 93        *,
 94        vary_lightness_rate=0.2,
 95        vary_saturation_rate=0.3,
 96    ) -> str:
 97        """Returns a pseudo-random color for the given object, or a random color.
 98
 99        The color is primarily determined by a random hue and given saturation and lightness.
100        By default, the lightness and saturation of the generated color slightly varies from the given values.
101        The returned color is guaranteed to be stable across runs for string seeds
102        (or objects with stable __repr__ value).
103        """
104        if (seed is not None) and (not isinstance(seed, str)):
105            # This can bring instability between versions of your __repr__ or Python
106            # and for some objects this may just vary even among identical objects
107            seed = repr(seed)
108            # Remove any concrete pointer addresses, as these will vary
109            seed = re.sub(f"0x[0-9a-fA-F]{8,20}", "<0xPTR>", seed)
110        rng = random.Random(seed)
111
112        hue = rng.random()
113        tgt_s = rng.random()
114        tgt_l = rng.random()
115        return cls.from_floats(
116            *colorsys.hls_to_rgb(
117                hue,
118                (1 - vary_lightness_rate) * lighness + vary_lightness_rate * tgt_l,
119                (1 - vary_saturation_rate) * saturation + vary_saturation_rate * tgt_s,
120            )
121        )

Returns a pseudo-random color for the given object, or a random color.

The color is primarily determined by a random hue and given saturation and lightness. By default, the lightness and saturation of the generated color slightly varies from the given values. The returned color is guaranteed to be stable across runs for string seeds (or objects with stable __repr__ value).