Source code for jdit.parallel.parallel_trainer

# coding=utf-8

from multiprocessing import Pool
import traceback
import copy

[docs]class SupParallelTrainer(object): """ Training parallel .. attr::`default_params` is the default params. .. attr::`unfixed_params_list` is the different params. :param default_params: a ``dict()`` like ``{param_1:d1, param_2:d2 ...}`` :param unfixed_params_list: a ``list`` like ``[{param_1:a1, param_2:a2}, {param_1:b1, param_2:b2}, ...]``. .. note :: You must set the value of ``task_id`` and ``gpu_ids_abs``, regardless in ``default_params`` or ``unfixed_params_list``. ``{'task_id': 1`}`` , ``{'gpu_ids_abs': [0,1]}`` . * For the same ``task_id`` , the tasks will be executed **sequentially** on the certain devices. * For the different ``task_id`` , the will be executed **parallelly** on the certain devices. Example: .. code:: unfixed_params_list = [ {'task_id':1, 'lr':1e-3,'gpu_ids_abs': [0] }, {'task_id':1, 'lr':1e-4,'gpu_ids_abs': [0] }, {'task_id':2, 'lr':1e-5,'gpu_ids_abs': [2,3] }] This set of ``unfixed_params_list`` means that: +------+-----------------------+----------------------+---------------------+ | time | 'task_id':1 | 'task_id':2 | | +======+=======================+======================+=====================+ | t | 'lr':1e-3, | 'lr':1e-5, | executed parallelly | | | 'gpu_ids_abs': [0] | 'gpu_ids_abs': [2,3] | | +------+-----------------------+----------------------+---------------------+ | t+1 | 'lr':1e-4, | \ | | | | 'gpu_ids_abs': [0] | | | +------+-----------------------+----------------------+---------------------+ | | executed sequentially | \ | | +------+-----------------------+----------------------+---------------------+ """ def __init__(self, unfixed_params_list: list, train_func=None): """ :param default_params: a ``dict()`` like {param:v1, param:v2 ...} :param unfixed_params_list: a ``list`` like [{param:v1, param:v2}, {param:v1, param:v2}, ...]. You must set the value of `task_id` and `gpu_ids_abs`, like {'task_id': 1}. {'gpu_ids_abs': [0,1]}, regardless in ``default_params`` or ``unfixed_params_list``. .. note :: You must set the value of `task_id` and `gpu_ids_abs`, like {'task_id': 1}. {'gpu_ids_abs': [0,1]} """ if train_func is not None: self.build_task_trainer_ = train_func candidate_params_list = self._add_logdirs_to_unfixed_params(unfixed_params_list) # [{params1..},{params2...}] self.parallel_plans = self._distribute_task(candidate_params_list) # self.parallel_plans = {task_id:[{param1},{param2}]}
[docs] def build_task_trainer(self, unfixed_params: dict): """You need to write this method to build your own ``Trainer``. This will run in a certain subprocess. The keys of ``params`` are compatible with ``dataset`` , ``Model`` , ``Optimizer`` and ``Trainer`` . You can see parameters in the following example. These two parameters are special. * ``params["logdir"]`` controls the log directory. * ``params["gpu_ids_abs"]`` controls the running devices. You should return a ``Trainer`` when you finish you building. :param params: parameters dictionary. :return: Trainer Example:: # Using ``params['key']`` to build your Trainer. logdir = params["logdir"] # necessary! gpu_ids_abs = params["gpu_ids_abs"] # necessary! use_benchmark = params["use_benchmark"] data_root = params["data_root"] batch_shape = params["batch_shape"] opt_name = params["opt_name"] lr = params["lr"] lr_decay = params["lr_decay"] lr_minimum = params["lr_minimum"] weight_decay = params["weight_decay"] momentum = params["momentum"] betas = params["betas"] init_method = params["init_method"] depth = params["depth"] mid_channels = params["mid_channels"] nepochs = params["nepochs"] torch.backends.cudnn.benchmark = use_benchmark mnist = FashionMNIST(root=data_root, batch_shape=batch_shape) T_net = Model(Tresnet18(depth=depth, mid_channels=mid_channels), gpu_ids_abs=gpu_ids_abs, init_method=init_method) opt = Optimizer(T_net.parameters(), lr, lr_decay, weight_decay, momentum, betas, opt_name, lr_minimum=lr_minimum) Trainer = FashingClassTrainer(logdir, nepochs, gpu_ids_abs, T_net, opt, mnist) # You must return a Trainer! return Trainer """ return self.build_task_trainer_(unfixed_params)
[docs] def train(self, max_processes=4): """start parallel task To start the parallel task that were saved in ``self.parallel_plans`` dictionary. :param max_processes: A max amount of processes for setting ``Pool(processes = ?)`` method. """ # print("Main process ID: %d" % os.getpid()) print('Waiting for all subprocesses done...\n%s' % ('=' * 36)) p = Pool(max_processes) # {(gpu_id1):[{param1}, {param2}], (gpu_id2):[{param1}, {param2}]} for position, parallel_plan in enumerate(self.parallel_plans.items()): # task_id, candidate_params = parallel_plan p.apply_async(self._start_train, (parallel_plan, position), callback=self.finish, error_callback=self.error) # p.map_async(self._start_train, self.parallel_plans.items(), callback=self.finish, error_callback=self.error) p.close() p.join() print('All subprocesses done.')
def _start_train(self, parallel_plan: tuple, position: int): task_id, candidate_params = parallel_plan # task_id, candidate_params = parallel_planChild process ID: nums_tasks = len(candidate_params) # print("Task %s child process ID: %d" % (task_id, os.getpid())) for index, params in enumerate(candidate_params): tag = "CPU" if not params["gpu_ids_abs"] else "GPU%s" % str(params["gpu_ids_abs"]) process_bar_header = ">>>T%d:(%d/%d)|%s" % (task_id, index, nums_tasks, tag) trainer = self.build_task_trainer(params) try: trainer.train(process_bar_header=process_bar_header, process_bar_position=position, subbar_disable=True) except Exception as e: print('str(Exception):\t', str(Exception)) print('repr(e):\t', repr(e)) print(traceback.print_exc()) print('traceback.format_exc():\n%s' % traceback.format_exc()) @staticmethod def _distribute_task(candidate_params_list: list): for params in candidate_params_list: if "task_id" not in params: raise ValueError("You must pass params `task_id` as a key to set a task ID") tasks_plan = dict({}) # (task_id):[t3],(task_id):[t1,t2] for candidate_params in candidate_params_list: task_id = candidate_params["task_id"] if task_id in tasks_plan: # if task_id have been used, append to the former tasks. tasks_plan[task_id].append(candidate_params) else: # if task_id have not been used, create a new task list. tasks_plan[task_id] = [candidate_params] # trainers_plan = list(gpu_used_plan.values) # [[t1,t2],[t3]...] return tasks_plan def _add_logdirs_to_unfixed_params(self, unfixed_params_list: list): final_unfixed_params_list = copy.deepcopy(unfixed_params_list) use_auto_logdir = not "logdir" in unfixed_params_list[0] if use_auto_logdir: print("Build log directories automatically!") for index, params_dict in enumerate(unfixed_params_list): # [dict(),dict()] logdir_name = [] for key, value in params_dict.items(): # params_dict = {p1:1, p2:2} if key == "task_id": continue if key == 'gpu_ids_abs': key = 'gpu' param_name = "=".join([str(key), str(value)]) logdir_name.append(param_name) final_unfixed_params_list[index]["logdir"] = "plog/" + ",".join(logdir_name) else: for index, params_dict in enumerate(unfixed_params_list): # [dict(),dict()] final_unfixed_params_list[index]["logdir"] = self._convert_to_dirname( unfixed_params_list[index]["logdir"]) print("logdir names are:\n\t%s" % "\n\t".join([params["logdir"] for params in final_unfixed_params_list])) return final_unfixed_params_list # [dir1, dir2, dir3] def _convert_to_dirname(self, item: str): dir_name = item.strip() replace_dict = {"*": "", ">": "greater", "<": "smaller", "|": "-", ":": "%", "?": "$", "/": "_", "\\": "_", } for key, value in replace_dict.items(): dir_name = str(dir_name).replace(key, value) if len(dir_name) > 50: import warnings warnings.warn("the length of `dir_name`(%d) is greater than 50." "It will be cut to `dir_name[0:50]`" % len(dir_name)) dir_name = dir_name[0:50] return dir_name
[docs] def finish(self, msg): """When a subprocess finished, it will be called. You can rewrite this method for your purpose. :param msg: fin """ # print("%s finished!" % os.getpid(), msg) pass
[docs] def error(self, msg): """When a subprocess failed, it will be called. You can rewrite this method for your purpose. :param msg: error massage """ print(msg)
