-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtools.py
409 lines (338 loc) · 18.9 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
import argparse
import code128
import csv
import time
from collections import defaultdict
from PIL import Image, ImageDraw, ImageFont
from pprint import pprint
from secrets import *
from tme import TME
from mouser import Mouser
from digikey import DigiKey
from lcsc import LCSC
from partkeepr import PartKeepr
from distributor_common import SUPPORTED_DISTRIBUTORS, get_part_data
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-a", "--action", type=str, required=True, choices=('sync-distributors', 'list-empty-part-mf', 'update-locations-from-csv', 'generate-labels', 'rename-from-params', 'update-project-from-csv', 'check-stock-from-csv'), help="Which action to perform")
parser.add_argument("-f", "--force", action='store_true', help="Force certain actions")
parser.add_argument("-o", "--offset", type=int, required=False, help="Offset into parts list (how many parts to skip)")
parser.add_argument("--id", type=int, required=False, help="Single part ID")
parser.add_argument("--location", type=str, required=False, help="Single storage location name")
parser.add_argument("--name-column", type=str, required=False, help="For CSV import: Name column name")
parser.add_argument("--location-column", type=str, required=False, help="For CSV import: Storage location column name")
parser.add_argument("--default-location", type=str, required=False, help="For CSV import: Default storage location if none is found")
parser.add_argument("--order-no-column", type=str, required=False, help="For CSV import: Order number column name")
parser.add_argument("--qty-column", type=str, required=False, help="For CSV import: Quantity column name")
parser.add_argument("--refs-column", type=str, required=False, help="For CSV import: References column name")
parser.add_argument("--csv-file", type=str, required=False, help="For CSV import: CSV file name")
parser.add_argument("--label-width", type=int, required=False, help="For label generation: Label width in millimeters")
parser.add_argument("--label-height", type=int, required=False, help="For label generation: Label height in millimeters")
parser.add_argument("--label-dpi", type=int, required=False, help="For label generation: Label resolution in dpi")
parser.add_argument("--font-size", type=int, required=False, help="For label generation: Font size")
parser.add_argument("--max-parts-per-label", type=int, required=False, help="For label generation: Only generate label for maximum of n parts")
parser.add_argument("--label-file", type=str, required=False, help="For label generation: Label PDF file name")
parser.add_argument("--project-id", type=int, required=False, help="For project CSV import: Internal project ID (integer)")
parser.add_argument("--num-boards", type=int, required=False, help="For stock check: Desired number of boards")
args = parser.parse_args()
pk = PartKeepr(PK_BASE_URL, PK_USERNAME, PK_PASSWORD)
tme = TME(TME_APP_KEY, TME_APP_SECRET)
mouser = Mouser(MOUSER_API_KEY)
digikey = DigiKey(DIGIKEY_CLIENT_ID, DIGIKEY_CLIENT_SECRET)
lcsc = LCSC()
if args.action == 'sync-distributors':
if args.id:
print("Getting part")
parts = [pk.get_part(args.id)]
else:
print("Getting parts")
parts = pk.get_parts()
print("Getting manufacturers")
manufacturers = pk.get_manufacturers()
manufacturer_ids_by_name = dict([(mf['name'].lower(), mf['@id']) for mf in manufacturers])
if args.offset:
parts = parts[args.offset:]
num_parts = len(parts)
errors = []
for i, part in enumerate(parts):
print(" [{: 5d}/{: 5d}] Processing {}".format(i+1, num_parts, part['name']))
part_distributors = part['distributors']
part_manufacturers = part['manufacturers']
part_manufacturer_ids_by_name = dict([(mf['manufacturer']['name'].lower(), mf['@id']) for mf in part_manufacturers])
for distributor in part_distributors:
distributor_name = distributor['distributor']['name']
if distributor_name not in SUPPORTED_DISTRIBUTORS.values():
print(" Skipping distributor {}".format(distributor_name))
continue
print(" Processing distributor {}".format(distributor_name))
order_no = distributor['orderNumber']
part_data = get_part_data(distributor_name, order_no, tme, mouser, digikey, lcsc)
if not part_data:
print(" Failed to get part data!")
errors.append(part['name'])
continue
part = pk.update_part_data(part, part_data, distributor, manufacturer_ids_by_name)
time.sleep(0.2) # To ensure we don't exceed 5 API calls per second
if errors:
print("Parts with errors:")
print("\n".join(errors))
elif args.action == 'list-empty-part-mf':
print("Getting parts")
parts = pk.get_parts()
num_parts = len(parts)
empty_mf_parts = []
for i, part in enumerate(parts):
print(" [{: 5d}/{: 5d}] Processing {}".format(i+1, num_parts, part['name']))
part_manufacturers = part['manufacturers']
if not part_manufacturers:
empty_mf_parts.append(part['name'])
print("Parts without part manufacturers:")
print("\n".join(empty_mf_parts))
elif args.action == 'update-locations-from-csv':
if not args.name_column or not args.location_column or not args.csv_file or not args.default_location:
print("Error: Missing parameters!")
return
if args.id:
print("Getting part")
parts = [pk.get_part(args.id)]
else:
print("Getting parts")
parts = pk.get_parts()
part_indices_by_name = dict([(part['name'].lower(), index) for index, part in enumerate(parts)])
print("Getting storage locations")
locations = pk.get_storage_locations()
location_ids_by_name = dict([(loc['name'].lower(), loc['@id']) for loc in locations])
entries = []
with open(args.csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=',', quotechar='"')
for row in reader:
entries.append(row)
for entry in entries:
name = entry[args.name_column]
location = entry[args.location_column]
if not location:
location = args.default_location
print("Processing {} located in {}".format(name, location))
if name.lower() not in part_indices_by_name:
print(" Could not find part in database, skipping")
continue
part_index = part_indices_by_name[name.lower()]
part = parts[part_index]
if part['storageLocation'] and not args.force:
print(" Part already has storage location assigned, skipping (use -f to override)")
continue
if location.lower() in location_ids_by_name:
print(" Found location in database")
loc_id = location_ids_by_name[location.lower()]
else:
print(" Creating location")
loc_new = {'name': location, 'category': {'@id': "/api/storage_location_categories/1"}}
result = pk.create_storage_location(loc_new)
loc_id = result['@id']
location_ids_by_name[location.lower()] = loc_id
print(" Updating part")
part['storageLocation'] = {'@id': loc_id}
result = pk.update_part(part)
elif args.action == 'generate-labels':
if not args.label_width or not args.label_height or not args.label_dpi or not args.font_size or not args.max_parts_per_label or not args.label_file:
print("Error: Missing parameters!")
return
label_width_px = round((args.label_width / 25.4) * args.label_dpi)
label_height_px = round((args.label_height / 25.4) * args.label_dpi)
print("Getting parts")
parts = pk.get_parts()
parts_by_location = {}
for part in parts:
if not part['storageLocation']:
continue
loc_name = part['storageLocation']['name']
if loc_name in parts_by_location:
parts_by_location[loc_name].append(part)
else:
parts_by_location[loc_name] = [part]
labels = []
font = ImageFont.truetype("LiberationSans-Regular.ttf", args.font_size)
for loc_name, parts in sorted(parts_by_location.items(), key=lambda e: e[0]):
if args.location and loc_name.lower() != args.location.lower():
continue
if len(parts) > args.max_parts_per_label:
print("Skipping storage location {}: {} parts".format(loc_name, len(parts)))
continue
print("Processing storage location {}: {} parts".format(loc_name, len(parts)))
img = Image.new("RGB", (label_width_px, label_height_px), 'white')
draw = ImageDraw.Draw(img)
margin = round(max(label_height_px * 0.02, label_width_px * 0.02))
avail_label_height = label_height_px - 2 * margin
avail_label_width = label_width_px - 2 * margin
# Split label into base grid with fixed height location tag and variable height parts areas
base_x = margin
loc_area_y = margin
loc_area_height = round(args.font_size * 1.5)
parts_area_y = loc_area_y + loc_area_height
parts_area_height = avail_label_height - loc_area_height
# Split parts area into evenly-spaced grid
parts_area_region_height = parts_area_height // len(parts)
parts_area_regions_y = []
for i in range(len(parts)):
region_y = parts_area_y + parts_area_region_height * i
parts_area_regions_y.append(region_y)
draw.text((base_x, loc_area_y), "Location: {}".format(loc_name), 'black', font=font)
for i, part in enumerate(sorted(parts, key=lambda p: p['@id'])):
barcode_text = "{}: {}".format(part['category']['name'], part['name'])
part_id = part['@id'].split("/")[-1]
barcode_height = parts_area_region_height - round(args.font_size * 1.5)
barcode_thickness = avail_label_width // 100
barcode = code128.image("P" + part_id, height=barcode_height, thickness=barcode_thickness, quiet_zone=False)
barcode_x = (avail_label_width - (barcode.size[0])) // 2
barcode_y = parts_area_regions_y[i]
img.paste(barcode, (barcode_x, barcode_y))
name_x = barcode_x
name_y = barcode_y + barcode_height + args.font_size * 0.1
draw.text((name_x, name_y), barcode_text, 'black', font=font)
labels.append(img)
print("Generating PDF")
labels[0].save(args.label_file, "PDF", resolution=args.label_dpi, save_all=True, append_images=labels[1:])
elif args.action == 'rename-from-params':
if args.id:
print("Getting part")
parts = [pk.get_part(args.id)]
else:
print("Getting parts")
parts = pk.get_parts()
num_parts = len(parts)
for i, part in enumerate(parts):
print(" [{: 5d}/{: 5d}] Processing {}".format(i+1, num_parts, part['name']))
new_name = ""
param_dict = defaultdict(lambda: "", [(p['name'], p['stringValue']) for p in part['parameters']])
# The following code needs to be customized depensing on your organization.
# It handles generating short part descriptions to print
# instead of the part number for certain kinds of parts, like resistors.
category = part['category']['name']
if category in ["Resistors"]:
new_name = "{Number of resistors} {Resistance} {Tolerance} {Power} {Case - inch} {Mounting}".format_map(param_dict)
elif category in ["Ceramic Caps"]:
new_name = "{Capacitance} {Tolerance} {Operating voltage} {Dielectric} {Case - inch} {Mounting}".format_map(param_dict)
elif category in ["Electrolytic Caps"]:
new_name = "{Capacitance} {Tolerance} {Operating voltage} {Mounting}".format_map(param_dict)
elif category in ["Tantalum Caps"]:
new_name = "{Capacitance} {Tolerance} {Operating voltage} {Case} {Mounting}".format_map(param_dict)
elif category in ["Fuses"]:
new_name = "{Current rating} {Fuse characteristics} {Rated voltage} {Mounting}".format_map(param_dict)
new_name = " ".join(new_name.split())
if not new_name.strip():
print(" Renaming would result in empty name, skipping")
continue
if new_name == part['name']:
print(" Name unchanged, skipping")
continue
accept = input(" Rename {} to {}? [Y/n] ".format(part['name'], new_name)).lower() in ("", "y")
if accept:
print(" Updating part")
part['name'] = new_name
result = pk.update_part(part)
elif args.action == 'update-project-from-csv':
if not args.order_no_column or not args.qty_column or not args.refs_column or not args.csv_file or not args.project_id:
print("Error: Missing parameters!")
return
print("Getting project")
project = pk.get_project(args.project_id)
print("Getting parts")
parts = pk.get_parts()
part_indices_by_order_no = {}
for index, part in enumerate(parts):
if part['distributors']:
for distributor in part['distributors']:
part_indices_by_order_no[distributor['orderNumber']] = index
entries = []
with open(args.csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=',', quotechar='"')
for row in reader:
entries.append(row)
for entry in entries:
order_no = entry[args.order_no_column]
qty = int(entry[args.qty_column])
refs = entry[args.refs_column]
print("Processing {} ({})".format(order_no, refs))
if order_no not in part_indices_by_order_no:
print(" Could not find part in database, skipping")
continue
part_index = part_indices_by_order_no[order_no]
part = parts[part_index]
project['parts'].append({
'part': {
'@id': part['@id']
},
'quantity': qty,
'remarks': refs,
'overageType': 'absolute',
'overage': 0
})
print("Updating project")
result = pk.update_project(project)
elif args.action == 'check-stock-from-csv':
if not args.order_no_column or not args.qty_column or not args.csv_file or not args.num_boards:
print("Error: Missing parameters!")
return
print("Getting parts")
parts = pk.get_parts()
part_indices_by_order_no = {}
for index, part in enumerate(parts):
if part['distributors']:
for distributor in part['distributors']:
part_indices_by_order_no[distributor['orderNumber']] = index
entries = []
with open(args.csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=',', quotechar='"')
for row in reader:
entries.append(row)
parts_status = {}
for entry in entries:
order_no = entry[args.order_no_column]
qty = args.num_boards * int(entry[args.qty_column])
print("Processing {}".format(order_no))
if order_no not in part_indices_by_order_no:
print(" Could not find part in database, skipping")
parts_status[order_no] = {
'status': 'missing',
'distributor': None,
'stock': 0,
'needed': qty,
'status_text': "Order {}".format(qty)
}
continue
part_index = part_indices_by_order_no[order_no]
part = parts[part_index]
print(" Stock: {}".format(part['stockLevel']))
print(" Required: {}".format(qty))
distributor_name = ""
for distributor in part['distributors']:
if distributor['orderNumber'] == order_no:
distributor_name = distributor['distributor']['name']
if order_no not in parts_status:
parts_status[order_no] = {
'distributor': distributor_name,
'stock': part['stockLevel'],
'needed': qty
}
else:
parts_status[order_no]['needed'] += qty
if part['stockLevel'] < parts_status[order_no]['needed']:
parts_status[order_no]['status'] = 'reorder'
parts_status[order_no]['status_text'] = "{} needed, {} available. Reorder {} at {}".format(parts_status[order_no]['needed'], parts_status[order_no]['stock'], (parts_status[order_no]['needed'] - parts_status[order_no]['stock']), parts_status[order_no]['distributor'])
else:
parts_status[order_no]['status'] = 'available'
parts_status[order_no]['status_text'] = "{} needed, {} available".format(parts_status[order_no]['needed'], parts_status[order_no]['stock'])
print("")
for order_no, status in parts_status.items():
if status['status'] == 'reorder':
print("{}: {}".format(order_no, status['status_text']))
print("")
for order_no, status in parts_status.items():
if status['status'] == 'missing':
print("{}: {}".format(order_no, status['status_text']))
print("")
for order_no, status in parts_status.items():
if status['status'] == 'available':
print("{}: {}".format(order_no, status['status_text']))
if __name__ == "__main__":
main()