Welcome to Gollum’s documentation!

_images/gollum.png

What is Gollum?

Gollum originally started as a tool to MUL-tiplex LOG-files (read it backwards to get the name). It quickly evolved to a one-way router for all kinds of messages, not limited to just logs. Gollum is written in Go to make it scaleable and easy to extend without the need to use a scripting language.

Terminology

The main components of Gollum are consumers, streams and producers. To explain these it helps imagineing to look at Gollum “from the outside”.

  • A consumer “consumes” message, i.e. it reads data from some external service or e.g. listens to a port.
  • A producer “produces” messages, i.e. it writes data to some external service or e.g. to disk.
  • A stream defines a path between one or more consumers and one or more producers.
  • A single set of data passing over a stream is called a message.
_images/flow.png

These main components, consumers, producers and streams are build upon a plugin architecture. This allows each component to be exchanged and configured individually. Every plugin has a different sets of options. Streams for example may define filters that can inspect a message to decide wether to drop the message or to let it pass. Producers and streams may use formatters to modify a message’s content and e.g. convert a plain-text log to JSON. Filters and Formatters are plugins, too, but can only be configured in context of another plugin like a stream. As of this they are called “nested plugins”. These plugins have access to all configuration options of their “host” plugin.

Configuration

A Gollum configuration file is written in YAML and may contain any number of plugins. Multiple plugins of the same type are possible, too. The Gollum core does not make any assumption over the type of data you are processing. Plugins however may do that. So it is up to the person configuring Gollum to ensure valid data is passed from consumers to producers. Formatters can help to achieve this.

Running Gollum

Gollum goes into an infinte loop once started. You can shutdown gollum by sending a SIG_INT, i.e. Ctrl+C, SIG_TERM or SIG_KILL. Gollum has several commandline options that can be accessed by starting Gollum without any paramters:

-c, –config=”“
Use a given configuration file.
-h, –help
Print this help message.
-ll, –loglevel=0
Set the loglevel [0-3]. Higher levels produce more messages.
-m, –metrics=0
Port to use for metric queries. Set 0 to disable.
-n, –numcpu=0
Number of CPUs to use. Set 0 for all CPUs.
-p, –pidfile=”“
Write the process id into a given file.
-pc, –profilecpu=”“
Write CPU profiler results to a given file.
-pm, –profilemem=”“
Write heap profile results to a given file.
-ps, –profilespeed
Write msg/sec measurements to log.
-r, –report
Print detailed version report and exit.
-tc, –testconfig=”“
Test a given configuration file and exit.
-tr, –trace
Write trace results to a given file.
-v, –version
Print version information and exit.

Table of contents

Consumers

Console

This consumer reads from stdin. A message is generated after each newline character. When attached to a fuse, this consumer will stop accepting messages in case that fuse is burned.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this consumer to be found by other plugins by name. By default this is set to “” which does not register this consumer.
Stream
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to “*” which means only producers set to consume “all streams” will get these messages.
Fuse
Fuse defines the name of a fuse to observe for this consumer. Producer may “burn” the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to “” by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.
Console
Console defines the pipe to read from. This can be “stdin” or the name of a named pipe that is created if not existing. The default is “stdin”.
Permissions
Permissions accepts an octal number string that contains the unix file permissions used when creating a named pipe. By default this is set to “0664”.
ExitOnEOF
ExitOnEOF can be set to true to trigger an exit signal if StdIn is closed (e.g. when a pipe is closed). This is set to false by default.
Example
- "consumer.Console":
    Enable: true
    ID: ""
    Fuse: ""
    Stream:
        - "foo"
        - "bar"
    Console: "stdin"
    Permissions: "0664"
    ExitOnEOF: false

File

The file consumer allows to read from files while looking for a delimiter that marks the end of a message. If the file is part of e.g. a log rotation the file consumer can be set to a symbolic link of the latest file and (optionally) be told to reopen the file by sending a SIGHUP. A symlink to a file will automatically be reopened if the underlying file is changed. When attached to a fuse, this consumer will stop accepting messages in case that fuse is burned.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this consumer to be found by other plugins by name. By default this is set to “” which does not register this consumer.
Stream
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to “*” which means only producers set to consume “all streams” will get these messages.
Fuse
Fuse defines the name of a fuse to observe for this consumer. Producer may “burn” the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to “” by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.
File
File is a mandatory setting and contains the file to read. The file will be read from beginning to end and the reader will stay attached until the consumer is stopped. I.e. appends to the attached file will be recognized automatically.
DefaultOffset
DefaultOffset defines where to start reading the file. Valid values are “oldest” and “newest”. If OffsetFile is defined the DefaultOffset setting will be ignored unless the file does not exist. By default this is set to “newest”.
OffsetFile
OffsetFile defines the path to a file that stores the current offset inside the given file. If the consumer is restarted that offset is used to continue reading. By default this is set to “” which disables the offset file.
Delimiter
Delimiter defines the end of a message inside the file. By default this is set to “n”.
Example
- "consumer.File":
    Enable: true
    ID: ""
    Fuse: ""
    Stream:
        - "foo"
        - "bar"
    File: "/var/run/system.log"
    DefaultOffset: "Newest"
    OffsetFile: ""
    Delimiter: "\n"

Http

This consumer opens up an HTTP 1.1 server and processes the contents of any incoming HTTP request. When attached to a fuse, this consumer will return error 503 in case that fuse is burned.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this consumer to be found by other plugins by name. By default this is set to “” which does not register this consumer.
Stream
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to “*” which means only producers set to consume “all streams” will get these messages.
Fuse
Fuse defines the name of a fuse to observe for this consumer. Producer may “burn” the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to “” by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.
Address
Address stores the host and port to bind to. This is allowed be any ip address/dns and port like “localhost:5880”. By default this is set to “:80”.
ReadTimeoutSec
ReadTimeoutSec specifies the maximum duration in seconds before timing out the HTTP read request. By default this is set to 3 seconds.
WithHeaders
WithHeaders can be set to false to only read the HTTP body instead of passing the whole HTTP message. By default this setting is set to true.
Htpasswd
Htpasswd can be set to the htpasswd formatted file to enable HTTP BasicAuth.
BasicRealm
BasicRealm can be set for HTTP BasicAuth.
Certificate
Certificate defines a path to a root certificate file to make this consumer handle HTTPS connections. Left empty by default (disabled). If a Certificate is given, a PrivateKey must be given, too.
PrivateKey
PrivateKey defines a path to the private key used for HTTPS connections. Left empty by default (disabled). If a Certificate is given, a PrivatKey must be given, too.
Example
- "consumer.Http":
    Enable: true
    ID: ""
    Fuse: ""
    Stream:
        - "foo"
        - "bar"
    Address: ":80"
    ReadTimeoutSec: 3
    WithHeaders: true
    Htpasswd: ""
    BasicRealm: ""
    Certificate: ""
    PrivateKey: ""

Kafka

Thes consumer reads data from a given kafka topic. It is based on the sarama library so most settings are mapped to the settings from this library. When attached to a fuse, this consumer will stop processing messages in case that fuse is burned.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this consumer to be found by other plugins by name. By default this is set to “” which does not register this consumer.
Stream
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to “*” which means only producers set to consume “all streams” will get these messages.
Fuse
Fuse defines the name of a fuse to observe for this consumer. Producer may “burn” the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to “” by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.
Topic
Topic defines the kafka topic to read from. By default this is set to “default”.
ClientId
ClientId sets the client id of this consumer. By default this is “gollum”.
GroupId
GroupId sets the consumer group of this consumer. By default this is “” which disables consumer groups. This requires Version to be >= 0.9.
Version
Version defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. Defaults to “0.8.2”, or if GroupId is set “0.9.0.1”. If the version given is not known, the closest possible version is chosen. If GroupId is set and this is < “0.9”, “0.9.0.1” will be used.
DefaultOffset
DefaultOffset defines where to start reading the topic. Valid values are “oldest” and “newest”. If OffsetFile is defined the DefaultOffset setting will be ignored unless the file does not exist. By default this is set to “newest”. Ignored when using GroupId.
OffsetFile
OffsetFile defines the path to a file that stores the current offset inside a given partition. If the consumer is restarted that offset is used to continue reading. By default this is set to “” which disables the offset file. Ignored when using GroupId.
FolderPermissions
FolderPermissions is used to create the offset file path if necessary. Set to 0755 by default. Ignored when using GroupId.
Ordered
Ordered can be set to enforce partitions to be read one-by-one in a round robin fashion instead of reading in parallel from all partitions. Set to false by default. Ignored when using GroupId.
PrependKey
PrependKey can be enabled to prefix the read message with the key from the kafka message. A separator will ba appended to the key. See KeySeparator. By default this is option set to false.
KeySeparator
KeySeparator defines the separator that is appended to the kafka message key if PrependKey is set to true. Set to “:” by default.
MaxOpenRequests
MaxOpenRequests defines the number of simultaneous connections are allowed. By default this is set to 5.
ServerTimeoutSec
ServerTimeoutSec defines the time after which a connection is set to timed out. By default this is set to 30 seconds.
MaxFetchSizeByte
MaxFetchSizeByte sets the maximum size of a message to fetch. Larger messages will be ignored. By default this is set to 0 (fetch all messages).
MinFetchSizeByte
MinFetchSizeByte defines the minimum amout of data to fetch from Kafka per request. If less data is available the broker will wait. By default this is set to 1.
FetchTimeoutMs
FetchTimeoutMs defines the time in milliseconds the broker will wait for MinFetchSizeByte to be reached before processing data anyway. By default this is set to 250ms.
MessageBufferCount
MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 8192.
PresistTimoutMs
PresistTimoutMs defines the time in milliseconds between writes to OffsetFile. By default this is set to 5000. Shorter durations reduce the amount of duplicate messages after a fail but increases I/O. When using GroupId this only controls how long to pause after receiving errors.
ElectRetries
ElectRetries defines how many times to retry during a leader election. By default this is set to 3.
ElectTimeoutMs
ElectTimeoutMs defines the number of milliseconds to wait for the cluster to elect a new leader. Defaults to 250.
MetadataRefreshMs
MetadataRefreshMs set the interval in seconds for fetching cluster metadata. By default this is set to 10000. This corresponds to the JVM setting topic.metadata.refresh.interval.ms.
TlsEnable
TlsEnable defines whether to use TLS to communicate with brokers. Defaults to false.
TlsKeyLocation
TlsKeyLocation defines the path to the client’s private key (PEM) for used for authentication. Defaults to “”.
TlsCertificateLocation
TlsCertificateLocation defines the path to the client’s public key (PEM) used for authentication. Defaults to “”.
TlsCaLocation
TlsCaLocation defines the path to CA certificate(s) for verifying the broker’s key. Defaults to “”.
TlsServerName
TlsServerName is used to verify the hostname on the server’s certificate unless TlsInsecureSkipVerify is true. Defaults to “”.
TlsInsecureSkipVerify
TlsInsecureSkipVerify controls whether to verify the server’s certificate chain and host name. Defaults to false.
SaslEnable
SaslEnable is whether to use SASL for authentication. Defaults to false.
SaslUsername
SaslUsername is the user for SASL/PLAIN authentication. Defaults to “gollum”.
SaslPassword
SaslPassword is the password for SASL/PLAIN authentication. Defaults to “”.
Servers
Servers contains the list of all kafka servers to connect to. By default this is set to contain only “localhost:9092”.
Example
- "consumer.Kafka":
    Enable: true
    ID: ""
    Fuse: ""
    Stream:
        - "foo"
        - "bar"
    Topic: "default"
    ClientId: "gollum"
    Version: "0.8.2"
    GroupId: ""
    DefaultOffset: "newest"
    OffsetFile: ""
    FolderPermissions: "0755"
    Ordered: true
    MaxOpenRequests: 5
    ServerTimeoutSec: 30
    MaxFetchSizeByte: 0
    MinFetchSizeByte: 1
    FetchTimeoutMs: 250
    MessageBufferCount: 256
    PresistTimoutMs: 5000
    ElectRetries: 3
    ElectTimeoutMs: 250
    MetadataRefreshMs: 10000
    TlsEnabled: true
    TlsKeyLocation: ""
    TlsCertificateLocation: ""
    TlsCaLocation: ""
    TlsServerName: ""
    TlsInsecureSkipVerify: false
    SaslEnabled: false
    SaslUsername: "gollum"
    SaslPassword: ""
    PrependKey: false
    KeySeparator: ":"
    Servers:
        - "localhost:9092"

Kinesis

This consumer reads message from an AWS Kinesis stream. When attached to a fuse, this consumer will stop processing messages in case that fuse is burned.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this consumer to be found by other plugins by name. By default this is set to “” which does not register this consumer.
Stream
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to “*” which means only producers set to consume “all streams” will get these messages.
Fuse
Fuse defines the name of a fuse to observe for this consumer. Producer may “burn” the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to “” by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.
KinesisStream
KinesisStream defines the stream to read from. By default this is set to “default”.
Region
Region defines the amazon region of your kinesis stream. By default this is set to “eu-west-1”.
Endpoint
Endpoint defines the amazon endpoint for your kinesis stream. By default this is et to “kinesis.eu-west-1.amazonaws.com”.
CredentialType
CredentialType defines the credentials that are to be used when connecting to kensis. This can be one of the following: environment, static, shared, none. Static enables the parameters CredentialId, CredentialToken and CredentialSecretm shared enables the parameters CredentialFile and CredentialProfile. None will not use any credentials and environment will pull the credentials from environmental settings. By default this is set to none.
DefaultOffset
DefaultOffset defines the message index to start reading from. Valid values are either “Newest”, “Oldest”, or a number. The default value is “Newest”.
OffsetFile
OffsetFile defines a file to store the current offset per shard. By default this is set to “”, i.e. it is disabled. If a file is set and found consuming will start after the stored offset.
RecordsPerQuery
RecordsPerQuery defines the number of records to pull per query. By default this is set to 100.
RecordMessageDelimiter
RecordMessageDelimiter defines the string to delimit messages within a record. By default this is set to “”, i.e. it is disabled.
QuerySleepTimeMs
QuerySleepTimeMs defines the number of milliseconds to sleep before trying to pull new records from a shard that did not return any records. By default this is set to 1000.
RetrySleepTimeSec
RetrySleepTimeSec defines the number of seconds to wait after trying to reconnect to a shard. By default this is set to 4.
Example
- "consumer.Kinesis":
    Enable: true
    ID: ""
    Fuse: ""
    Stream:
        - "foo"
        - "bar"
    KinesisStream: "default"
    Region: "eu-west-1"
    Endpoint: "kinesis.eu-west-1.amazonaws.com"
    DefaultOffset: "Newest"
    OffsetFile: ""
    RecordsPerQuery: 100
    RecordMessageDelimiter: ""
    QuerySleepTimeMs: 1000
    RetrySleepTimeSec: 4
    CredentialType: "none"
    CredentialId: ""
    CredentialToken: ""
    CredentialSecret: ""
    CredentialFile: ""
    CredentialProfile: ""

Profiler

The profiler plugin generates Runs x Batches messages and send them to the configured streams as fast as possible. This consumer can be used to profile producers and/or configurations. When attached to a fuse, this consumer will stop processing messages in case that fuse is burned.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this consumer to be found by other plugins by name. By default this is set to “” which does not register this consumer.
Stream
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to “*” which means only producers set to consume “all streams” will get these messages.
Fuse
Fuse defines the name of a fuse to observe for this consumer. Producer may “burn” the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to “” by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.
Runs
Runs defines the number of messages per batch. By default this is set to 10000.
Batches
Batches defines the number of measurement runs to do. By default this is set to 10.
TemplateCount
TemplateCount defines the number of message templates to be generated. A random message template will be chosen when a message is sent. Templates are generated in advance. By default this is set to 10.
Characters
Characters defines the characters to be used in generated strings. By default these are “abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890 “.
Message
Message defines a go format string to be used to generate the message payloads. The length of the values generated will be deducted from the format size parameter. I.e. “%200d” will generate a digit between 0 and 200, “%10s” will generate a string with 10 characters, etc. By default this is set to “%256s”.
DelayMs
DelayMs defines the number of milliseconds of sleep between messages. By default this is set to 0.
KeepRunning
KeepRunning can be set to true to disable automatic shutdown of gollum after profiling is done. This can be used to e.g. read metrics after a profile run. By default this is set to false.
Example
- "consumer.Profile":
    Enable: true
    ID: ""
    Fuse: ""
    Stream:
        - "foo"
        - "bar"
    Runs: 10000
    Batches: 10
    TemplateCount: 10
    Characters: "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890"
    Message: "%256s"
        DelayMs: 0
    KeepRunning: false

Proxy

The proxy consumer reads messages directly as-is from a given socket. Messages are extracted by standard message size algorithms (see Partitioner). This consumer can be used with any compatible proxy producer to establish a two-way communication. When attached to a fuse, this consumer will stop accepting new connections and close all existing connections in case that fuse is burned.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this consumer to be found by other plugins by name. By default this is set to “” which does not register this consumer.
Stream
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to “*” which means only producers set to consume “all streams” will get these messages.
Fuse
Fuse defines the name of a fuse to observe for this consumer. Producer may “burn” the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to “” by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.
Address
Address defines the protocol, host and port or socket to bind to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.socket”. By default this is set to “:5880”. UDP is not supported.
Partitioner

Partitioner defines the algorithm used to read messages from the stream. The messages will be sent as a whole, no cropping or removal will take place. By default this is set to “delimiter”.

  • “delimiter” separates messages by looking for a delimiter string. The delimiter is included into the left hand message.
  • “ascii” reads an ASCII number at a given offset until a given delimiter is found. Everything to the right of and including the delimiter is removed from the message.
  • “binary” reads a binary number at a given offset and size.
  • “binary_le” is an alias for “binary”.
  • “binary_be” is the same as “binary” but uses big endian encoding.
  • “fixed” assumes fixed size messages.
Delimiter
Delimiter defines the delimiter used by the text and delimiter partitioner. By default this is set to “n”.
Offset
Offset defines the offset used by the binary and text partitioner. By default this is set to 0. This setting is ignored by the fixed partitioner.
Size
Size defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8. By default 4 is chosen. For fixed this defines the size of a message. By default 1 is chosen.
Example
- "consumer.Proxy":
    Enable: true
    ID: ""
    Fuse: ""
    Stream:
        - "foo"
        - "bar"
    Address: ":5880"
    Partitioner: "delimiter"
    Delimiter: "\n"
    Offset: 0
    Size: 1

Socket

The socket consumer reads messages directly as-is from a given socket. Messages are separated from the stream by using a specific partitioner method. When attached to a fuse, this consumer will stop accepting new connections (closing the socket) and close all existing connections in case that fuse is burned.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this consumer to be found by other plugins by name. By default this is set to “” which does not register this consumer.
Stream
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to “*” which means only producers set to consume “all streams” will get these messages.
Fuse
Fuse defines the name of a fuse to observe for this consumer. Producer may “burn” the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to “” by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.
Address
Address defines the protocol, host and port or socket to bind to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.socket”. By default this is set to “:5880”.
Permissions
Permissions sets the file permissions for “unix://” based connections as an four digit octal number string. By default this is set to “0770”.
Acknowledge
Acknowledge can be set to a non-empty value to inform the writer on success or error. On success the given string is send. Any error will close the connection. This setting is disabled by default, i.e. set to “”. If Acknowledge is enabled and a IP-Address is given to Address, TCP is used to open the connection, otherwise UDP is used. If an error occurs during write “NOT <Acknowledge>” is returned.
Partitioner

Partitioner defines the algorithm used to read messages from the stream. By default this is set to “delimiter”.

  • “delimiter” separates messages by looking for a delimiter string. The delimiter is removed from the message.
  • “ascii” reads an ASCII number at a given offset until a given delimiter is found. Everything to the right of and including the delimiter is removed from the message.
  • “binary” reads a binary number at a given offset and size.
  • “binary_le” is an alias for “binary”.
  • “binary_be” is the same as “binary” but uses big endian encoding.
  • “fixed” assumes fixed size messages.
Delimiter
Delimiter defines the delimiter used by the text and delimiter partitioner. By default this is set to “n”.
Offset
Offset defines the offset used by the binary and text partitioner. By default this is set to 0. This setting is ignored by the fixed partitioner.
Size
Size defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8. By default 4 is chosen. For fixed this defines the size of a message. By default 1 is chosen.
ReconnectAfterSec
ReconnectAfterSec defines the number of seconds to wait before a connection is tried to be reopened again. By default this is set to 2.
AckTimoutSec
AckTimoutSec defines the number of seconds waited for an acknowledge to succeed. Set to 2 by default.
ReadTimoutSec
ReadTimoutSec defines the number of seconds that waited for data to be received. Set to 5 by default.
RemoveOldSocket
RemoveOldSocket toggles removing exisiting files with the same name as the socket (unix://<path>) prior to connecting. Enabled by default.
Example
- "consumer.Socket":
    Enable: true
    ID: ""
    Fuse: ""
    Stream:
        - "foo"
        - "bar"
    Address: ":5880"
    Permissions: "0770"
    Acknowledge: ""
    Partitioner: "delimiter"
    Delimiter: "\n"
    Offset: 0
    Size: 1
    ReconnectAfterSec: 2
    AckTimoutSec: 2
    ReadTimeoutSec: 5

Syslogd

The syslogd consumer accepts messages from a syslogd compatible socket. When attached to a fuse, this consumer will stop the syslogd service in case that fuse is burned.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this consumer to be found by other plugins by name. By default this is set to “” which does not register this consumer.
Stream
Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to “*” which means only producers set to consume “all streams” will get these messages.
Fuse
Fuse defines the name of a fuse to observe for this consumer. Producer may “burn” the fuse when they encounter errors. Consumers may react on this by e.g. closing connections to notify any writing services of the problem. Set to “” by default which disables the fuse feature for this consumer. It is up to the consumer implementation to react on a broken fuse in an appropriate manner.
Address
Address defines the protocol, host and port or socket to bind to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.socket”. By default this is set to “udp://0.0.0.0:514”. The protocol can be defined along with the address, e.g. “tcp://…” but this may be ignored if a certain protocol format does not support the desired transport protocol.
Format

Format defines the syslog standard to expect for message encoding. Three standards are currently supported, by default this is set to “RFC6587”.

Example
- "consumer.Syslogd":
    Enable: true
    ID: ""
    Fuse: ""
    Stream:
        - "foo"
        - "bar"
    Address: "udp://0.0.0.0:514"
    Format: "RFC6587"

SystemdConsumer

NOTICE: This producer is not included in standard builds. To enable it you need to trigger a custom build with native plugins enabled. The systemd consumer allows to read from the systemd journal. When attached to a fuse, this consumer will stop reading messages in case that fuse is burned.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
SystemdUnit
SystemdUnit defines what journal will be followed. This uses journal.add_match with _SYSTEMD_UNIT. By default this is set to “”, which disables the filter.
DefaultOffset
DefaultOffset defines where to start reading the file. Valid values are “oldest” and “newest”. If OffsetFile is defined the DefaultOffset setting will be ignored unless the file does not exist. By default this is set to “newest”.
OffsetFile
OffsetFile defines the path to a file that stores the current offset. If the consumer is restarted that offset is used to continue reading. By default this is set to “” which disables the offset file.
Example
- "native.Systemd":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    SystemdUnit: "sshd.service"
    DefaultOffset: "Newest"
    OffsetFile: ""

Consumers are plugins that read data from external sources. Data is packed into messages and passed to a stream.

Streams

Broadcast

Messages will be sent to all producers attached to this stream.

Parameters
Enable
Enable can be set to false to disable this stream configuration but leave it in the config for future use. Set to true by default.
Stream
Stream defines the stream to configure. This is a mandatory setting and has no default value.
Formatter
Formatter defines the first formatter to apply to the messages passing through this stream. By default this is set to “format.Forward”.
Filter
Filter defines the filter to apply to the messages passing through this stream. By default this is et to “filter.All”.
TimeoutMs
TimeoutMs defines an optional timeout that can be used to wait for producers attached to this stream to unblock. This setting overwrites the corresponding producer setting for this (and only this) stream.
Example
- "stream.Foobar"
    Enable: true
    Stream: "streamToConfigure"
    Formatter: "format.Forward"
    Filter: "filter.All"
    TimeoutMs: 0

Random

Messages will be sent to one of the producers attached to this stream. The concrete producer is chosen randomly with each message.

Parameters
Enable
Enable can be set to false to disable this stream configuration but leave it in the config for future use. Set to true by default.
Stream
Stream defines the stream to configure. This is a mandatory setting and has no default value.
Formatter
Formatter defines the first formatter to apply to the messages passing through this stream. By default this is set to “format.Forward”.
Filter
Filter defines the filter to apply to the messages passing through this stream. By default this is et to “filter.All”.
TimeoutMs
TimeoutMs defines an optional timeout that can be used to wait for producers attached to this stream to unblock. This setting overwrites the corresponding producer setting for this (and only this) stream.
Example
- "stream.Foobar"
    Enable: true
    Stream: "streamToConfigure"
    Formatter: "format.Forward"
    Filter: "filter.All"
    TimeoutMs: 0

RoundRobin

Messages will be sent to one of the producers attached to this stream. Producers will be switched one-by-one.

Parameters
Enable
Enable can be set to false to disable this stream configuration but leave it in the config for future use. Set to true by default.
Stream
Stream defines the stream to configure. This is a mandatory setting and has no default value.
Formatter
Formatter defines the first formatter to apply to the messages passing through this stream. By default this is set to “format.Forward”.
Filter
Filter defines the filter to apply to the messages passing through this stream. By default this is et to “filter.All”.
TimeoutMs
TimeoutMs defines an optional timeout that can be used to wait for producers attached to this stream to unblock. This setting overwrites the corresponding producer setting for this (and only this) stream.
Example
- "stream.Foobar"
    Enable: true
    Stream: "streamToConfigure"
    Formatter: "format.Forward"
    Filter: "filter.All"
    TimeoutMs: 0

Route

Messages will be routed to all streams configured. Each target stream can hold another stream configuration, too, so this is not directly sending to the producers attached to the target streams.

Parameters
Enable
Enable can be set to false to disable this stream configuration but leave it in the config for future use. Set to true by default.
Stream
Stream defines the stream to configure. This is a mandatory setting and has no default value.
Formatter
Formatter defines the first formatter to apply to the messages passing through this stream. By default this is set to “format.Forward”.
Filter
Filter defines the filter to apply to the messages passing through this stream. By default this is et to “filter.All”.
TimeoutMs
TimeoutMs defines an optional timeout that can be used to wait for producers attached to this stream to unblock. This setting overwrites the corresponding producer setting for this (and only this) stream.
Routes
Routes defines a 1:n stream remapping. Messages are reassigned to all of stream(s) in this list. If no route is set messages are forwarded on the incoming stream. When routing to multiple streams, the incoming stream has to be listed explicitly to be used.
Example
- "stream.Route":
    Enable: true
    Stream: "streamToConfigure"
    Formatter: "format.Forward"
    Filter: "filter.All"
    TimeoutMs: 0
    Routes:
        - "foo"
        - "bar"

Streams manage the transfer of messages between consumers and producers. Streams can act as a kind of proxy that may filter, modify and define the distribution algorithm of messages. Streams can be referred to by cleartext names. This names are free to choose but there are several reserved names for internal or special purpose streams:

  • “_GOLLUM_” is used for internal log messages
  • “_DROPPED_” is used for messages that could not be sent, e.g. because of a channel timeout
  • “*” is a placeholder for “all streams but the internal streams”. In some cases “*” means “all streams” without exceptions. This is denoted in the corresponding documentations whenever this is the case.

Producers

Benchmark

This producer is similar to producer.Null but will use the standard buffer mechanism before discarding messages. This producer is used for benchmarking the core system. If you require a /dev/null style producer you should prefer producer.Null instead as it is way more performant.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Example
- "producer.Foobar":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"

Console

The console producer writes messages to the standard output streams. This producer does not implement a fuse breaker.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Console
Console may either be “stdout” or “stderr”. By default it is set to “stdout”.
Example
- "producer.Console":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Console: "stdout"

ElasticSearch

The ElasticSearch producer sends messages to elastic search using the bulk http API. This producer uses a fuse breaker when cluster health reports a “red” status or the connection is down.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
RetrySec
RetrySec denotes the time in seconds after which a failed dataset will be transmitted again. By default this is set to 5.
Connections
Connections defines the number of simultaneous connections allowed to a elasticsearch server. This is set to 6 by default.
TTL
TTL defines the TTL set in elasticsearch messages. By default this is set to “” which means no TTL.
DayBasedIndex
DayBasedIndex can be set to true to append the date of the message to the index as in “<index>_YYYY-MM-DD”. By default this is set to false.
Servers
Servers defines a list of servers to connect to. The first server in the list is used as the server passed to the “Domain” setting. The Domain setting can be overwritten, too.
Port
Port defines the elasticsearch port, which has to be the same for all servers. By default this is set to 9200.
User
User and Password can be used to pass credentials to the elasticsearch server. By default both settings are empty.
Index
Index maps a stream to a specific index. You can define the wildcard stream (*) here, too. If set all streams that do not have a specific mapping will go to this stream (including _GOLLUM_). If no category mappings are set the stream name is used.
Type
Type maps a stream to a specific type. This behaves like the index map and is used to assign a _type to an elasticsearch message. By default the type “log” is used.
DataTypes
DataTypes allows to define elasticsearch type mappings for indexes that are being created by this producer (e.g. day based indexes). You can define mappings per index.
Settings
Settings allows to define elasticsearch index settings for indexes that are being created by this producer (e.g. day based indexes). You can define settings per index.
BatchSizeByte
BatchSizeByte defines the size in bytes required to trigger a flush. By default this is set to 32768 (32KB).
BatchMaxCount
BatchMaxCount defines the number of documents required to trigger a flush. By default this is set to 256.
BatchTimeoutSec
BatchTimeoutSec defines the time in seconds after which a flush will be triggered. By default this is set to 5.
Example
- "producer.ElasticSearch":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Connections: 6
    RetrySec: 5
    TTL: ""
    DayBasedIndex: false
    User: ""
    Password: ""
    BatchSizeByte: 32768
    BatchMaxCount: 256
    BatchTimeoutSec: 5
    Port: 9200
    Servers:
        - "localhost"
    Index:
        "console" : "console"
        "_GOLLUM_"  : "_GOLLUM_"
    Settings:
        "console":
            "number_of_shards": 1
    DataTypes:
        "console":
            "source": "ip"
    Type:
        "console" : "log"
        "_GOLLUM_"  : "log"

File

The file producer writes messages to a file. This producer also allows log rotation and compression of the rotated logs. Folders in the file path will be created if necessary. This producer does not implement a fuse breaker.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
File
File contains the path to the log file to write. The wildcard character “*” can be used as a placeholder for the stream name. By default this is set to /var/log/gollum.log.
FileOverwrite
FileOverwrite enables files to be overwritten instead of appending new data to it. This is set to false by default.
Permissions
Permissions accepts an octal number string that contains the unix file permissions used when creating a file. By default this is set to “0664”.
FolderPermissions
FolderPermissions accepts an octal number string that contains the unix file permissions used when creating a folder. By default this is set to “0755”.
BatchMaxCount
BatchMaxCount defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this is set to 8192.
BatchFlushCount
BatchFlushCount defines the number of messages to be buffered before they are written to disk. This setting is clamped to BatchMaxCount. By default this is set to BatchMaxCount / 2.
BatchTimeoutSec
BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5.
FlushTimeoutSec
FlushTimeoutSec sets the maximum number of seconds to wait before a flush is aborted during shutdown. By default this is set to 0, which does not abort the flushing procedure.
Rotate
Rotate if set to true the logs will rotate after reaching certain thresholds. By default this is set to false.
RotateTimeoutMin
RotateTimeoutMin defines a timeout in minutes that will cause the logs to rotate. Can be set in parallel with RotateSizeMB. By default this is set to 1440 (i.e. 1 Day).
RotateAt
RotateAt defines specific timestamp as in “HH:MM” when the log should be rotated. Hours must be given in 24h format. When left empty this setting is ignored. By default this setting is disabled.
RotateSizeMB
RotateSizeMB defines the maximum file size in MB that triggers a file rotate. Files can get bigger than this size. By default this is set to 1024.
RotateTimestamp
RotateTimestamp sets the timestamp added to the filename when file rotation is enabled. The format is based on Go’s time.Format function and set to “2006-01-02_15” by default.
RotatePruneCount
RotatePruneCount removes old logfiles upon rotate so that only the given number of logfiles remain. Logfiles are located by the name defined by “File” and are pruned by date (followed by name). By default this is set to 0 which disables pruning.
RotatePruneAfterHours
RotatePruneAfterHours removes old logfiles that are older than a given number of hours. By default this is set to 0 which disables pruning.
RotatePruneTotalSizeMB
RotatePruneTotalSizeMB removes old logfiles upon rotate so that only the given number of MBs are used by logfiles. Logfiles are located by the name defined by “File” and are pruned by date (followed by name). By default this is set to 0 which disables pruning.
RotateZeroPadding
RotateZeroPadding sets the number of leading zeros when rotating files with an existing name. Setting this setting to 0 won’t add zeros, every other number defines the number of leading zeros to be used. By default this is set to 0.
Compress
Compress defines if a rotated logfile is to be gzip compressed or not. By default this is set to false.
Example
- "producer.File":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    File: "/var/log/gollum.log"
    FileOverwrite: false
    Permissions: "0664"
    FolderPermissions: "0755"
    BatchMaxCount: 8192
    BatchFlushCount: 4096
    BatchTimeoutSec: 5
    FlushTimeoutSec: 0
    Rotate: false
    RotateTimeoutMin: 1440
    RotateSizeMB: 1024
    RotateAt: ""
    RotateTimestamp: "2006-01-02_15"
    RotatePruneCount: 0
    RotatePruneAfterHours: 0
    RotatePruneTotalSizeMB: 0
    Compress: false

Firehose

This producer sends data to an AWS Firehose stream.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Firehose
Firehose defines the stream to read from. By default this is set to “default”.
Region
Region defines the amazon region of your firehose stream. By default this is set to “eu-west-1”.
Endpoint
Endpoint defines the amazon endpoint for your firehose stream. By default this is set to “firehose.eu-west-1.amazonaws.com”.
CredentialType
CredentialType defines the credentials that are to be used when connecting to firehose. This can be one of the following: environment, static, shared, none. Static enables the parameters CredentialId, CredentialToken and CredentialSecret shared enables the parameters CredentialFile and CredentialProfile. None will not use any credentials and environment will pull the credentials from environmental settings. By default this is set to none.
BatchMaxMessages
BatchMaxMessages defines the maximum number of messages to send per batch. By default this is set to 500.
RecordMaxMessages
RecordMaxMessages defines the maximum number of messages to join into a firehose record. By default this is set to 500.
RecordMessageDelimiter
RecordMessageDelimiter defines the string to delimit messages within a firehose record. By default this is set to “n”.
SendTimeframeMs
SendTimeframeMs defines the timeframe in milliseconds in which a second batch send can be triggered. By default this is set to 1000, i.e. one send operation per second.
BatchTimeoutSec
BatchTimeoutSec defines the number of seconds after which a batch is flushed automatically. By default this is set to 3.
StreamMapping
StreamMapping defines a translation from gollum stream to firehose stream name. If no mapping is given the gollum stream name is used as firehose stream name.
Example
- "producer.Firehose":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Region: "eu-west-1"
    Endpoint: "firehose.eu-west-1.amazonaws.com"
    CredentialType: "none"
    CredentialId: ""
    CredentialToken: ""
    CredentialSecret: ""
    CredentialFile: ""
    CredentialProfile: ""
    BatchMaxMessages: 500
    RecordMaxMessages: 1
    RecordMessageDelimiter: "\n"
    SendTimeframeSec: 1
    BatchTimeoutSec: 3
    StreamMapping:
        "*" : "default"

HTTPRequest

The HTTPRequest producers sends messages as HTTP packet to a given webserver. This producer uses a fuse breaker when a request fails with an error code > 400 or the connection is down.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Address
Address defines the webserver to send http requests to. Set to “localhost:80” by default.
RawData
RawData switches between creating POST data from the incoming message (false) and passing the message as HTTP request without changes (true). This setting is enabled by default.
Encoding
Encoding defines the payload encoding when RawData is set to false. Set to “text/plain; charset=utf-8” by default.
Example
- "producer.HTTPRequest":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    RawData: true
    Encoding: "text/plain; charset=utf-8"
    Address: "localhost:80"

InfluxDB

This producer writes data to an influxDB cluster. The data is expected to be of a valid influxDB format. As the data format changed between influxDB versions it is advisable to use a formatter for the specific influxDB version you want to write to. There are collectd to influxDB formatters available that can be used (as an example). This producer uses a fuse breaker if the connection to the influxDB cluster is lost.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Host
Host defines the host (and port) of the InfluxDB server. Defaults to “localhost:8086”.
User
User defines the InfluxDB username to use to login. If this name is left empty credentials are assumed to be disabled. Defaults to empty.
Password
Password defines the user’s password. Defaults to empty.
Database
Database sets the InfluxDB database to write to. By default this is is set to “default”.
TimeBasedName
TimeBasedName enables using time.Format based formatting of databse names. I.e. you can use something like “metrics-2006-01-02” to switch databases for each day. This setting is enabled by default.
RetentionPolicy
RetentionPolicy correlates to the InfluxDB retention policy setting. This is left empty by default (no retention policy used).
UseVersion08
UseVersion08 has to be set to true when writing data to InfluxDB 0.8.x. By default this is set to false. DEPRECATED. Use Version instead.
Version
Version defines the InfluxDB version to use as in Mmp (Major, minor, patch). For version 0.8.x use 80, for version 0.9.0 use 90, for version 1.0.0 use use 100 and so on. Defaults to 100.
BatchMaxCount
BatchMaxCount defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this is set to 8192.
BatchFlushCount
BatchFlushCount defines the number of messages to be buffered before they are written to InfluxDB. This setting is clamped to BatchMaxCount. By default this is set to BatchMaxCount / 2.
BatchTimeoutSec
BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5.
Example
- "producer.InfluxDB":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Host: "localhost:8086"
    User: ""
    Password: ""
    Database: "default"
    TimeBasedName: true
    UseVersion08: false
    Version: 100
    RetentionPolicy: ""
    BatchMaxCount: 8192
    BatchFlushCount: 4096
    BatchTimeoutSec: 5

Kafka

The kafka producer writes messages to a kafka cluster. This producer is backed by the sarama library so most settings relate to that library. This producer uses a fuse breaker if any connection reports an error.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
ClientId
ClientId sets the client id of this producer. By default this is “gollum”.
Version
Version defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. Defaults to “0.8.2”. If the version given is not known, the closest possible version is chosen.
Partitioner
Partitioner sets the distribution algorithm to use. Valid values are: “Random”,”Roundrobin” and “Hash”. By default “Roundrobin” is set.
KeyFormatter
KeyFormatter can define a formatter that extracts the key for a kafka message from the message payload. By default this is an empty string, which disables this feature. A good formatter for this can be format.Identifier.
KeyFormatterFirst
KeyFormatterFirst can be set to true to apply the key formatter to the unformatted message. By default this is set to false, so that key formatter uses the message after Formatter has been applied. KeyFormatter does never affect the payload of the message sent to kafka.
FilterAfterFormat
FilterAfterFormat behaves like Filter but allows filters to be executed after the formatter has run. By default no such filter is set.
RequiredAcks
RequiredAcks defines the acknowledgment level required by the broker. 0 = No responses required. 1 = wait for the local commit. -1 = wait for all replicas to commit. >1 = wait for a specific number of commits. By default this is set to 1.
TimeoutMs
TimeoutMs denotes the maximum time the broker will wait for acks. This setting becomes active when RequiredAcks is set to wait for multiple commits. By default this is set to 10 seconds.
SendRetries
SendRetries defines how many times to retry sending data before marking a server as not reachable. By default this is set to 1.
Compression
Compression sets the method of compression to use. Valid values are: “None”,”Zip” and “Snappy”. By default “None” is set.
MaxOpenRequests
MaxOpenRequests defines the number of simultaneous connections are allowed. By default this is set to 5.
BatchMinCount
BatchMinCount sets the minimum number of messages required to trigger a flush. By default this is set to 1.
BatchMaxCount
BatchMaxCount defines the maximum number of messages processed per request. By default this is set to 0 for “unlimited”.
BatchSizeByte
BatchSizeByte sets the minimum number of bytes to collect before a new flush is triggered. By default this is set to 8192.
BatchSizeMaxKB
BatchSizeMaxKB defines the maximum allowed message size. By default this is set to 1024.
BatchTimeoutMs
BatchTimeoutMs sets the minimum time in milliseconds to pass after which a new flush will be triggered. By default this is set to 3.
MessageBufferCount
MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 8192.
ServerTimeoutSec
ServerTimeoutSec defines the time after which a connection is set to timed out. By default this is set to 30 seconds.
SendTimeoutMs
SendTimeoutMs defines the number of milliseconds to wait for a server to resond before triggering a timeout. Defaults to 250.
ElectRetries
ElectRetries defines how many times to retry during a leader election. By default this is set to 3.
ElectTimeoutMs
ElectTimeoutMs defines the number of milliseconds to wait for the cluster to elect a new leader. Defaults to 250.
GracePeriodMs
GracePeriodMs defines the number of milliseconds to wait for Sarama to accept a single message. After this period a message is dropped. By default this is set to 100ms.
MetadataRefreshMs
MetadataRefreshMs set the interval in seconds for fetching cluster metadata. By default this is set to 600000 (10 minutes). This corresponds to the JVM setting topic.metadata.refresh.interval.ms.
TlsEnable
TlsEnable defines whether to use TLS to communicate with brokers. Defaults to false.
TlsKeyLocation
TlsKeyLocation defines the path to the client’s private key (PEM) for used for authentication. Defaults to “”.
TlsCertificateLocation
TlsCertificateLocation defines the path to the client’s public key (PEM) used for authentication. Defaults to “”.
TlsCaLocation
TlsCaLocation defines the path to CA certificate(s) for verifying the broker’s key. Defaults to “”.
TlsServerName
TlsServerName is used to verify the hostname on the server’s certificate unless TlsInsecureSkipVerify is true. Defaults to “”.
TlsInsecureSkipVerify
TlsInsecureSkipVerify controls whether to verify the server’s certificate chain and host name. Defaults to false.
SaslEnable
SaslEnable is whether to use SASL for authentication. Defaults to false.
SaslUsername
SaslUsername is the user for SASL/PLAIN authentication. Defaults to “gollum”.
SaslPassword
SaslPassword is the password for SASL/PLAIN authentication. Defaults to “”.
Servers
Servers contains the list of all kafka servers to connect to.
By default this is set to contain only “localhost:9092”.
Topic
Topic maps a stream to a specific kafka topic. You can define the wildcard stream (*) here, too. If defined, all streams that do not have a specific mapping will go to this topic (including _GOLLUM_). If no topic mappings are set the stream names will be used as topic.
Example
- "producer.Kafka":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    ClientId: "gollum"
    Version: "0.8.2"
    Partitioner: "Roundrobin"
    RequiredAcks: 1
    TimeoutMs: 1500
    GracePeriodMs: 10
    SendRetries: 0
    Compression: "None"
    MaxOpenRequests: 5
    MessageBufferCount: 256
    BatchMinCount: 1
    BatchMaxCount: 0
    BatchSizeByte: 8192
    BatchSizeMaxKB: 1024
    BatchTimeoutMs: 3000
    ServerTimeoutSec: 30
    SendTimeoutMs: 250
    ElectRetries: 3
    ElectTimeoutMs: 250
    MetadataRefreshMs: 10000
    TlsEnabled: true
    TlsKeyLocation: ""
    TlsCertificateLocation: ""
    TlsCaLocation: ""
    TlsServerName: ""
    TlsInsecureSkipVerify: false
    SaslEnabled: false
    SaslUsername: "gollum"
    SaslPassword: ""
    KeyFormatter: ""
    KeyFormatterFirst: false
    Servers:
        - "localhost:9092"
    Topic:
        "console" : "console"

KafkaProducer

NOTICE: This producer is not included in standard builds. To enable it you need to trigger a custom build with native plugins enabled. The kafka producer writes messages to a kafka cluster. This producer is backed by the native librdkafka (0.8.6) library so most settings relate to that library. This producer does not implement a fuse breaker.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
SendRetries
SendRetries is mapped to message.send.max.retries. This defines the number of times librdkafka will try to re-send a message if it did not succeed. Set to 0 by default (don’t retry).
Compression
Compression is mapped to compression.codec. Please note that “zip” has to be used instead of “gzip”. Possible values are “none”, “zip” and “snappy”. By default this is set to “none”.
TimeoutMs
TimeoutMs is mapped to request.timeout.ms. This defines the number of milliseconds to wait until a request is marked as failed. By default this is set to 1.5sec.
BatchSizeMaxKB
BatchSizeMaxKB is mapped to message.max.bytes (x1024). This defines the maximum message size in KB. By default this is set to 1 MB. Messages above this size are rejected.
BatchMaxMessages
BatchMaxMessages is mapped to queue.buffering.max.messages. This defines the maximum number of messages that can be pending at any given moment in time. If this limit is hit additional messages will be rejected. This value is set to 100.000 by default and should be adjusted according to your average message throughput.
BatchMinMessages
BatchMinMessages is mapped to batch.num.messages. This defines the minimum number of messages required for a batch to be sent. This is set to 1000 by default and should be significantly lower than BatchMaxMessages to avoid messages to be rejected.
BatchTimeoutMs
BatchTimeoutMs is mapped to queue.buffering.max.ms. This defines the number of milliseconds to wait until a batch is flushed to kafka. Set to 1sec by default.
ServerTimeoutSec
ServerTimeoutSec is mapped to socket.timeout.ms. Defines the time in seconds after a server is defined as “not reachable”. Set to 1 minute by default.
ServerMaxFails
ServerMaxFails is mapped to socket.max.fails. Number of retries after a server is marked as “failing”.
MetadataTimeoutMs
MetadataTimeoutMs is mapped to metadata.request.timeout.ms. Number of milliseconds a metadata request may take until considered as failed. Set to 1.5 seconds by default.
MetadataRefreshMs
MetadataRefreshMs is mapped to topic.metadata.refresh.interval.ms. Interval in milliseconds for querying metadata. Set to 5 minutes by default.
SecurityProtocol
SecurityProtocol is mapped to security.protocol. Protocol used to communicate with brokers. Set to plaintext by default.
SslCipherSuites
SslCipherSuites is mapped to ssl.cipher.suites. Cipher Suites to use when connection via TLS/SSL. Not set by default.
SslKeyLocation
SslKeyLocation is mapped to ssl.key.location. Path to client’s private key (PEM) for used for authentication. Not set by default.
SslKeyPassword
SslKeyPassword is mapped to ssl.key.password. Private key passphrase. Not set by default.
SslCertificateLocation
SslCertificateLocation is mapped to ssl.certificate.location. Path to client’s public key (PEM) used for authentication. Not set by default.
SslCaLocation
SslCaLocation is mapped to ssl.ca.location. File or directory path to CA certificate(s) for verifying the broker’s key. Not set by default.
SslCrlLocation
SslCrlLocation is mapped to ssl.crl.location. Path to CRL for verifying broker’s certificate validity. Not set by default.
SaslMechanism
SaslMechanism is mapped to sasl.mechanisms. SASL mechanism to use for authentication. Not set by default.
SaslUsername
SaslUsername is mapped to sasl.username. SASL username for use with the PLAIN mechanism. Not set by default.
SaslPassword
SaslPassword is mapped to sasl.password. SASL password for use with the PLAIN mechanism. Not set by default.
Servers
Servers defines the list of brokers to produce messages to.
Topic
Topic defines a stream to topic mapping. If a stream is not mapped a topic named like the stream is assumed.
KeyFormatter
KeyFormatter can define a formatter that extracts the key for a kafka message from the message payload. By default this is an empty string, which disables this feature. A good formatter for this can be format.Identifier.
KeyFormatterFirst
KeyFormatterFirst can be set to true to apply the key formatter to the unformatted message. By default this is set to false, so that key formatter uses the message after Formatter has been applied. KeyFormatter does never affect the payload of the message sent to kafka.
FilterAfterFormat
FilterAfterFormat behaves like Filter but allows filters to be executed after the formatter has run. By default no such filter is set.
Example
- "native.KafkaProducer":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    ClientId: "weblog"
    RequiredAcks: 1
    TimeoutMs: 1500
    SendRetries: 0
    Compression: "none"
    BatchSizeMaxKB: 1024
    BatchMaxMessages: 100000
    BatchMinMessages: 1000
    BatchTimeoutMs: 1000
    ServerTimeoutSec: 60
    ServerMaxFails: 3
    MetadataTimeoutMs: 1500
    MetadataRefreshMs: 300000
    SecurityProtocol: "plaintext"
    SslCipherSuites: ""
    SslKeyLocation: ""
    SslKeyPassword: ""
    SslCertificateLocation: ""
    SslCaLocation: ""
    SslCrlLocation: ""
    SaslMechanism: ""
    SaslUsername: ""
    SaslPassword: ""
    KeyFormatter: ""
    Servers:
        - "localhost:9092"
    Topic:
        "console" : "console"

Kinesis

This producer sends data to an AWS kinesis stream.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
KinesisStream
KinesisStream defines the stream to read from. By default this is set to “default”.
Region
Region defines the amazon region of your kinesis stream. By default this is set to “eu-west-1”.
Endpoint
Endpoint defines the amazon endpoint for your kinesis stream. By default this is et to “kinesis.eu-west-1.amazonaws.com”.
CredentialType
CredentialType defines the credentials that are to be used when connecting to kensis. This can be one of the following: environment, static, shared, none. Static enables the parameters CredentialId, CredentialToken and CredentialSecret shared enables the parameters CredentialFile and CredentialProfile. None will not use any credentials and environment will pull the credentials from environmental settings. By default this is set to none.
BatchMaxMessages
BatchMaxMessages defines the maximum number of messages to send per batch. By default this is set to 500.
RecordMaxMessages
RecordMaxMessages defines the maximum number of messages to join into a kinesis record. By default this is set to 500.
RecordMessageDelimiter
RecordMessageDelimiter defines the string to delimit messages within a kinesis record. By default this is set to “n”.
SendTimeframeMs
SendTimeframeMs defines the timeframe in milliseconds in which a second batch send can be triggered. By default this is set to 1000, i.e. one send operation per second.
BatchTimeoutSec
BatchTimeoutSec defines the number of seconds after which a batch is flushed automatically. By default this is set to 3.
StreamMapping
StreamMapping defines a translation from gollum stream to kinesis stream name. If no mapping is given the gollum stream name is used as kinesis stream name.
Example
- "producer.Kinesis":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Region: "eu-west-1"
    Endpoint: "kinesis.eu-west-1.amazonaws.com"
    CredentialType: "none"
    CredentialId: ""
    CredentialToken: ""
    CredentialSecret: ""
    CredentialFile: ""
    CredentialProfile: ""
    BatchMaxMessages: 500
    RecordMaxMessages: 1
    RecordMessageDelimiter: "\n"
    SendTimeframeSec: 1
    BatchTimeoutSec: 3
    StreamMapping:
        "*" : "default"

Null

This producer does nothing and provides only bare-bone configuration (i.e. enabled and streams). Use this producer to test consumer performance. This producer does not implement a fuse breaker.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Example
- "producer.Foobar":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"

PcapHTTPConsumer

NOTICE: This producer is not included in standard builds. To enable it you need to trigger a custom build with native plugins enabled. This plugin utilizes libpcap to listen for network traffic and reassamble http requests from it. As it uses a CGO based library it will break cross platform builds (i.e. you will have to compile it on the correct platform).

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Interface
Interface defines the network interface to listen on. By default this is set to eth0, get your specific value from ifconfig.
Filter
Filter defines a libpcap filter for the incoming packages. You can filter for specific ports, portocols, ips, etc. The documentation can be found here: http://www.tcpdump.org/manpages/pcap-filter.7.txt (manpage). By default this is set to listen on port 80 for localhost packages.
Promiscuous
Promiscuous switches the network interface defined by Interface into promiscuous mode. This is required if you want to listen for all packages coming from the network, even those that were not meant for the ip bound to the interface you listen on. Enabling this can increase your CPU load. This setting is enabled by default.
TimeoutMs
TimeoutMs defines a timeout after which a tcp session is considered to have dropped, i.e. the (remaining) packages will be discarded. Every incoming packet will restart the timer for the specific client session. By default this is set to 3000, i.e. 3 seconds.
Example
- "native.PcapHTTPConsumer":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Enable: true
    Interface: eth0
    Filter: "dst port 80 and dst host 127.0.0.1"
    Promiscuous: true
    TimeoutMs: 3000

Proxy

This producer is compatible to consumer.proxy. Responses to messages sent to the given address are sent back to the original consumer of it is a compatible message source. As with consumer.proxy the returned messages are partitioned by common message length algorithms. This producer does not implement a fuse breaker.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Address
Address stores the identifier to connect to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.Proxy”. By default this is set to “:5880”.
ConnectionBufferSizeKB
ConnectionBufferSizeKB sets the connection buffer size in KB. This also defines the size of the buffer used by the message parser. By default this is set to 1024, i.e. 1 MB buffer.
TimeoutSec
TimeoutSec defines the maximum time in seconds a client is allowed to take for a response. By default this is set to 1.
Partitioner

Partitioner defines the algorithm used to read messages from the stream. The messages will be sent as a whole, no cropping or removal will take place. By default this is set to “delimiter”.

  • “delimiter” separates messages by looking for a delimiter string.
The delimiter is included into the left hand message.
  • “ascii” reads an ASCII encoded number at a given offset until a given delimiter is found.
  • “binary” reads a binary number at a given offset and size - “binary_le” is an alias for “binary” - “binary_be” is the same as “binary” but uses big endian encoding - “fixed” assumes fixed size messages.
Delimiter
Delimiter defines the delimiter used by the text and delimiter partitioner. By default this is set to “n”.
Offset
Offset defines the offset used by the binary and text partitioner. By default this is set to 0. This setting is ignored by the fixed partitioner.
Size
Size defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8. By default 4 is chosen. For fixed this defines the size of a message. By default 1 is chosen.
Example
- "producer.Proxy":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Address: ":5880"
    ConnectionBufferSizeKB: 1024
    TimeoutSec: 1
    Partitioner: "delimiter"
    Delimiter: "\n"
    Offset: 0
    Size: 1

Redis

This producer sends data to a redis server. Different redis storage types and database indexes are supported. This producer does not implement support for redis 3.0 cluster.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Address
Address stores the identifier to connect to. This can either be any ip address and port like “localhost:6379” or a file like “unix:///var/redis.socket”. By default this is set to “:6379”. This producer does not implement a fuse breaker.
Database
Database defines the redis database to connect to. By default this is set to 0.
Key
Key defines the redis key to store the values in. This field is ignored when “KeyFormatter” is set. By default this is set to “default”.
Storage
Storage defines the type of the storage to use. Valid values are: “hash”, “list”, “set”, “sortedset”, “string”. By default this is set to “hash”.
FieldFormatter
FieldFormatter defines an extra formatter used to define an additional field or score value if required by the storage type. If no field value is required this value is ignored. By default this is set to “format.Identifier”.
FieldAfterFormat
FieldAfterFormat will send the formatted message to the FieldFormatter if set to true. If this is set to false the message will be send to the FieldFormatter before it has been formatted. By default this is set to false.
KeyFormatter
KeyFormatter defines an extra formatter used to allow generating the key from a message. If this value is set the “Key” field will be ignored. By default this field is not used.
KeyAfterFormat
KeyAfterFormat will send the formatted message to the keyFormatter if set to true. If this is set to false the message will be send to the keyFormatter before it has been formatted. By default this is set to false.
Example
- "producer.Redis":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Address: ":6379"
    Database: 0
    Key: "default"
    Storage: "hash"
    FieldFormatter: "format.Identifier"
    FieldAfterFormat: false
    KeyFormatter: "format.Forward"
    KeyAfterFormat: false

S3

This producer sends data to an AWS S3 Bucket.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Region
Region defines the amazon region of your s3 bucket. By default this is set to “eu-west-1”.
Endpoint
Endpoint defines the amazon endpoint for your s3 bucket. By default this is set to “s3-eu-west-1.amazonaws.com”.
StorageClass
StorageClass defines the amazon s3 storage class for objects created, from http://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html By default this is set to “STANDARD”.
CredentialType
CredentialType defines the credentials that are to be used when connecting to s3. This can be one of the following: environment, static, shared, none. Static enables the parameters CredentialId, CredentialToken and CredentialSecret shared enables the parameters CredentialFile and CredentialProfile. None will not use any credentials and environment will pull the credentials from environmental settings. By default this is set to none.
BatchMaxMessages
BatchMaxMessages defines the maximum number of messages to upload per batch. By default this is set to 5000.
ObjectMaxMessages
ObjectMaxMessages defines the maximum number of messages to join into an s3 object. By default this is set to 5000.
ObjectMessageDelimiter
ObjectMessageDelimiter defines the string to delimit messages within an s3 object. By default this is set to “n”.
SendTimeframeMs
SendTimeframeMs defines the timeframe in milliseconds in which a second batch send can be triggered. By default this is set to 10000, i.e. ten upload operations per second per s3 path.
BatchTimeoutSec
BatchTimeoutSec defines the number of seconds after which a batch is flushed automatically. By default this is set to 30.
TimestampWrite
TimestampWrite defines the go timestamp format that will be used in naming objects. Objects are named <s3_path><timestamp><sha1>. By default timestamp is set to “2006-01-02T15:04:05”.
PathFormatter
PathFormatter can define a formatter that extracts the path suffix for an s3 object from the object data. By default this is uses the sha1 of the object. A good formatter for this can be format.Identifier.
Compress
Compress defines whether to gzip compress the object before uploading. This adds a “.gz” extension to objects. By default this is set to false.
LocalPath
LocalPath defines the local output directory for temporary object files. Files will be stored as “<path>/<number>”. Compressed files will have a .gz extension. State will be stored in “<path>/state”. By default this is not set, and objects will be built in memory.
UploadOnShutdown
UploadOnShutdown defines whether to upload all temporary object files on shutdown. This has no effect if LocalPath is not set. By default this is false.
FileMaxAgeSec
FileMaxAgeSec defines the maximum age of a local file before it is uploaded. This defaults to 3600 (1 hour).
FileMaxMB
FileMaxMB defines the maximum size of a local file before it is uploaded. This limit is imposed before compression occurs. This defaults to 1000 (1 GB).
StreamMapping
StreamMapping defines a translation from gollum stream to s3 bucket/path. If no mapping is given the gollum stream name is used as s3 bucket. Values are of the form bucket/path or bucket, s3:// prefix is not allowed. The full path of the object will be s3://<StreamMapping><Timestamp><PathFormat> where Timestamp is time the object is written formatted with TimestampWrite, and PathFormat is the output of PathFormatter when passed the object data.
Example
- "producer.S3":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Region: "eu-west-1"
    Endpoint: "s3-eu-west-1.amazonaws.com"
    StorageClass: "STANDARD"
    CredentialType: "none"
    CredentialId: ""
    CredentialToken: ""
    CredentialSecret: ""
    CredentialFile: ""
    CredentialProfile: ""
    BatchMaxMessages: 5000
    ObjectMaxMessages: 5000
    ObjectMessageDelimiter: "\n"
    SendTimeframeMs: 10000
    BatchTimeoutSec: 30
    TimestampWrite: "2006-01-02T15:04:05"
    PathFormatter: ""
    Compress: false
    LocalPath: ""
    UploadOnShutdown: false
    FileMaxAgeSec: 3600
    FileMaxMB: 1000
    StreamMapping:
        "*" : "bucket/path"

Scribe

The scribe producer allows sending messages to Facebook’s scribe. This producer uses a fuse breaker if the connection to the scribe server is lost.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Address
Address defines the host and port to connect to. By default this is set to “localhost:1463”.
ConnectionBufferSizeKB
ConnectionBufferSizeKB sets the connection buffer size in KB. By default this is set to 1024, i.e. 1 MB buffer.
BatchMaxCount
BatchMaxCount defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this is set to 8192.
BatchFlushCount
BatchFlushCount defines the number of messages to be buffered before they are written to disk. This setting is clamped to BatchMaxCount. By default this is set to BatchMaxCount / 2.
BatchTimeoutSec
BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5. This also defines the maximum time allowed for messages to be sent to the server.
HeartBeatIntervalSec
HeartBeatIntervalSec defines the interval used to query scribe for status updates. By default this is set to 5sec.
Category
Category maps a stream to a specific scribe category. You can define the wildcard stream (*) here, too. When set, all streams that do not have a specific mapping will go to this category (including _GOLLUM_). If no category mappings are set the stream name is used.
Example
- "producer.Scribe":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Address: "localhost:1463"
    ConnectionBufferSizeKB: 1024
    BatchMaxCount: 8192
    BatchFlushCount: 4096
    BatchTimeoutSec: 5
        HeartBeatIntervalSec: 5
    Category:
        "console" : "console"
        "_GOLLUM_"  : "_GOLLUM_"

Socket

The socket producer connects to a service over a TCP, UDP or unix domain socket based connection. This producer uses a fuse breaker when the service to connect to goes down.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Address
Address stores the identifier to connect to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.socket”. By default this is set to “:5880”.
ConnectionBufferSizeKB
ConnectionBufferSizeKB sets the connection buffer size in KB. By default this is set to 1024, i.e. 1 MB buffer.
BatchMaxCount
BatchMaxCount defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this is set to 8192.
BatchFlushCount
BatchFlushCount defines the number of messages to be buffered before they are written to disk. This setting is clamped to BatchMaxCount. By default this is set to BatchMaxCount / 2.
BatchTimeoutSec
BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5.
Acknowledge
Acknowledge can be set to a non-empty value to expect the given string as a response from the server after a batch has been sent. This setting is disabled by default, i.e. set to “”. If Acknowledge is enabled and a IP-Address is given to Address, TCP is used to open the connection, otherwise UDP is used.
AckTimeoutMs
AckTimeoutMs defines the time in milliseconds to wait for a response from the server. After this timeout the send is marked as failed. Defaults to 2000.
Example
- "producer.Socket":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Enable: true
    Address: ":5880"
    ConnectionBufferSizeKB: 1024
    BatchMaxCount: 8192
    BatchFlushCount: 4096
    BatchTimeoutSec: 5
    Acknowledge: ""

Spooling

The Spooling producer buffers messages and sends them again to the previous stream stored in the message. This means the message must have been routed at least once before reaching the spooling producer. If the previous and current stream is identical the message is dropped. The Formatter configuration value is forced to “format.Serialize” and cannot be changed. This producer does not implement a fuse breaker.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Path
Path sets the output directory for spooling files. Spooling files will Files will be stored as “<path>/<stream>/<number>.spl”. By default this is set to “/var/run/gollum/spooling”.
BatchMaxCount
BatchMaxCount defines the maximum number of messages stored in memory before a write to file is triggered. Set to 100 by default.
BatchTimeoutSec
BatchTimeoutSec defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this is set to 5.
MaxFileSizeMB
MaxFileSizeMB sets the size in MB when a spooling file is rotated. Reading will start only after a file is rotated. Set to 512 MB by default.
MaxFileAgeMin
MaxFileAgeMin defines the time in minutes after a spooling file is rotated. Reading will start only after a file is rotated. This setting divided by two will be used to define the wait time for reading, too. Set to 1 minute by default.
BufferSizeByte
BufferSizeByte defines the initial size of the buffer that is used to parse messages from a spool file. If a message is larger than this size, the buffer will be resized. By default this is set to 8192.
RespoolDelaySec
RespoolDelaySec sets the number of seconds to wait before trying to load existing spool files after a restart. This is useful for configurations that contain dynamic streams. By default this is set to 10.
MaxMessagesSec
MaxMessagesSec sets the maximum number of messages that can be respooled per second. By default this is set to 100. Setting this value to 0 will cause respooling to work as fast as possible.
RevertStreamOnDrop
RevertStreamOnDrop can be used to revert the message stream before dropping the message. This can be useful if you e.g. want to write messages that could not be spooled to stream separated files on disk. Set to false by default.
Example
- "producer.Spooling":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Path: "/var/run/gollum/spooling"
    BatchMaxCount: 100
    BatchTimeoutSec: 5
    MaxFileSizeMB: 512
    MaxFileAgeMin: 1
    MessageSizeByte: 8192
    RespoolDelaySec: 10
    MaxMessagesSec: 100
    RevertStreamOnDrop: false

Statsd

This producer sends increment events to a statsd server.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
BatchMaxMessages
BatchMaxMessages defines the maximum number of messages to send per batch. By default this is set to 500.
BatchTimeoutSec
BatchTimeoutSec defines the number of seconds after which a batch is flushed automatically. By default this is set to 10.
Prefix
Prefix defines the prefix for stats metric names. By default this is set to “gollum.”.
Server
Server defines the server and port to send statsd metrics to. By default this is set to “localhost:8125”.
UseMessage
UseMessage defines whether to cast the message to string and increment the metric by that value. If this is set to true and the message fails to cast to an integer, then the message with be ignored. If this is set to false then each message will increment by 1. By default this is set to false.
StreamMapping
StreamMapping defines a translation from gollum stream to statsd metric name. If no mapping is given the gollum stream name is used as the metric name.
Example
- "producer.Statsd":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    BatchMaxMessages: 500
    BatchTimeoutSec: 10
    Prefix: "gollum."
    Server: "localhost:8125"
    UseMessage: false
    StreamMapping:
        "*" : "default"

Websocket

The websocket producer opens up a websocket. This producer does not implement a fuse breaker.

Parameters
Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
Address
Address defines the host and port to bind to. This is allowed be any ip address/dns and port like “localhost:5880”. By default this is set to “:81”.
Path
Path defines the url path to listen for. By default this is set to “/”.
ReadTimeoutSec
ReadTimeoutSec specifies the maximum duration in seconds before timing out read of the request. By default this is set to 3 seconds.
Example
- "producer.Websocket":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    Address: ":81"
    Path:    "/"
    ReadTimeoutSec: 3

Producers are plugins that transfer messages to external services. Data arrives in the form of messages and can be converted by using a formatter.

Filters

All

This filter passes all messages.

Any

This plugin blocks messages after a certain number of messages per second has been reached.

Parameters
AnyFilter
AnyFilter defines a list of filters that should be checked before dropping a message. Filters are checked in order, and if the message passes then no further filters are checked. By default this list is empty.
Example
- "stream.Broadcast":
    Filter: "filter.Any"
    AnyFilter:
        - "filter.JSON"
        - "filter.RegEx"

JSON

This plugin allows filtering of JSON messages by looking at certain fields. Note that this filter is quite expensive due to JSON marshaling and regexp testing of every message passing through it.

Parameters
FilterReject
FilterReject defines fields that will cause a message to be rejected if the given regular expression matches. Rejects are checked before Accepts. Field paths can be defined in a format accepted by shared.MarshalMap.Path.
FilterAccept
FilterAccept defines fields that will cause a message to be rejected if the given regular expression does not match. Field paths can be defined in a format accepted by shared.MarshalMap.Path.
Example
- "stream.Broadcast":
    Filter: "filter.JSON"
    FilterReject:
        "command" : "state\d\..*"
    FilterAccept:
        "args/results[0]value" : "true"
        "args/results[1]" : "true"
        "command" : "state\d\..*"

None

This plugin blocks all messages.

Rate

This plugin blocks messages after a certain number of messages per second has been reached.

Parameters
RateLimitPerSec
RateLimitPerSec defines the maximum number of messages per second allowed to pass through this filter. By default this is set to 100.
RateLimitDropToStream
RateLimitDropToStream is an optional stream messages are sent to when the limit is reached. By default this is disabled and set to “”.
RateLimitIgnore
RateLimitIgnore defines a list of streams that should not be affected by rate limiting. This is useful for e.g. producers listeing to “*”. By default this list is empty.
Example
- "stream.Broadcast":
    Filter: "filter.Rate"
    RateLimitPerSec: 100
    RateLimitDropToStream: ""
    RateLimitIgnore:
        - "foo"

RegExp

This plugin allows filtering messages using regular expressions.

Parameters
FilterExpression
FilterExpression defines the regular expression used for matching the message payload. If the expression matches, the message is passed. FilterExpression is evaluated after FilterExpressionNot.
FilterExpressionNot
FilterExpressionNot defines a negated regular expression used for matching the message payload. If the expression matches, the message is blocked. FilterExpressionNot is evaluated before FilterExpression.
Example
- "stream.Broadcast":
    Filter: "filter.RegExp"
    FilterExpression: "\d+-.*"
    FilterExpressionNot: "\d+-.*"

Sample

This plugin blocks messages after a certain number of messages per second has been reached.

Parameters
SampleRatePerGroup
SampleRatePerGroup defines how many messages are passed through the filter in each group. By default this is set to 1.
SampleGroupSize
SampleGroupSize defines how many messages make up a group. Messages over SampleRatePerGroup within a group are dropped. By default this is set to 1.
SampleDropToStream
SampleDropToStream is an optional stream messages are sent to when they are sampled. By default this is disabled and set to “”.
SampleRateIgnore
SampleRateIgnore defines a list of streams that should not be affected by sampling. This is useful for e.g. producers listeing to “*”. By default this list is empty.
Example
- "stream.Broadcast":
    Filter: "filter.Sample"
    SampleRatePerGroup: 1
    SampleGroupSize: 1
    SampleDropToStream: ""
    SampleRateIgnore:
        - "foo"

Stream

This plugin filters messages by stream based on a black and a whitelist. The blacklist is checked first.

Parameters
FilterBlockStreams
FilterBlockStreams sets a list of streams that are blocked. If a message’s stream is not in that list, the OnlyStreams list is tested. This list ist empty by default.
FilterOnlyStreams
FilterOnlyStreams sets a list of streams that may pass. Messages from streams that are not in this list are blocked unless the list is empty. By default this list is empty.
Example
- "stream.Broadcast":
    Filter: "filter.Stream"
    FilterBlockStreams:
        - "foo"
    FilterOnlyStreams:
        - "test1"
        - "test2"

Filters are plugins that are embedded into stream plugins. Filters can analyze messages and decide wether to let them pass to a producer. or to block them.

Formatters

Base64Decode

Base64Decode is a formatter that decodes a base64 message. If a message is not or only partly base64 encoded an error will be logged and the decoded part is returned. RFC 4648 is expected.

Parameters
Base64Dictionary
Base64Dictionary defines the 64-character base64 lookup dictionary to use. When left empty a dictionary as defined by RFC4648 is used. This is the default.
Base64DataFormatter
Base64DataFormatter defines a formatter that is applied before the base64 decoding takes place. By default this is set to “format.Forward” .
Example
- "stream.Broadcast":
    Formatter: "format.Base64Decode"
    Base64Formatter: "format.Forward"
    Base64Dictionary: "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz01234567890+/"

Base64Encode

Base64Encode is a formatter that encodes a message as base64.

Parameters
Base64Dictionary
Base64Dictionary defines the 64-character base64 lookup dictionary to use. When left empty a dictionary as defined by RFC4648 is used. This is the default.
Base64DataFormatter
Base64DataFormatter defines a formatter that is applied before the base64 encoding takes place. By default this is set to “format.Forward” .
Example
- "stream.Broadcast":
    Formatter: "format.Base64Encode"
    Base64Formatter: "format.Forward"
    Base64Dictionary: "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz01234567890+/"

Clear

Clear is a formatter that clears the message .

CollectdToInflux08

CollectdToInflux08 provides a transformation from collectd JSON data to InfluxDB 0.8.x compatible JSON data. Trailing and leading commas are removed from the Collectd message beforehand.

Parameters
CollectdToInfluxFormatter
CollectdToInfluxFormatter defines the formatter applied before the conversion from Collectd to InfluxDB. By default this is set to format.Forward.
Example
- "stream.Broadcast":
    Formatter: "format.CollectdToInflux08"
    CollectdToInfluxFormatter: "format.Forward"

CollectdToInflux09

CollectdToInflux09 provides a transformation from collectd JSON data to InfluxDB 0.9.x compatible JSON data. Trailing and leading commas are removed from the Collectd message beforehand.

Parameters
CollectdToInfluxFormatter
CollectdToInfluxFormatter defines the formatter applied before the conversion from Collectd to InfluxDB. By default this is set to format.Forward.
Example
- "stream.Broadcast":
    Formatter: "format.CollectdToInflux09"
    CollectdToInfluxFormatter: "format.Forward"

CollectdToInflux10

CollectdToInflux10 provides a transformation from collectd JSON data to InfluxDB 0.9.1+ compatible line protocol data. Trailing and leading commas are removed from the Collectd message beforehand.

Parameters
CollectdToInfluxFormatter
CollectdToInfluxFormatter defines the formatter applied before the conversion from Collectd to InfluxDB. By default this is set to format.Forward.
Example
- "stream.Broadcast":
    Formatter: "format.CollectdToInflux10"
    CollectdToInflux10Formatter: "format.Forward"

Envelope

Envelope is a formatter that allows prefixing and/or postfixing a message with configurable strings.

Parameters
EnvelopePrefix
EnvelopePrefix defines the message prefix. By default this is set to “”. Special characters like n r t will be transformed into the actual control characters.
EnvelopePostfix
EnvelopePostfix defines the message postfix. By default this is set to “n”. Special characters like n r t will be transformed into the actual control characters.
EnvelopeFormatter
EnvelopeFormatter defines the formatter for the data transferred as message. By default this is set to “format.Forward” .
Example
- "stream.Broadcast":
    Formatter: "format.Envelope"
    EnvelopeFormatter: "format.Forward"
    EnvelopePrefix: ""
    EnvelopePostfix: "\n"

ExtractJSON

ExtractJSON is a formatter that extracts a single value from a JSON message.

Parameters
ExtractJSONDataFormatter
ExtractJSONDataFormatter formatter that will be applied before the field is extracted. Set to format.Forward by default.
ExtractJSONField
ExtractJSONField defines the field to extract. This value is empty by default. If the field does not exist an empty string is returned.
ExtractJSONTrimValues
ExtractJSONTrimValues will trim whitspaces from the value if enabled. Enabled by default.
ExtractJSONPrecision
ExtractJSONPrecision defines the floating point precision of number values. By default this is set to 0 i.e. all decimal places will be omitted.
Example
- "stream.Broadcast":
    Formatter: "format.ExtractJSON"
    ExtractJSONdataFormatter: "format.Forward"
    ExtractJSONField: ""
    ExtractJSONTrimValues: true
    ExtractJSONPrecision: 0

Forward

Forward is a formatter that passes a message as is .

Hostname

Hostname is a formatter that prefixes a message with the hostname.

Parameters
HostnameDataFormatter
HostnameDataFormatter defines the formatter for the data transferred as message. By default this is set to “format.Envelope”.
HostnameSeparator
HostnameSeparator sets the separator character placed after the hostname. This is set to ” ” by default.
Example
- "stream.Broadcast":
    Formatter: "format.Hostname"
    HostnameFormatter: "format.Envelope"
    HostnameSeparator: " "

Identifier

Identifier is a formatter that will generate a (mostly) unique 64 bit identifier number from the message timestamp and sequence number. The message payload will not be encoded.

Parameters
IdentifierType

IdentifierType defines the algorithm used to generate the message id. This my be one of the following: “hash”, “time”, “seq”, “seqhex”. By default this is set to “time”.

  • When using “hash” the message payload will be hashed using fnv1a and returned as hex.
  • When using “time” the id will be formatted YYMMDDHHmmSSxxxxxxx where x denotes the sequence number modulo 10000000. I.e. 10mil messages per second are possible before there is a collision.
  • When using “seq” the id will be returned as the integer representation of the sequence number.
  • When using “seqhex” the id will be returned as the hex representation of the sequence number.
IdentifierDataFormatter
IdentifierDataFormatter defines the formatter for the data that is used to build the identifier from. By default this is set to “format.Forward” .
Example
- "stream.Broadcast":
    Formatter: "format.Identifier"
    IdentifierType: "hash"
    IdentifierDataFormatter: "format.Forward"

JSON

JSON is a formatter that passes a message encapsulated as JSON in the form {“message”:”…”}. The actual message is formatted by a nested formatter and HTML escaped.

Parameters
JSONStartState
JSONStartState defines the initial parser state when parsing a message. By default this is set to “” which will fall back to the first state used in the JSONDirectives array.
JSONTimestampRead
JSONTimestampRead defines the go timestamp format expected from fields that are parsed as “dat”. When JSONUnixTimestampRead is not set, this is set to “20060102150405” by default.
JSONUnixTimestampRead
JSONUnixTimestampRead defines the unix timestamp format expected from fields that are parsed as “dat”. May be “s”, “ms”, or “ns”, and only accepts integer values. When JSONTimestampRead is set, this is ignored.
JSONTimestampWrite
JSONTimestampWrite defines the go timestamp format that “dat” fields will be converted to. By default this is set to “2006-01-02 15:04:05 MST”.
JSONDirectives
JSONDirectives defines an array of parser directives. This setting is mandatory and has no default value. Each string must be of the following format: “State:Token:NextState:Flags:Function”. Spaces will be stripped from all fields but Token. If a fields requires a colon it has to be escaped with a backslash. Other escape characters supported are n, r and t.
Flags
Flags (JSONDirectives) can be a comma separated set of the following flags.
  • continue -> Prepend the token to the next match.
  • append -> Append the token to the current match and continue reading.
  • include -> Append the token to the current match.
  • push -> Push the current state to the stack.
  • pop -> Pop the stack and use the returned state if possible.
Function
Function (JSONDirectives) can hold one of the following names.
  • key -> Write the current match as a key.
  • val -> Write the current match as a value without quotes.
  • esc -> Write the current match as a escaped string value.
  • dat -> Write the current match as a timestamp value.
  • arr -> Start a new array.
  • obj -> Start a new object.
  • end -> Close an array or object.
  • arr+val -> arr followed by val.
  • arr+esc -> arr followed by esc.
  • arr+dat -> arr followed by dat.
  • val+end -> val followed by end.
  • esc+end -> esc followed by end.
  • dat+end -> dat followed by end.
Rules
Rules for storage (JSONDirectives): if a value is written without a previous key write, a key will be auto generated from the current parser state name. This does not happen when inside an array. If key is written without a previous value write, a null value will be written. This does not happen after an object has been started. A key write inside an array will cause the array to be closed. If the array is nested, all arrays will be closed.
Example
- "stream.Broadcast":
    Formatter: "format.JSON"
    JSONStartState: "findKey"
    JSONDirectives:
            - 'findKey :":  key     ::'
            - 'findKey :}:          : pop  : end'
            - 'key     :":  findVal :      : key'
            - 'findVal :\:: value   ::'

ProcessJSON

ProcessJSON is a formatter that allows modifications to fields of a given JSON message. The message is modified and returned again as JSON.

Parameters
ProcessJSONDataFormatter
ProcessJSONDataFormatter formatter that will be applied before ProcessJSONDirectives are processed.
ProcessJSONGeoIPFile
ProcessJSONGeoIPFile defines a GeoIP file to load. This enables the “geoip” directive. If no file is loaded IPs will not be resolved. Files can be found e.g. at http://dev.maxmind.com/geoip/geoip2/geolite2/.
ProcessJSONDirectives

ProcessJSONDirectives defines the action to be applied to the json payload. Directives are processed in order of appearance. The directives have to be given in the form of key:operation:parameters, where operation can be one of the following.

  • split:<string>{:<key>:<key>:…} Split the value by a string and set the resulting array elements to the given fields in order of appearance.
  • replace:<old>:<new> replace a given string in the value with a new one * trim:<characters> remove the given characters (not string!) from the start and end of the value * rename:<old>:<new> rename a given field * remove{:<string>:<string>…} remove a given field. If additional parameters are given, an array is expected. Strings given as additional parameters will be removed from that array * pick:<key>:<index>:<name> Pick a specific index from an array and store it in a new field.
  • time:<read>:<write> read a timestamp and transform it into another format * unixtimestamp:<read>:<write> read a unix timestamp and transform it into another format. valid read formats are s, ms, and ns.
  • flatten{:<delimiter>} create new fields from the values in field, with new fields named field + delimiter + subfield. Delimiter defaults to “.”. Removes the original field.
  • agent:<key>{:<field>:<field>:…} Parse the value as a user agent string and extract the given fields into <key>_<field>. (“ua:agent:browser:os” would create the new fields “ua_browser” and “ua_os”). Possible values are: “mozilla”, “platform”, “os”, “localization”, “engine”, “engine_version”, “browser”, “version”.
  • ip Parse the field as an array of strings and remove all values that cannot be parsed as a valid IP. Single-string fields are supported, too, but will be converted to an array.
  • geoip:{<field>:<field>:…} like agent this directive will analyse an IP string via geoip and produce new fields. Possible values are: “country”, “city”, “continent”, “timezone”, “proxy”, “location”.
ProcessJSONTrimValues
ProcessJSONTrimValues will trim whitspaces from all values if enabled. Enabled by default.
Example
- "stream.Broadcast":
    Formatter: "format.ProcessJSON"
    ProcessJSONDataFormatter: "format.Forward"
    ProcessJSONGeoIPFile: ""
    ProcessJSONDirectives:
        - "host:split: :host:@timestamp"
        - "@timestamp:time:20060102150405:2006-01-02 15\\:04\\:05"
        - "error:replace:°:\n"
        - "text:trim: \t"
        - "foo:rename:bar"
        - "foobar:remove"
        - "array:pick:0:firstOfArray"
        - "array:remove:foobar"
        - "user_agent:agent:browser:os:version"
        - "client:geoip:country:city:timezone:location"
    ProcessJSONTrimValues: true

ProcessTSV

ProcessTSV is a formatter that allows modifications to fields of a given TSV message. The message is modified and returned again as TSV.

Parameters
ProcessTSVDataFormatter
ProcessTSVDataFormatter formatter that will be applied before ProcessTSVDirectives are processed.
ProcessTSVDirectives

ProcessTSVDirectives defines the action to be applied to the tsv payload. Directives are processed in order of appearance. The directives have to be given in the form of key:operation:parameters, where operation can be one of the following.

  • replace:<old>:<new> replace a given string in the value with a new one.
  • prefix:<string> add a given string to the start of the value.
  • postfix:<string> add a given string to the end of the value.
  • trim:<characters> remove the given characters (not string!) from the start and end of the value.
  • quote add a ” to the start and end of the value after processing.
  • timestamp:<read>:<write> read a timestamp and transform it into another format.
  • remove remove the value.
  • agent{:<user_agent_field>:<user_agent_field>:…} Parse the value as a user agent string and extract the given fields into <key>_<user_agent_field> (“ua:agent:browser:os” would create the new fields “ua_browser” and “ua_os”).
ProcessTSVDelimiter
ProcessTSVDelimiter defines what value separator to split on. Defaults to tabs.
ProcessTSVQuotedValue
ProcessTSVQuotedValue defines if a value that starts and ends with ” may contain ProcessTSVDelimiter without being split. Default is false.
Example
- "stream.Broadcast":
    Formatter: "format.processTSV"
    ProcessTSVDataFormatter: "format.Forward"
    ProcessTSVDelimiter: '\t'
    ProcessTSVQuotedValues: false
    ProcessTSVDirectives:
        - "0:time:20060102150405:2006-01-02 15\\:04\\:05"
        - "3:replace:°:\n"
        - "6:prefix:0."
        - "6:postfix:000"
        - "7:trim: "
        - "10:quote"
        - "11:remove"
        - "11:agent:browser:os:version"

Runlength

Runlength is a formatter that prepends the length of the message, followed by a “:”. The actual message is formatted by a nested formatter.

Parameters
RunlengthSeparator
RunlengthSeparator sets the separator character placed after the runlength. This is set to “:” by default.
RunlengthDataFormatter
RunlengthDataFormatter defines the formatter for the data transferred as message. By default this is set to “format.Forward” .
Example
- "stream.Broadcast":
    Formatter: "format.Runlength"
    RunlengthSeparator: ":"
    RunlengthFormatter: "format.Envelope"

Sequence

Sequence is a formatter that allows prefixing a message with the message’s sequence number .

Parameters
SequenceSeparator
SequenceSeparator sets the separator character placed after the sequence number. This is set to “:” by default.
SequenceDataFormatter
SequenceDataFormatter defines the formatter for the data transferred as message. By default this is set to “format.Forward” .
Example
- "stream.Broadcast":
    Formatter: "format.Sequence"
    SequenceFormatter: "format.Envelope"
    SequenceSeparator: ":"

Serialize

Serialize is a formatter that serializes a message for later retrieval.

Parameters
SerializeFormatter
SerializeFormatter defines the formatter for the data transferred as message. By default this is set to “format.Forward”.
SerializeStringEncode
SerializeStringEncode causes the serialized data to be base64 encoded and newline separated. This is enabled by default.
Example
- "stream.Broadcast":
    Formatter: "format.Serialize"
    SerializeFormatter: "format.Envelope"
    SerializeStringEncode: true

SplitToJSON

SplitToJSON is a formatter that splits a message by a given token and puts the result into a JSON object by using an array based mapping .

Parameters
SplitToJSONDataFormatter
SplitToJSONDataFormatter defines the formatter to apply before executing this formatter. Set to “format.Forward” by default.
SplitToJSONToken
SplitToJSONToken defines the separator character to use when processing a message. By default this is set to “|”.
SplitToJSONKeepJSON
SplitToJSONKeepJSON can be set to false to escape texts that are JSON payloads as regualar strings. Otherwise JSON payload will be taken as-is and set to the corresponding key. By default set to “true”.
SplitToJSONKeys
SplitToJSONKeys defines an array of keys to apply to the tokens generated by splitting a message by SplitToJSONToken. The keys listed here are applied to the resulting token array by index. This list is empty by default.
Example
- "stream.Broadcast":
    Formatter: "format.SplitToJSON"
    SplitToJSONDataFormatter: "format.Forward"
    SplitToJSONToken: "|"
    SplitToJSONKeys:
        - "timestamp"
        - "server"
        - "error"

StreamName

StreamName is a formatter that prefixes a message with the StreamName.

Parameters
StreamNameFormatter
StreamNameFormatter defines the formatter for the data transferred as message. By default this is set to “format.Envelope”.
StreamNameHistory
StreamNameHistory can be set to true to not use the current but the previous stream name. This can be useful to e.g. get the name of the stream messages were dropped from. By default this is set to false.
StreamNameSeparator
StreamNameSeparator sets the separator character placed after the stream name. This is set to ” ” by default.
Example
- "stream.Broadcast":
    Formatter: "format.StreamName"
    StreamNameFormatter: "format.Envelope"
    StreamNameSeparator: " "
    StreamNameHistory: false

StreamRevert

StreamRevert is a formatter that recovers the last used stream from a message and sets it as a new target stream. Streams change whenever the Stream.Route or Message.Route function is used. This e.g. happens after a Drop call.

Parameters
StreamRevertFormatter
StreamRevertFormatter defines the formatter applied after reading the stream. This formatter is applied to the data after StreamRevertDelimiter. By default this is set to “format.Forward” .
Example
- "stream.Broadcast":
    Formatter: "format.StreamRevert"
    StreamRevertFormatter: "format.Forward"

StreamRoute

StreamRoute is a formatter that modifies a message’s stream by reading a prefix from the message’s data (and discarding it). The prefix is defined by everything before a given delimiter in the message. If no delimiter is found or the prefix is empty the message stream is not changed.

Parameters
StreamRouteFormatter
StreamRouteFormatter defines the formatter applied after reading the stream. This formatter is applied to the data after StreamRouteDelimiter. By default this is set to “format.Forward”.
StreamRouteStreamFormatter
StreamRouteStreamFormatter is used when StreamRouteFormatStream is set to true. By default this is the same value as StreamRouteFormatter.
StreamRouteDelimiter
StreamRouteDelimiter defines the delimiter to search when extracting the stream name. By default this is set to “:”.
StreamRouteFormatStream
StreamRouteFormatStream can be set to true to apply StreamRouteFormatter to both parts of the message (stream and data). Set to false by default.
Example
- "stream.Broadcast":
    Formatter: "format.StreamRoute"
    StreamRouteFormatter: "format.Forward"
    StreamRouteStreamFormatter: "format.Forward"
    StreamRouteDelimiter: ":"
    StreamRouteFormatStream: false

TemplateJSON

TemplateJSON is a formatter that evaluates a text template with an input of a JSON message.

Parameters
TemplateJSONFormatter
TemplateJSONFormatter formatter that will be applied before the field is templated. Set to format.Forward by default.
TemplateJSONTemplate
TemplateJSONTemplate defines the template to execute with text/template. This value is empty by default. If the template fails to execute the output of TemplateJSONFormatter is returned.
Example
- "stream.Broadcast":
    Formatter: "format.TemplateJSON"
    TemplateJSONFormatter: "format.Forward"
    TemplateJSONTemplate: ""

Timestamp

Timestamp is a formatter that allows prefixing a message with a timestamp (time of arrival at gollum) as well as postfixing it with a delimiter string.

Parameters
Timestamp
Timestamp defines a Go time format string that is used to format the actual timestamp that prefixes the message. By default this is set to “2006-01-02 15:04:05 MST | “.
TimestampFormatter
TimestampFormatter defines the formatter for the data transferred as message. By default this is set to “format.Forward” .
Example
- "stream.Broadcast":
    Formatter: "format.Timestamp"
    TimestampFormatter: "format.Envelope"
    Timestamp: "2006-01-02T15:04:05.000 MST | "

Formatters are plugins that are embedded into streams or producers. Formatters can convert messages into another format or append additional information.

Examples

Configuration

Hello World

This example sets up a simple console consumer and producer that will simply echo everything you type back to the console. As messages have no new line appended by default a Envelope formatter is used to add one before writing to console. Make sure to start Gollum with gollum -ll 3 to see all log messages.

- "consumer.Console":
    Stream: "console"

- "producer.Console":
    Formatter: "format.Envelope"
    Stream:
        - "*"
        - "_GOLLUM_"
Hello World XML

This example extends the Hello World example by introducing a stream configuration. All messages from the console consumer will be enveloped into a XML tag, while the log messages are not. Make sure to start Gollum with gollum -ll 3 to see all log messages.

- "consumer.Console":
    Stream: "console"

- "stream.Broadcast":
    Formatter: "format.Envelope"
    EnvelopePrefix: "<message>"
    EnvelopePostfix: "</message>"
    Stream: "console"

- "producer.Console":
    Formatter: "format.Envelope"
    Stream:
        - "*"
        - "_GOLLUM_"
Hello World filtered

This example extends the previous exmaples by setting up a filter to only echo sentences that end with the word “gollum”. A regular expression filter is used to achieve this. Note that this filter does not apply to standard log messages. Make sure to start Gollum with gollum -ll 3 to see all log messages.

- "consumer.Console":
    Stream: "console"

- "stream.Broadcast":
    Filter: "filter.RegExp"
    FilterExpression: "gollum$"
    Stream: "console"

- "producer.Console":
    Formatter: "format.Envelope"
    Stream:
        - "*"
        - "_GOLLUM_"
Hello World splitter

This example extends the Hello World example by introducing another stream configuration. This time we will print the console output twice, encoded as XML and as JSON. Make sure to start Gollum with gollum -ll 3 to see all log messages.

- "consumer.Console":
    Stream:
        - "consoleXML"
        - "consoleJSON"

- "stream.Broadcast":
    Formatter: "format.Envelope"
    EnvelopePrefix: "<message>"
    EnvelopePostfix: "</message>"
    Stream: "consoleXML"

- "stream.Broadcast":
    Formatter: "format.Envelope"
    EnvelopePrefix: "{message:\""
    EnvelopePostfix: "\"}"
    Stream: "consoleJSON"

- "producer.Console":
    Formatter: "format.Envelope"
    Stream:
        - "*"
        - "_GOLLUM_"
Chat server

This example requires two Gollum instances to run. The first one acts as the “chat client” while the second one acts as the “chat server”. Messages entered on the client will be sent to the server using runlength encoding where they are written to a log file and to console. The logfile will write a standard timestamp before each message while the console will just print the message. Both servers have a standard console producer attached to print log messages to console aswell. Make sure to start Gollum with gollum -ll 3 to see all log messages.

Client

- "consumer.Console":
    Stream: "client"

- "producer.Socket":
    Address: ":5880"
    Formatter: "format.Runlength"
    Acknowledge: "OK"
    Stream: "client"

- "producer.Console":
    Formatter: "format.Envelope"
    Stream: "_GOLLUM_"

Server

- "consumer.Socket":
    Acknowledge: "OK"
    Partitioner: "ascii"
    Delimiter: ":"
    Address: ":5880"
    Stream: "server"

- "producer.File":
    Formatter: "format.Timestamp"
    TimestampFormatter: "format.Envelope"
    File: "chat.log"
    Stream: "server"

- "producer.Console":
    Formatter: "format.Envelope"
    Stream:
      - "*"
      - "_GOLLUM_"
Proxy

This configuration will set up a simple proxy for protocols that separate messages by newlines. This works well for e.g. basic redis traffic. Make sure to start Gollum with gollum -ll 3 to see all log messages.

- "consumer.Proxy":
    Address: "localhost:5880"
    Partitioner: "delimiter"
    Delimiter: "\r\n"
    Stream: "redis"

- "producer.Proxy":
    Address: "localhost:6379"
    ConnectionBufferSizeKB: 64
    Partitioner: "delimiter"
    Delimiter: "\r\n"
    Stream: "redis"

Note that the standard proxy consumer and producer cannot react on details implied by a specific protocol. While this does work for simple protocols it will have problems with more complex protocols like http. In that case it is advisable to use or write a proxy plugin for this specific protocol.

Fuse

This configuration introduces a fuse to close the consumer connection if something goes wrong on the producer side. Fuses work in a broadcasting manner, i.e. one producer “breaks” a fuse and multiple consumers may react on this. The reasons for breaking a fuse may either be a dropped connection or a blocking consumer. Different consumers may react differently on “broken” fuses, depending on their context. In our case the incoming socket will be closed.

- "consumer.Socket":
    Stream: "forward"
    Fuse: "socket"
    Address: "127.0.0.1:5880"
    Acknowledge: "OK"

- "producer.Socket":
    Stream: "forward"
    Fuse: "socket"
    Address: "unix://test/test.socket"
    BatchTimeoutSec: 1
    Acknowledge: "OK"
Profiling

This configuration will test Gollum for its theoretic maximum message throughput. You can of course modify this example to test e.g. file producer performance. Make sure to start Gollum with gollum -ll 3 -ps to see all log messages as well as intermediate profiling results.

- "consumer.Profiler":
    Runs: 100000
    Batches: 100
    Characters: "abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUFVXYZ 0123456789 .,!;:-_"
    Message: "%256s"
    Stream: "profile"

- "producer.Null":
    Stream: "profile"

- "producer.Console":
    Formatter: "format.Envelope"
    Stream: "_GOLLUM_"

Writing plugins

When starting to write a plugin its probably a good idea to have a look at already existing plugins. A good starting point is the console plugin as it is very lightweight. If you plan to write a special purpose plugin you should place it into “contrib/yourCompanyName”. Plugins that can be used for general purpose should be placed into the main package folders like “consumer” or “producer”.

To enable a contrib plugin you will need to extend the file “contrib/loader.go”. Add an anonymous import to the list of imports like this:

import (
  _ "./yourCompanyName"                                 // this is ok for local extensions
  _ "github.com/trivago/gollum/contrib/yourCompanyName" // if you plan to contribute
)
Configuration

All plugins have to implement the “core/Plugin” interface. This interface requires a type to implement the Configure method which can be used to read data from the config file passed to Gollum. To make it possible for Gollum to instantiate an instance of your plugin by name it has to be registered. This should be done by adding a line to the init() method of the file.

import (
  "github.com/trivago/gollum/core"
  "github.com/trivago/gollum/shared"
)

struct MyPlugin type {
}

func init() {
  shared.TypeRegistry.Register(MyPlugin{}) // Register the new plugin type
}

func (cons *MyPlugin) Configure(conf core.PluginConfig) error {
  // ... read custom options ...
}

The configure method is also called when just testing the configuration via gollum -tc. As of this, this function should never open any sockets or other kind of resources. This should be done when a plugin is explicitly started so that proper closing of resources is assured, too.

If your plugins derives from aother plugin it is advisable to call Configure() of the base type before checking your configuration options. There are several convenience functions in the PluginConfig type that makes it easy to obtain configuration values and setting default values. Please refer to Gollum’s GoDoc API documentation for more details on this.

func (plugin *MyPlugin) Configure(conf core.PluginConfig) error {
  err := prod.MyPluginBase.Configure(conf)
  if err != nil {
    return err
  }
  // ... read custom options ...
  return nil
}
Configuring nested plugins

Some plugins may want to configure “nested” plugins such as a formatter or filter. The plugins can be instantiated by using the type registry and passing the config passed to the Configure method.

func (plugin *MyPlugin) Configure(conf core.PluginConfig) error {
  formatter, err := core.NewPluginWithType(conf.GetString("Formatter", "format.Forward"), conf)
  if err != nil {
    return err // ### return, plugin load error ###
  }
  // ... do something with your formatter ...
  return nil
}

Writing consumers

When writing a new consumer it is advisable to have a look at existing consumers. A good starting point are the Console and File consumers.

Requirements

All consumers have to implement the “core/Consumer” as well as the “core/Plugin” interface. The most convenient way to do this is to derive from the “core/ConsumerBase” type as it will provide implementations of the most common methods required. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.

ConsumerBase

Consumers deriving from “core/ConsumerBase” have to implement the “Consume” method from the “core/Consumer” interface. In addition to that most plugins might also want to overload the “Configure” function from the “core/Plugin” interface.

The Consume() function will be called as a separate go routine and should do two things. 1. Listen to the control channel 2. Process incoming data

As Consume() is called as a separate go routine you can decide wether to spawn additional go routines to handle both tasks or to let Consume() handle everything. ConsumerBase gives you two convenience loop functions to handle control commands:

ControlLoop
Will loop until a stop is received and can trigger a callback if a log rotation is requested (SIG_HUP is sent). The log rotation callback cane be set e.g. in the Configure method by using the SetRollBack function. Other possible callbacks functions are SetPrepareStopCallback and SetStopCallback.
TickerControlLoop
Gives you an additional callback that is triggered in regular intervals.

Both loops only cover control message handling and are blocking calls. As of their blocking nature you will probably want to spawn a separate go routine handling incoming messages when using these loops.

It is highly recommended to use at least one of these functions in your plugin implementation. By doing this you can be sure that changes to message streaming and control handling are automatically used by your plugin after a Gollum update.

A typical consume function will look like this:

func (cons *MyConsumer) Configure(conf core.PluginConfig) error {
      cons.SetStopCallback(cons.close) // Register close to the control message handler
}

func (cons *MyConsumer) close() {
  cons.WorkerDone()
}

func (cons *MyConsumer) Consume(workers *sync.WaitGroup) {
  cons.AddMainWorker(workers) // New go routine = new worker
  go cons.readData()          // Run until close is called
  cons.ControlLoop()          // Blocks
}

As we want to run a new go routine we also add a new worker. As this is the first worker we use AddMainWorker(). Additional workers can be added by using AddWorker(). This enables the shutdown routine to wait until all consumers have properly stopped. However - to avoid a hang during shutdown, make sure that all workers added are properly closed during the shutdown sequence.

After we made sure all workers are registered, the core function readData() is called as a separate go routine. This is necessary as the ControlLoop will block Consume() until a shutdown is requested. When a stop control message is received, the StopCallback is executed. You can use this callback to signal your readData function to stop or you can check the pluginState inside your readData function. The pluginState will switch to PluginStateStopping after a stop control has been triggered.

Configuration

If your consumer requires additonal configuration options you should implement the Configure method. Please refer to the Plugin documentation for further details.

Sending messages

Messages can be sent by using either the Enqueue() or EnqueueCopy() method. Both function will make sure that the message is sent to all streams and the correct stream ID is set. The function Enqueue() will reference the data you pass to it, while EnqueueCopy() will copy the data to the new message. The latter will allow you to e.g. safely recycle internal buffers without changing messages that are not processed by all producers, yet.

Both methods expect a sequence number to be passed. This sequence number is meant to be a runtime unique ID that may allow future checks on duplicate messages. The most common sequence number is an incrementing 64-bit integer.

func (cons *MyConsumer) readData() {
  var data []byte
  for cons.IsActive() {
    // ... read data
    cons.Enqueue(data, cons.sequence) // This call may block
    cons.sequence++                   // Increment your sequence number
  }
}
Writing bare bone consumers

Sometimes it might be useful not to derive from ConsumerBase. If you decide to go this way please have a look at Gollum’s GoDoc API documentation as well as the source of ConsumerBase.

Writing producers

When writing a new producer it is advisable to have a look at existing producers. A good starting point are the Console and File producers.

Requirements

All producers have to implement the “core/Producer” as well as the “core/Plugin” interface. The most convenient way to do this is to derive from the “core/ProducerBase” type as it will provide implementations of the most common methods required. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.

ProducerBase

Producers deriving from core/ProducerBase have to implement the “Produce” method from the “core/Producer” interface. In addition to that most plugins might also want to overload the “Configure” function from the “core/Plugin” interface.

The Produce() function will be called as a separate go routine and should provide two things. 1. Listen to the control channel 2. Listen to incoming messages

As Produce() is called as a separate go routine you can decide wether to spawn additional go routines to handle both tasks or to let Produce() handle everything. ProducerBase gives you three convenience loop functions to handle control commands:

ControlLoop
Will only listen to control messages and trigger the corresponding callbacks that can be registered during Configure. Stop control messages will cause this loop to end.
MessageControlLoop
In addition to the functionality of ControlLoop this will also check for incoming messages. Messages from the internal message channel are passed to the given message handler. The log rotation callback can be set e.g. in the Configure method by using the SetRollBack function. Other possible callbacks functions are SetPrepareStopCallback and SetStopCallback.
TickerMessageControlLoop
Gives you an additional callback that is triggered in regular intervals.

It is highly recommended to use at least one of these functions in your plugin implementation. By doing this you can be sure that changes to message streaming and control handling are automatically used by your plugin after a Gollum update. A typical produce function will look like this:

func (prod *MyProducer) close() {
  prod.CloseMessageChannel(prod.processData) // Close the internal channel and flush remaining messages
  prod.WorkerDone()                          // Signal that we're done now
}

func (prod *MyProducer) Configure(conf core.PluginConfig) error {
  prod.SetStopCallback(prod.close)  // Call close upon shutdown
  prod.SetRollCallback(prod.onRoll) // Call onRoll when SIG_HUP is sent to the process
}

func (prod *MyProducer) processData(msg core.Message) {
  // Do something with the message
}

func (prod *MyProducer) Produce(workers *sync.WaitGroup) {
  prod.AddMainWorker(workers)
  prod.MessageControlLoop(prod.processData)
}

The framework will call the registered StopCallback function when the control loop receives a stop. As the shutdown procedure needs to wait until all messages from this producers have been sent (to avoid data loss) at least one worker should always be registered. The shutdown procedure will wait until all producer workers have finished before exiting. As of this you have to make sure that all AddWorker calls are followed by a WorkerDone() call during shutdown. If this does not happen the shutdown procedure will block. If your producer sends messages to other producers you can manually set dependencies between receiving producers and this producer by using StreamRegistry.LinkDependencies. DropStream dependencies are automatically added during startup.

Configuration

If your producer requires additonal configuration options you should implement the Configure method. Please refer to the Plugin documentation for further details.

Working with slow services

Messages are passed to the producer one-by-one. Certain services however might perform better when messages are not sent one-by-one but as a batch of messages. Gollum gives you several tools to handle these kind of message batches. A good example for this is the socket producer. This producer takes advantage of the “core/MessageBatch” type. This allows storing messages in a double-buffered queue and provides callback based methods to flush the queue asynchronously. The following code illustrates a best practice approach on how to use the MessageBatch. You may of course change details if required.

buffer := NewMessageBatch(8192)                // Hold up to 8192*2 messages (front and backbuffer)

for {
  // Append the given message
  // - If the buffer is full call the sendBatch method and wait for flush
  // - If the producers is not active or if it is shutting down pass the message to prod.Drop

  buffer.AppendOrFlush(message, prod.sendBatch, prod.IsActiveOrStopping, prod.Drop)

  // ...

  if buffer.ReachedSizeThreshold(2048) {       // Check if at least 2 KB have been written
    buffer.Flush(prod.sendBatch)               // Send all buffered messages via sendBatch
    buffer.WaitForFlush()                      // Wait until done
  }
}
Filtering messages

Producers are able to filter messages like streams do, too. In contrast to streams messages are filtered before they are send to the internal message channel, i.e. before formatting. As formatting is an implementation detail (and may also not happen) a plugin that needs filtering after formatting has too implement it by itself.

Formatting messages

Messages are not automatically formatted when passed to the producer. If you wish to enable producer based formatting you need to call ProducerBase.Format() at an appropriate point inside your plugin. All producers deriving from ProducerBase - and that have called ProducerBase.Configure() - may have a formatter set and should thus provide this possibility.

Writing bare bone producers

Sometimes it might be useful not to derive from ProducerBase. An example for this is the Null producer which is extremely lightweight. If you decide to go this way please have a look at Gollum’s GoDoc API documentation as well as the source of ConsumerBase.

Writing filters

Requirements

All filters have to implement the “core/Filter” as well as the “core/Plugin” interface. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.

Attention

Filters are called in a multithreaded context, so you have to make sure that any internal state is secured by either a mutex or by using atomic functions.

Filtering messages

The Accept method is fairly simple to implement. If the methods returns true the message is passed. If the method returns false the message is rejected. You can inspect the message in question from the parameter passed to the accept method. The following example filter will reject all messages that have no content:

func (filter *MyFilter) Accepts(msg core.Message) bool {
  return len(msg.Data) > 0
}

Writing formatters

Requirements

All filters have to implement the “core/Formatter” as well as the “core/Plugin” interface. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.

Attention

Formatters are called in a multithreaded context, so you have to make sure that any internal state is secured by either a mutex or by using atomic functions.

Transforming messages

The Format method is fairly simple to implement. It accepts the message to modify and returns the new content plus the stream the message should be sent to. The message itself cannot be changed directly. The following example adds a newline to each message:

func (format *MyFormatter) Format(msg core.Message) ([]byte, core.MessageStreamID) {
  return append(msg.Data, '\n'), msg.StreamID
}

Writing streams

When writing a new stream it is advisable to have a look at existing streams. A good starting point is the Random stream.

Requirements

All streams have to implement the “core/Stream” as well as the “core/Plugin” interface. The most convenient way to do this is to derive from the “core/StreamBase” type as it will provide implementations of the most common methods required as well as message metrics. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.

StreamBase

Streams deriving from “core/StreamBase” have to implement a custom method that has to be hooked to the “Distribute” callback during Configure(). This allows StreamBase to check and format the message before actually distributing it. In addition to that a message count metric is updated. The following example implements a stream that sends messages only to the first producer in the list.

func (stream *MyStream) myDistribute() {
  stream.StreamBase.Producers[0].Enqueue(msg)
}

func (stream *MyStream) Configure(conf core.PluginConfig) {
  if err := stream.StreamBase.Configure(conf); err != nil {
    return err
  }
  stream.StreamBase.Distribute = stream.myDistribute
  return nil
}
Sending messages

Messages are sent directly to a producer by calling the Enqueue method. This call may block as either the underlying channel is filled up completely or the producer plugin implemented Enqueue as a blocking method.

Streams that derive from StreamBase may also by paused. In that case messages are not passed to the custom distributor function but to a temporary function. These messages will be sent to the custom distributor function after the stream is resumed. A Pause() call is normally done from producers that encounter a connection loss or an unavailable resource in general.

License

This project is released under the terms of the Apache 2.0 license.

Copyright 2015-2016 trivago GmbH

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.