pocket_coffea.executors package

Contents

pocket_coffea.executors package#

Submodules#

pocket_coffea.executors.executors_CLAIX module#

pocket_coffea.executors.executors_DESY_NAF module#

class pocket_coffea.executors.executors_DESY_NAF.ParslCondorExecutorFactory(run_options, outputdir, **kwargs)#

Bases: ExecutorFactoryABC

Parsl executor based on condor for DESY NAF

close()#
customized_args()#
get()#
get_worker_env()#
setup()#

Start the slurm cluster here

pocket_coffea.executors.executors_DESY_NAF.get_executor_factory(executor_name, **kwargs)#

pocket_coffea.executors.executors_RWTH module#

pocket_coffea.executors.executors_T3_CH_PSI module#

class pocket_coffea.executors.executors_T3_CH_PSI.DaskExecutorFactory(run_options, outputdir, **kwargs)#

Bases: ExecutorFactoryABC

At T3_CH_PSI the dask executor is based on slurm

close()#
customized_args()#
get()#
get_worker_env()#
setup()#

Start the DASK cluster here

pocket_coffea.executors.executors_T3_CH_PSI.get_executor_factory(executor_name, **kwargs)#

pocket_coffea.executors.executors_base module#

class pocket_coffea.executors.executors_base.ExecutorFactoryABC(run_options, **kwargs)#

Bases: ABC

close()#
customized_args()#
abstract get()#
set_env()#
setup()#
setup_proxyfile()#
class pocket_coffea.executors.executors_base.FuturesExecutorFactory(run_options, **kwargs)#

Bases: ExecutorFactoryABC

customized_args()#
get()#
class pocket_coffea.executors.executors_base.IterativeExecutorFactory(run_options, **kwargs)#

Bases: ExecutorFactoryABC

get()#
pocket_coffea.executors.executors_base.get_executor_factory(executor_name, **kwargs)#

pocket_coffea.executors.executors_brux module#

pocket_coffea.executors.executors_casa module#

class pocket_coffea.executors.executors_casa.DaskExecutorFactory(run_options, outputdir, **kwargs)#

Bases: ExecutorFactoryABC

close()#
customized_args()#
get()#
setup()#

Start the DASK cluster here

pocket_coffea.executors.executors_casa.get_executor_factory(executor_name, **kwargs)#

pocket_coffea.executors.executors_cern_swan module#

pocket_coffea.executors.executors_infn_af module#

pocket_coffea.executors.executors_lxplus module#

class pocket_coffea.executors.executors_lxplus.DaskExecutorFactory(run_options, outputdir, **kwargs)#

Bases: ExecutorFactoryABC

close()#
customized_args()#
get()#
setup()#

Start the DASK cluster here

class pocket_coffea.executors.executors_lxplus.ExecutorFactoryCondorCERN(run_options, outputdir, **kwargs)#

Bases: ExecutorFactoryManualABC

get()#
prepare_jobs(splits)#
recreate_jobs(jobs_to_recreate)#
submit_jobs(jobs_config)#

Prepare job config and script and submit the jobs to the cluster

pocket_coffea.executors.executors_lxplus.get_executor_factory(executor_name, **kwargs)#
pocket_coffea.executors.executors_lxplus.get_worker_env(run_options, x509_path, exec_name='dask')#
pocket_coffea.executors.executors_lxplus.set_queue(subfile, job, new_queue)#

Rewrite the +JobFlavour line of subfile to new_queue (no-op if already set).

pocket_coffea.executors.executors_lxplus.update_queue(subfile, job)#

pocket_coffea.executors.executors_manual_jobs module#

class pocket_coffea.executors.executors_manual_jobs.ExecutorFactoryManualABC(run_options, outputdir, **kwargs)#

Bases: ABC

close()#
customized_args()#
abstract get()#
abstract prepare_jobs(splits)#
prepare_splitting(filesets)#

Looking at the run options the fileset can be split in different ways. The goal is to have uniform splitting across jobs. We can both split and merge different datasets.

Three splitting modes are supported, in order of precedence:

  1. max-events-per-job as a dict mapping sample name -> events-per-job (with an optional default fallback). In this mode each dataset is split independently so a single job contains files from exactly one sample/dataset. Useful when sample sizes differ by orders of magnitude.

  2. max-events-per-job as a scalar: same as (1) but with a single limit applied to every sample, and jobs may pack multiple datasets together (legacy behaviour).

  3. scaleout: target total number of jobs; the per-job event budget is derived as tot_n_events // scaleout.

abstract recreate_jobs(jobs)#
set_env()#
setup()#
setup_proxyfile()#
submit(config, filesets, outputdir)#
abstract submit_jobs(jobs)#
pocket_coffea.executors.executors_manual_jobs.ensure_job_sh_forwards_inner_yaml(job_sh_path, yaml_filename='inner_run_options.yaml')#

Idempotently rewrite job.sh so the inner pocket-coffea run is invoked with –custom-run-options inner_run_options.yaml.

No-op when the wrapper already references the YAML. Used by –recreate-jobs so an existing jobs_dir (whose wrapper predates this feature) starts honouring inner run-options without a fresh submission.

Returns True when the file was modified, False when already up-to-date.

pocket_coffea.executors.executors_manual_jobs.ensure_sub_transfers_inner_yaml(sub_path, abs_jobdir_path, yaml_filename='inner_run_options.yaml')#

Idempotently append the inner-run-options YAML to a .sub file’s transfer_input_files line. No-op when already present.

Returns True when the file was modified, False when already up-to-date.

pocket_coffea.executors.executors_manual_jobs.write_inner_run_options(jobs_dir, run_options, whitelist=('skip-bad-files', 'tree-reduction'))#

Filter run_options to the whitelist and dump to {jobs_dir}/inner_run_options.yaml.

The file is shipped to every condor worker via transfer_input_files and consumed by the inner pocket-coffea run –custom-run-options inner_run_options.yaml invocation. The whitelist keeps the user’s -ro overrides for Coffea-Runner-side options (e.g. skip-bad-files) flowing into the job while leaving HTCondor / per-job-sizing keys strictly outer-only.

The file is always written (even if no whitelisted key is set) so that job.sh can unconditionally reference it without runtime fallbacks.

Returns the absolute path to the written YAML.

pocket_coffea.executors.executors_oscar module#

pocket_coffea.executors.executors_purdue_af module#

class pocket_coffea.executors.executors_purdue_af.DaskGatewayExecutorFactory(run_options, outputdir, **kwargs)#

Bases: ExecutorFactoryABC

close()#
customized_args()#
get()#
get_worker_env()#
setup()#

Start the DASK cluster here

setup_proxyfile()#
pocket_coffea.executors.executors_purdue_af.get_executor_factory(executor_name, **kwargs)#

pocket_coffea.executors.executors_rubin module#

Module contents#