treetrace
1from .tracing.tracingnode import TracingNode, Tag, current_tracing_node, with_trace 2from treetrace.tracing.serialization import ( 3 register_custom_serializer, 4 unregister_custom_serializer, 5) 6from .tracing.data.format_str import FormatStr 7from .tracing.data.blob import DataWithMime 8from .tracing.data.html import Html 9from .tracing.storage import FileStorage, StorageBase, current_storage 10from .utils.html_color import HtmlColor 11from .utils.text import shorten_str 12from .ui.console_server import ConsoleServer 13from . import ext 14 15__all__ = [ 16 "TracingNode", 17 "Tag", 18 "current_tracing_node", 19 "current_storage", 20 "with_trace", 21 "FileStorage", 22 "StorageBase", 23 "DataWithMime", 24 "Html", 25 "FormatStr", 26 "ext", 27 "HtmlColor", 28 "shorten_str", 29 "ConsoleServer", 30 "register_custom_serializer", 31 "unregister_custom_serializer", 32]
57class TracingNode: 58 """ 59 A tracing object that represents a single request or (sub)task in a nested hierarchy. 60 61 The class has several attributes that are intended as read-only; use setters to modify them. 62 63 The `TracingNode` can be used as context manager, e.g.: 64 65 ```python 66 with TracingNode("my node", inputs={"z": 42}) as c: 67 c.add_input("x", 1) 68 y = do_some_computation(x=1) 69 # The tracing node would also note any exceptions raised here 70 # (letting it propagate upwards), but a result needs to be set manually: 71 c.set_result(y) 72 # <- Here the tracing node is already closed. 73 ``` 74 """ 75 76 def __init__( 77 self, 78 name: str, 79 kind: Optional[str] = None, 80 inputs: Optional[Dict[str, Any]] = None, 81 meta: Optional[Dict[str, Data]] = None, 82 tags: Optional[Sequence[str | Tag]] = None, 83 storage: Optional["StorageBase"] = None, 84 directory=False, 85 result=None, 86 ): 87 """ 88 - `name` - A description or name for the tracing node. 89 - `kind` - Indicates category of the tracing node, may e.g. influence display of the tracing node. 90 - `inputs` - A dictionary of inputs for the tracing node. 91 - `meta` - A dictionary of any metadata for the tracing node, e.g. UI style data. 92 - `tags` - A list of tags for the tracing node 93 - `storage` - A storage object for the tracing node. Set on the root tracing node to log all nodes below it. 94 - `directory` - Whether to create a sub-directory for the tracing node while storing. 95 This allows you to split the stored data across multiple files. 96 - `result` - The result value of the tracing node, if it has already been computed. 97 """ 98 99 if storage is None and current_tracing_node(False) is None: 100 storage = current_storage() 101 102 if inputs: 103 assert isinstance(inputs, dict) 104 assert all(isinstance(key, str) for key in inputs) 105 inputs = serialize_with_type(inputs) 106 107 if meta: 108 meta = serialize_with_type(meta) 109 110 if result: 111 result = serialize_with_type(result) 112 113 if tags is not None: 114 tags = [Tag.into_tag(tag) for tag in tags] 115 116 self.name = name 117 self.kind = kind 118 self.inputs = inputs 119 self.result = result 120 self.error = None 121 self.state: TracingNodeState = ( 122 TracingNodeState.NEW if result is None else TracingNodeState.FINISHED 123 ) 124 self.uid = generate_uid(name) 125 self.children: List[TracingNode] = [] 126 self.tags: List[Tag] = tags 127 self.start_time = None 128 self.end_time = None if result is None else datetime.datetime.now() 129 self.meta = meta 130 self.storage = storage 131 self.directory = directory 132 self._token = None 133 self._depth = 0 134 self._lock = Lock() 135 136 if storage: 137 storage.register_node(self) 138 139 @classmethod 140 def deserialize(cls, data: Data, depth=0): 141 """ 142 Deserialize a `TracingNode` object from given JSON data. 143 144 - `data` - A dictionary containing the serialized tracing data. 145 """ 146 assert isinstance(data, dict) 147 148 # For backward compatibility we also "Context" 149 assert data["_type"] == "TracingNode" or data["_type"] == "Context" 150 self = cls.__new__(cls) 151 self.uid = data["uid"] 152 self.name = data["name"] 153 154 state = data.get("state") 155 if state: 156 state = TracingNodeState(state) 157 else: 158 state = TracingNodeState.FINISHED 159 self.state = state 160 for name in ["kind", "inputs", "result", "error", "tags", "meta"]: 161 setattr(self, name, data.get(name)) 162 self.kind = data.get("kind") 163 self.inputs = data.get("inputs") 164 self.tags = data.get("tags") 165 166 start_time = data.get("start_time") 167 if start_time is None: 168 self.start_time = None 169 else: 170 self.start_time = datetime.datetime.fromisoformat(start_time) 171 172 end_time = data.get("end_time") 173 if end_time is None: 174 self.end_time = None 175 else: 176 self.end_time = datetime.datetime.fromisoformat(end_time) 177 178 children = data.get("children") 179 if children is None: 180 self.children = None 181 else: 182 new_depth = depth + 1 183 self.children = [ 184 TracingNode.deserialize(child, depth=new_depth) for child in children 185 ] 186 187 self._token = None 188 self._depth = depth 189 self._lock = Lock() 190 return self 191 192 def to_dict(self, with_children=True, root=True): 193 """ 194 Serialize `TracingNode` object into JSON structure. 195 196 - `with_children` - If True then children are recursively serialized. 197 If False then serialization of children is skipped and only 198 children UIDs are put into key `children_uids` 199 """ 200 with self._lock: 201 result = {"_type": "TracingNode", "name": self.name, "uid": self.uid} 202 if root: 203 result["version"] = TRACING_FORMAT_VERSION 204 if self.state != TracingNodeState.FINISHED: 205 result["state"] = self.state.value 206 for name in ["kind", "result", "error", "tags"]: 207 value = getattr(self, name) 208 if value is not None: 209 result[name] = value 210 if self.inputs: 211 result["inputs"] = self.inputs 212 if with_children and self.children: 213 result["children"] = [c.to_dict(root=False) for c in self.children] 214 if not with_children and self.children: 215 result["children_uids"] = [c.uid for c in self.children] 216 if self.start_time: 217 result["start_time"] = self.start_time.isoformat() 218 if self.end_time: 219 result["end_time"] = self.end_time.isoformat() 220 if self.meta: 221 result["meta"] = self.meta 222 if self.tags: 223 result["tags"] = serialize_with_type(self.tags) 224 return result 225 226 @property 227 def _pad(self): 228 return " " * self._depth 229 230 def __enter__(self): 231 def _helper(depth): 232 with self._lock: 233 assert not self._token 234 assert self.state == TracingNodeState.NEW 235 self.start_time = datetime.datetime.now() 236 self._depth = depth 237 self._token = _TRACING_STACK.set(parents + (self,)) 238 self.state = TracingNodeState.OPEN 239 _LOG.debug( 240 f"{self._pad}TracingNode {self.kind} inputs={shorten_str(self.inputs, 50)}" 241 ) 242 243 # First we need to get Lock from parent to not get in collision 244 # with to_dict() that goes down the tree 245 parents = _TRACING_STACK.get() 246 if parents: 247 parent = parents[-1] 248 with parent._lock: # noqa 249 _helper(len(parents)) 250 parent.children.append(self) 251 else: 252 _helper(0) 253 return self 254 255 def __exit__(self, _exc_type, exc_val, _exc_tb): 256 with self._lock: 257 assert self._token 258 assert self.state == TracingNodeState.OPEN 259 if exc_val: 260 # Do not call set_error here as it takes a lock 261 self.state = TracingNodeState.ERROR 262 self.error = serialize_with_type(exc_val) 263 _LOG.debug( 264 f"{self._pad}-> ERR {self.kind} error={shorten_str(exc_val, 50)}" 265 ) 266 else: 267 self.state = TracingNodeState.FINISHED 268 _LOG.debug( 269 f"{self._pad}-> OK {self.kind} result={shorten_str(repr(self.result), 50)}" 270 ) 271 self.end_time = datetime.datetime.now() 272 _TRACING_STACK.reset(self._token) 273 self._token = None 274 if self.storage: 275 self.storage.write_node(self) 276 return False # Propagate any exception 277 278 def add_tag(self, tag: str | Tag): 279 """ 280 Add a tag to the tracing node. 281 """ 282 with self._lock: 283 if self.tags is None: 284 self.tags = [Tag.into_tag(tag)] 285 else: 286 self.tags.append(Tag.into_tag(tag)) 287 288 def add_event( 289 self, 290 name: str, 291 kind: Optional[str] = None, 292 data: Optional[Any] = None, 293 meta: Optional[Dict[str, Data]] = None, 294 tags: Optional[List[str | Tag]] = None, 295 ) -> "TracingNode": 296 event = TracingNode(name=name, kind=kind, result=data, meta=meta, tags=tags) 297 with self._lock: 298 self.children.append(event) 299 return event 300 301 def add_input(self, name: str, value: object): 302 """ 303 Add a named input value to the tracing node. 304 305 If an input of the same name already exists, an exception is raised. 306 """ 307 with self._lock: 308 if self.inputs is None: 309 self.inputs = {} 310 if name in self.inputs: 311 raise Exception(f"Input {name} already exists") 312 self.inputs[name] = serialize_with_type(value) 313 314 def add_inputs(self, inputs: dict[str, object]): 315 """ 316 Add a new input values to the tracing node. 317 318 If an input of the same name already exists, an exception is raised. 319 """ 320 with self._lock: 321 if self.inputs is None: 322 self.inputs = {} 323 for name in inputs: 324 if name in self.inputs: 325 raise Exception(f"Input {name} already exists") 326 for name, value in inputs.items(): 327 self.inputs[name] = serialize_with_type(value) 328 329 def set_result(self, value: Any): 330 """ 331 Set the result value of the tracing node. 332 """ 333 with self._lock: 334 self.result = serialize_with_type(value) 335 336 def set_error(self, exc: Any): 337 """ 338 Set the error value of the tracing node (usually an `Exception` instance). 339 """ 340 with self._lock: 341 self.state = TracingNodeState.ERROR 342 self.error = serialize_with_type(exc) 343 344 def has_tag_name(self, tag_name: str): 345 """ 346 Returns `True` if the tracing node has a tag with the given name. 347 """ 348 if not self.tags: 349 return False 350 for tag in self.tags: 351 if tag == tag_name or (isinstance(tag, Tag) and tag.name == tag_name): 352 return True 353 return False 354 355 def find_nodes(self, predicate: Callable) -> List["TracingNode"]: 356 """ 357 Find all nodes matching the given callable `predicate`. 358 359 The predicate is called with a single argument, the `TracingNode` to check, and should return `bool`. 360 """ 361 362 def _helper(node: TracingNode): 363 with node._lock: 364 if predicate(node): 365 result.append(node) 366 if node.children: 367 for child in node.children: 368 _helper(child) 369 370 result = [] 371 _helper(self) 372 return result 373 374 def write_html(self, filename: str): 375 from ..ui.staticview import create_node_static_page 376 377 html = create_node_static_page(self) 378 with open(filename, "w") as f: 379 f.write(html) 380 381 def display(self): 382 """Show tracing in Jupyter notebook""" 383 from IPython.core.display import HTML 384 from IPython.display import display 385 386 from ..ui.staticview import create_node_static_html 387 388 html = create_node_static_html(self) 389 display(HTML(html))
A tracing object that represents a single request or (sub)task in a nested hierarchy.
The class has several attributes that are intended as read-only; use setters to modify them.
The TracingNode
can be used as context manager, e.g.:
with TracingNode("my node", inputs={"z": 42}) as c:
c.add_input("x", 1)
y = do_some_computation(x=1)
# The tracing node would also note any exceptions raised here
# (letting it propagate upwards), but a result needs to be set manually:
c.set_result(y)
# <- Here the tracing node is already closed.
76 def __init__( 77 self, 78 name: str, 79 kind: Optional[str] = None, 80 inputs: Optional[Dict[str, Any]] = None, 81 meta: Optional[Dict[str, Data]] = None, 82 tags: Optional[Sequence[str | Tag]] = None, 83 storage: Optional["StorageBase"] = None, 84 directory=False, 85 result=None, 86 ): 87 """ 88 - `name` - A description or name for the tracing node. 89 - `kind` - Indicates category of the tracing node, may e.g. influence display of the tracing node. 90 - `inputs` - A dictionary of inputs for the tracing node. 91 - `meta` - A dictionary of any metadata for the tracing node, e.g. UI style data. 92 - `tags` - A list of tags for the tracing node 93 - `storage` - A storage object for the tracing node. Set on the root tracing node to log all nodes below it. 94 - `directory` - Whether to create a sub-directory for the tracing node while storing. 95 This allows you to split the stored data across multiple files. 96 - `result` - The result value of the tracing node, if it has already been computed. 97 """ 98 99 if storage is None and current_tracing_node(False) is None: 100 storage = current_storage() 101 102 if inputs: 103 assert isinstance(inputs, dict) 104 assert all(isinstance(key, str) for key in inputs) 105 inputs = serialize_with_type(inputs) 106 107 if meta: 108 meta = serialize_with_type(meta) 109 110 if result: 111 result = serialize_with_type(result) 112 113 if tags is not None: 114 tags = [Tag.into_tag(tag) for tag in tags] 115 116 self.name = name 117 self.kind = kind 118 self.inputs = inputs 119 self.result = result 120 self.error = None 121 self.state: TracingNodeState = ( 122 TracingNodeState.NEW if result is None else TracingNodeState.FINISHED 123 ) 124 self.uid = generate_uid(name) 125 self.children: List[TracingNode] = [] 126 self.tags: List[Tag] = tags 127 self.start_time = None 128 self.end_time = None if result is None else datetime.datetime.now() 129 self.meta = meta 130 self.storage = storage 131 self.directory = directory 132 self._token = None 133 self._depth = 0 134 self._lock = Lock() 135 136 if storage: 137 storage.register_node(self)
name
- A description or name for the tracing node.kind
- Indicates category of the tracing node, may e.g. influence display of the tracing node.inputs
- A dictionary of inputs for the tracing node.meta
- A dictionary of any metadata for the tracing node, e.g. UI style data.tags
- A list of tags for the tracing nodestorage
- A storage object for the tracing node. Set on the root tracing node to log all nodes below it.directory
- Whether to create a sub-directory for the tracing node while storing. This allows you to split the stored data across multiple files.result
- The result value of the tracing node, if it has already been computed.
139 @classmethod 140 def deserialize(cls, data: Data, depth=0): 141 """ 142 Deserialize a `TracingNode` object from given JSON data. 143 144 - `data` - A dictionary containing the serialized tracing data. 145 """ 146 assert isinstance(data, dict) 147 148 # For backward compatibility we also "Context" 149 assert data["_type"] == "TracingNode" or data["_type"] == "Context" 150 self = cls.__new__(cls) 151 self.uid = data["uid"] 152 self.name = data["name"] 153 154 state = data.get("state") 155 if state: 156 state = TracingNodeState(state) 157 else: 158 state = TracingNodeState.FINISHED 159 self.state = state 160 for name in ["kind", "inputs", "result", "error", "tags", "meta"]: 161 setattr(self, name, data.get(name)) 162 self.kind = data.get("kind") 163 self.inputs = data.get("inputs") 164 self.tags = data.get("tags") 165 166 start_time = data.get("start_time") 167 if start_time is None: 168 self.start_time = None 169 else: 170 self.start_time = datetime.datetime.fromisoformat(start_time) 171 172 end_time = data.get("end_time") 173 if end_time is None: 174 self.end_time = None 175 else: 176 self.end_time = datetime.datetime.fromisoformat(end_time) 177 178 children = data.get("children") 179 if children is None: 180 self.children = None 181 else: 182 new_depth = depth + 1 183 self.children = [ 184 TracingNode.deserialize(child, depth=new_depth) for child in children 185 ] 186 187 self._token = None 188 self._depth = depth 189 self._lock = Lock() 190 return self
Deserialize a TracingNode
object from given JSON data.
data
- A dictionary containing the serialized tracing data.
192 def to_dict(self, with_children=True, root=True): 193 """ 194 Serialize `TracingNode` object into JSON structure. 195 196 - `with_children` - If True then children are recursively serialized. 197 If False then serialization of children is skipped and only 198 children UIDs are put into key `children_uids` 199 """ 200 with self._lock: 201 result = {"_type": "TracingNode", "name": self.name, "uid": self.uid} 202 if root: 203 result["version"] = TRACING_FORMAT_VERSION 204 if self.state != TracingNodeState.FINISHED: 205 result["state"] = self.state.value 206 for name in ["kind", "result", "error", "tags"]: 207 value = getattr(self, name) 208 if value is not None: 209 result[name] = value 210 if self.inputs: 211 result["inputs"] = self.inputs 212 if with_children and self.children: 213 result["children"] = [c.to_dict(root=False) for c in self.children] 214 if not with_children and self.children: 215 result["children_uids"] = [c.uid for c in self.children] 216 if self.start_time: 217 result["start_time"] = self.start_time.isoformat() 218 if self.end_time: 219 result["end_time"] = self.end_time.isoformat() 220 if self.meta: 221 result["meta"] = self.meta 222 if self.tags: 223 result["tags"] = serialize_with_type(self.tags) 224 return result
Serialize TracingNode
object into JSON structure.
with_children
- If True then children are recursively serialized. If False then serialization of children is skipped and only children UIDs are put into keychildren_uids
278 def add_tag(self, tag: str | Tag): 279 """ 280 Add a tag to the tracing node. 281 """ 282 with self._lock: 283 if self.tags is None: 284 self.tags = [Tag.into_tag(tag)] 285 else: 286 self.tags.append(Tag.into_tag(tag))
Add a tag to the tracing node.
301 def add_input(self, name: str, value: object): 302 """ 303 Add a named input value to the tracing node. 304 305 If an input of the same name already exists, an exception is raised. 306 """ 307 with self._lock: 308 if self.inputs is None: 309 self.inputs = {} 310 if name in self.inputs: 311 raise Exception(f"Input {name} already exists") 312 self.inputs[name] = serialize_with_type(value)
Add a named input value to the tracing node.
If an input of the same name already exists, an exception is raised.
314 def add_inputs(self, inputs: dict[str, object]): 315 """ 316 Add a new input values to the tracing node. 317 318 If an input of the same name already exists, an exception is raised. 319 """ 320 with self._lock: 321 if self.inputs is None: 322 self.inputs = {} 323 for name in inputs: 324 if name in self.inputs: 325 raise Exception(f"Input {name} already exists") 326 for name, value in inputs.items(): 327 self.inputs[name] = serialize_with_type(value)
Add a new input values to the tracing node.
If an input of the same name already exists, an exception is raised.
329 def set_result(self, value: Any): 330 """ 331 Set the result value of the tracing node. 332 """ 333 with self._lock: 334 self.result = serialize_with_type(value)
Set the result value of the tracing node.
336 def set_error(self, exc: Any): 337 """ 338 Set the error value of the tracing node (usually an `Exception` instance). 339 """ 340 with self._lock: 341 self.state = TracingNodeState.ERROR 342 self.error = serialize_with_type(exc)
Set the error value of the tracing node (usually an Exception
instance).
344 def has_tag_name(self, tag_name: str): 345 """ 346 Returns `True` if the tracing node has a tag with the given name. 347 """ 348 if not self.tags: 349 return False 350 for tag in self.tags: 351 if tag == tag_name or (isinstance(tag, Tag) and tag.name == tag_name): 352 return True 353 return False
Returns True
if the tracing node has a tag with the given name.
355 def find_nodes(self, predicate: Callable) -> List["TracingNode"]: 356 """ 357 Find all nodes matching the given callable `predicate`. 358 359 The predicate is called with a single argument, the `TracingNode` to check, and should return `bool`. 360 """ 361 362 def _helper(node: TracingNode): 363 with node._lock: 364 if predicate(node): 365 result.append(node) 366 if node.children: 367 for child in node.children: 368 _helper(child) 369 370 result = [] 371 _helper(self) 372 return result
Find all nodes matching the given callable predicate
.
The predicate is called with a single argument, the TracingNode
to check, and should return bool
.
381 def display(self): 382 """Show tracing in Jupyter notebook""" 383 from IPython.core.display import HTML 384 from IPython.display import display 385 386 from ..ui.staticview import create_node_static_html 387 388 html = create_node_static_html(self) 389 display(HTML(html))
Show tracing in Jupyter notebook
37@dataclass 38class Tag: 39 """ 40 A simple class representing a tag that can be applied to a tracing node. Optionally with style information. 41 """ 42 43 name: str 44 """The name of the tag; any short string.""" 45 color: Optional[str] = None 46 """HTML hex color code, e.g. `#ff0000`.""" 47 48 @staticmethod 49 def into_tag(obj: Union[str, "Tag"]) -> "Tag": 50 if isinstance(obj, Tag): 51 return obj 52 if isinstance(obj, str): 53 return Tag(obj) 54 raise Exception(f"Object {obj!r} cannot be converted into Tag")
A simple class representing a tag that can be applied to a tracing node. Optionally with style information.
455def current_tracing_node(check: bool = True) -> Optional[TracingNode]: 456 """ 457 Returns the inner-most open tracing node, if any. 458 459 Throws an error if `check` is `True` and there is no current tracing node. If `check` is `False` and there is 460 no current tracing node, it returns `None`. 461 """ 462 stack = _TRACING_STACK.get() 463 if not stack: 464 if check: 465 raise Exception("No current tracing") 466 return None 467 return stack[-1]
Returns the inner-most open tracing node, if any.
Throws an error if check
is True
and there is no current tracing node. If check
is False
and there is
no current tracing node, it returns None
.
392def with_trace( 393 fn: Callable = None, *, name=None, kind=None, tags: Optional[List[str | Tag]] = None 394): 395 """ 396 A decorator wrapping every execution of the function in a new `TracingNode`. 397 398 The `inputs`, `result`, and `error` (if any) are set automatically. 399 Note that you can access the created tracing in your function using `current_tracing_node`. 400 401 *Usage:* 402 403 ```python 404 @with_trace 405 def func(): 406 pass 407 408 @with_trace(name="custom_name", kind="custom_kind", tags=['tag1', 'tag2']) 409 def func(): 410 pass 411 ``` 412 """ 413 if isinstance(fn, str): 414 raise TypeError("use `with_tracing()` with explicit `name=...` parameter") 415 416 def helper(func): 417 signature = inspect.signature(func) 418 419 @functools.wraps(func) 420 def wrapper(*a, **kw): 421 binding = signature.bind(*a, **kw) 422 with TracingNode( 423 name=name or func.__name__, 424 kind=kind or "call", 425 inputs=binding.arguments, 426 tags=tags, 427 ) as node: 428 result = func(*a, **kw) 429 node.set_result(result) 430 return result 431 432 async def async_wrapper(*a, **kw): 433 binding = signature.bind(*a, **kw) 434 with TracingNode( 435 name=name or func.__name__, 436 kind=kind or "acall", 437 inputs=binding.arguments, 438 ) as node: 439 result = await func(*a, **kw) 440 node.set_result(result) 441 return result 442 443 if inspect.iscoroutinefunction(func): 444 return async_wrapper 445 else: 446 return wrapper 447 448 if fn is not None: 449 assert callable(fn) 450 return helper(fn) 451 else: 452 return helper
A decorator wrapping every execution of the function in a new TracingNode
.
The inputs
, result
, and error
(if any) are set automatically.
Note that you can access the created tracing in your function using current_tracing_node
.
Usage:
@with_trace
def func():
pass
@with_trace(name="custom_name", kind="custom_kind", tags=['tag1', 'tag2'])
def func():
pass
118class FileStorage(StorageBase): 119 def __init__(self, directory: PathLike | str): 120 super().__init__() 121 directory = os.path.abspath(directory) 122 os.makedirs(directory, exist_ok=True) 123 self.directory = directory 124 125 def _file_path(self, directory, name) -> str: 126 return os.path.join(directory, f"{name}.gz") 127 128 def _dir_path(self, directory, name: str) -> str: 129 return os.path.join(directory, f"{name}.ctx") 130 131 def write_node(self, node: TracingNode): 132 if not validate_uid(node.uid): 133 raise Exception("Invalid uid") 134 self._write_node_into(self.directory, node, True) 135 with self._lock: 136 self._ephemeral_nodes.pop(node.uid, None) 137 138 def _write_node_into(self, directory: str, node: TracingNode, root_node: bool): 139 if not validate_uid(node.uid): 140 raise Exception("Invalid uid") 141 if node.directory: 142 self._write_node_dir(directory, node, root_node) 143 else: 144 data = json.dumps(node.to_dict(root=root_node)).encode() 145 data_root = json.dumps(node.to_dict(False, root=root_node)).encode() 146 # Write full first, so when root exists, then full is definitely there 147 self._write_node_file(directory, node.uid + ".full", data) 148 self._write_node_file(directory, node.uid + ".root", data_root) 149 150 def _write_node_dir(self, directory: str, node: TracingNode, node_content: bool): 151 path = self._dir_path(directory, node.uid) 152 tmp_path = path + "._tmp" 153 try: 154 os.mkdir(tmp_path) 155 data_root = json.dumps(node.to_dict(False, root=node_content)).encode() 156 self._write_node_file(tmp_path, "_self", data_root) 157 for child in node.children: 158 self._write_node_into(tmp_path, child, False) 159 os.rename(tmp_path, path) 160 finally: 161 if os.path.exists(tmp_path): 162 shutil.rmtree(tmp_path) 163 164 def _write_node_file(self, directory: str, name: str, data: Data): 165 path = self._file_path(directory, name) 166 tmp_path = path + "._tmp" 167 try: 168 with gzip.open(tmp_path, "w") as f: 169 f.write(data) 170 os.rename(tmp_path, path) 171 finally: 172 if os.path.exists(tmp_path): 173 os.unlink(tmp_path) 174 175 def read(self, uid: str) -> Data: 176 if not validate_uid(uid): 177 raise Exception("Invalid uid") 178 node = self.get_ephemeral_node(uid) 179 if node: 180 return node.to_dict() 181 return self._read_from(self.directory, uid) 182 183 def read_root(self, uid) -> Data: 184 if not validate_uid(uid): 185 raise Exception("Invalid uid") 186 node = self.get_ephemeral_node(uid) 187 if node: 188 return node.to_dict(with_children=False) 189 190 dir_path = self._dir_path(self.directory, uid) 191 if os.path.isdir(dir_path): 192 path = self._file_path(dir_path, "_self") 193 else: 194 path = self._file_path(self.directory, uid + ".root") 195 return self._read_file(path) 196 197 def _read_from(self, directory: str, uid: str) -> Data: 198 path = self._dir_path(directory, uid) 199 if os.path.isdir(path): 200 return self._read_dir(path) 201 else: 202 path = self._file_path(directory, uid + ".full") 203 return self._read_file(path) 204 205 def _read_file(self, path: str): 206 with gzip.open(path, "r") as f: 207 data = f.read() 208 return json.loads(data) 209 210 def _read_dir(self, path: str): 211 self_path = self._file_path(path, "_self") 212 self_data = self._read_file(self_path) 213 children_uids = self_data.pop("children_uids", None) 214 if children_uids is None: 215 return self_data 216 self_data["children"] = [self._read_from(path, uid) for uid in children_uids] 217 return self_data 218 219 def list(self) -> List[str]: 220 file_suffix = ".root.gz" 221 dir_suffix = ".ctx" 222 223 with self._lock: 224 ephemeral = list(self._ephemeral_nodes.keys()) 225 226 lst = os.listdir(self.directory) 227 return ( 228 ephemeral 229 + [name[: -len(file_suffix)] for name in lst if name.endswith(file_suffix)] 230 + [name[: -len(dir_suffix)] for name in lst if name.endswith(dir_suffix)] 231 ) 232 233 def remove_node(self, uid: str): 234 if not validate_uid(uid): 235 raise Exception("Invalid uid") 236 # First try to remove .root, so it immediately disappear from listing 237 path = self._file_path(self.directory, uid + ".root") 238 if os.path.isfile(path): 239 os.unlink(path) 240 path = self._file_path(self.directory, uid + ".full") 241 os.unlink(path) 242 else: 243 path = self._dir_path(self.directory, uid) 244 if os.path.isdir(path): 245 shutil.rmtree(path) 246 with self._lock: 247 self._ephemeral_nodes.pop(uid, None) 248 249 def __repr__(self): 250 return f"<FileStorage directory={repr(self.directory)}>"
Helper class that provides a standard way to create an ABC using inheritance.
131 def write_node(self, node: TracingNode): 132 if not validate_uid(node.uid): 133 raise Exception("Invalid uid") 134 self._write_node_into(self.directory, node, True) 135 with self._lock: 136 self._ephemeral_nodes.pop(node.uid, None)
Write tracing node into persistent storage
175 def read(self, uid: str) -> Data: 176 if not validate_uid(uid): 177 raise Exception("Invalid uid") 178 node = self.get_ephemeral_node(uid) 179 if node: 180 return node.to_dict() 181 return self._read_from(self.directory, uid)
Read unserialized tracing from the storage
183 def read_root(self, uid) -> Data: 184 if not validate_uid(uid): 185 raise Exception("Invalid uid") 186 node = self.get_ephemeral_node(uid) 187 if node: 188 return node.to_dict(with_children=False) 189 190 dir_path = self._dir_path(self.directory, uid) 191 if os.path.isdir(dir_path): 192 path = self._file_path(dir_path, "_self") 193 else: 194 path = self._file_path(self.directory, uid + ".root") 195 return self._read_file(path)
Read tracing without children
219 def list(self) -> List[str]: 220 file_suffix = ".root.gz" 221 dir_suffix = ".ctx" 222 223 with self._lock: 224 ephemeral = list(self._ephemeral_nodes.keys()) 225 226 lst = os.listdir(self.directory) 227 return ( 228 ephemeral 229 + [name[: -len(file_suffix)] for name in lst if name.endswith(file_suffix)] 230 + [name[: -len(dir_suffix)] for name in lst if name.endswith(dir_suffix)] 231 )
List all tracing uids in storage
233 def remove_node(self, uid: str): 234 if not validate_uid(uid): 235 raise Exception("Invalid uid") 236 # First try to remove .root, so it immediately disappear from listing 237 path = self._file_path(self.directory, uid + ".root") 238 if os.path.isfile(path): 239 os.unlink(path) 240 path = self._file_path(self.directory, uid + ".full") 241 os.unlink(path) 242 else: 243 path = self._dir_path(self.directory, uid) 244 if os.path.isdir(path): 245 shutil.rmtree(path) 246 with self._lock: 247 self._ephemeral_nodes.pop(uid, None)
Remove tracing from storage
Inherited Members
20class StorageBase(abc.ABC): 21 def __init__(self): 22 self._lock = threading.Lock() 23 self._ephemeral_nodes = {} 24 self._server = None 25 self._token = None 26 27 def register_node(self, node: TracingNode): 28 """Register running tracing node (without writing into the persistent storage)""" 29 with self._lock: 30 self._ephemeral_nodes[node.uid] = node 31 32 def get_ephemeral_node(self, uid: str) -> Optional[TracingNode]: 33 """Get running tracing node (not from persistent storage)""" 34 with self._lock: 35 return self._ephemeral_nodes.get(uid) 36 37 @abc.abstractmethod 38 def write_node(self, node: TracingNode): 39 """Write tracing node into persistent storage""" 40 raise NotImplementedError 41 42 @abc.abstractmethod 43 def read(self, uid: str) -> Data: 44 """Read unserialized tracing from the storage""" 45 raise NotImplementedError 46 47 @abc.abstractmethod 48 def read_root(self, uid: str) -> Data: 49 """Read tracing without children""" 50 raise NotImplementedError 51 52 @abc.abstractmethod 53 def remove_node(self, uid: str): 54 """Remove tracing from storage""" 55 raise NotImplementedError 56 57 @abc.abstractmethod 58 def list(self) -> List[str]: 59 """List all tracing uids in storage""" 60 raise NotImplementedError 61 62 def read_roots(self, uids: Sequence[str]) -> List[Data]: 63 """Read given root nodes without children""" 64 return [self.read_root(uid) for uid in uids] 65 66 def read_node(self, uid: str) -> TracingNode: 67 """Read and serialize tracing node from storage""" 68 return TracingNode.deserialize(self.read(uid)) 69 70 def read_all_nodes(self) -> Iterator[TracingNode]: 71 """Read all nodes from storage""" 72 for uid in self.list(): 73 yield self.read_node(uid) 74 75 def live_display(self, width="95%", height=700): 76 """Show tracing in Jupyter notebook""" 77 if self._server is None: 78 self.start_server() 79 display_iframe(self._server.url, self._server.port, width, height) 80 81 @property 82 def server(self): 83 if self._server is None: 84 raise Exception("Server not stared. Use method 'start_server' on storage") 85 return self._server 86 87 def start_server(self, port=0): 88 if self._server is not None: 89 raise Exception("Server already started") 90 from ..ui.storage_server import start_storage_server 91 92 self._server = start_storage_server(storage=self, port=port) 93 return self._server 94 95 def find_nodes(self, predicate: Callable) -> Iterator[TracingNode]: 96 for node in self.read_all_nodes(): 97 yield from node.find_nodes(predicate) 98 99 def __enter__(self): 100 if self._token: 101 raise Exception("Storage is already active") 102 old = _STORAGE_STACK.get() 103 self._token = _STORAGE_STACK.set(old + (self,)) 104 return self 105 106 def __exit__(self, _exc_type, exc_val, _exc_tb): 107 _STORAGE_STACK.reset(self._token) 108 self._token = False
Helper class that provides a standard way to create an ABC using inheritance.
27 def register_node(self, node: TracingNode): 28 """Register running tracing node (without writing into the persistent storage)""" 29 with self._lock: 30 self._ephemeral_nodes[node.uid] = node
Register running tracing node (without writing into the persistent storage)
32 def get_ephemeral_node(self, uid: str) -> Optional[TracingNode]: 33 """Get running tracing node (not from persistent storage)""" 34 with self._lock: 35 return self._ephemeral_nodes.get(uid)
Get running tracing node (not from persistent storage)
37 @abc.abstractmethod 38 def write_node(self, node: TracingNode): 39 """Write tracing node into persistent storage""" 40 raise NotImplementedError
Write tracing node into persistent storage
42 @abc.abstractmethod 43 def read(self, uid: str) -> Data: 44 """Read unserialized tracing from the storage""" 45 raise NotImplementedError
Read unserialized tracing from the storage
47 @abc.abstractmethod 48 def read_root(self, uid: str) -> Data: 49 """Read tracing without children""" 50 raise NotImplementedError
Read tracing without children
52 @abc.abstractmethod 53 def remove_node(self, uid: str): 54 """Remove tracing from storage""" 55 raise NotImplementedError
Remove tracing from storage
57 @abc.abstractmethod 58 def list(self) -> List[str]: 59 """List all tracing uids in storage""" 60 raise NotImplementedError
List all tracing uids in storage
62 def read_roots(self, uids: Sequence[str]) -> List[Data]: 63 """Read given root nodes without children""" 64 return [self.read_root(uid) for uid in uids]
Read given root nodes without children
66 def read_node(self, uid: str) -> TracingNode: 67 """Read and serialize tracing node from storage""" 68 return TracingNode.deserialize(self.read(uid))
Read and serialize tracing node from storage
2class Html: 3 4 """ 5 Wrapper around HTML code that is directly rendered in Data Browser 6 """ 7 8 def __init__(self, html: str): 9 self.html = html 10 11 def __trace_to_node__(self): 12 return {"_type": "$html", "html": self.html}
Wrapper around HTML code that is directly rendered in Data Browser
75class FormatStr: 76 """ 77 A partially-formatted string that remembers the substitutions made (incl. nesting). Immutable. 78 """ 79 80 _parts: tuple[str | _SubFormatStr | _FormatStrField] 81 _text: str | None 82 83 def __init__(self, fstr: str | None = None, _parts=None, _text=None): 84 if _parts is not None: 85 assert fstr is None 86 self._parts = tuple(_parts) 87 self._text = _text 88 if self._text is None: 89 self._text = self._gen_text() 90 else: 91 assert _text is None 92 if fstr is None: 93 fstr = "" 94 self._parts = tuple(self._parse_fstring(fstr)) 95 self._text = fstr 96 self._initialized = True 97 98 def __setattr__(self, name: str, value: Any) -> None: 99 if hasattr(self, "_initialized") and self._initialized: 100 raise TypeError(f"Instances of {self.__class__.__name__} are immutable") 101 return super().__setattr__(name, value) 102 103 def into_html(self, level=0) -> str: 104 ps = [p if isinstance(p, str) else p.into_html(level) for p in self._parts] 105 return f'<span class="FmtString__span" style="white-space: pre-wrap;">{"".join(ps)}</span>' 106 107 def __trace_to_node__(self): 108 return {"_type": "$html", "html": self.into_html()} 109 110 @classmethod 111 def _parse_fstring(cls, fstr: str) -> list[str | _SubFormatStr | _FormatStrField]: 112 """Split a given string into""" 113 res = [] 114 for literal_text, field_name, format_spec, conversion in _formatter.parse(fstr): 115 if literal_text: 116 res.append(literal_text) 117 if field_name is not None: 118 if "#" in field_name: 119 field_name, color = field_name.split("#", 1) 120 else: 121 color = None 122 if not field_name or field_name.isnumeric(): 123 raise ValueError("Position-based format arguments are unsupported") 124 if "{" in format_spec: 125 raise ValueError( 126 f"Format specifiers with arguments unsupported in {cls.__name__}" 127 ) 128 res.append( 129 _FormatStrField( 130 field_name=field_name, 131 format_spec=format_spec, 132 conversion=conversion, 133 color=color, 134 ) 135 ) 136 return res 137 138 def __str__(self) -> str: 139 return self._text 140 141 def _gen_text(self) -> str: 142 """Generate the underlying (partially formatted) string""" 143 res = [] 144 for p in self._parts: 145 if isinstance(p, str): 146 res.append(p) 147 elif isinstance(p, _FormatStrField): 148 res.append("{" + p.field_format_spec + "}") 149 elif isinstance(p, _SubFormatStr): 150 res.append(str(p.value)) 151 else: 152 assert False 153 return "".join(res) 154 155 def __add__(self, other: str | FormatStr) -> FormatStr: 156 parts = list(self._parts) 157 if isinstance(other, str): 158 parts.append(other) 159 elif isinstance(other, FormatStr): 160 parts.extend(other._parts) 161 else: 162 raise TypeError( 163 f"{self.__class__.__name__}.__add__() only accepts str and FormatStr, got {type(other)}" 164 ) 165 return self.__class__(None, _parts=parts) 166 167 def join(self, iter: Iterable[str | FormatStr]) -> FormatStr: 168 parts = [] 169 for i, s in enumerate(iter): 170 if i > 0: 171 parts.extend(self._parts) 172 if isinstance(s, str): 173 parts.append(s) 174 elif isinstance(s, FormatStr): 175 parts.extend(s._parts) 176 else: 177 raise TypeError( 178 f"{self.__class__.__name__}.join() only accepts str and FormatStr, got {type(s)}" 179 ) 180 return self.__class__(None, _parts=parts) 181 182 def format(self, _recursive=False, _partial=True, **kwargs) -> "FormatStr": 183 assert _recursive is True or _recursive is False 184 assert _partial is True or _partial is False 185 186 res = [] 187 for p in self._parts: 188 if isinstance(p, str): 189 res.append(p) 190 elif isinstance(p, _SubFormatStr): 191 p2 = copy.copy(p) 192 if _recursive: 193 p2.value = p.value.format( 194 _recursive=_recursive, _partial=_partial, **kwargs 195 ) 196 res.append(p2) 197 elif isinstance(p, _FormatStrField): 198 if p.field_name in kwargs: 199 v = kwargs[p.field_name] 200 if p.conversion: 201 v = _formatter.convert_field(v, p.conversion) 202 if p.format_spec: 203 v = _formatter.format_field(v, p.format_spec) 204 # In case v is not already a string or FmtString 205 if not isinstance(v, (FormatStr, str)): 206 v = str(v) 207 res.append( 208 _SubFormatStr( 209 field_name=p.field_name, 210 field_format_spec=p.field_format_spec, 211 value=v, 212 color=p.color, 213 ) 214 ) 215 else: 216 if not _partial: 217 raise KeyError(p.field_name) 218 res.append(p) 219 return FormatStr(None, _parts=res) 220 221 def free_params(self, _recursive=False) -> set[str]: 222 """Return names of all free parameters""" 223 res = set() 224 for p in self._parts: 225 if isinstance(p, _FormatStrField): 226 res.add(p.field_name) 227 if isinstance(p, _SubFormatStr) and _recursive: 228 res.update(p.value.free_params(_recursive=_recursive)) 229 return res
A partially-formatted string that remembers the substitutions made (incl. nesting). Immutable.
221 def free_params(self, _recursive=False) -> set[str]: 222 """Return names of all free parameters""" 223 res = set() 224 for p in self._parts: 225 if isinstance(p, _FormatStrField): 226 res.add(p.field_name) 227 if isinstance(p, _SubFormatStr) and _recursive: 228 res.update(p.value.free_params(_recursive=_recursive)) 229 return res
Return names of all free parameters
9class HtmlColor: 10 """Helper class representing RGB color. Accepts HTML hex notation (3, 4, 6 or 8 hex digits, with optional '#').""" 11 12 def __init__(self, color: str): 13 color = color.strip() 14 m = re.fullmatch(r"[#]?([0-9a-f]{3,8})", color.lower()) 15 if not m: 16 raise ValueError( 17 f"Invalid color spec: {color!r}, expected HTML hex color spec" 18 ) 19 c = m.groups()[0] 20 if len(c) not in (3, 4, 6, 8): 21 raise ValueError( 22 f"Invalid color spec: {color!r}, expected HTML hex color spec" 23 ) 24 if len(c) <= 4: 25 c = "".join(2 * x for x in c) 26 self.r = int(c[0:2], base=16) 27 self.g = int(c[2:4], base=16) 28 self.b = int(c[4:6], base=16) 29 if len(c) == 8: 30 self.a = int(c[6:8], base=16) 31 else: 32 self.a = None 33 34 def as_floats(self) -> tuple: 35 "Returns (r,g,b) or (r,g,b,a), with all values 0..1" 36 cf = (self.r / 255, self.g / 255, self.b / 255) 37 if self.a is not None: 38 cf += (self.a / 255,) 39 return cf 40 41 @classmethod 42 def from_floats(cls, r: float, g: float, b: float, a: float = None) -> "HtmlColor": 43 """Returns (r,g,b) or (r,g,b,a), with all values 0.0 .. 1.0.""" 44 c = cls("000000") 45 c.r = round(r * 255) 46 c.g = round(g * 255) 47 c.b = round(b * 255) 48 if a is not None: 49 c.a = round(a * 255) 50 return c 51 52 def __str__(self): 53 """Returns the color in '#RRGGBB[AA]' HTML hex format.""" 54 s = f"#{self.r:02x}{self.g:02x}{self.b:02x}" 55 if self.a is not None: 56 s += f"{self.a:02x}" 57 return s 58 59 def __repr__(self): 60 return f"{self.__class__.__name__}({str(self)!r})" 61 62 def copy(self): 63 return copy.copy(self) 64 65 def _blend_lightness(self, tgt: float, rate: float) -> "HtmlColor": 66 hue, light, sat = colorsys.rgb_to_hls(*self.as_floats()) 67 light = rate * tgt + (1 - rate) * light 68 c = self.from_floats(*colorsys.hls_to_rgb(hue, light, sat)) 69 c.a = self.a 70 return c 71 72 def lighter(self, rate=0.2) -> "HtmlColor": 73 "Returns a lighter copy of the color: rate=1.0 returns white, rate=0 returns the same color." 74 return self._blend_lightness(1.0, rate) 75 76 def darker(self, rate=0.2) -> "HtmlColor": 77 "Returns a darker copy of the color: rate=1.0 returns black, rate=0 returns the same color." 78 return self._blend_lightness(0.0, rate) 79 80 def with_alpha(self, a: float) -> "HtmlColor": 81 "Returns a copy of the color with given alpha: float 0.0-1.0" 82 assert isinstance(a, (float, int)) and a <= 1.0 and a >= 0.0 83 c = self.copy() 84 c.a = round(a * 255) 85 return c 86 87 @classmethod 88 def random_color( 89 cls, 90 seed: Any | None = None, 91 saturation=0.7, 92 lighness=0.5, 93 *, 94 vary_lightness_rate=0.2, 95 vary_saturation_rate=0.3, 96 ) -> str: 97 """Returns a pseudo-random color for the given object, or a random color. 98 99 The color is primarily determined by a random hue and given saturation and lightness. 100 By default, the lightness and saturation of the generated color slightly varies from the given values. 101 The returned color is guaranteed to be stable across runs for string seeds 102 (or objects with stable __repr__ value). 103 """ 104 if (seed is not None) and (not isinstance(seed, str)): 105 # This can bring instability between versions of your __repr__ or Python 106 # and for some objects this may just vary even among identical objects 107 seed = repr(seed) 108 # Remove any concrete pointer addresses, as these will vary 109 seed = re.sub(f"0x[0-9a-fA-F]{8,20}", "<0xPTR>", seed) 110 rng = random.Random(seed) 111 112 hue = rng.random() 113 tgt_s = rng.random() 114 tgt_l = rng.random() 115 return cls.from_floats( 116 *colorsys.hls_to_rgb( 117 hue, 118 (1 - vary_lightness_rate) * lighness + vary_lightness_rate * tgt_l, 119 (1 - vary_saturation_rate) * saturation + vary_saturation_rate * tgt_s, 120 ) 121 ) 122 123 def __eq__(self, other): 124 if not isinstance(other, self.__class__): 125 return False 126 return ( 127 (self.r == other.r) 128 and (self.g == other.g) 129 and (self.b == other.b) 130 and (self.a == other.a) 131 )
Helper class representing RGB color. Accepts HTML hex notation (3, 4, 6 or 8 hex digits, with optional '#').
34 def as_floats(self) -> tuple: 35 "Returns (r,g,b) or (r,g,b,a), with all values 0..1" 36 cf = (self.r / 255, self.g / 255, self.b / 255) 37 if self.a is not None: 38 cf += (self.a / 255,) 39 return cf
Returns (r,g,b) or (r,g,b,a), with all values 0..1
41 @classmethod 42 def from_floats(cls, r: float, g: float, b: float, a: float = None) -> "HtmlColor": 43 """Returns (r,g,b) or (r,g,b,a), with all values 0.0 .. 1.0.""" 44 c = cls("000000") 45 c.r = round(r * 255) 46 c.g = round(g * 255) 47 c.b = round(b * 255) 48 if a is not None: 49 c.a = round(a * 255) 50 return c
Returns (r,g,b) or (r,g,b,a), with all values 0.0 .. 1.0.
72 def lighter(self, rate=0.2) -> "HtmlColor": 73 "Returns a lighter copy of the color: rate=1.0 returns white, rate=0 returns the same color." 74 return self._blend_lightness(1.0, rate)
Returns a lighter copy of the color: rate=1.0 returns white, rate=0 returns the same color.
76 def darker(self, rate=0.2) -> "HtmlColor": 77 "Returns a darker copy of the color: rate=1.0 returns black, rate=0 returns the same color." 78 return self._blend_lightness(0.0, rate)
Returns a darker copy of the color: rate=1.0 returns black, rate=0 returns the same color.
80 def with_alpha(self, a: float) -> "HtmlColor": 81 "Returns a copy of the color with given alpha: float 0.0-1.0" 82 assert isinstance(a, (float, int)) and a <= 1.0 and a >= 0.0 83 c = self.copy() 84 c.a = round(a * 255) 85 return c
Returns a copy of the color with given alpha: float 0.0-1.0
87 @classmethod 88 def random_color( 89 cls, 90 seed: Any | None = None, 91 saturation=0.7, 92 lighness=0.5, 93 *, 94 vary_lightness_rate=0.2, 95 vary_saturation_rate=0.3, 96 ) -> str: 97 """Returns a pseudo-random color for the given object, or a random color. 98 99 The color is primarily determined by a random hue and given saturation and lightness. 100 By default, the lightness and saturation of the generated color slightly varies from the given values. 101 The returned color is guaranteed to be stable across runs for string seeds 102 (or objects with stable __repr__ value). 103 """ 104 if (seed is not None) and (not isinstance(seed, str)): 105 # This can bring instability between versions of your __repr__ or Python 106 # and for some objects this may just vary even among identical objects 107 seed = repr(seed) 108 # Remove any concrete pointer addresses, as these will vary 109 seed = re.sub(f"0x[0-9a-fA-F]{8,20}", "<0xPTR>", seed) 110 rng = random.Random(seed) 111 112 hue = rng.random() 113 tgt_s = rng.random() 114 tgt_l = rng.random() 115 return cls.from_floats( 116 *colorsys.hls_to_rgb( 117 hue, 118 (1 - vary_lightness_rate) * lighness + vary_lightness_rate * tgt_l, 119 (1 - vary_saturation_rate) * saturation + vary_saturation_rate * tgt_s, 120 ) 121 )
Returns a pseudo-random color for the given object, or a random color.
The color is primarily determined by a random hue and given saturation and lightness. By default, the lightness and saturation of the generated color slightly varies from the given values. The returned color is guaranteed to be stable across runs for string seeds (or objects with stable __repr__ value).