aio_microservice.amqp.testing

Attributes

Classes

AmqpBroker

TestAmqpBroker

A class to test RabbitMQ brokers.

Module Contents

aio_microservice.amqp.testing.ServiceT
class aio_microservice.amqp.testing.AmqpBroker(broker)[source]
Parameters:

broker (faststream.rabbit.RabbitBroker)

__getattr__(attr)[source]
Parameters:

attr (str)

Return type:

Any

__setattr__(attr, val)[source]

Implement setattr(self, name, value).

Parameters:
  • attr (str)

  • val (Any)

Return type:

None

get_published_messages(queue=None, exchange=None)[source]
Parameters:
  • queue (str | None)

  • exchange (str | None)

Return type:

list[Any]

reset_published_messages()[source]
Return type:

None

class aio_microservice.amqp.testing.TestAmqpBroker(service, with_real=False)[source]

Bases: faststream.rabbit.TestRabbitBroker

A class to test RabbitMQ brokers.

Parameters:
  • service (ServiceT)

  • with_real (bool)

async __aenter__()[source]
Return type:

AmqpBroker