Skip to content

Commit e94180f

Browse files
committed
Basic configuration for queues.
1 parent b29a512 commit e94180f

11 files changed

+296
-172
lines changed

src/xact/cfg/__init__.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import xact.cfg.exception
1818
import xact.cfg.load
1919
import xact.cfg.override
20+
import xact.cfg.queue
2021
import xact.cfg.validate
2122
import xact.util
2223
import xact.util.serialization
@@ -98,8 +99,9 @@ def denormalize(cfg):
9899
such information explicit.
99100
100101
"""
101-
return xact.cfg.edge.denormalize(
102-
xact.cfg.data.denormalize(cfg))
102+
return xact.cfg.queue.denormalize(
103+
xact.cfg.edge.denormalize(
104+
xact.cfg.data.denormalize(cfg)))
103105

104106

105107
# -----------------------------------------------------------------------------

src/xact/cfg/data/_meta/conftest.py

+3
Original file line numberDiff line numberDiff line change
@@ -85,5 +85,8 @@ def valid_partly_denormalized_config():
8585
}
8686
],
8787
'some_type_alias': 'py_dict'
88+
},
89+
'queue': {
90+
8891
}
8992
}

src/xact/cfg/queue.py

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
2+
# -*- coding: utf-8 -*-
3+
"""
4+
Module of functions that support the configuration of queues.
5+
6+
"""
7+
8+
# -----------------------------------------------------------------------------
9+
def denormalize(cfg):
10+
"""
11+
Add default queue config.
12+
13+
"""
14+
if 'queue' not in cfg:
15+
cfg['queue'] = dict()
16+
cfg['queue']['inter_process'] = 'xact.queue.multiprocessing'
17+
cfg['queue']['inter_host_server'] = 'xact.queue.zmq_server'
18+
cfg['queue']['inter_host_client'] = 'xact.queue.zmq_client'
19+
20+
return cfg
21+
22+
# default_rules: (
23+
# {'edge_type': 'inter_host_server',
24+
# 'from_node': ('python',),
25+
# 'to_node': ('python',),
26+
# 'tags': ('*',),
27+
# 'logic': 'xact.queue.zmq_server'},
28+
# {'edge_type': 'inter_host_client',
29+
# 'from_node': ('python',),
30+
# 'to_node': ('python',),
31+
# 'tags': ('*',),
32+
# 'logic': 'xact.queue.zmq_client'},
33+
# {'edge_type': 'inter_process_server',
34+
# 'from_node': ('python',),
35+
# 'to_node': ('python',),
36+
# 'tags': ('*',),
37+
# 'logic': 'xact.queue.multiprocessing'},
38+
# {'edge_type': 'inter_process_client',
39+
# 'from_node': ('python',),
40+
# 'to_node': ('python',),
41+
# 'tags': ('*',),
42+
# 'logic': 'xact.queue.multiprocessing'},
43+
# {'edge_type': 'intra_process',
44+
# 'from_node': ('python',),
45+
# 'to_node': ('python',),
46+
# 'tags': ('*',),
47+
# 'logic': None})
48+
# cfg['queue']['default_rules'] = default_rules
49+
50+

src/xact/cfg/validate.py

+4
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ def _normalized_cfg_schema():
196196
'additionalProperties': False
197197
}
198198
},
199+
'queue': {
200+
'type': 'object'
201+
},
199202
'req_host_cfg': {
200203
'type': 'object'
201204
},
@@ -269,6 +272,7 @@ def _denormalized_cfg_schema():
269272
_denormalize_host_section(schema)
270273
_denormalize_node_section(schema)
271274
_denormalize_edge_section(schema)
275+
schema['required'].append('queue')
272276
return schema
273277

274278

src/xact/host/__init__.py

+35-18
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import xact.host.util
2222
import xact.log
2323
import xact.proc
24-
import xact.queue
2524

2625

2726
FIFO_HOST_CONTROL = "xact_host_control"
@@ -153,23 +152,35 @@ def _group_edges_by_class(cfg, id_host_local):
153152
154153
"""
155154
map_id_by_class = {
156-
'ipc': set(),
157-
'server': set(),
158-
'client': set()
155+
'intra_process': set(),
156+
'inter_process': set(),
157+
'inter_host_server': set(),
158+
'inter_host_client': set()
159159
}
160160

161161
for cfg_edge in cfg['edge']:
162162
id_edge = cfg_edge['id_edge']
163163

164-
if id_host_local not in cfg_edge['list_id_host']:
164+
# Ignore edges that don't impact the current host.
165+
#
166+
is_on_host_local = id_host_local in cfg_edge['list_id_host']
167+
if not is_on_host_local:
165168
continue
166-
if cfg_edge['ipc_type'] == 'inter_process':
167-
map_id_by_class['ipc'].add(id_edge)
169+
170+
# Inter-host (i.e. remote) queues have a server end and a client end.
171+
#
168172
if cfg_edge['ipc_type'] == 'inter_host':
169173
if id_host_local == cfg_edge['id_host_owner']:
170-
map_id_by_class['server'].add(id_edge)
174+
queue_type = 'inter_host_server'
171175
else:
172-
map_id_by_class['client'].add(id_edge)
176+
queue_type = 'inter_host_client'
177+
178+
# Inter-process and intra-process queues are the same class both ends.
179+
#
180+
else:
181+
queue_type = cfg_edge['ipc_type'] # inter_process or intra_process
182+
183+
map_id_by_class[cfg_edge['ipc_type']].add(id_edge)
173184

174185
return map_id_by_class
175186

@@ -180,16 +191,22 @@ def _construct_queues(cfg, map_cfg_edge, map_id_by_class, id_host_local):
180191
Return a map from edge id to queue instance.
181192
182193
"""
183-
map_queues = dict()
184-
for id_edge in map_id_by_class['server']:
185-
map_queues[id_edge] = xact.queue.RemoteQueueServer(
186-
cfg, map_cfg_edge[id_edge], id_host_local)
187-
188-
for id_edge in map_id_by_class['ipc']:
189-
map_queues[id_edge] = xact.queue.LocalQueue()
194+
# Ensure queue implementations are loaded.
195+
#
196+
map_queue_impl = dict()
197+
for (id_edge_class, spec_module) in cfg['queue'].items():
198+
module = xact.proc.ensure_imported(spec_module)
199+
if 'Queue' not in module.__dict__:
200+
raise xact.signal.NonRecoverableError(
201+
cause = 'No Queue class in specified module.')
202+
map_queue_impl[id_edge_class] = module.Queue
190203

191-
for id_edge in map_id_by_class['client']:
192-
map_queues[id_edge] = xact.queue.RemoteQueueClient(
204+
# Select the correct queue implementation for each edge.
205+
#
206+
map_queues = dict()
207+
for (id_edge_class, queue_impl) in map_queue_impl.items():
208+
for id_edge in map_id_by_class[id_edge_class]:
209+
map_queues[id_edge] = queue_impl(
193210
cfg, map_cfg_edge[id_edge], id_host_local)
194211

195212
return map_queues

src/xact/node/__init__.py

+2-15
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import xact.log
1313
import xact.signal
1414
import xact.util
15+
import xact.proc
1516

1617

1718
# =============================================================================
@@ -115,7 +116,7 @@ def _load_from_module(spec_module):
115116
Log any syntax errors.
116117
117118
"""
118-
module = _ensure_imported(spec_module)
119+
module = xact.proc.ensure_imported(spec_module)
119120

120121
if _is_step(map_func = module.__dict__):
121122
fcn_reset = module.reset
@@ -143,20 +144,6 @@ def _load_serialized(spec, unpacker):
143144
return (fcn_reset, fcn_step)
144145

145146

146-
# -----------------------------------------------------------------------------
147-
def _ensure_imported(spec_module):
148-
"""
149-
Import the specified module or throw a NonRecoverableError
150-
151-
"""
152-
module = None
153-
with xact.log.logger.catch():
154-
module = importlib.import_module(spec_module)
155-
if module is None:
156-
raise xact.signal.NonRecoverableError(cause = 'Module not found.')
157-
return module
158-
159-
160147
# -----------------------------------------------------------------------------
161148
def _is_step(map_func):
162149
"""

src/xact/proc/__init__.py

+27-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77

88
import collections
9+
import importlib
910
import itertools
1011
import multiprocessing
1112

@@ -57,6 +58,20 @@ def start(cfg, id_process, id_process_host, map_queues):
5758
raise RuntimeError('Termination condition not recognized.')
5859

5960

61+
# -----------------------------------------------------------------------------
62+
def ensure_imported(spec_module):
63+
"""
64+
Import the specified module or throw a NonRecoverableError
65+
66+
"""
67+
module = None
68+
with xact.log.logger.catch():
69+
module = importlib.import_module(spec_module)
70+
if module is None:
71+
raise xact.signal.NonRecoverableError(cause = 'Module not found.')
72+
return module
73+
74+
6075
# -----------------------------------------------------------------------------
6176
def _configure(id_process,
6277
map_cfg_node,
@@ -73,11 +88,13 @@ def _configure(id_process,
7388
map_cfg_node,
7489
map_alloc,
7590
runtime)
76-
_config_edges(id_process,
77-
iter_cfg_edge,
78-
map_node,
79-
map_queues,
80-
map_alloc)
91+
92+
_configure_edges(id_process,
93+
iter_cfg_edge,
94+
map_node,
95+
map_queues,
96+
map_alloc)
97+
8198
list_node = _get_list_node_in_runorder(id_process,
8299
map_cfg_node,
83100
iter_cfg_edge,
@@ -121,7 +138,11 @@ def _instantiate_nodes(id_process, map_cfg_node, map_alloc, runtime):
121138

122139

123140
# -----------------------------------------------------------------------------
124-
def _config_edges(id_process, iter_cfg_edge, map_node, map_queues, map_alloc):
141+
def _configure_edges(id_process,
142+
iter_cfg_edge,
143+
map_node,
144+
map_queues,
145+
map_alloc):
125146
"""
126147
Configure the edges in the data flow graph.
127148

0 commit comments

Comments
 (0)