Skip to content
Snippets Groups Projects

[AGIPD] [Correct] Try to simplify & speed up file reading code

Merged Thomas Kluyver requested to merge fix/agipd-perf-read-file into master
2 unresolved threads

Description

Janusz M's investigation showed that actually reading raw AGIPD data was significantly faster than implied by the timings in our notebooks. So the file-reading code is wasting some time. I investigated this and tried to mitigate it.

The biggest cause seems to be the cell selection. We were constructing an array of frame indexes to use, even when we want to use them all. The indexing causes an extra memory copy. By using a slice instead of an index array when all frames are selected, these two lines go from ~120 seconds on one file to ~15 seconds (IDK why copying in memory could be this slow - I suspect something about Xarray):

data_dict['data'][:n_img] = raw_data[frm_ix, 0]
data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1]

Second, the added complexity of Xarray and the AGIPD component class, which we're not actually using here. I switched to reading numpy arrays to simplify things, which dropped the two lines above from ~15 to ~8 seconds. This includes converting the ints to floats in data.

How Has This Been Tested?

Running the notebook for MID data from run 6796:

xfel-calibrate agipd CORRECT \
  --ctrl-source-template '{}/MDL/FPGA_COMP' \
  --karabo-da AGIPD00 AGIPD01 AGIPD02 AGIPD03 AGIPD04 AGIPD05 AGIPD06 AGIPD07 AGIPD08 AGIPD09 AGIPD10 AGIPD11 AGIPD12 AGIPD13 AGIPD14 AGIPD15 \
  --sequences 0-1 \
  --karabo-id-control MID_EXP_AGIPD1M1 --receiver-template '{}CH0' \
  --compress-fields gain mask data --recast-image-data int16 --round-photons \
  --use-litframe-finder auto --use-super-selection final \
  --use-xgm-device SA2_XTD1_XGM/XGM/DOOCS --adjust-mg-baseline \
  --bias-voltage 300 --blc-set-min --blc-stripes --cm-dark-fraction 0.15 \
  --cm-dark-range -30 30 --cm-n-itr 4 --common-mode --ff-gain 1.0 \
  --force-hg-if-below --force-mg-if-below --hg-hard-threshold 1000 \
  --low-medium-gap --mg-hard-threshold 1000 --overwrite --rel-gain \
  --sequences-per-node 1 --slopes-ff-from-files '' --xray-gain --max-tasks-per-worker 1 \
  --in-folder /gpfs/exfel/exp/MID/202325/p006976/raw --run 50 \
  --out-folder /gpfs/exfel/data/scratch/kluyvert/agipd-corr-p6976-r50 \
  --karabo-id MID_DET_AGIPD1M-1

Relevant Documents (optional)

Timing results from running entire notebook

Original correction of p6796 r50:

Total processing time 1181.5 s
Timing summary per batch of 4 files:
Constants were retrieved in: 6.1 +- 0.00 s
Constants were loaded in : 24.2 +- 0.00 s
Started pool: 0.6 +- 0.00 s
Loading data from files: 96.1 +- 3.63 s
Offset correction: 22.2 +- 0.08 s
Base-line shift correction: 31.9 +- 0.17 s
Common-mode correction: 19.7 +- 0.46 s
Applying selected cells after common mode correction: 51.8 +- 0.34 s
Gain corrections: 38.8 +- 0.63 s
Save: 27.2 +- 0.58 s

Re-running with master today:

Total processing time 1280.5 s
Timing summary per batch of 4 files:
Constants were retrieved in: 6.1 +- 0.00 s
Constants were loaded in : 21.8 +- 0.00 s
Started pool: 1.0 +- 0.00 s
Loading data from files: 107.6 +- 5.36 s
Offset correction: 23.1 +- 0.22 s
Base-line shift correction: 32.9 +- 0.58 s
Common-mode correction: 19.1 +- 0.87 s
Applying selected cells after common mode correction: 58.1 +- 0.40 s
Gain corrections: 40.3 +- 0.59 s
Save: 31.8 +- 1.16 s

After:

Total processing time 1042.3 s
Timing summary per batch of 4 files:
Constants were retrieved in: 6.0 +- 0.00 s
Constants were loaded in : 16.9 +- 0.00 s
Started pool: 0.8 +- 0.00 s
Loading data from files: 37.9 +- 2.05 s
Offset correction: 24.4 +- 0.42 s
Base-line shift correction: 34.6 +- 0.44 s
Common-mode correction: 21.4 +- 0.27 s
Applying selected cells after common mode correction: 61.6 +- 0.23 s
Gain corrections: 41.8 +- 0.47 s
Save: 32.9 +- 0.85 s

i.e. better than a 2x speedup in the loading step, but a relatively small impact on the overall time for correction.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)

Checklist:

  • My code follows the code style of this project.

Reviewers

@schmidtp @ahmedk

Edited by Thomas Kluyver

Merge request reports

Loading
Loading

Activity

Filter activity
  • Approvals
  • Assignees & reviewers
  • Comments (from bots)
  • Comments (from users)
  • Commits & branches
  • Edits
  • Labels
  • Lock status
  • Mentions
  • Merge request status
  • Tracking
  • Thomas Kluyver changed the description

    changed the description

  • Further detail: line profiles of read_file.

    Before these changes
    Timer unit: 1e-06 s
    
    Total time: 132.418 s
    File: /home/kluyvert/.conda/envs/offline-env/lib/python3.8/site-packages/cal_tools/agipdlib.py
    Function: read_file at line 643
    
    Line #      Hits         Time  Per Hit   % Time  Line Contents
    ==============================================================
       643                                               def read_file(self, i_proc: int, file_name: str,
       644                                                             apply_sel_pulses: Optional[bool] = True
       645                                                             ) -> int:
       646                                                   """Read file with raw data to shared memory
       647
       648                                                   :param file_name: Name of input file including path.
       649                                                   :param i_proc: Index of shared memory array.
       650                                                   :param apply_sel_pulses: apply selected pulses before
       651                                                                            all corrections.
       652                                                   :return:
       653                                                       - n_img: The number of images to correct.
       654                                                   """
       655         1         16.0     16.0      0.0          module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
       656         1          3.0      3.0      0.0          agipd_base = self.h5_data_path.format(module_idx)
       657         1          2.0      2.0      0.0          data_dict = self.shared_dict[i_proc]
       658         1         16.0     16.0      0.0          data_dict['moduleIdx'][0] = module_idx
       659
       660         1      63156.0  63156.0      0.0          h5_dc = H5File(file_name)
       661
       662                                                   # Exclude trains without data.
       663         1    1101338.0 1101338.0      0.8          im_dc = h5_dc.select(agipd_base, "image.*", require_all=True)
       664
       665         2        743.0    371.5      0.0          valid_train_ids = self.get_valid_image_idx(
       666         1        322.0    322.0      0.0              im_dc[agipd_base, "image.trainId"])
       667
       668                                                   # filter out trains which will not be selected
       669         2         29.0     14.5      0.0          valid_train_ids = self.cell_sel.filter_trains(
       670         1         16.0     16.0      0.0              np.array(valid_train_ids)).tolist()
       671
       672         1          1.0      1.0      0.0          if not valid_train_ids:
       673                                                       # If there's not a single valid train, exit early.
       674                                                       print(f"WARNING: No valid trains for {im_dc.files} to process.")
       675                                                       data_dict['nImg'][0] = 0
       676                                                       return 0
       677
       678                                                   # Exclude non_valid trains from the selected data collection.
       679         1        834.0    834.0      0.0          im_dc = im_dc.select_trains(by_id(valid_train_ids))
       680
       681                                                   # Just want to be sure that order is correct
       682         1          3.0      3.0      0.0          valid_train_ids = im_dc.train_ids
       683                                                   # Get a count of images in each train
       684         1       1157.0   1157.0      0.0          nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False)
       685         1          6.0      6.0      0.0          nimg_in_trains = nimg_in_trains.astype(int)
       686
       687                                                   # store valid trains in shared memory
       688         1          2.0      2.0      0.0          n_valid_trains = len(valid_train_ids)
       689         1         12.0     12.0      0.0          data_dict["n_valid_trains"][0] = n_valid_trains
       690         1         91.0     91.0      0.0          data_dict["valid_trains"][:n_valid_trains] = valid_train_ids
       691
       692         1          5.0      5.0      0.0          if "AGIPD500K" in agipd_base:
       693                                                       agipd_comp = components.AGIPD500K(im_dc)
       694                                                   else:
       695         1     221801.0 221801.0      0.2              agipd_comp = components.AGIPD1M(im_dc)
       696
       697         1          1.0      1.0      0.0          kw = {
       698         1          2.0      2.0      0.0              "unstack_pulses": False,
       699                                                   }
       700
       701                                                   # get selection for the images in this file
       702         2          3.0      1.5      0.0          cm = (self.cell_sel.CM_NONE if apply_sel_pulses
       703         1          3.0      3.0      0.0                else self.cell_sel.CM_PRESEL)
       704
       705         1      29553.0  29553.0      0.0          cellid = np.squeeze(im_dc[agipd_base, "image.cellId"].ndarray())
       706
       707         2       4290.0   2145.0      0.0          img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains(
       708         1        414.0    414.0      0.0              np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm)
       709         1         34.0     34.0      0.0          data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains
       710
       711         1        514.0    514.0      0.0          frm_ix = np.flatnonzero(img_selected)
       712         1         12.0     12.0      0.0          data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL)
       713         1          2.0      2.0      0.0          n_img = len(frm_ix)
       714
       715                                                   # read raw data
       716                                                   # [n_modules, n_imgs, 2, x, y]
       717         1   11974924.0 11974924.0      9.0          raw_data = agipd_comp.get_array("image.data", **kw)[0]
       718
       719                                                   # store in shmem only selected images
       720         1         13.0     13.0      0.0          data_dict['nImg'][0] = n_img
       721         1   68629020.0 68629020.0     51.8          data_dict['data'][:n_img] = raw_data[frm_ix, 0]
       722         1   50183172.0 50183172.0     37.9          data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1]
       723         4      82093.0  20523.2      0.1          data_dict['cellId'][:n_img] = agipd_comp.get_array(
       724         2          6.0      3.0      0.0              "image.cellId", **kw)[0, frm_ix]
       725         4      61478.0  15369.5      0.0          data_dict['pulseId'][:n_img] = agipd_comp.get_array(
       726         2          3.0      1.5      0.0              "image.pulseId", **kw)[0, frm_ix]
       727         4      63402.0  15850.5      0.0          data_dict['trainId'][:n_img] = agipd_comp.get_array(
       728         2          3.0      1.5      0.0              "image.trainId", **kw)[0, frm_ix]
       729
       730         1          3.0      3.0      0.0          return n_img
    Indexing change only
    Timer unit: 1e-06 s
    
    Total time: 24.8342 s
    File: <ipython-input-64-689693c0871f>
    Function: read_file at line 4
    
    Line #      Hits         Time  Per Hit   % Time  Line Contents
    ==============================================================
         4                                           def read_file(self, i_proc: int, file_name: str,
         5                                                             apply_sel_pulses: Optional[bool] = True
         6                                                             ) -> int:
         7                                                   """Read file with raw data to shared memory
         8
         9                                                   :param file_name: Name of input file including path.
        10                                                   :param i_proc: Index of shared memory array.
        11                                                   :param apply_sel_pulses: apply selected pulses before
        12                                                                            all corrections.
        13                                                   :return:
        14                                                       - n_img: The number of images to correct.
        15                                                   """
        16         1          7.0      7.0      0.0          module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
        17         1          4.0      4.0      0.0          agipd_base = self.h5_data_path.format(module_idx)
        18         1          2.0      2.0      0.0          data_dict = self.shared_dict[i_proc]
        19         1          6.0      6.0      0.0          data_dict['moduleIdx'][0] = module_idx
        20
        21         1        398.0    398.0      0.0          h5_dc = H5File(file_name)
        22
        23                                                   # Exclude trains without data.
        24         1       5399.0   5399.0      0.0          im_dc = h5_dc.select(agipd_base, "image.*", require_all=True)
        25
        26         2        358.0    179.0      0.0          valid_train_ids = self.get_valid_image_idx(
        27         1        308.0    308.0      0.0              im_dc[agipd_base, "image.trainId"])
        28
        29                                                   # filter out trains which will not be selected
        30         2         18.0      9.0      0.0          valid_train_ids = self.cell_sel.filter_trains(
        31         1         17.0     17.0      0.0              np.array(valid_train_ids)).tolist()
        32
        33         1          1.0      1.0      0.0          if not valid_train_ids:
        34                                                       # If there's not a single valid train, exit early.
        35                                                       print(f"WARNING: No valid trains for {im_dc.files} to process.")
        36                                                       data_dict['nImg'][0] = 0
        37                                                       return 0
        38
        39                                                   # Exclude non_valid trains from the selected data collection.
        40         1        814.0    814.0      0.0          im_dc = im_dc.select_trains(by_id(valid_train_ids))
        41
        42                                                   # Just want to be sure that order is correct
        43         1          2.0      2.0      0.0          valid_train_ids = im_dc.train_ids
        44                                                   # Get a count of images in each train
        45         1       1033.0   1033.0      0.0          nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False)
        46         1          7.0      7.0      0.0          nimg_in_trains = nimg_in_trains.astype(np.int64)
        47
        48                                                   # store valid trains in shared memory
        49         1          1.0      1.0      0.0          n_valid_trains = len(valid_train_ids)
        50         1          3.0      3.0      0.0          data_dict["n_valid_trains"][0] = n_valid_trains
        51         1         38.0     38.0      0.0          data_dict["valid_trains"][:n_valid_trains] = valid_train_ids
        52
        53         1          1.0      1.0      0.0          if "AGIPD500K" in agipd_base:
        54                                                       agipd_comp = components.AGIPD500K(im_dc)
        55                                                   else:
        56         1      52159.0  52159.0      0.2              agipd_comp = components.AGIPD1M(im_dc)
        57
        58         1          2.0      2.0      0.0          kw = {
        59         1          2.0      2.0      0.0              "unstack_pulses": False,
        60                                                   }
        61
        62                                                   # get selection for the images in this file
        63         2          3.0      1.5      0.0          cm = (self.cell_sel.CM_NONE if apply_sel_pulses
        64         1          2.0      2.0      0.0                else self.cell_sel.CM_PRESEL)
        65
        66         1      23985.0  23985.0      0.1          cellid = np.squeeze(im_dc[agipd_base, "image.cellId"].ndarray())
        67
        68         2       4289.0   2144.5      0.0          img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains(
        69         1        336.0    336.0      0.0              np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm)
        70         1         20.0     20.0      0.0          data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains
        71
        72         1          4.0      4.0      0.0          data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL)
        73
        74         1         16.0     16.0      0.0          if img_selected.all():
        75                                                       # All frames selected - use slice to skip unnecessary copy
        76         1          4.0      4.0      0.0              frm_ix = np.s_[:]
        77                                                   else:
        78                                                       frm_ix = np.flatnonzero(img_selected)
        79         1        118.0    118.0      0.0          n_img = img_selected.sum()
        80
        81                                                   # read raw data
        82                                                   # [n_modules, n_imgs, 2, x, y]
        83         1    9967626.0 9967626.0     40.1          raw_data = agipd_comp.get_array("image.data", **kw)[0]
        84
        85                                                   # store in shmem only selected images
        86         1          5.0      5.0      0.0          data_dict['nImg'][0] = n_img
        87         1   11935722.0 11935722.0     48.1          data_dict['data'][:n_img] = raw_data[frm_ix, 0]
        88         1    2650431.0 2650431.0     10.7          data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1]
        89         4      74927.0  18731.8      0.3          data_dict['cellId'][:n_img] = agipd_comp.get_array(
        90         2          3.0      1.5      0.0              "image.cellId", **kw)[0, frm_ix]
        91         4      57820.0  14455.0      0.2          data_dict['pulseId'][:n_img] = agipd_comp.get_array(
        92         2          4.0      2.0      0.0              "image.pulseId", **kw)[0, frm_ix]
        93         4      58312.0  14578.0      0.2          data_dict['trainId'][:n_img] = agipd_comp.get_array(
        94         2          4.0      2.0      0.0              "image.trainId", **kw)[0, frm_ix]
        95
        96         1          3.0      3.0      0.0          return n_img
    Loading to numpy array rather than xarray
    Timer unit: 1e-06 s
    
    Total time: 16.7732 s
    File: <ipython-input-68-a8daa11e503a>
    Function: read_file at line 4
    
    Line #      Hits         Time  Per Hit   % Time  Line Contents
    ==============================================================
         4                                           def read_file(self, i_proc: int, file_name: str,
         5                                                             apply_sel_pulses: Optional[bool] = True
         6                                                             ) -> int:
         7                                                   """Read file with raw data to shared memory
         8
         9                                                   :param file_name: Name of input file including path.
        10                                                   :param i_proc: Index of shared memory array.
        11                                                   :param apply_sel_pulses: apply selected pulses before
        12                                                                            all corrections.
        13                                                   :return:
        14                                                       - n_img: The number of images to correct.
        15                                                   """
        16         1          7.0      7.0      0.0          module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
        17         1          4.0      4.0      0.0          agipd_base = self.h5_data_path.format(module_idx)
        18         1          1.0      1.0      0.0          data_dict = self.shared_dict[i_proc]
        19         1          6.0      6.0      0.0          data_dict['moduleIdx'][0] = module_idx
        20
        21         1        405.0    405.0      0.0          h5_dc = H5File(file_name)
        22
        23                                                   # Exclude trains without data.
        24         1       5248.0   5248.0      0.0          im_dc = h5_dc.select(agipd_base, "image.*", require_all=True)
        25
        26         2        359.0    179.5      0.0          valid_train_ids = self.get_valid_image_idx(
        27         1        295.0    295.0      0.0              im_dc[agipd_base, "image.trainId"])
        28
        29                                                   # filter out trains which will not be selected
        30         2         18.0      9.0      0.0          valid_train_ids = self.cell_sel.filter_trains(
        31         1         16.0     16.0      0.0              np.array(valid_train_ids)).tolist()
        32
        33         1          2.0      2.0      0.0          if not valid_train_ids:
        34                                                       # If there's not a single valid train, exit early.
        35                                                       print(f"WARNING: No valid trains for {im_dc.files} to process.")
        36                                                       data_dict['nImg'][0] = 0
        37                                                       return 0
        38
        39                                                   # Exclude non_valid trains from the selected data collection.
        40         1        755.0    755.0      0.0          im_dc = im_dc.select_trains(by_id(valid_train_ids))
        41
        42                                                   # Just want to be sure that order is correct
        43         1          2.0      2.0      0.0          valid_train_ids = im_dc.train_ids
        44                                                   # Get a count of images in each train
        45         1        913.0    913.0      0.0          nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False)
        46         1          7.0      7.0      0.0          nimg_in_trains = nimg_in_trains.astype(np.int64)
        47
        48                                                   # store valid trains in shared memory
        49         1          1.0      1.0      0.0          n_valid_trains = len(valid_train_ids)
        50         1          2.0      2.0      0.0          data_dict["n_valid_trains"][0] = n_valid_trains
        51         1         37.0     37.0      0.0          data_dict["valid_trains"][:n_valid_trains] = valid_train_ids
        52
        53                                           #         if "AGIPD500K" in agipd_base:
        54                                           #             agipd_comp = components.AGIPD500K(im_dc)
        55                                           #         else:
        56                                           #             agipd_comp = components.AGIPD1M(im_dc)
        57
        58
        59                                                   # get selection for the images in this file
        60         2          2.0      1.0      0.0          cm = (self.cell_sel.CM_NONE if apply_sel_pulses
        61         1          2.0      2.0      0.0                else self.cell_sel.CM_PRESEL)
        62
        63         1          5.0      5.0      0.0          agipd_src = im_dc[agipd_base]
        64
        65         1      23661.0  23661.0      0.1          cellid = agipd_src["image.cellId"].ndarray()[:, 0]
        66
        67         2       4420.0   2210.0      0.0          img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains(
        68         1         36.0     36.0      0.0              np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm)
        69         1         25.0     25.0      0.0          data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains
        70
        71         1          3.0      3.0      0.0          data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL)
        72
        73         1        123.0    123.0      0.0          n_img = img_selected.sum()
        74         1         16.0     16.0      0.0          if img_selected.all():
        75                                                       # All frames selected - use slice to skip unnecessary copy
        76         1          4.0      4.0      0.0              frm_ix = np.s_[:]
        77                                                   else:
        78                                                       frm_ix = np.flatnonzero(img_selected)
        79
        80                                                   # read raw data
        81                                                   # [n_imgs, 2, x, y]
        82         1    9430756.0 9430756.0     56.2          raw_data = agipd_src['image.data'].ndarray()
        83
        84                                                   # store in shmem only selected images
        85         1          9.0      9.0      0.0          data_dict['nImg'][0] = n_img
        86         1    4591767.0 4591767.0     27.4          data_dict['data'][:n_img] = raw_data[frm_ix, 0]
        87         1    2649703.0 2649703.0     15.8          data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1]
        88         1         86.0     86.0      0.0          data_dict['cellId'][:n_img] = cellid[frm_ix]
        89         1      35078.0  35078.0      0.2          data_dict['pulseId'][:n_img] = agipd_src['image.pulseId'].ndarray()[frm_ix, 0]
        90         1      29424.0  29424.0      0.2          data_dict['trainId'][:n_img] = agipd_src['image.trainId'].ndarray()[frm_ix, 0]
        91
        92         1          2.0      2.0      0.0          return n_img
  • Nice work! It's unfortunate it doesn't make a huge dent overall, but a big improvement to this particular section. It also means it makes sense to look at the computational sections again.

    Indeed I have encountered similar problems with xarray in the past, e.g. here.

    Currently the call to KeyData.ndarray() is still doing a temporary allocation. Do you think a static buffer may have further benefit here?

    • Currently the call to KeyData.ndarray() is still doing a temporary allocation

      The AGIPD raw data is a mixture of the 'real' data and the raw gain information, which we want to separate out into two different arrays (and convert the 'real' data to floats). When we're keeping all cells, we could read those out with something like .ndarray(out=target, roi=np.s_[0]).

      But I suspect doing two separate reads will be slower than copying it from the temporary array. In particular because a chunk contains both of those together, so doing two reads means HDF5 reading all the chunks twice.

    • Yes, I think you're right with the two reads. Originally I actually meant just having a static buffer to read into, from which it is then copied to shared memory.

    • Ah, gotcha.

      My guess is this wouldn't make a big difference. But I'll try to quickly verify that in the notebook.

    • More difference than I thought, but still not massive: one call to read_file() (with no parallelism) goes from ~16 seconds to ~13 seconds on the second go, once it's reusing memory.

      My gut reaction is that this saving isn't worth keeping another copy of the full data in memory for (and adding the minor complexity of allocating and using that array), but... :shrug:

      Profiling current state of the PR (9bd70343)
      Timer unit: 1e-06 s
      
      Total time: 16.1484 s
      File: <ipython-input-27-89ab80a65e56>
      Function: read_file at line 4
      
      Line #      Hits         Time  Per Hit   % Time  Line Contents
      ==============================================================
           4                                           def read_file(self, i_proc: int, file_name: str,
           5                                                             apply_sel_pulses: Optional[bool] = True
           6                                                             ) -> int:
           7                                                   """Read file with raw data to shared memory
           8
           9                                                   :param file_name: Name of input file including path.
          10                                                   :param i_proc: Index of shared memory array.
          11                                                   :param apply_sel_pulses: apply selected pulses before
          12                                                                            all corrections.
          13                                                   :return:
          14                                                       - n_img: The number of images to correct.
          15                                                   """
          16         1          6.0      6.0      0.0          module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
          17         1          4.0      4.0      0.0          agipd_base = self.h5_data_path.format(module_idx)
          18         1          2.0      2.0      0.0          data_dict = self.shared_dict[i_proc]
          19         1          6.0      6.0      0.0          data_dict['moduleIdx'][0] = module_idx
          20
          21         1       7196.0   7196.0      0.0          h5_dc = H5File(file_name)
          22
          23                                                   # Exclude trains without data.
          24         1      74041.0  74041.0      0.5          im_dc = h5_dc.select(agipd_base, "image.*", require_all=True)
          25
          26         2        374.0    187.0      0.0          valid_train_ids = self.get_valid_image_idx(
          27         1        290.0    290.0      0.0              im_dc[agipd_base, "image.trainId"])
          28
          29                                                   # filter out trains which will not be selected
          30         2         17.0      8.5      0.0          valid_train_ids = self.cell_sel.filter_trains(
          31         1         16.0     16.0      0.0              np.array(valid_train_ids)).tolist()
          32
          33         1          1.0      1.0      0.0          if not valid_train_ids:
          34                                                       # If there's not a single valid train, exit early.
          35                                                       print(f"WARNING: No valid trains for {im_dc.files} to process.")
          36                                                       data_dict['nImg'][0] = 0
          37                                                       return 0
          38
          39                                                   # Exclude non_valid trains from the selected data collection.
          40         1        816.0    816.0      0.0          im_dc = im_dc.select_trains(by_id(valid_train_ids))
          41
          42                                                   # Just want to be sure that order is correct
          43         1          2.0      2.0      0.0          valid_train_ids = im_dc.train_ids
          44                                                   # Get a count of images in each train
          45         1        940.0    940.0      0.0          nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False)
          46         1          6.0      6.0      0.0          nimg_in_trains = nimg_in_trains.astype(np.int64)
          47
          48                                                   # store valid trains in shared memory
          49         1          1.0      1.0      0.0          n_valid_trains = len(valid_train_ids)
          50         1          3.0      3.0      0.0          data_dict["n_valid_trains"][0] = n_valid_trains
          51         1         39.0     39.0      0.0          data_dict["valid_trains"][:n_valid_trains] = valid_train_ids
          52
          53                                                   # get selection for the images in this file
          54         2          2.0      1.0      0.0          cm = (self.cell_sel.CM_NONE if apply_sel_pulses
          55         1          2.0      2.0      0.0                else self.cell_sel.CM_PRESEL)
          56
          57         1          5.0      5.0      0.0          agipd_src = im_dc[agipd_base]
          58
          59         1      22992.0  22992.0      0.1          cellid = agipd_src["image.cellId"].ndarray()[:, 0]
          60
          61         2       4712.0   2356.0      0.0          img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains(
          62         1        114.0    114.0      0.0              np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm)
          63
          64         1         21.0     21.0      0.0          data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains
          65         1          3.0      3.0      0.0          data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL)
          66
          67         1        120.0    120.0      0.0          n_img = img_selected.sum()
          68         1         15.0     15.0      0.0          if img_selected.all():
          69                                                       # All frames selected - use slice to skip unnecessary copy
          70         1          3.0      3.0      0.0              frm_ix = np.s_[:]
          71                                                   else:
          72                                                       frm_ix = np.flatnonzero(img_selected)
          73
          74                                                   # read raw data
          75                                                   # [n_imgs, 2, x, y]
          76         1    9770154.0 9770154.0     60.5          raw_data = agipd_src['image.data'].ndarray()
          77
          78                                                   # store in shmem only selected images
          79         1         10.0     10.0      0.0          data_dict['nImg'][0] = n_img
          80         1    3809884.0 3809884.0     23.6          data_dict['data'][:n_img] = raw_data[frm_ix, 0]
          81         1    2377308.0 2377308.0     14.7          data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1]
          82         1         96.0     96.0      0.0          data_dict['cellId'][:n_img] = cellid[frm_ix]
          83         1      56848.0  56848.0      0.4          data_dict['pulseId'][:n_img] = agipd_src['image.pulseId'].ndarray()[frm_ix, 0]
          84         1      22387.0  22387.0      0.1          data_dict['trainId'][:n_img] = agipd_src['image.trainId'].ndarray()[frm_ix, 0]
          85
          86         1          2.0      2.0      0.0          return n_img
      With preallocated buffer - first use
      Timer unit: 1e-06 s
      
      Total time: 15.8114 s
      File: <ipython-input-34-114898917ff6>
      Function: read_file at line 6
      
      Line #      Hits         Time  Per Hit   % Time  Line Contents
      ==============================================================
           6                                           def read_file(self, i_proc: int, file_name: str,
           7                                                             apply_sel_pulses: Optional[bool] = True
           8                                                             ) -> int:
           9                                                   """Read file with raw data to shared memory
          10
          11                                                   :param file_name: Name of input file including path.
          12                                                   :param i_proc: Index of shared memory array.
          13                                                   :param apply_sel_pulses: apply selected pulses before
          14                                                                            all corrections.
          15                                                   :return:
          16                                                       - n_img: The number of images to correct.
          17                                                   """
          18         1          8.0      8.0      0.0          module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
          19         1          3.0      3.0      0.0          agipd_base = self.h5_data_path.format(module_idx)
          20         1          2.0      2.0      0.0          data_dict = self.shared_dict[i_proc]
          21         1          7.0      7.0      0.0          data_dict['moduleIdx'][0] = module_idx
          22
          23         1       7792.0   7792.0      0.0          h5_dc = H5File(file_name)
          24
          25                                                   # Exclude trains without data.
          26         1      66134.0  66134.0      0.4          im_dc = h5_dc.select(agipd_base, "image.*", require_all=True)
          27
          28         2        350.0    175.0      0.0          valid_train_ids = self.get_valid_image_idx(
          29         1        291.0    291.0      0.0              im_dc[agipd_base, "image.trainId"])
          30
          31                                                   # filter out trains which will not be selected
          32         2         16.0      8.0      0.0          valid_train_ids = self.cell_sel.filter_trains(
          33         1         16.0     16.0      0.0              np.array(valid_train_ids)).tolist()
          34
          35         1          1.0      1.0      0.0          if not valid_train_ids:
          36                                                       # If there's not a single valid train, exit early.
          37                                                       print(f"WARNING: No valid trains for {im_dc.files} to process.")
          38                                                       data_dict['nImg'][0] = 0
          39                                                       return 0
          40
          41                                                   # Exclude non_valid trains from the selected data collection.
          42         1        812.0    812.0      0.0          im_dc = im_dc.select_trains(by_id(valid_train_ids))
          43
          44                                                   # Just want to be sure that order is correct
          45         1          2.0      2.0      0.0          valid_train_ids = im_dc.train_ids
          46                                                   # Get a count of images in each train
          47         1        922.0    922.0      0.0          nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False)
          48         1          7.0      7.0      0.0          nimg_in_trains = nimg_in_trains.astype(np.int64)
          49
          50                                                   # store valid trains in shared memory
          51         1          2.0      2.0      0.0          n_valid_trains = len(valid_train_ids)
          52         1          3.0      3.0      0.0          data_dict["n_valid_trains"][0] = n_valid_trains
          53         1         37.0     37.0      0.0          data_dict["valid_trains"][:n_valid_trains] = valid_train_ids
          54
          55                                                   # get selection for the images in this file
          56         2          2.0      1.0      0.0          cm = (self.cell_sel.CM_NONE if apply_sel_pulses
          57         1          2.0      2.0      0.0                else self.cell_sel.CM_PRESEL)
          58
          59         1          4.0      4.0      0.0          agipd_src = im_dc[agipd_base]
          60
          61         1      22713.0  22713.0      0.1          cellid = agipd_src["image.cellId"].ndarray()[:, 0]
          62
          63         2       4560.0   2280.0      0.0          img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains(
          64         1         47.0     47.0      0.0              np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm)
          65
          66         1         19.0     19.0      0.0          data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains
          67         1          3.0      3.0      0.0          data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL)
          68
          69         1        121.0    121.0      0.0          n_img = img_selected.sum()
          70         1         15.0     15.0      0.0          if img_selected.all():
          71                                                       # All frames selected - use slice to skip unnecessary copy
          72         1          3.0      3.0      0.0              frm_ix = np.s_[:]
          73                                                   else:
          74                                                       frm_ix = np.flatnonzero(img_selected)
          75
          76                                                   # read raw data
          77                                                   # [n_imgs, 2, x, y]
          78         1    9647374.0 9647374.0     61.0          agipd_src['image.data'].ndarray(out=tmp_arr)
          79
          80                                                   # store in shmem only selected images
          81         1         10.0     10.0      0.0          data_dict['nImg'][0] = n_img
          82         1    3695789.0 3695789.0     23.4          data_dict['data'][:n_img] = tmp_arr[frm_ix, 0]
          83         1    2299796.0 2299796.0     14.5          data_dict['rawgain'][:n_img] = tmp_arr[frm_ix, 1]
          84         1        103.0    103.0      0.0          data_dict['cellId'][:n_img] = cellid[frm_ix]
          85         1      33207.0  33207.0      0.2          data_dict['pulseId'][:n_img] = agipd_src['image.pulseId'].ndarray()[frm_ix, 0]
          86         1      31190.0  31190.0      0.2          data_dict['trainId'][:n_img] = agipd_src['image.trainId'].ndarray()[frm_ix, 0]
          87
          88         1          2.0      2.0      0.0          return n_img
      With preallocated buffer - reuse
      Timer unit: 1e-06 s
      
      Total time: 12.8709 s
      File: <ipython-input-34-114898917ff6>
      Function: read_file at line 6
      
      Line #      Hits         Time  Per Hit   % Time  Line Contents
      ==============================================================
           6                                           def read_file(self, i_proc: int, file_name: str,
           7                                                             apply_sel_pulses: Optional[bool] = True
           8                                                             ) -> int:
           9                                                   """Read file with raw data to shared memory
          10
          11                                                   :param file_name: Name of input file including path.
          12                                                   :param i_proc: Index of shared memory array.
          13                                                   :param apply_sel_pulses: apply selected pulses before
          14                                                                            all corrections.
          15                                                   :return:
          16                                                       - n_img: The number of images to correct.
          17                                                   """
          18         1          7.0      7.0      0.0          module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
          19         1          3.0      3.0      0.0          agipd_base = self.h5_data_path.format(module_idx)
          20         1          1.0      1.0      0.0          data_dict = self.shared_dict[i_proc]
          21         1          7.0      7.0      0.0          data_dict['moduleIdx'][0] = module_idx
          22
          23         1       7204.0   7204.0      0.1          h5_dc = H5File(file_name)
          24
          25                                                   # Exclude trains without data.
          26         1      36890.0  36890.0      0.3          im_dc = h5_dc.select(agipd_base, "image.*", require_all=True)
          27
          28         2        366.0    183.0      0.0          valid_train_ids = self.get_valid_image_idx(
          29         1        294.0    294.0      0.0              im_dc[agipd_base, "image.trainId"])
          30
          31                                                   # filter out trains which will not be selected
          32         2         17.0      8.5      0.0          valid_train_ids = self.cell_sel.filter_trains(
          33         1         17.0     17.0      0.0              np.array(valid_train_ids)).tolist()
          34
          35         1          1.0      1.0      0.0          if not valid_train_ids:
          36                                                       # If there's not a single valid train, exit early.
          37                                                       print(f"WARNING: No valid trains for {im_dc.files} to process.")
          38                                                       data_dict['nImg'][0] = 0
          39                                                       return 0
          40
          41                                                   # Exclude non_valid trains from the selected data collection.
          42         1        808.0    808.0      0.0          im_dc = im_dc.select_trains(by_id(valid_train_ids))
          43
          44                                                   # Just want to be sure that order is correct
          45         1          2.0      2.0      0.0          valid_train_ids = im_dc.train_ids
          46                                                   # Get a count of images in each train
          47         1        915.0    915.0      0.0          nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False)
          48         1          6.0      6.0      0.0          nimg_in_trains = nimg_in_trains.astype(np.int64)
          49
          50                                                   # store valid trains in shared memory
          51         1          1.0      1.0      0.0          n_valid_trains = len(valid_train_ids)
          52         1          3.0      3.0      0.0          data_dict["n_valid_trains"][0] = n_valid_trains
          53         1         38.0     38.0      0.0          data_dict["valid_trains"][:n_valid_trains] = valid_train_ids
          54
          55                                                   # get selection for the images in this file
          56         2          2.0      1.0      0.0          cm = (self.cell_sel.CM_NONE if apply_sel_pulses
          57         1          2.0      2.0      0.0                else self.cell_sel.CM_PRESEL)
          58
          59         1          4.0      4.0      0.0          agipd_src = im_dc[agipd_base]
          60
          61         1      22870.0  22870.0      0.2          cellid = agipd_src["image.cellId"].ndarray()[:, 0]
          62
          63         2       4566.0   2283.0      0.0          img_selected, nimg_in_trains = self.cell_sel.get_cells_on_trains(
          64         1        153.0    153.0      0.0              np.array(valid_train_ids), nimg_in_trains, cellid, cm=cm)
          65
          66         1         20.0     20.0      0.0          data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains
          67         1          3.0      3.0      0.0          data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL)
          68
          69         1        126.0    126.0      0.0          n_img = img_selected.sum()
          70         1         16.0     16.0      0.0          if img_selected.all():
          71                                                       # All frames selected - use slice to skip unnecessary copy
          72         1          4.0      4.0      0.0              frm_ix = np.s_[:]
          73                                                   else:
          74                                                       frm_ix = np.flatnonzero(img_selected)
          75
          76                                                   # read raw data
          77                                                   # [n_imgs, 2, x, y]
          78         1    6735341.0 6735341.0     52.3          agipd_src['image.data'].ndarray(out=tmp_arr)
          79
          80                                                   # store in shmem only selected images
          81         1         10.0     10.0      0.0          data_dict['nImg'][0] = n_img
          82         1    3697241.0 3697241.0     28.7          data_dict['data'][:n_img] = tmp_arr[frm_ix, 0]
          83         1    2297465.0 2297465.0     17.9          data_dict['rawgain'][:n_img] = tmp_arr[frm_ix, 1]
          84         1         96.0     96.0      0.0          data_dict['cellId'][:n_img] = cellid[frm_ix]
          85         1      44554.0  44554.0      0.3          data_dict['pulseId'][:n_img] = agipd_src['image.pulseId'].ndarray()[frm_ix, 0]
          86         1      21841.0  21841.0      0.2          data_dict['trainId'][:n_img] = agipd_src['image.trainId'].ndarray()[frm_ix, 0]
          87
          88         1          2.0      2.0      0.0          return n_img
    • Quite right, thanks for testing!

    • Please register or sign in to reply
682 682 valid_train_ids = im_dc.train_ids
683 683 # Get a count of images in each train
684 684 nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False)
685 nimg_in_trains = nimg_in_trains.astype(int)
685 nimg_in_trains = nimg_in_trains.astype(np.int64)
  • Was this casted further below to int64 implicitly before?

  • Numpy treats int as equivalent to int64 (maybe depending on your CPU or your Python build?). This is just making it more explicit, because I had to check what this actually did.

  • TIL! While I expected np.int32, I think I also mixed this up with np.uint64 and its quirky behaviour (e.g. arithmetic with int yields np.float64).

    Interesting that int is equivalent to int64 on a LP64 data model.

  • NumPy treats int as shorthand for np.int_, which is documented as 'compatible with Python int and C long.' So I think this comes from Python 2, where an int was what C called a long, and long was the unlimited integer type (Python 3 int).

    I find it much easier to always specify the integer size!

  • Please register or sign in to reply
  • merged

  • Thomas Kluyver mentioned in commit 2b0ad6ba

    mentioned in commit 2b0ad6ba

  • Philipp Schmidt changed milestone to %3.12.0

    changed milestone to %3.12.0

  • Please register or sign in to reply
    Loading