Consume
Consume messages from an AMQP queue.
Requires maxDuration or maxRecords.
type: "io.kestra.plugin.amqp.Consume"Examples
id: amqp_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.amqp.Consume
host: localhost
port: 5672
username: guest
password: guest
virtualHost: /my_vhost
queue: kestramqp.queue
maxRecords: 1000
Properties
host *Requiredstring
The broker host
queue *Requiredstring
The queue to pull messages from
serdeType *Requiredstring
STRINGSTRINGJSONSerialization format
Defines how message payloads are serialized or deserialized. Use STRING for plain text or JSON for structured data.
consumerTag string
KestraA client-generated consumer tag to establish context
maxDuration string
durationMaximum duration
The maximum duration the consumer will run before stopping. This is a soft limit evaluated approximately every 100 milliseconds, so the actual duration may slightly exceed this value.
maxRecords integerstring
Maximum number of records
The maximum number of messages to consume before stopping. This is a soft limit evaluated after each message.
password string
The broker password
port string
5672The broker port
username string
The broker username
virtualHost string
/The broker virtual host
Outputs
count integer
Number of rows consumed
uri string
uriFile URI containing consumed messages
Metrics
consumed.records counter
The total number of records consumed from the AMQP queue.