[AGIPD] [Correct] Try to simplify & speed up file reading code
Description
Janusz M's investigation showed that actually reading raw AGIPD data was significantly faster than implied by the timings in our notebooks. So the file-reading code is wasting some time. I investigated this and tried to mitigate it.
The biggest cause seems to be the cell selection. We were constructing an array of frame indexes to use, even when we want to use them all. The indexing causes an extra memory copy. By using a slice instead of an index array when all frames are selected, these two lines go from ~120 seconds on one file to ~15 seconds (IDK why copying in memory could be this slow - I suspect something about Xarray):
data_dict['data'][:n_img] = raw_data[frm_ix, 0]
data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1]
Second, the added complexity of Xarray and the AGIPD component class, which we're not actually using here. I switched to reading numpy arrays to simplify things, which dropped the two lines above from ~15 to ~8 seconds. This includes converting the ints to floats in data
.
How Has This Been Tested?
Running the notebook for MID data from run 6796:
xfel-calibrate agipd CORRECT \
--ctrl-source-template '{}/MDL/FPGA_COMP' \
--karabo-da AGIPD00 AGIPD01 AGIPD02 AGIPD03 AGIPD04 AGIPD05 AGIPD06 AGIPD07 AGIPD08 AGIPD09 AGIPD10 AGIPD11 AGIPD12 AGIPD13 AGIPD14 AGIPD15 \
--sequences 0-1 \
--karabo-id-control MID_EXP_AGIPD1M1 --receiver-template '{}CH0' \
--compress-fields gain mask data --recast-image-data int16 --round-photons \
--use-litframe-finder auto --use-super-selection final \
--use-xgm-device SA2_XTD1_XGM/XGM/DOOCS --adjust-mg-baseline \
--bias-voltage 300 --blc-set-min --blc-stripes --cm-dark-fraction 0.15 \
--cm-dark-range -30 30 --cm-n-itr 4 --common-mode --ff-gain 1.0 \
--force-hg-if-below --force-mg-if-below --hg-hard-threshold 1000 \
--low-medium-gap --mg-hard-threshold 1000 --overwrite --rel-gain \
--sequences-per-node 1 --slopes-ff-from-files '' --xray-gain --max-tasks-per-worker 1 \
--in-folder /gpfs/exfel/exp/MID/202325/p006976/raw --run 50 \
--out-folder /gpfs/exfel/data/scratch/kluyvert/agipd-corr-p6976-r50 \
--karabo-id MID_DET_AGIPD1M-1
Relevant Documents (optional)
Timing results from running entire notebook
Original correction of p6796 r50:
Total processing time 1181.5 s
Timing summary per batch of 4 files:
Constants were retrieved in: 6.1 +- 0.00 s
Constants were loaded in : 24.2 +- 0.00 s
Started pool: 0.6 +- 0.00 s
Loading data from files: 96.1 +- 3.63 s
Offset correction: 22.2 +- 0.08 s
Base-line shift correction: 31.9 +- 0.17 s
Common-mode correction: 19.7 +- 0.46 s
Applying selected cells after common mode correction: 51.8 +- 0.34 s
Gain corrections: 38.8 +- 0.63 s
Save: 27.2 +- 0.58 s
Re-running with master today:
Total processing time 1280.5 s
Timing summary per batch of 4 files:
Constants were retrieved in: 6.1 +- 0.00 s
Constants were loaded in : 21.8 +- 0.00 s
Started pool: 1.0 +- 0.00 s
Loading data from files: 107.6 +- 5.36 s
Offset correction: 23.1 +- 0.22 s
Base-line shift correction: 32.9 +- 0.58 s
Common-mode correction: 19.1 +- 0.87 s
Applying selected cells after common mode correction: 58.1 +- 0.40 s
Gain corrections: 40.3 +- 0.59 s
Save: 31.8 +- 1.16 s
After:
Total processing time 1042.3 s
Timing summary per batch of 4 files:
Constants were retrieved in: 6.0 +- 0.00 s
Constants were loaded in : 16.9 +- 0.00 s
Started pool: 0.8 +- 0.00 s
Loading data from files: 37.9 +- 2.05 s
Offset correction: 24.4 +- 0.42 s
Base-line shift correction: 34.6 +- 0.44 s
Common-mode correction: 21.4 +- 0.27 s
Applying selected cells after common mode correction: 61.6 +- 0.23 s
Gain corrections: 41.8 +- 0.47 s
Save: 32.9 +- 0.85 s
i.e. better than a 2x speedup in the loading step, but a relatively small impact on the overall time for correction.
Types of changes
- Bug fix (non-breaking change which fixes an issue)
Checklist:
- My code follows the code style of this project.
Reviewers
Merge request reports
Activity
Further detail: line profiles of
read_file
.Before these changes
Timer unit: 1e-06 s Total time: 132.418 s File: /home/kluyvert/.conda/envs/offline-env/lib/python3.8/site-packages/cal_tools/agipdlib.py Function: read_file at line 643 Line # Hits Time Per Hit % Time Line Contents ============================================================== 643 def read_file(self, i_proc: int, file_name: str, 644 apply_sel_pulses: Optional[bool] = True 645 ) -> int: 646 """Read file with raw data to shared memory 647 648 :param file_name: Name of input file including path. 649 :param i_proc: Index of shared memory array. 650 :param apply_sel_pulses: apply selected pulses before 651 all corrections. 652 :return: 653 - n_img: The number of images to correct. 654 """ 655 1 16.0 16.0 0.0 module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) 656 1 3.0 3.0 0.0 agipd_base = self.h5_data_path.format(module_idx) 657 1 2.0 2.0 0.0 data_dict = self.shared_dict[i_proc] 658 1 16.0 16.0 0.0 data_dict['moduleIdx'][0] = module_idx 659 660 1 63156.0 63156.0 0.0 h5_dc = H5File(file_name) 661 662 # Exclude trains without data. 663 1 1101338.0 1101338.0 0.8 im_dc = h5_dc.select(agipd_base, "image.*", require_all=True) 664 665 2 743.0 371.5 0.0 valid_train_ids = self.get_valid_image_idx( 666 1 322.0 322.0 0.0 im_dc[agipd_base, "image.trainId"]) 667 668 # filter out trains which will not be selected 669 2 29.0 14.5 0.0 valid_train_ids = self.cell_sel.filter_trains( 670 1 16.0 16.0 0.0 np.array(valid_train_ids)).tolist() 671 672 1 1.0 1.0 0.0 if not valid_train_ids: 673 # If there's not a single valid train, exit early. 674 print(f"WARNING: No valid trains for {im_dc.files} to process.") 675 data_dict['nImg'][0] = 0 676 return 0 677 678 # Exclude non_valid trains from the selected data collection. 679 1 834.0 834.0 0.0 im_dc = im_dc.select_trains(by_id(valid_train_ids)) 680 681 # Just want to be sure that order is correct 682 1 3.0 3.0 0.0 valid_train_ids = im_dc.train_ids 683 # Get a count of images in each train 684 1 1157.0 1157.0 0.0 nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False) 685 1 6.0 6.0 0.0 nimg_in_trains = nimg_in_trains.astype(int) 686 687 # store valid trains in shared memory 688 1 2.0 2.0 0.0 n_valid_trains = len(valid_train_ids) 689 1 12.0 12.0 0.0 data_dict["n_valid_trains"][0] = n_valid_trains 690 1 91.0 91.0 0.0 data_dict["valid_trains"][:n_valid_trains] = valid_train_ids 691 692 1 5.0 5.0 0.0 if "AGIPD500K" in agipd_base: 693 agipd_comp = components.AGIPD500K(im_dc) 694 else: 695 1 221801.0 221801.0 0.2 agipd_comp = components.AGIPD1M(im_dc) 696 697 1 1.0 1.0 0.0 kw = { 698 1 2.0 2.0 0.0 "unstack_pulses": False, 699 } 700 701 # get selection for the images in this file 702 2 3.0 1.5 0.0 cm = (self.cell_sel.CM_NONE if apply_sel_pulses 703 1 3.0 3.0 0.0 else self.cell_sel.CM_PRESEL) 704 705 1 29553.0 29553.0 0.0 cellid = np.squeeze(im_dc[agipd_base, "image.cellId"].ndarray()) 706 707 2 4290.0 2145.0 0.0 img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains( 708 1 414.0 414.0 0.0 np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm) 709 1 34.0 34.0 0.0 data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains 710 711 1 514.0 514.0 0.0 frm_ix = np.flatnonzero(img_selected) 712 1 12.0 12.0 0.0 data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) 713 1 2.0 2.0 0.0 n_img = len(frm_ix) 714 715 # read raw data 716 # [n_modules, n_imgs, 2, x, y] 717 1 11974924.0 11974924.0 9.0 raw_data = agipd_comp.get_array("image.data", **kw)[0] 718 719 # store in shmem only selected images 720 1 13.0 13.0 0.0 data_dict['nImg'][0] = n_img 721 1 68629020.0 68629020.0 51.8 data_dict['data'][:n_img] = raw_data[frm_ix, 0] 722 1 50183172.0 50183172.0 37.9 data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1] 723 4 82093.0 20523.2 0.1 data_dict['cellId'][:n_img] = agipd_comp.get_array( 724 2 6.0 3.0 0.0 "image.cellId", **kw)[0, frm_ix] 725 4 61478.0 15369.5 0.0 data_dict['pulseId'][:n_img] = agipd_comp.get_array( 726 2 3.0 1.5 0.0 "image.pulseId", **kw)[0, frm_ix] 727 4 63402.0 15850.5 0.0 data_dict['trainId'][:n_img] = agipd_comp.get_array( 728 2 3.0 1.5 0.0 "image.trainId", **kw)[0, frm_ix] 729 730 1 3.0 3.0 0.0 return n_img
Indexing change only
Timer unit: 1e-06 s Total time: 24.8342 s File: <ipython-input-64-689693c0871f> Function: read_file at line 4 Line # Hits Time Per Hit % Time Line Contents ============================================================== 4 def read_file(self, i_proc: int, file_name: str, 5 apply_sel_pulses: Optional[bool] = True 6 ) -> int: 7 """Read file with raw data to shared memory 8 9 :param file_name: Name of input file including path. 10 :param i_proc: Index of shared memory array. 11 :param apply_sel_pulses: apply selected pulses before 12 all corrections. 13 :return: 14 - n_img: The number of images to correct. 15 """ 16 1 7.0 7.0 0.0 module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) 17 1 4.0 4.0 0.0 agipd_base = self.h5_data_path.format(module_idx) 18 1 2.0 2.0 0.0 data_dict = self.shared_dict[i_proc] 19 1 6.0 6.0 0.0 data_dict['moduleIdx'][0] = module_idx 20 21 1 398.0 398.0 0.0 h5_dc = H5File(file_name) 22 23 # Exclude trains without data. 24 1 5399.0 5399.0 0.0 im_dc = h5_dc.select(agipd_base, "image.*", require_all=True) 25 26 2 358.0 179.0 0.0 valid_train_ids = self.get_valid_image_idx( 27 1 308.0 308.0 0.0 im_dc[agipd_base, "image.trainId"]) 28 29 # filter out trains which will not be selected 30 2 18.0 9.0 0.0 valid_train_ids = self.cell_sel.filter_trains( 31 1 17.0 17.0 0.0 np.array(valid_train_ids)).tolist() 32 33 1 1.0 1.0 0.0 if not valid_train_ids: 34 # If there's not a single valid train, exit early. 35 print(f"WARNING: No valid trains for {im_dc.files} to process.") 36 data_dict['nImg'][0] = 0 37 return 0 38 39 # Exclude non_valid trains from the selected data collection. 40 1 814.0 814.0 0.0 im_dc = im_dc.select_trains(by_id(valid_train_ids)) 41 42 # Just want to be sure that order is correct 43 1 2.0 2.0 0.0 valid_train_ids = im_dc.train_ids 44 # Get a count of images in each train 45 1 1033.0 1033.0 0.0 nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False) 46 1 7.0 7.0 0.0 nimg_in_trains = nimg_in_trains.astype(np.int64) 47 48 # store valid trains in shared memory 49 1 1.0 1.0 0.0 n_valid_trains = len(valid_train_ids) 50 1 3.0 3.0 0.0 data_dict["n_valid_trains"][0] = n_valid_trains 51 1 38.0 38.0 0.0 data_dict["valid_trains"][:n_valid_trains] = valid_train_ids 52 53 1 1.0 1.0 0.0 if "AGIPD500K" in agipd_base: 54 agipd_comp = components.AGIPD500K(im_dc) 55 else: 56 1 52159.0 52159.0 0.2 agipd_comp = components.AGIPD1M(im_dc) 57 58 1 2.0 2.0 0.0 kw = { 59 1 2.0 2.0 0.0 "unstack_pulses": False, 60 } 61 62 # get selection for the images in this file 63 2 3.0 1.5 0.0 cm = (self.cell_sel.CM_NONE if apply_sel_pulses 64 1 2.0 2.0 0.0 else self.cell_sel.CM_PRESEL) 65 66 1 23985.0 23985.0 0.1 cellid = np.squeeze(im_dc[agipd_base, "image.cellId"].ndarray()) 67 68 2 4289.0 2144.5 0.0 img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains( 69 1 336.0 336.0 0.0 np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm) 70 1 20.0 20.0 0.0 data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains 71 72 1 4.0 4.0 0.0 data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) 73 74 1 16.0 16.0 0.0 if img_selected.all(): 75 # All frames selected - use slice to skip unnecessary copy 76 1 4.0 4.0 0.0 frm_ix = np.s_[:] 77 else: 78 frm_ix = np.flatnonzero(img_selected) 79 1 118.0 118.0 0.0 n_img = img_selected.sum() 80 81 # read raw data 82 # [n_modules, n_imgs, 2, x, y] 83 1 9967626.0 9967626.0 40.1 raw_data = agipd_comp.get_array("image.data", **kw)[0] 84 85 # store in shmem only selected images 86 1 5.0 5.0 0.0 data_dict['nImg'][0] = n_img 87 1 11935722.0 11935722.0 48.1 data_dict['data'][:n_img] = raw_data[frm_ix, 0] 88 1 2650431.0 2650431.0 10.7 data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1] 89 4 74927.0 18731.8 0.3 data_dict['cellId'][:n_img] = agipd_comp.get_array( 90 2 3.0 1.5 0.0 "image.cellId", **kw)[0, frm_ix] 91 4 57820.0 14455.0 0.2 data_dict['pulseId'][:n_img] = agipd_comp.get_array( 92 2 4.0 2.0 0.0 "image.pulseId", **kw)[0, frm_ix] 93 4 58312.0 14578.0 0.2 data_dict['trainId'][:n_img] = agipd_comp.get_array( 94 2 4.0 2.0 0.0 "image.trainId", **kw)[0, frm_ix] 95 96 1 3.0 3.0 0.0 return n_img
Loading to numpy array rather than xarray
Timer unit: 1e-06 s Total time: 16.7732 s File: <ipython-input-68-a8daa11e503a> Function: read_file at line 4 Line # Hits Time Per Hit % Time Line Contents ============================================================== 4 def read_file(self, i_proc: int, file_name: str, 5 apply_sel_pulses: Optional[bool] = True 6 ) -> int: 7 """Read file with raw data to shared memory 8 9 :param file_name: Name of input file including path. 10 :param i_proc: Index of shared memory array. 11 :param apply_sel_pulses: apply selected pulses before 12 all corrections. 13 :return: 14 - n_img: The number of images to correct. 15 """ 16 1 7.0 7.0 0.0 module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) 17 1 4.0 4.0 0.0 agipd_base = self.h5_data_path.format(module_idx) 18 1 1.0 1.0 0.0 data_dict = self.shared_dict[i_proc] 19 1 6.0 6.0 0.0 data_dict['moduleIdx'][0] = module_idx 20 21 1 405.0 405.0 0.0 h5_dc = H5File(file_name) 22 23 # Exclude trains without data. 24 1 5248.0 5248.0 0.0 im_dc = h5_dc.select(agipd_base, "image.*", require_all=True) 25 26 2 359.0 179.5 0.0 valid_train_ids = self.get_valid_image_idx( 27 1 295.0 295.0 0.0 im_dc[agipd_base, "image.trainId"]) 28 29 # filter out trains which will not be selected 30 2 18.0 9.0 0.0 valid_train_ids = self.cell_sel.filter_trains( 31 1 16.0 16.0 0.0 np.array(valid_train_ids)).tolist() 32 33 1 2.0 2.0 0.0 if not valid_train_ids: 34 # If there's not a single valid train, exit early. 35 print(f"WARNING: No valid trains for {im_dc.files} to process.") 36 data_dict['nImg'][0] = 0 37 return 0 38 39 # Exclude non_valid trains from the selected data collection. 40 1 755.0 755.0 0.0 im_dc = im_dc.select_trains(by_id(valid_train_ids)) 41 42 # Just want to be sure that order is correct 43 1 2.0 2.0 0.0 valid_train_ids = im_dc.train_ids 44 # Get a count of images in each train 45 1 913.0 913.0 0.0 nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False) 46 1 7.0 7.0 0.0 nimg_in_trains = nimg_in_trains.astype(np.int64) 47 48 # store valid trains in shared memory 49 1 1.0 1.0 0.0 n_valid_trains = len(valid_train_ids) 50 1 2.0 2.0 0.0 data_dict["n_valid_trains"][0] = n_valid_trains 51 1 37.0 37.0 0.0 data_dict["valid_trains"][:n_valid_trains] = valid_train_ids 52 53 # if "AGIPD500K" in agipd_base: 54 # agipd_comp = components.AGIPD500K(im_dc) 55 # else: 56 # agipd_comp = components.AGIPD1M(im_dc) 57 58 59 # get selection for the images in this file 60 2 2.0 1.0 0.0 cm = (self.cell_sel.CM_NONE if apply_sel_pulses 61 1 2.0 2.0 0.0 else self.cell_sel.CM_PRESEL) 62 63 1 5.0 5.0 0.0 agipd_src = im_dc[agipd_base] 64 65 1 23661.0 23661.0 0.1 cellid = agipd_src["image.cellId"].ndarray()[:, 0] 66 67 2 4420.0 2210.0 0.0 img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains( 68 1 36.0 36.0 0.0 np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm) 69 1 25.0 25.0 0.0 data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains 70 71 1 3.0 3.0 0.0 data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) 72 73 1 123.0 123.0 0.0 n_img = img_selected.sum() 74 1 16.0 16.0 0.0 if img_selected.all(): 75 # All frames selected - use slice to skip unnecessary copy 76 1 4.0 4.0 0.0 frm_ix = np.s_[:] 77 else: 78 frm_ix = np.flatnonzero(img_selected) 79 80 # read raw data 81 # [n_imgs, 2, x, y] 82 1 9430756.0 9430756.0 56.2 raw_data = agipd_src['image.data'].ndarray() 83 84 # store in shmem only selected images 85 1 9.0 9.0 0.0 data_dict['nImg'][0] = n_img 86 1 4591767.0 4591767.0 27.4 data_dict['data'][:n_img] = raw_data[frm_ix, 0] 87 1 2649703.0 2649703.0 15.8 data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1] 88 1 86.0 86.0 0.0 data_dict['cellId'][:n_img] = cellid[frm_ix] 89 1 35078.0 35078.0 0.2 data_dict['pulseId'][:n_img] = agipd_src['image.pulseId'].ndarray()[frm_ix, 0] 90 1 29424.0 29424.0 0.2 data_dict['trainId'][:n_img] = agipd_src['image.trainId'].ndarray()[frm_ix, 0] 91 92 1 2.0 2.0 0.0 return n_img
Nice work! It's unfortunate it doesn't make a huge dent overall, but a big improvement to this particular section. It also means it makes sense to look at the computational sections again.
Indeed I have encountered similar problems with
xarray
in the past, e.g. here.Currently the call to
KeyData.ndarray()
is still doing a temporary allocation. Do you think a static buffer may have further benefit here?Currently the call to
KeyData.ndarray()
is still doing a temporary allocationThe AGIPD raw data is a mixture of the 'real' data and the raw gain information, which we want to separate out into two different arrays (and convert the 'real' data to floats). When we're keeping all cells, we could read those out with something like
.ndarray(out=target, roi=np.s_[0])
.But I suspect doing two separate reads will be slower than copying it from the temporary array. In particular because a chunk contains both of those together, so doing two reads means HDF5 reading all the chunks twice.
More difference than I thought, but still not massive: one call to
read_file()
(with no parallelism) goes from ~16 seconds to ~13 seconds on the second go, once it's reusing memory.My gut reaction is that this saving isn't worth keeping another copy of the full data in memory for (and adding the minor complexity of allocating and using that array), but...
Profiling current state of the PR (9bd70343)
Timer unit: 1e-06 s Total time: 16.1484 s File: <ipython-input-27-89ab80a65e56> Function: read_file at line 4 Line # Hits Time Per Hit % Time Line Contents ============================================================== 4 def read_file(self, i_proc: int, file_name: str, 5 apply_sel_pulses: Optional[bool] = True 6 ) -> int: 7 """Read file with raw data to shared memory 8 9 :param file_name: Name of input file including path. 10 :param i_proc: Index of shared memory array. 11 :param apply_sel_pulses: apply selected pulses before 12 all corrections. 13 :return: 14 - n_img: The number of images to correct. 15 """ 16 1 6.0 6.0 0.0 module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) 17 1 4.0 4.0 0.0 agipd_base = self.h5_data_path.format(module_idx) 18 1 2.0 2.0 0.0 data_dict = self.shared_dict[i_proc] 19 1 6.0 6.0 0.0 data_dict['moduleIdx'][0] = module_idx 20 21 1 7196.0 7196.0 0.0 h5_dc = H5File(file_name) 22 23 # Exclude trains without data. 24 1 74041.0 74041.0 0.5 im_dc = h5_dc.select(agipd_base, "image.*", require_all=True) 25 26 2 374.0 187.0 0.0 valid_train_ids = self.get_valid_image_idx( 27 1 290.0 290.0 0.0 im_dc[agipd_base, "image.trainId"]) 28 29 # filter out trains which will not be selected 30 2 17.0 8.5 0.0 valid_train_ids = self.cell_sel.filter_trains( 31 1 16.0 16.0 0.0 np.array(valid_train_ids)).tolist() 32 33 1 1.0 1.0 0.0 if not valid_train_ids: 34 # If there's not a single valid train, exit early. 35 print(f"WARNING: No valid trains for {im_dc.files} to process.") 36 data_dict['nImg'][0] = 0 37 return 0 38 39 # Exclude non_valid trains from the selected data collection. 40 1 816.0 816.0 0.0 im_dc = im_dc.select_trains(by_id(valid_train_ids)) 41 42 # Just want to be sure that order is correct 43 1 2.0 2.0 0.0 valid_train_ids = im_dc.train_ids 44 # Get a count of images in each train 45 1 940.0 940.0 0.0 nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False) 46 1 6.0 6.0 0.0 nimg_in_trains = nimg_in_trains.astype(np.int64) 47 48 # store valid trains in shared memory 49 1 1.0 1.0 0.0 n_valid_trains = len(valid_train_ids) 50 1 3.0 3.0 0.0 data_dict["n_valid_trains"][0] = n_valid_trains 51 1 39.0 39.0 0.0 data_dict["valid_trains"][:n_valid_trains] = valid_train_ids 52 53 # get selection for the images in this file 54 2 2.0 1.0 0.0 cm = (self.cell_sel.CM_NONE if apply_sel_pulses 55 1 2.0 2.0 0.0 else self.cell_sel.CM_PRESEL) 56 57 1 5.0 5.0 0.0 agipd_src = im_dc[agipd_base] 58 59 1 22992.0 22992.0 0.1 cellid = agipd_src["image.cellId"].ndarray()[:, 0] 60 61 2 4712.0 2356.0 0.0 img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains( 62 1 114.0 114.0 0.0 np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm) 63 64 1 21.0 21.0 0.0 data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains 65 1 3.0 3.0 0.0 data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) 66 67 1 120.0 120.0 0.0 n_img = img_selected.sum() 68 1 15.0 15.0 0.0 if img_selected.all(): 69 # All frames selected - use slice to skip unnecessary copy 70 1 3.0 3.0 0.0 frm_ix = np.s_[:] 71 else: 72 frm_ix = np.flatnonzero(img_selected) 73 74 # read raw data 75 # [n_imgs, 2, x, y] 76 1 9770154.0 9770154.0 60.5 raw_data = agipd_src['image.data'].ndarray() 77 78 # store in shmem only selected images 79 1 10.0 10.0 0.0 data_dict['nImg'][0] = n_img 80 1 3809884.0 3809884.0 23.6 data_dict['data'][:n_img] = raw_data[frm_ix, 0] 81 1 2377308.0 2377308.0 14.7 data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1] 82 1 96.0 96.0 0.0 data_dict['cellId'][:n_img] = cellid[frm_ix] 83 1 56848.0 56848.0 0.4 data_dict['pulseId'][:n_img] = agipd_src['image.pulseId'].ndarray()[frm_ix, 0] 84 1 22387.0 22387.0 0.1 data_dict['trainId'][:n_img] = agipd_src['image.trainId'].ndarray()[frm_ix, 0] 85 86 1 2.0 2.0 0.0 return n_img
With preallocated buffer - first use
Timer unit: 1e-06 s Total time: 15.8114 s File: <ipython-input-34-114898917ff6> Function: read_file at line 6 Line # Hits Time Per Hit % Time Line Contents ============================================================== 6 def read_file(self, i_proc: int, file_name: str, 7 apply_sel_pulses: Optional[bool] = True 8 ) -> int: 9 """Read file with raw data to shared memory 10 11 :param file_name: Name of input file including path. 12 :param i_proc: Index of shared memory array. 13 :param apply_sel_pulses: apply selected pulses before 14 all corrections. 15 :return: 16 - n_img: The number of images to correct. 17 """ 18 1 8.0 8.0 0.0 module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) 19 1 3.0 3.0 0.0 agipd_base = self.h5_data_path.format(module_idx) 20 1 2.0 2.0 0.0 data_dict = self.shared_dict[i_proc] 21 1 7.0 7.0 0.0 data_dict['moduleIdx'][0] = module_idx 22 23 1 7792.0 7792.0 0.0 h5_dc = H5File(file_name) 24 25 # Exclude trains without data. 26 1 66134.0 66134.0 0.4 im_dc = h5_dc.select(agipd_base, "image.*", require_all=True) 27 28 2 350.0 175.0 0.0 valid_train_ids = self.get_valid_image_idx( 29 1 291.0 291.0 0.0 im_dc[agipd_base, "image.trainId"]) 30 31 # filter out trains which will not be selected 32 2 16.0 8.0 0.0 valid_train_ids = self.cell_sel.filter_trains( 33 1 16.0 16.0 0.0 np.array(valid_train_ids)).tolist() 34 35 1 1.0 1.0 0.0 if not valid_train_ids: 36 # If there's not a single valid train, exit early. 37 print(f"WARNING: No valid trains for {im_dc.files} to process.") 38 data_dict['nImg'][0] = 0 39 return 0 40 41 # Exclude non_valid trains from the selected data collection. 42 1 812.0 812.0 0.0 im_dc = im_dc.select_trains(by_id(valid_train_ids)) 43 44 # Just want to be sure that order is correct 45 1 2.0 2.0 0.0 valid_train_ids = im_dc.train_ids 46 # Get a count of images in each train 47 1 922.0 922.0 0.0 nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False) 48 1 7.0 7.0 0.0 nimg_in_trains = nimg_in_trains.astype(np.int64) 49 50 # store valid trains in shared memory 51 1 2.0 2.0 0.0 n_valid_trains = len(valid_train_ids) 52 1 3.0 3.0 0.0 data_dict["n_valid_trains"][0] = n_valid_trains 53 1 37.0 37.0 0.0 data_dict["valid_trains"][:n_valid_trains] = valid_train_ids 54 55 # get selection for the images in this file 56 2 2.0 1.0 0.0 cm = (self.cell_sel.CM_NONE if apply_sel_pulses 57 1 2.0 2.0 0.0 else self.cell_sel.CM_PRESEL) 58 59 1 4.0 4.0 0.0 agipd_src = im_dc[agipd_base] 60 61 1 22713.0 22713.0 0.1 cellid = agipd_src["image.cellId"].ndarray()[:, 0] 62 63 2 4560.0 2280.0 0.0 img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains( 64 1 47.0 47.0 0.0 np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm) 65 66 1 19.0 19.0 0.0 data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains 67 1 3.0 3.0 0.0 data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) 68 69 1 121.0 121.0 0.0 n_img = img_selected.sum() 70 1 15.0 15.0 0.0 if img_selected.all(): 71 # All frames selected - use slice to skip unnecessary copy 72 1 3.0 3.0 0.0 frm_ix = np.s_[:] 73 else: 74 frm_ix = np.flatnonzero(img_selected) 75 76 # read raw data 77 # [n_imgs, 2, x, y] 78 1 9647374.0 9647374.0 61.0 agipd_src['image.data'].ndarray(out=tmp_arr) 79 80 # store in shmem only selected images 81 1 10.0 10.0 0.0 data_dict['nImg'][0] = n_img 82 1 3695789.0 3695789.0 23.4 data_dict['data'][:n_img] = tmp_arr[frm_ix, 0] 83 1 2299796.0 2299796.0 14.5 data_dict['rawgain'][:n_img] = tmp_arr[frm_ix, 1] 84 1 103.0 103.0 0.0 data_dict['cellId'][:n_img] = cellid[frm_ix] 85 1 33207.0 33207.0 0.2 data_dict['pulseId'][:n_img] = agipd_src['image.pulseId'].ndarray()[frm_ix, 0] 86 1 31190.0 31190.0 0.2 data_dict['trainId'][:n_img] = agipd_src['image.trainId'].ndarray()[frm_ix, 0] 87 88 1 2.0 2.0 0.0 return n_img
With preallocated buffer - reuse
Timer unit: 1e-06 s Total time: 12.8709 s File: <ipython-input-34-114898917ff6> Function: read_file at line 6 Line # Hits Time Per Hit % Time Line Contents ============================================================== 6 def read_file(self, i_proc: int, file_name: str, 7 apply_sel_pulses: Optional[bool] = True 8 ) -> int: 9 """Read file with raw data to shared memory 10 11 :param file_name: Name of input file including path. 12 :param i_proc: Index of shared memory array. 13 :param apply_sel_pulses: apply selected pulses before 14 all corrections. 15 :return: 16 - n_img: The number of images to correct. 17 """ 18 1 7.0 7.0 0.0 module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) 19 1 3.0 3.0 0.0 agipd_base = self.h5_data_path.format(module_idx) 20 1 1.0 1.0 0.0 data_dict = self.shared_dict[i_proc] 21 1 7.0 7.0 0.0 data_dict['moduleIdx'][0] = module_idx 22 23 1 7204.0 7204.0 0.1 h5_dc = H5File(file_name) 24 25 # Exclude trains without data. 26 1 36890.0 36890.0 0.3 im_dc = h5_dc.select(agipd_base, "image.*", require_all=True) 27 28 2 366.0 183.0 0.0 valid_train_ids = self.get_valid_image_idx( 29 1 294.0 294.0 0.0 im_dc[agipd_base, "image.trainId"]) 30 31 # filter out trains which will not be selected 32 2 17.0 8.5 0.0 valid_train_ids = self.cell_sel.filter_trains( 33 1 17.0 17.0 0.0 np.array(valid_train_ids)).tolist() 34 35 1 1.0 1.0 0.0 if not valid_train_ids: 36 # If there's not a single valid train, exit early. 37 print(f"WARNING: No valid trains for {im_dc.files} to process.") 38 data_dict['nImg'][0] = 0 39 return 0 40 41 # Exclude non_valid trains from the selected data collection. 42 1 808.0 808.0 0.0 im_dc = im_dc.select_trains(by_id(valid_train_ids)) 43 44 # Just want to be sure that order is correct 45 1 2.0 2.0 0.0 valid_train_ids = im_dc.train_ids 46 # Get a count of images in each train 47 1 915.0 915.0 0.0 nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False) 48 1 6.0 6.0 0.0 nimg_in_trains = nimg_in_trains.astype(np.int64) 49 50 # store valid trains in shared memory 51 1 1.0 1.0 0.0 n_valid_trains = len(valid_train_ids) 52 1 3.0 3.0 0.0 data_dict["n_valid_trains"][0] = n_valid_trains 53 1 38.0 38.0 0.0 data_dict["valid_trains"][:n_valid_trains] = valid_train_ids 54 55 # get selection for the images in this file 56 2 2.0 1.0 0.0 cm = (self.cell_sel.CM_NONE if apply_sel_pulses 57 1 2.0 2.0 0.0 else self.cell_sel.CM_PRESEL) 58 59 1 4.0 4.0 0.0 agipd_src = im_dc[agipd_base] 60 61 1 22870.0 22870.0 0.2 cellid = agipd_src["image.cellId"].ndarray()[:, 0] 62 63 2 4566.0 2283.0 0.0 img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains( 64 1 153.0 153.0 0.0 np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm) 65 66 1 20.0 20.0 0.0 data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains 67 1 3.0 3.0 0.0 data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) 68 69 1 126.0 126.0 0.0 n_img = img_selected.sum() 70 1 16.0 16.0 0.0 if img_selected.all(): 71 # All frames selected - use slice to skip unnecessary copy 72 1 4.0 4.0 0.0 frm_ix = np.s_[:] 73 else: 74 frm_ix = np.flatnonzero(img_selected) 75 76 # read raw data 77 # [n_imgs, 2, x, y] 78 1 6735341.0 6735341.0 52.3 agipd_src['image.data'].ndarray(out=tmp_arr) 79 80 # store in shmem only selected images 81 1 10.0 10.0 0.0 data_dict['nImg'][0] = n_img 82 1 3697241.0 3697241.0 28.7 data_dict['data'][:n_img] = tmp_arr[frm_ix, 0] 83 1 2297465.0 2297465.0 17.9 data_dict['rawgain'][:n_img] = tmp_arr[frm_ix, 1] 84 1 96.0 96.0 0.0 data_dict['cellId'][:n_img] = cellid[frm_ix] 85 1 44554.0 44554.0 0.3 data_dict['pulseId'][:n_img] = agipd_src['image.pulseId'].ndarray()[frm_ix, 0] 86 1 21841.0 21841.0 0.2 data_dict['trainId'][:n_img] = agipd_src['image.trainId'].ndarray()[frm_ix, 0] 87 88 1 2.0 2.0 0.0 return n_img
682 682 valid_train_ids = im_dc.train_ids 683 683 # Get a count of images in each train 684 684 nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False) 685 nimg_in_trains = nimg_in_trains.astype(int) 685 nimg_in_trains = nimg_in_trains.astype(np.int64) TIL! While I expected
np.int32
, I think I also mixed this up withnp.uint64
and its quirky behaviour (e.g. arithmetic withint
yieldsnp.float64
).Interesting that
int
is equivalent toint64
on a LP64 data model.NumPy treats
int
as shorthand fornp.int_
, which is documented as 'compatible with Python int and C long.' So I think this comes from Python 2, where anint
was what C called along
, andlong
was the unlimited integer type (Python 3int
).I find it much easier to always specify the integer size!
mentioned in commit 2b0ad6ba
changed milestone to %3.12.0