Skip to content

Commit 8f12217

Browse files
implemented simplified VSL links
(cherry picked from commit 7126b59)
1 parent 595f9f7 commit 8f12217

File tree

2 files changed

+80
-3
lines changed

2 files changed

+80
-3
lines changed

src/sym_metanet/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"Network",
1515
"Node",
1616
"Origin",
17+
"SimplifiedLinkWithVsl",
1718
"SimplifiedMeteredOnRamp",
1819
]
1920

@@ -39,7 +40,7 @@
3940
del _notfound, _engine
4041

4142
from sym_metanet.blocks.destinations import CongestedDestination, Destination
42-
from sym_metanet.blocks.links import Link, LinkWithVsl
43+
from sym_metanet.blocks.links import Link, LinkWithVsl, SimplifiedLinkWithVsl
4344
from sym_metanet.blocks.nodes import Node
4445
from sym_metanet.blocks.origins import (
4546
MainstreamOrigin,

src/sym_metanet/blocks/links.py

+78-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from collections.abc import Collection
2-
from typing import TYPE_CHECKING, Any, Optional, Union
2+
from typing import TYPE_CHECKING, Any, Literal, Optional, Union
33

44
from sym_metanet.blocks.base import ElementWithVars
55
from sym_metanet.blocks.origins import MeteredOnRamp
@@ -305,7 +305,7 @@ def __init__(
305305
if index >= self.N or index < 0:
306306
raise ValueError(f"Invalid segment index {index} for VSL sign.")
307307

308-
def init_vars( # type: ignore[override]
308+
def init_vars(
309309
self,
310310
init_conditions: Optional[dict[str, VarType]] = None,
311311
engine: Optional[EngineBase] = None,
@@ -351,3 +351,79 @@ def _get_equilibrium_speed(self, engine: EngineBase, rho: VarType) -> VarType:
351351
self.rho_crit,
352352
self.a,
353353
)
354+
355+
356+
class SimplifiedLinkWithVsl(LinkWithVsl[VarType]):
357+
"""A simplified version of a link with VSL sign, where the equilibrium speed of the
358+
link is the direct control action (instead of controlling the displayed speed).
359+
360+
See `LinkWithVsl` for the original version."""
361+
362+
__slots__ = ("vsl", "alpha", "V_eq_type")
363+
_actions = {"V"}
364+
365+
def __init__(
366+
self,
367+
*args: Any,
368+
segments_with_vsl: set[int],
369+
V_eq_type: Literal["limited", "unlimited"] = "limited",
370+
**kwargs: Any,
371+
) -> None:
372+
"""Creates an instance of a (simplified) METANET link with VSL signs.
373+
374+
Parameters
375+
----------
376+
args, kwargs
377+
See base class `Link` for the other parameters.
378+
Average speed of cars when traffic is freely flowing, i.e., `v_free`.
379+
segments_with_vsl : set of ints
380+
The set of segment indices that are equipped with a VSL sign (0-based).
381+
V_eq_type : "limited" or "unlimited"
382+
If "limited", the equilibrium speed action is capped by the standard
383+
equilibrium speed of the link, which is a function of the link density. If
384+
"unlimited", this cap is not forced, and it's up to the user to satisfy it.
385+
By default, "limited" is selected.
386+
387+
References
388+
----------
389+
[1] Hegyi, A., 2004, "Model predictive control for integrating traffic control
390+
measures", Netherlands TRAIL Research School.
391+
"""
392+
super().__init__(
393+
*args, segments_with_vsl=segments_with_vsl, alpha=float("nan"), **kwargs
394+
)
395+
self.V_eq_type = V_eq_type
396+
397+
def init_vars(
398+
self,
399+
init_conditions: Optional[dict[str, VarType]] = None,
400+
engine: Optional[EngineBase] = None,
401+
*args: Any,
402+
**kwargs: Any,
403+
) -> None:
404+
"""Initializes as control action the equilibrium speed `V` of the link instead
405+
of the control speed `v_ctrl`."""
406+
if init_conditions is None:
407+
init_conditions = {}
408+
if engine is None:
409+
engine = get_current_engine()
410+
411+
super().init_vars(init_conditions, engine, *args, **kwargs)
412+
413+
del self.actions["v_ctrl"]
414+
self.actions["V"] = (
415+
init_conditions["V"]
416+
if "V" in init_conditions
417+
else engine.var(f"V_{self.name}", len(self.vsl))
418+
)
419+
420+
def _get_equilibrium_speed(self, engine: EngineBase, rho: VarType) -> VarType:
421+
if engine is None:
422+
engine = get_current_engine()
423+
V_ctrl = self.actions["V"]
424+
Veq = engine.links.Veq(rho, self.v_free, self.rho_crit, self.a)
425+
if self.V_eq_type == "unlimited":
426+
Veq[self.vsl] = V_ctrl
427+
else:
428+
Veq[self.vsl] = engine.min(Veq[self.vsl], V_ctrl)
429+
return Veq

0 commit comments

Comments
 (0)