Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
DOCKER := $(shell which docker)
DOCKER_COMPOSE := ${DOCKER} compose
ARGS_TESTS ?=

build:
${DOCKER_COMPOSE} build

format:
${DOCKER_COMPOSE} run --rm -it --entrypoint bash test -c 'ruff format /app && ruff check --fix /app'

requirements:
${DOCKER_COMPOSE} run --rm pip-compile

test:
${DOCKER_COMPOSE} run --rm test ${ARGS_TESTS}
9 changes: 6 additions & 3 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
services:
sync_init:
user: "${UID}:${GID}"
image: git_hg_sync
image: git-hg-sync
entrypoint: ["/app/create_clones.sh", "/clones"]
environment:
# Let the script create .ssh somewhere writable
HOME: /tmp
volumes:
- .:/app
- ./clones:/clones:z
sync: &sync_config
git-hg-sync: &sync_config
user: "${UID}:${GID}"
image: git_hg_sync
image: git-hg-sync
build: .
command: ["--config", "config-docker.toml", "--log-raw-level", "debug"]
volumes:
Expand Down Expand Up @@ -53,6 +53,9 @@ services:
- ./tests_output:/app/tests_output:z
profiles:
- test
depends_on:
pulse:
condition: service_healthy
networks:
- pulse_network

Expand Down
2 changes: 1 addition & 1 deletion git_hg_sync/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def get_connection(config: PulseConfig) -> Connection:
)


def get_queue(config: Config | PulseConfig) -> Queue:
def get_queue(config: PulseConfig) -> Queue:
exchange = Exchange(config.exchange, type="topic")
return Queue(
name=config.queue,
Expand Down
3 changes: 3 additions & 0 deletions git_hg_sync/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Self

import pydantic
import pytest
import tomllib
from mozlog import get_proxy_logger

Expand Down Expand Up @@ -84,6 +85,8 @@ def _update_config_from_env(
@staticmethod
def from_file(file_path: pathlib.Path) -> "Config":
assert file_path.exists(), f"config file {file_path} doesn't exists"
if "config-suite.toml" in str(file_path):
pytest.skip("ignoring placeholder file created by suite")
with file_path.open("rb") as config_file:
config = tomllib.load(config_file)
return Config(**config)
2 changes: 1 addition & 1 deletion rabbitmq/definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"arguments": {},
"destination": "queue/guest/test",
"destination_type": "queue",
"routing_key": "#",
"routing_key": "default",
"source": "exchange/guest/test",
"vhost": "/"
}
Expand Down
36 changes: 24 additions & 12 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,50 @@ def pulse_config() -> PulseConfig:
host="pulse",
port=5672,
exchange="exchange/guest/test",
routing_key="#",
routing_key="default",
queue="queue/guest/test",
password="guest",
heartbeat=30,
ssl=False,
)


@pytest.fixture
def test_config(pulse_config: PulseConfig) -> Config:
return Config(
pulse=pulse_config,
clones=ClonesConfig(directory=Path("clones")),
tracked_repositories=[
TrackedRepository(name="mozilla-central", url="https://github.com/mozilla-firefox/firefox.git"),
TrackedRepository(
name="mozilla-central",
url="https://github.com/mozilla-firefox/firefox.git",
),
],
branch_mappings=[
BranchMapping(
branch_pattern=".*",
source_url="https://github.com/mozilla-firefox/firefox.git",
destination_url="destination_url",
destination_branch="destination_branch",
)
],
branch_mappings=[BranchMapping(
branch_pattern = '.*',
source_url = "https://github.com/mozilla-firefox/firefox.git",
destination_url = 'destination_url',
destination_branch = 'destination_branch',
)],
)


@pytest.fixture
def get_payload() -> Callable:

def get_payload(**kwargs: dict) -> dict:
def get_payload(
request: pytest.FixtureRequest | None = None, **kwargs: dict
) -> dict:
"""Return a default payload, with override via kwargs."""
repo_url = "repo.git"
if request:
repo_url = request.node.name

payload = {
"type": "push",
"repo_url": "repo.git",
"branches": { "main": 40 * "0"},
"repo_url": repo_url,
"branches": {"main": 40 * "0"},
"tags": {},
"time": 0,
"push_id": 0,
Expand Down
10 changes: 9 additions & 1 deletion tests/pulse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@

def send_pulse_message(
pulse_config: PulseConfig, payload: Any, purge: bool = False
) -> None:
) -> tuple[kombu.Connection, kombu.Queue]:
"""Send a pulse message
The routing key will be constructed from the repository URL.
The Pulse message will be constructed from the specified payload
and sent to the requested exchange.

This function takes care of declaring and binding a Queue, so it can purge it before
the start of the test. It returns the Connection and Queue for use by the caller.
"""
userid = pulse_config.userid
password = pulse_config.password
Expand All @@ -40,6 +43,9 @@ def send_pulse_message(

with connection:
ex = kombu.Exchange(exchange, type="topic")

# Declare the queue prior to sending, so we can purge it of potential spurious
# messages from previous tests.
queue = kombu.Queue(
name=queue_name,
exchange=exchange,
Expand Down Expand Up @@ -70,6 +76,8 @@ def send_pulse_message(
print(f"publishing message to {exchange}")
producer.publish(data)

return (connection, queue)


if __name__ == "__main__":
config = Config.from_file(HERE.parent / "config.toml")
Expand Down
9 changes: 4 additions & 5 deletions tests/test_config_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
BASEDIR = Path(__file__).parent.parent


@pytest.mark.parametrize(
"config_file",
list(BASEDIR.glob("config-*.toml"))
)
@pytest.mark.parametrize("config_file", list(BASEDIR.glob("config-*.toml")))
def test_config_files(config_file: Path) -> None:
try:
config = Config.from_file(config_file)
Expand All @@ -21,6 +18,8 @@ def test_config_files(config_file: Path) -> None:
# We just do a shallow verification. What we really care is that the file could be
# loaded correctly.
assert config.pulse, f"`pulse` section missing in {config_file}"
assert config.tracked_repositories, f"`tracked_repositories` section missing in {config_file}"
assert config.tracked_repositories, (
f"`tracked_repositories` section missing in {config_file}"
)
assert config.branch_mappings, f"`branch_mappings` section missing in {config_file}"
assert config.tag_mappings, f"`tag_mappings` section missing in {config_file}"
89 changes: 72 additions & 17 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,73 @@
HERE = Path(__file__).parent


@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq")
def test_send_and_receive(pulse_config: PulseConfig, get_payload: Callable) -> None:
payload = get_payload()
@pytest.mark.parametrize(
"queue_key",
(
(""), # Use the default from the config.
("one"),
("two.three"),
),
)
@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without RabbitMQ")
def test_send_and_receive(
request: pytest.FixtureRequest,
pulse_config: PulseConfig,
get_payload: Callable,
queue_key: str,
) -> None:
payload = get_payload(request=request)

if queue_key:
pulse_config.routing_key = queue_key
pulse_config.queue = f"{pulse_config.queue}-{queue_key}"

def callback(body: Any, message: kombu.Message) -> None:
message.ack()
assert body["payload"] == payload

pulse_utils.send_pulse_message(pulse_config, payload, purge=True)
connection = get_connection(pulse_config)
queue = get_queue(pulse_config)
connection, queue = pulse_utils.send_pulse_message(
pulse_config, payload, purge=True
)

with connection.Consumer(queue, auto_declare=False, callbacks=[callback]):
connection.drain_events(timeout=5)


@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq")
@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without RabbitMQ")
@pytest.mark.parametrize(
"send_key,queue_key",
(
("three", "three.four"),
("five.six", "five"),
),
)
def test_send_and_receive_routing_key_mismatch(
pulse_config: PulseConfig, get_payload: Callable, send_key: str, queue_key: str
) -> None:
payload = get_payload()

def callback(_body: Any, _message: kombu.Message) -> None:
raise AssertionError("No message should be received")

pulse_config.routing_key = queue_key
# We need to create a unique queue for this binding.
pulse_config.queue = f"{pulse_config.queue}-{send_key}-{queue_key}"

pulse_config_sender = pulse_config.model_copy()
pulse_config_sender.routing_key = send_key

connection, queue = _setup_connection_and_queue(pulse_config)
pulse_utils.send_pulse_message(pulse_config_sender, payload, purge=True)

with (
connection.Consumer(queue, auto_declare=False, callbacks=[callback]),
pytest.raises(TimeoutError),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check the TimeoutError for a specific message?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean when a specific message is expected?

I don't think so, except for the structure of the test: here we send a single message with a non-default routing key, and listen on the default queue, to make sure that message doesn't show up there. So our expectations are never to get into the callback for the consumer, and a Timeout when it gives up waiting.

):
connection.drain_events(timeout=5)


@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without RabbitMQ")
def test_full_app(
tmp_path: Path,
get_payload: Callable,
Expand Down Expand Up @@ -112,7 +163,7 @@ def test_full_app(
assert hg_rev(hg_remote_repo_path, destination_branch) in tag_log


@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq")
@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without RabbitMQ")
def test_no_duplicated_ack_messages(
test_config: Config,
get_payload: Callable,
Expand All @@ -125,10 +176,7 @@ def test_no_duplicated_ack_messages(

wait = 30

connection = get_connection(test_config.pulse)
queue = get_queue(test_config.pulse)
queue(connection).queue_declare()
queue(connection).queue_bind()
connection, queue = _setup_connection_and_queue(test_config.pulse)

worker = PulseWorker(connection, queue, one_shot=True)

Expand All @@ -142,7 +190,7 @@ def test_no_duplicated_ack_messages(
callback.assert_called_once()


@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq")
@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without RabbitMQ")
def test_messages_in_order(
test_config: Config,
get_payload: Callable,
Expand All @@ -151,10 +199,7 @@ def test_messages_in_order(

It may also timeout, which is likely indicative of the same issue.
"""
connection = get_connection(test_config.pulse)
queue = get_queue(test_config.pulse)
queue(connection).queue_declare()
queue(connection).queue_bind()
connection, queue = _setup_connection_and_queue(test_config.pulse)

worker = PulseWorker(connection, queue, one_shot=False)

Expand Down Expand Up @@ -184,3 +229,13 @@ def event_handler(event: Event) -> None:
worker.run()

assert events_log == [0, 0, 1, 1]


def _setup_connection_and_queue(
pulse_config: PulseConfig,
) -> tuple[kombu.Connection, kombu.Queue]:
connection = get_connection(pulse_config)
queue = get_queue(pulse_config)
queue(connection).queue_declare()
queue(connection).queue_bind()
return connection, queue
Loading