Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limit queue lengths to prevent overloading Redis #884

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: 2
jobs:
bot_test:
docker:
- image: python:3.7-slim
- image: python:3.7
auth:
username: $DOCKER_USER
password: $DOCKER_PASS
Expand All @@ -12,11 +12,6 @@ jobs:
username: $DOCKER_USER
password: $DOCKER_PASS
steps:
- run:
name: install git
command: |
apt update
apt install -y git
- checkout
- run:
name: skip build if no changes
Expand Down Expand Up @@ -89,7 +84,7 @@ jobs:
# https://circleci.com/docs/2.0/building-docker-images/
bot_build_container:
docker:
- image: docker:20.10.23
- image: docker:18.05.0-ce
auth:
username: $DOCKER_USER
password: $DOCKER_PASS
Expand Down
42 changes: 21 additions & 21 deletions bot/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
FROM python:3.7-slim

RUN apt update && \
apt-get install --no-install-recommends -y \
supervisor \
git && \
python -m pip install --upgrade pip && \
pip install \
--no-cache-dir \
--root-user-action=ignore \
cryptography===37.0.4 \
poetry===1.1.15 && \
poetry config virtualenvs.in-project true && \
groupadd kodiak && \
useradd --uid 1000 --gid kodiak kodiak && \
mkdir -p /var/app && \
chown -R kodiak:kodiak /var/app
FROM python:3.7@sha256:6eaf19442c358afc24834a6b17a3728a45c129de7703d8583392a138ecbdb092

WORKDIR /var/app
RUN set -ex && mkdir -p /var/app

RUN apt-get update && apt-get install -y supervisor

RUN mkdir -p /var/log/supervisor

# use cryptography version for poetry that doesn't require Rust
RUN python3 -m pip install cryptography===37.0.4
RUN python3 -m pip install poetry===1.1.13

RUN poetry config virtualenvs.in-project true

COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf

COPY --chown=kodiak pyproject.toml poetry.lock ./
WORKDIR /var/app

COPY pyproject.toml poetry.lock /var/app/

# install deps
RUN poetry install

COPY --chown=kodiak . ./
COPY . /var/app

USER kodiak
# workaround for: https://github.com/sdispater/poetry/issues/1123
RUN rm -rf /var/app/pip-wheel-metadata/

# install cli
RUN poetry install

CMD ["/usr/bin/supervisord"]
2 changes: 2 additions & 0 deletions bot/kodiak/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def __call__(
"USAGE_REPORTING_QUEUE_LENGTH", cast=int, default=10_000
)
INGEST_QUEUE_LENGTH = config("INGEST_QUEUE_LENGTH", cast=int, default=1_000)
WEBHOOK_QUEUE_LENGTH = config("WEBHOOK_QUEUE_LENGTH", cast=int, default=250)
MERGE_QUEUE_LENGTH = config("MERGE_QUEUE_LENGTH", cast=int, default=100)
REDIS_BLOCKING_POP_TIMEOUT_SEC = config(
"REDIS_BLOCKING_POP_TIMEOUT_SEC", cast=int, default=10
)
Expand Down
19 changes: 14 additions & 5 deletions bot/kodiak/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def get_ingest_queue(installation_id: int) -> str:


RETRY_RATE_SECONDS = 2
ONE_DAY = int(timedelta(days=1).total_seconds())
TWELVE_HOURS_IN_SECONDS = int(timedelta(hours=12).total_seconds())


def installation_id_from_queue(queue_name: str) -> str:
Expand Down Expand Up @@ -321,11 +323,14 @@ async def dequeue() -> None:
await redis_bot.zrem(webhook_event.get_merge_queue_name(), webhook_event.json())

async def requeue() -> None:
queue_name = webhook_event.get_webhook_queue_name()
await redis_bot.zadd(
webhook_event.get_webhook_queue_name(),
queue_name,
{webhook_event.json(): time.time()},
nx=True,
)
await redis_bot.zremrangebyrank(queue_name, conf.WEBHOOK_QUEUE_LENGTH, -1)
await redis_bot.expire(queue_name, TWELVE_HOURS_IN_SECONDS)

async def queue_for_merge(*, first: bool) -> Optional[int]:
return await webhook_queue.enqueue_for_repo(event=webhook_event, first=first)
Expand Down Expand Up @@ -390,11 +395,14 @@ async def dequeue() -> None:
await redis_bot.zrem(webhook_event.get_merge_queue_name(), webhook_event.json())

async def requeue() -> None:
queue_name = webhook_event.get_webhook_queue_name()
await redis_bot.zadd(
webhook_event.get_webhook_queue_name(),
queue_name,
{webhook_event.json(): time.time()},
nx=True,
)
await redis_bot.zremrangebyrank(queue_name, conf.WEBHOOK_QUEUE_LENGTH, -1)
await redis_bot.expire(queue_name, TWELVE_HOURS_IN_SECONDS)

async def queue_for_merge(*, first: bool) -> Optional[int]:
raise NotImplementedError
Expand Down Expand Up @@ -449,9 +457,6 @@ def find_position(x: typing.Iterable[T], v: T) -> typing.Optional[int]:
return None


ONE_DAY = int(timedelta(days=1).total_seconds())


@dataclass(frozen=True)
class TaskMeta:
kind: Literal["repo", "webhook"]
Expand Down Expand Up @@ -523,6 +528,8 @@ async def enqueue(self, *, event: WebhookEvent) -> None:
async with redis_bot.pipeline(transaction=True) as pipe:
pipe.sadd(WEBHOOK_QUEUE_NAMES, queue_name)
pipe.zadd(queue_name, {event.json(): time.time()}, nx=True)
pipe.zremrangebyrank(queue_name, conf.WEBHOOK_QUEUE_LENGTH, -1)
pipe.expire(queue_name, TWELVE_HOURS_IN_SECONDS)
await pipe.execute()
log = logger.bind(
owner=event.repo_owner,
Expand Down Expand Up @@ -558,6 +565,8 @@ async def enqueue_for_repo(
# use only_if_not_exists to prevent changing queue positions on new
# webhook events.
pipe.zadd(queue_name, {event.json(): time.time()}, nx=True)
pipe.zremrangebyrank(queue_name, conf.WEBHOOK_QUEUE_LENGTH, -1)
pipe.expire(queue_name, TWELVE_HOURS_IN_SECONDS)
pipe.zrange(queue_name, 0, 1000, withscores=True)
results = await pipe.execute()
log = logger.bind(
Expand Down
5 changes: 5 additions & 0 deletions bot/kodiak/refresh_pull_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
a GitHub webhook would for each pull request to trigger the bot to reevaluate
the mergeability.
"""

from __future__ import annotations

import asyncio
import logging
import sys
import time
from datetime import timedelta
from typing import cast

import sentry_sdk
Expand Down Expand Up @@ -60,6 +62,8 @@

logger = structlog.get_logger()

TWELVE_HOURS_IN_SECONDS = int(timedelta(hours=12).total_seconds())

# we query for both organization repositories and user repositories because we
# do not know of the installation is a user or an organization. We filter to
# private repositories only because we may not have access to all public
Expand Down Expand Up @@ -162,6 +166,7 @@ async def refresh_pull_requests_for_installation(*, installation_id: str) -> Non
{event.json(): time.time()},
nx=True,
)
await redis_bot.expire(event.get_webhook_queue_name(), TWELVE_HOURS_IN_SECONDS)
logger.info(
"pull_requests_refreshed",
installation_id=installation_id,
Expand Down
3 changes: 0 additions & 3 deletions bot/supervisord.conf
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
[supervisord]
nodaemon=true
user=kodiak

[program:ingest]
command=/var/app/.venv/bin/python -m kodiak.entrypoints.ingest
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stdout
stdout_logfile_maxbytes=0

[program:worker]
command=/var/app/.venv/bin/python -m kodiak.entrypoints.worker
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stdout
stdout_logfile_maxbytes=0