Skip to content

Commit

Permalink
update doc of core (alibaba#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
xieyxclack authored May 5, 2022
1 parent 58b6051 commit 2252a58
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 22 deletions.
15 changes: 11 additions & 4 deletions federatedscope/core/fed_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@

class FedRunner(object):
"""
This class is used for building up the Federated Learning course
This class is used to construct an FL course, which includes `_set_up` and `run`.
Arguments:
data: The data used in the FL courses, which are formatted as {'ID':data} for standalone mode. More details can be found in federatedscope.core.auxiliaries.data_builder .
server_class: The server class is used for instantiating a (customized) server.
client_class: The client class is used for instantiating a (customized) client.
config: The configurations of the FL course.
"""
def __init__(self,
data,
Expand Down Expand Up @@ -39,7 +45,7 @@ def __init__(self,

def _setup_for_standalone(self):
"""
To set up server and client for standalone mode
To set up server and client for standalone mode.
"""
self.server = self._setup_server()

Expand All @@ -65,7 +71,7 @@ def _setup_for_standalone(self):

def _setup_for_distributed(self):
"""
To set up server or client for distributed mode
To set up server or client for distributed mode.
"""
self.server_address = {
'host': self.cfg.distribute.server_host,
Expand All @@ -83,7 +89,8 @@ def _setup_for_distributed(self):

def run(self):
"""
To run an FL course, which is called after server/client has been set up
To run an FL course, which is called after server/client has been set up.
For the standalone mode, a shared message queue will be set up to simulate ``receiving message``.
"""
if self.mode == 'standalone':
# trigger the FL course
Expand Down
2 changes: 1 addition & 1 deletion federatedscope/core/worker/base_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class Worker(object):
"""
The base worker class
The base worker class.
"""
def __init__(self, ID=-1, state=0, config=None, model=None, strategy=None):
self._ID = ID
Expand Down
55 changes: 48 additions & 7 deletions federatedscope/core/worker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
class Client(Worker):
"""
The Client class, which describes the behaviors of client in an FL course.
The attributes include:
The behaviors are described by the handling functions (named as callback_funcs_for_xxx)
Arguments:
ID: The unique ID of the client, which is assigned by the server when joining the FL course
server_id: (Default) 0
state: The training round
config: the configuration
config: The configuration
data: The data owned by the client
model: The local model
model: The model maintained locally
device: The device to run local training and evaluation
strategy: redundant attribute
The behaviors are described by the handled functions (named as callback_funcs_for_xxx)
"""
def __init__(self,
ID=-1,
Expand Down Expand Up @@ -100,7 +101,11 @@ def __init__(self,

def register_handlers(self, msg_type, callback_func):
"""
To bind a message type with a handled function
To bind a message type with a handling function.
Arguments:
msg_type (str): The defined message type
callback_func: The handling functions to handle the received message
"""
self.msg_handlers[msg_type] = callback_func

Expand All @@ -119,7 +124,7 @@ def _register_default_handlers(self):

def join_in(self):
"""
To send 'join_in' message to the server
To send 'join_in' message to the server for joining in the FL course.
"""
self.comm_manager.send(
Message(msg_type='join_in',
Expand All @@ -129,7 +134,7 @@ def join_in(self):

def run(self):
"""
To wait for the messages and handle them (for distributed mode)
To listen to the message and handle them accordingly (used for distributed mode)
"""
while True:
msg = self.comm_manager.receive()
Expand All @@ -140,6 +145,12 @@ def run(self):
break

def callback_funcs_for_model_para(self, message: Message):
"""
The handling function for receiving model parameters, which triggers the local training process. This handling function is widely used in various FL courses.
Arguments:
message: The received message, which includes sender, receiver, state, and content. More detail can be found in federatedscope.core.message
"""
if 'ss' in message.msg_type:
# A fragment of the shared secret
state, content = message.state, message.content
Expand Down Expand Up @@ -240,12 +251,24 @@ def callback_funcs_for_model_para(self, message: Message):
content=(sample_size, model_para_all)))

def callback_funcs_for_assign_id(self, message: Message):
"""
The handling function for receiving the client_ID assigned by the server (during the joining process), which is used in the distributed mode.
Arguments:
message: The received message
"""
content = message.content
self.ID = int(content)
logger.info('Client (address {}:{}) is assigned with #{:d}.'.format(
self.comm_manager.host, self.comm_manager.port, self.ID))

def callback_funcs_for_join_in_info(self, message: Message):
"""
The handling function for receiving the request of join in information (such as batch_size, num_of_samples) during the joining process.
Arguments:
message: The received message
"""
requirements = message.content
join_in_info = dict()
for requirement in requirements:
Expand All @@ -267,12 +290,24 @@ def callback_funcs_for_join_in_info(self, message: Message):
content=join_in_info))

def callback_funcs_for_address(self, message: Message):
"""
The handling function for receiving other clients' IP addresses, which is used for constructing a complex topology
Arguments:
message: The received message
"""
content = message.content
for neighbor_id, address in content.items():
if int(neighbor_id) != self.ID:
self.comm_manager.add_neighbors(neighbor_id, address)

def callback_funcs_for_evaluate(self, message: Message):
"""
The handling function for receiving the request of evaluating
Arguments:
message: The received message
"""
sender = message.sender
self.state = message.state
if message.content != None:
Expand Down Expand Up @@ -321,6 +356,12 @@ def callback_funcs_for_evaluate(self, message: Message):
content=metrics))

def callback_funcs_for_finish(self, message: Message):
"""
The handling function for receiving the signal of finishing the FL course
Arguments:
message: The received message
"""
logger.info(
"================= receiving Finish Message ============================"
)
Expand Down
70 changes: 60 additions & 10 deletions federatedscope/core/worker/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
class Server(Worker):
"""
The Server class, which describes the behaviors of server in an FL course.
The attributes include:
The behaviors are described by the handled functions (named as callback_funcs_for_xxx).
Arguments:
ID: The unique ID of the server, which is set to 0 by default
state: The training round
config: the configuration
Expand All @@ -30,7 +32,6 @@ class Server(Worker):
total_round_num: The total number of the training round
device: The device to run local training and evaluation
strategy: redundant attribute
The behaviors are described by the handled functions (named as callback_funcs_for_xxx)
"""
def __init__(self,
ID=-1,
Expand Down Expand Up @@ -158,7 +159,11 @@ def register_noise_injector(self, func):

def register_handlers(self, msg_type, callback_func):
"""
To bind a message type with a handled function
To bind a message type with a handling function.
Arguments:
msg_type (str): The defined message type
callback_func: The handling functions to handle the received message
"""
self.msg_handlers[msg_type] = callback_func

Expand All @@ -170,7 +175,7 @@ def _register_default_handlers(self):

def run(self):
"""
To start the FL course, listen and handle messages (for distributed mode)
To start the FL course, listen and handle messages (for distributed mode).
"""

# Begin: Broadcast model parameters and start to FL train
Expand Down Expand Up @@ -226,7 +231,10 @@ def check_and_move_on(self,
check_eval_result=False,
min_received_num=None):
"""
To check the message_buffer, when enough messages are receiving, trigger some events (such as perform aggregation, evaluation, and move to the next training round)
To check the message_buffer. When enough messages are receiving, some events (such as perform aggregation, evaluation, and move to the next training round) would be triggered.
Arguments:
check_eval_result (bool): If True, check the message buffer for evaluation; and check the message buffer for training otherwise.
"""
if min_received_num is None:
min_received_num = self._cfg.federate.sample_client_num
Expand Down Expand Up @@ -307,8 +315,9 @@ def check_and_move_on(self,

def check_and_save(self):
"""
To save the results and save model after each evaluation
To save the results and save model after each evaluation.
"""

# early stopping
should_stop = False

Expand Down Expand Up @@ -341,6 +350,10 @@ def check_and_save(self):
self.state += 1

def save_best_results(self):
"""
To Save the best evaluation results.
"""

if self._cfg.federate.save_to != '':
self.aggregator.save_model(self._cfg.federate.save_to, self.state)
formatted_best_res = self._monitor.format_eval_res(
Expand All @@ -359,12 +372,13 @@ def save_formatted_results(self, formatted_res):

def merge_eval_results_from_all_clients(self, final_round=False):
"""
Merge evaluation results from all clients,
update best, log the merged results and save then into eval_results.log
Merge evaluation results from all clients, update best, log the merged results and save then into eval_results.log
:param final_round:
:return:
Arguments:
final_round (bool): Whether it is the final round of training
:returns: the formatted merged results
"""

round = max(self.msg_buffer['eval'].keys())
eval_msg_buffer = self.msg_buffer['eval'][round]
metrics_all_clients = dict()
Expand Down Expand Up @@ -403,7 +417,12 @@ def broadcast_model_para(self,
sample_client_num=-1):
"""
To broadcast the message to all clients or sampled clients
Arguments:
msg_type: 'model_para' or other user defined msg_type
sample_client_num: the number of sampled clients in the broadcast behavior. And sample_client_num = -1 denotes to broadcast to all the clients.
"""

if sample_client_num > 0:
receiver = np.random.choice(np.arange(1, self.client_num + 1),
size=sample_client_num,
Expand Down Expand Up @@ -442,6 +461,7 @@ def broadcast_client_address(self):
"""
To broadcast the communication addresses of clients (used for additive secret sharing)
"""

self.comm_manager.send(
Message(msg_type='address',
sender=self.ID,
Expand All @@ -463,6 +483,7 @@ def check_buffer(self,
:returns: Whether enough messages have been received or not
:rtype: bool
"""

if check_eval_result:
if 'eval' not in self.msg_buffer.keys() or len(
self.msg_buffer['eval'].keys()) == 0:
Expand All @@ -479,6 +500,10 @@ def check_buffer(self,
return True

def check_client_join_in(self):
"""
To check whether all the clients have joined in the FL course.
"""

if len(self._cfg.federate.join_in_info) != 0:
return len(self.join_in_info) == self.client_num
else:
Expand All @@ -488,6 +513,7 @@ def trigger_for_start(self):
"""
To start the FL course when the expected number of clients have joined
"""

if self.check_client_join_in():
if self._cfg.federate.use_ss:
self.broadcast_client_address()
Expand Down Expand Up @@ -518,6 +544,7 @@ def eval(self):
"""
To conduct evaluation. When cfg.federate.make_global_eval=True, a global evaluation is conducted by the server.
"""

if self._cfg.federate.make_global_eval:
# By default, the evaluation is conducted one-by-one for all internal models;
# for other cases such as ensemble, override the eval function
Expand Down Expand Up @@ -550,6 +577,14 @@ def eval(self):
self.broadcast_model_para(msg_type='evaluate')

def callback_funcs_model_para(self, message: Message):
"""
The handling function for receiving model parameters, which triggers check_and_move_on (perform aggregation when enough feedback has been received).
This handling function is widely used in various FL courses.
Arguments:
message: The received message, which includes sender, receiver, state, and content. More detail can be found in federatedscope.core.message
"""

round, sender, content = message.state, message.sender, message.content
# For a new round
if round not in self.msg_buffer['train'].keys():
Expand All @@ -563,6 +598,14 @@ def callback_funcs_model_para(self, message: Message):
return self.check_and_move_on()

def callback_funcs_for_join_in(self, message: Message):
"""
The handling function for receiving the join in information. The server might request for some information (such as num_of_samples) if necessary, assign IDs for the servers.
If all the clients have joined in, the training process will be triggered.
Arguments:
message: The received message
"""

if 'info' in message.msg_type:
sender, info = message.sender, message.content
for key in self._cfg.federate.join_in_info:
Expand Down Expand Up @@ -598,6 +641,13 @@ def callback_funcs_for_join_in(self, message: Message):
self.trigger_for_start()

def callback_funcs_for_metrics(self, message: Message):
"""
The handling function for receiving the evaluation results, which triggers check_and_move_on (perform aggregation when enough feedback has been received).
Arguments:
message: The received message
"""

round, sender, content = message.state, message.sender, message.content

if round not in self.msg_buffer['eval'].keys():
Expand Down

0 comments on commit 2252a58

Please sign in to comment.