Patchwork D6924: ci: store job start parameters in DynamoDB

login
register
mail settings
Submitter phabricator
Date Oct. 1, 2019, 3:58 a.m.
Message ID <differential-rev-PHID-DREV-7kww3efg2bcrzi4qk3gn-req@mercurial-scm.org>
Download mbox | patch
Permalink /patch/41869/
State New
Headers show

Comments

phabricator - Oct. 1, 2019, 3:58 a.m.
indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This makes it vastly easier to retry or retrigger a job,
  since the instance launch state will be stored in the
  database instead of in an ad-hoc SQS message.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D6924

AFFECTED FILES
  contrib/ci/lambda_functions/ci.py
  contrib/ci/terraform/job_executor.tf

CHANGE DETAILS




To: indygreg, #hg-reviewers
Cc: mercurial-devel

Patch

diff --git a/contrib/ci/terraform/job_executor.tf b/contrib/ci/terraform/job_executor.tf
--- a/contrib/ci/terraform/job_executor.tf
+++ b/contrib/ci/terraform/job_executor.tf
@@ -16,6 +16,11 @@ 
   runtime = "python3.7"
   timeout = 60
   role = aws_iam_role.lambda_ci_run_pending_job.arn
+  environment {
+    variables = {
+      DYNAMODB_JOB_TABLE = aws_dynamodb_table.ci_job.name
+    }
+  }
 }
 
 resource "aws_cloudwatch_log_group" "lambda_ci_run_pending_job" {
@@ -63,6 +68,16 @@ 
     ]
     resources = ["*"]
   }
+  # Allow querying job state in DynamoDB.
+  statement {
+    effect = "Allow"
+    actions = [
+      "dynamodb:GetItem",
+    ]
+    resources = [
+      aws_dynamodb_table.ci_job.arn,
+    ]
+  }
 }
 
 resource "aws_iam_role_policy" "lambda_ci_handle_pending_job" {
diff --git a/contrib/ci/lambda_functions/ci.py b/contrib/ci/lambda_functions/ci.py
--- a/contrib/ci/lambda_functions/ci.py
+++ b/contrib/ci/lambda_functions/ci.py
@@ -54,16 +54,17 @@ 
 def handle_pending_job(event, context):
     """Handler for starting a job from an SQS message."""
     ec2 = boto3.client('ec2')
+    dynamodb = boto3.resource('dynamodb')
+
+    job_table = dynamodb.Table(os.environ['DYNAMODB_JOB_TABLE'])
 
     for record in event['Records']:
         body = record['body']
 
         data = json.loads(body)
-        user_data_template = data['user_data_template']
-        user_data_params = data['user_data_params']
-        ec2_instance_config = data['ec2_instance_launch_config']
+        job_id = data['job_id']
 
-        start_pending_job(ec2, user_data_template, user_data_params, ec2_instance_config)
+        start_pending_job(ec2, job_table, job_id)
 
 
 def handle_job_result_s3_artifact(event, context):
@@ -409,12 +410,6 @@ 
             'SecurityGroups': ['hg-linux-dev-1'],
         }
 
-        job_params = {
-            'user_data_template': LINUX_USER_DATA,
-            'user_data_params': user_data_params,
-            'ec2_instance_launch_config': config,
-        }
-
         schedule_time = decimal.Decimal(time.time())
 
         print('registering job in DynamoDB')
@@ -426,17 +421,35 @@ 
             'build_number': build_number,
             'execution_state': 'pending',
             'schedule_time': schedule_time,
+            # We encode as JSON because DynamoDB doesn't like some types
+            # like empty strings.
+            'start_params': json.dumps({
+                'user_data_template': LINUX_USER_DATA,
+                'user_data_params': user_data_params,
+                'ec2_instance_launch_config': config,
+            }, sort_keys=True),
         })
 
         print('adding job to pending queue')
         sqs.send_message(
             QueueUrl=sqs_url,
-            MessageBody=json.dumps(job_params, sort_keys=True)
+            MessageBody=json.dumps({'job_id': job_id})
         )
 
 
-def start_pending_job(ec2, user_data_template, user_data_params, ec2_instance_config):
+def start_pending_job(ec2, job_table, job_id):
     """Called to request the start of a pending job."""
+    res = job_table.get_item(Key={'job_id': job_id}, ConsistentRead=True)
+    if 'Item' not in res:
+        print('unable to find job %s' % job_id)
+        return
+
+    job = res['Item']
+    start_params = json.loads(job['start_params'])
+    user_data_template = start_params['user_data_template']
+    user_data_params = start_params['user_data_params']
+    ec2_instance_config = start_params['ec2_instance_launch_config']
+
     user_data = user_data_template.format(**user_data_params)
 
     print('requesting spot instance for job %s' % user_data_params['job_id'])