How to split one Celery task into multiple tasks
Let's talk about splitting the task.process_message
into one processing task
and multiple tasks handling the specific events. The main benefit from this
will be the possibility to use more queues (for faster/slower tasks)
and to differentiate tasks easily (Sentry).
Workflow 1
- get webhook payload or message from message bus
- add aditional info which can make determining the event type easier (GH webhooks - X-GitHub-Event)
- send it to Celery as
- this task will take care about:- parsing of the event into object of class Event (this class provides
properties, which enables us to make the checks) - private repository check
- getting the handlers which handle the event
- whitelist check
- possibly creating records in DB (explained below)
- (running
here to filter out some events?) - sending specific task to Celery
- parsing of the event into object of class Event (this class provides
- handle specific tasks, which directly run the handlers
How to pass the information needed by handlers (already discussed)
Handlers are currently using the information from event (with project
properties, specific for each event), service config and job config object:
class JobHandler(Handler):
def __init__(
self, config: ServiceConfig, job_config: Optional[JobConfig], event: Event
Possible solutions, which can be somehow combined:
serialize the info about objects and pass it into
- this would need serializing and then again deserializing
save the info about objects in DB and pass IDs of models into
what models does make sense to have? possibilities:
- project
- package config
- service config
- job config
- event
each subclass of class Event stores different set of info -> does it make sense to create model for each?
these could be reused since in
table we store the event dict
pass just the arguments which are required by the specific handler
What needs to be done
- create functions for serializing and deserializing the objects needed by each handler:
- service config
- package config
- job config
- project
- for each event create method which serializes and deserializes event specific data
- after doing the checks and getting the handlers pass arguments to
for each handler instead of running it:
# serialize objects
serialized_config = self.config.serialize()
handler = handler_kls(...)
if handler.pre_check():
kwargs={"config": serialized_config,
"job_config": serialized_job_config,
"package_config": serialized_package_config,
"project": serialized_project,
"specific_info": info_dict}
- create task for each handler -> create functions handling tasks:
def process_message(self, ...):
# get objects from serialized data
handler = CoprBuildStartHandler(...)
- change the code in handlers to handle changed attributes correctly (have project, config, job config, package config and specific data in separate attributes , not everthing in original event object)
How the transition could be done in smaller steps
- implement the helper functions
- serializing and deserializing common data (configs)
- serializing event-specific data
- change the code in handlers without using Event class, so that the project and configs are separate attributes as well as event-specific info is deserialized into attributes
- when the changed handler code works, implement the division of 1 task into more tasks (described above - create functions for processing each task)
Workflow 2
- get webhook payload or message from message bus
- add aditional info which can make determining the event type easier (GH webhooks - X-GitHub-Event)
- send it to Celery as
- this task will take care about:- parsing of the info needed for the event object (doing event specific pre-check?)
- send event specific task to Celery with all arguments needed to create object of specific event
- event specific tasks will take care about:
- private repository check
- getting the handlers which handle the event
- whitelist check
- run the handlers
What needs to be done
- in each specific parser function send task to Celery:
def parse_pr_event(event):
commit_sha = nested_get(event, "pull_request", "head", "sha")
https_url = event["repository"]["html_url"]
kwargs={"action": PullRequestAction[action],
"pr_id": pr_id,
"base_repo_namespace": base_repo_namespace,
"base_repo_name": base_repo_name,
"base_ref": base_ref,
"target_repo_namespace": target_repo_namespace,
"target_repo_name": target_repo_name,
"https_url": https_url,
"commit_sha": commit_sha,
- create task for each event -> create functions handling the event specific tasks, which create event object and do the things written above
def process_message(self, ...):
event = PullRequestGHEvent(...)
# process method would contain code moved from process_message(),
# the checks and running jobs
Which tasks do we want to have?
- task x job (what has to be done in general, independently on the trigger and git forge):
- propose_downstream
- build/copr_build
- sync_from_downstream
- production_build
- add_to_whitelist
- tests
- report_test_results
- pull_request_action
- copr_build_finished
- copr_build_started
- task x handler (which handler handles the task)
- ReleaseCoprBuildHandler
- PullRequestCoprBuildHandler
- PushCoprBuildHandler
- GitHubPullRequestCommentCoprBuildHandler
- CoprBuildStartHandler
- CoprBuildEndHandler
- GithubAppInstallationHandler
- GitHubPullRequestCommentTestingFarmHandler
- GithubTestingFarmHandler
- TestingFarmResultsHandler
- GitHubIssueCommentProposeUpdateHandler
- ProposeDownstreamHandler
- ReleaseGithubKojiBuildHandler
- PullRequestGithubKojiBuildHandler
- PushGithubKojiBuildHandler
- NewDistGitCommitHandler
- PagurePullRequestCommentCoprBuildHandler
- task x event (what event triggered the task)
- InstallationEvent
- CoprBuildEvent
- KojiBuildEvent
- TestingFarmResultEvent
- ReleaseEvent
- PushGithubGHEvent
- PullRequestGHEvent
- PullRequestCommentGHEvent
- IssueCommentEvent
- PushPagureEvent
- PullRequestCommentPagureEvent
- PullRequestPagureEvent