Welcome to Gollum’s documentation!

_images/gollum.png

What is Gollum?

Gollum is an n:m multiplexer that gathers messages from different sources and broadcasts them to a set of destinations.

Gollum originally started as a tool to MUL-tiplex LOG-files (read it backwards to get the name). It quickly evolved to a one-way router for all kinds of messages, not limited to just logs. Gollum is written in Go to make it scalable and easy to extend without the need to use a scripting language.

Terminology

The main components of Gollum are consumers, streams and producers. To explain these it helps imagineing to look at Gollum “from the outside”.

Message:A single set of data passing over a stream is called a message.
Metadata:A optional part of messages. These can contain key/value pairs with additional information or content.
Stream:A stream defines a path between one or more consumers, routers and producers.
Consumer:The consumer create messages by “consuming” a specific data source. This can be everything like files, ports, external services and so on.
Producer:The producer processed receiving message and “produce” something with it. That can be writing to files or ports, sending to external services and so on.
Router:The router get and forward messages from specific source- to target-stream(s).
Modulator:A modulator can be a Filter or Formatter which “modulates” a message.
Formatter:A formatter can modulate the payload of a message like convert a plain-text to JSON.
Filter:A filter can inspect a message to decide wether to drop the message or to let it pass.
_images/flow_800w.png

These main components, consumers, routers, producers, filters and formatters are build upon a plugin architecture. This allows each component to be exchanged and configured individually with a different sets of options.

Configuration

A Gollum configuration file is written in YAML and may contain any number of plugins. Multiple plugins of the same type are possible, too. The Gollum core does not make any assumption over the type of data you are processing. Plugins however may do that. So it is up to the person configuring Gollum to ensure valid data is passed from consumers to producers. Formatters can help to achieve this.

Table of contents

Instructions

Installation

Latest Release

You can download a compressed pre-compiled binary from github releases:

# linux bases example
curl -L https://github.com/trivago/gollum/releases/download/v0.4.5/gollum-0.4.5-Linux_x64.zip -o gollum.zip
unzip -o gollum.zip
chmod 0755 gollum

./gollum --help
From source

Installation from source requires the installation of the go toolchain.

Gollum need a least go version 1.7 or higher and supports the Go 1.5 vendor experiment that is automatically enabled when using the provided makefile. With Go 1.7 and later you can also use go build directly without additional modifications. Builds with Go 1.6 or earlier versions are not officially supported and might require additional steps and modifications.

# checkout
mkdir -p $(GOPATH)/src/github.com/trivago
cd $(GOPATH)/src/github.com/trivago
git clone git@github.com:trivago/gollum.git
cd gollum

# run tests and compile
make test
./gollum --help

You can use the make file coming with gollum to trigger cross platform builds. Make will produce ready to deploy .zip files with the corresponding platform builds inside the dist folder.

Build

Building gollum is as easy as make or go build. If you want to do cross platform builds use make all or specify one of the following platforms instead of “all”:

current:build for current OS (default)
freebsd:build for FreeBSD
linux:build for Linux x64
mac:build for MacOS X
pi:build for Linux ARM
win:build for Windows
debug:build for current OS with debug compiler flags
clean:clean all artifacts created by the build process
Docker

The repository contains a Dockerfile which enables you to build and run gollum inside a Docker container.

docker build -t trivago/gollum .
docker run -it --rm trivago/gollum -c config/profile.conf -ps -ll 3

To use your own configuration you could run:

docker run -it --rm -v /path/to/config.conf:/etc/gollum/gollum.conf:ro trivago/gollum -c /etc/gollum/gollum.conf

Usage

Commandline

Gollum goes into an infinte loop once started. You can shutdown gollum by sending a SIG_INT, i.e. Ctrl+C, SIG_TERM or SIG_KILL.

Gollum has several commandline options that can be accessed by starting Gollum without any paramters:

-h, -help Print this help message.
-v, -version Print version information and quit.
-r, -runtime Print runtime information and quit.
-l, -list Print plugin information and quit.
-c, -config Use a given configuration file.
-tc, -testconfig
 Test the given configuration file and exit.
-ll, -loglevel Set the loglevel [0-3] as in {0=Error, 1=+Warning, 2=+Info, 3=+Debug}.
-lc, -log-colors
 Use Logrus’s “colored” log format. One of “never”, “auto” (default), “always”
-n, -numcpu Number of CPUs to use. Set 0 for all CPUs.
-p, -pidfile Write the process id into a given file.
-m, -metrics Address to use for metric queries. Disabled by default.
-hc, -healthcheck
 Listening address ([IP]:PORT) to use for healthcheck HTTP endpoint. Disabled by default.
-pc, -profilecpu
 Write CPU profiler results to a given file.
-pm, -profilemem
 Write heap profile results to a given file.
-ps, -profilespeed
 Write msg/sec measurements to log.
-pt, -profiletrace
 Write profile trace results to a given file.
-t, -trace Write message trace results _TRACE_ stream.
Running Gollum

By default you start Gollum with your config file of your defined pipeline.

Configuration files are written in the YAML format and have to be loaded via command line switch. Each plugin has a different set of configuration options which are currently described in the plugin itself, i.e. you can find examples in the github wiki.

# starts a gollum process
gollum -c path/to/your/config.yaml

Here is a minimal console example to run Gollum:

# create a minimal config
echo \
{StdIn: {Type: consumer.Console, Streams: console}, StdOut: {Type: producer.Console, Streams: console}} \
> example_conf.yaml

# starts a gollum process
gollum -c example_conf.yaml -ll 3

Metrics

Gollum provide various metrics which can be used for monitoring or controlling.

Collecting metrics

To collect metrics you need to start the gollum process with the “-m <address:port>” option. If gollum is running with the “-m” option you are able to get all collected metrics by a tcp request in json format.

Example request:

# start gollum on host
gollum -m 8080 -c /my/config/file.conf

# get metrics by curl and prettify response by python
curl 127.0.0.1:8080 | python -m json.tool

# alternative by netcat
nc -d 127.0.0.1 8080 | python -m json.tool

Example response

{
    "Consumers": 3,
    "GoMemoryAllocated": 21850200,
    "GoMemoryGCEnabled": 1,
    "GoMemoryNumObjects": 37238,
    "GoRoutines": 20,
    "GoVersion": 10803,
    "Messages:Discarded": 0,
    "Messages:Discarded:AvgPerSec": 0,
    "Messages:Enqueued": 13972236,
    "Messages:Enqueued:AvgPerSec": 1764931,
    "Messages:Routed": 13972233,
    "Messages:Routed:AvgPerSec": 1764930,
    "Plugins:ActiveWorkers": 3,
    "Plugins:State:Active": 4,
    "Plugins:State:Dead": 0,
    "Plugins:State:Initializing": 0,
    "Plugins:State:PrepareStop": 0,
    "Plugins:State:Stopping": 0,
    "Plugins:State:Waiting": 0,
    "ProcessStart": 1501855102,
    "Producers": 1,
    "Routers": 2,
    "Routers:Fallback": 2,
    "Stream:profile:Messages:Discarded": 0,
    "Stream:profile:Messages:Discarded:AvgPerSec": 0,
    "Stream:profile:Messages:Routed": 13972239,
    "Stream:profile:Messages:Routed:AvgPerSec": 1768878,
    "Version": 500
}
Metrics overview
Global metrics

Consumers

Number of current active consumers.

GoMemoryAllocated

Current allocated memory in bytes.

GoMemoryGCEnabled

Indicates that GC is enabled.

GoMemoryNumObjects

The number of allocated heap objects.

GoRoutines

The number of active go routines.

GoVersion

The golang version in number format.

Messages:Discarded

The count of discarded messages over all.

Messages:Discarded:AvgPerSec

The average of discarded messages from the last seconds.

Messages:Enqueued

The count of enqueued messages over all.

Messages:Enqueued:AvgPerSec

The average of enqueued messages from the last seconds.

Messages:Routed

The count of routed messages over all.

Messages:Routed:AvgPerSec

The average of routed messages from the last seconds.

Plugins:ActiveWorkers

Number of active worker (plugin) processes.

Plugins:State:<STATE>

Number of plugins in specific states. The following states can possible for plugins:

  • Active
  • Dead
  • Initializing
  • PrepareStop
  • Stopping
  • Waiting

ProcessStart

Timestamp of the process start time.

Producers

Number of current active producers.

Routers

Number of current active routers.

Routers:Fallback

Number of current active “fallback” (auto created) routers.

Version

Gollum version as numeric value.
Stream based metrics

Stream:<STREAM_NAME>:Messages:Discarded

The count of discarded messages for a specific stream.

Stream:<STREAM_NAME>:Messages:Discarded:AvgPerSec

The average of discarded messages from the last seconds for a specific stream.

Stream:<STREAM_NAME>:Messages:Routed

The count of routed messages for a specific stream.

Stream:<STREAM_NAME>:Messages:Routed:AvgPerSec

The average of routed messages from the last seconds for a specific stream.

Health checks

Gollum provide optional http endpoints for health checks.

To activate the health check endpoints you need to start the gollum process with the “-hc <address:port>” option. If gollum is running with the “-hc” option you are able to request different http endpoints to get global- and plugin health status.

# start gollum on host with health check endpoints
gollum -hc 8080 -c /my/config/file.conf
Endpoints

/_ALL_

Request:

curl -i 127.0.0.1:8080/_ALL_

Response:

HTTP/1.1 200 OK
Date: Fri, 04 Aug 2017 16:03:22 GMT
Content-Length: 191
Content-Type: text/plain; charset=utf-8

/pluginID-A/pluginState 200 ACTIVE: Active
/pluginID-B/pluginState 200 ACTIVE: Active
/pluginID-C/pluginState 200 ACTIVE: Active
/pluginID-D/pluginState 200 ACTIVE: Active
/_PING_ 200 PONG

/_PING_

Request:

curl -i 127.0.0.1:8080/_PING_

Response:

HTTP/1.1 200 OK
Date: Fri, 04 Aug 2017 15:46:34 GMT
Content-Length: 5
Content-Type: text/plain; charset=utf-8

PONG

/<PLUGIN_ID>/pluginState

Request:

# example request with active `producer.benchmark`
curl -i 127.0.0.1:8080/pluginID-A/pluginState

Response:

HTTP/1.1 200 OK
Date: Fri, 04 Aug 2017 15:47:45 GMT
Content-Length: 15
Content-Type: text/plain; charset=utf-8

ACTIVE: Active

Best practice

Managing own plugins in a seperate git repository

You can add a own plugin module by simple using git submodule:

git submodule add -f https://github.com/YOUR_NAMESPACE/YOUR_REPO.git contrib/YOUR_NAMESPACE

The by git created .gitmodules will be ignored by the gollum repository.

To activate your plugin you need to create a contrib_loader.go to be able to compile gollum with your own provided plugins.

package main

// This is a stub file to enable registration of vendor specific plugins that
// are placed in sub folders of this folder.

import (
    _ "github.com/trivago/gollum/contrib/myPackage"
)

func init() {
}

You can also copy the existing contrib_loader.go.dist to contrib_loader.go and update the import path to your package:

cp contrib_loader.go.dist contrib_loader.go
# open contrib_loader.go with an editor
# update package path with your plugin's path
make build

You can also change the version string of you Gollum builds to include the version of your plugin. Set the GOLLUM_RELEASE_SUFFIX variable either in the environment or as an argument to make:

# build Gollum with myPackage version suffixed to the Gollum version
# e.g.: 0.5.3-pkg0a01d7b6
make all GOLLUM_RELEASE_SUFFIX=pkg$(git -C contrib/myPackage describe --tags --always)
Use more Gollum processes for complex pipelines

If your pipeline contain more steps think in your setup also about the separation of concerns (SoC) principle. Split your configuration in smaller parts and start more Gollum processes to handle the pipeline steps.

Developing

Testing

Gollum provides unit-, integrations- and a couple of linter tests which also runs regulary on travis-ci.

You can run the test by:

# run tests
make test

# run unit-test only
make unit

# run integration-test only
make integration

Here an overview of all provided tests by the Makefile:

make test:Run go vet, golint, gofmt and go test
make unit:Run go test -tags unit
make integration:
 Run go test -tags integration
make vet:Run go vet
make lint:Run golint
make fmt-check:Run gofmt -l
make ineffassign:
 Install and run ineffassign
Debugging

If you want to use Delve for debugging you need to build gollum with some additional flags. You can use the predefined make command make debug:

# build for current OS with debug compiler flags
make debug

# or go build
# go build -ldflags='-s -linkmode=internal' -gcflags='-N -l'

With this debug build you are able to start a Delve remote debugger:

# for the gollum arguments pls use this format: ./gollum -- -c my/config.conf
dlv --listen=:2345 --headless=true --api-version=2 --log exec ./gollum -- -c testing/configs/test_router.conf -ll 3
Profiling

To test Gollum you can use the internal profiler consumer and the benchmark producer.

By some optional parameters you can get further additional information:

-ps:Profile the processed message per second.
-ll 3:Set the log level to debug
-m 8080:Activate metrics endpoint on port 8080

Here a simple config example how you can setup a profiler consumer with a benchmark producer. By default this test profiles the theoretic maximum throughput of 256 Byte messages:

Profiler:
    Type: consumer.Profiler
    Runs: 100000
    Batches: 100
    Message: "%256s"
    Streams: profile
    KeepRunning: true

Benchmark:
    Type: producer.Benchmark
    Streams: profile
# start Gollum for profiling
gollum -ps -ll 3 -m 8080 -c config/profile.conf

# get metrics
nc -d 127.0.0.1 8080 | python -m json.tool

You can enable different producers in that config to test the write performance of these producers, too.

Dependencies

To handle external go-packages and -libraries Gollum use dep. Like in other go projects the vendor is also checked in on github.com. All dependencies can be found in the Gopkg.toml file.

To update the external dependencies we provide also a make command:

# update external dependencies
make update-vendor

Writing plugins

When starting to write a plugin its probably a good idea to have a look at already existing plugins. A good starting point is the console plugin as it is very lightweight. If you plan to write a special purpose plugin you should place it into “contrib/yourCompanyName”. Plugins that can be used for general purpose should be placed into the main package folders like “consumer” or “producer”.

To enable a contrib plugin you will need to extend the file “contrib/loader.go”. Add an anonymous import to the list of imports like this:

import (
  _ "./yourCompanyName"                                 // this is ok for local extensions
  _ "github.com/trivago/gollum/contrib/yourCompanyName" // if you plan to contribute
)
Configuration

All plugins have to implement the “core/Plugin” interface. This interface requires a type to implement the Configure method which can be used to read data from the config file passed to Gollum. To make it possible for Gollum to instantiate an instance of your plugin by name it has to be registered. This should be done by adding a line to the init() method of the file.

import (
  "github.com/trivago/gollum/core"
  "github.com/trivago/tgo"
)

struct MyPlugin type {
}

func init() {
  core.TypeRegistry.Register(MyPlugin{}) // Register the new plugin type
}

func (cons *MyPlugin) Configure(conf core.PluginConfig) error {
  // ... read custom options ...
}

The configure method is also called when just testing the configuration via gollum -tc. As of this, this function should never open any sockets or other kind of resources. This should be done when a plugin is explicitly started so that proper closing of resources is assured, too.

If your plugins derives from aother plugin it is advisable to call Configure() of the base type before checking your configuration options. There are several convenience functions in the PluginConfig type that makes it easy to obtain configuration values and setting default values. Please refer to Gollum’s GoDoc API documentation for more details on this.

func (plugin *MyPlugin) Configure(conf core.PluginConfig) error {
  err := prod.MyPluginBase.Configure(conf)
  if err != nil {
    return err
  }
  // ... read custom options ...
  return nil
}
Configuring nested plugins

Some plugins may want to configure “nested” plugins such as a formatter or filter. The plugins can be instantiated by using the type registry and passing the config passed to the Configure method.

func (plugin *MyPlugin) Configure(conf core.PluginConfig) error {
  formatter, err := core.NewPluginWithType(conf.GetString("Formatter", "format.Forward"), conf)
  if err != nil {
    return err // ### return, plugin load error ###
  }
  // ... do something with your formatter ...
  return nil
}
How to document plugins
How to document plugins
Background

Documentation for each plugin is sourced from a single location - the plugin’s source code - and presented in two locations.

  • Standard godocs based documentation is targeted at developers interested in writing Gollum plugins or developing Gollum itself and describes implementation-level details of Gollum’s plugin API and internals.
  • The Plugins chapter of Gollum’s documentation on readthedocs.org describes the built-in plugins’ features and configuration from the viewpoint of an end user or operator.

The readthedocs.org documentation is generated by a custom tool (./docs/generator) that converts the godoc formatted comments into RST. Consequently, plugin source code comments must satisfy the syntax requirements of both godoc and Gollum’s rst_generator.

Syntax

Each plugin’s documentation is contained in the comment block immediately preceding the plugin’s Go struct definition with al according to godoc rules. Other comments in the plugin source are ignored by the RST generator.

Single-line comments (“// ….”) are used for inline documentation. For the purpose of this document, the initial “// ” sequences on each line are not part of the comment block; an “empty line” means a line solely consisting of the comment sequence and optional trailing whitespace.

The following types of elements are recognized by the RST generator:

  1. Heading A heading is defined by text that is surrounded by blank lines. If a heading is starting a documentation block, the preceding blank line is omitted. Please note that a heading is only detected as heading by godoc if there is regular text following the blank line after the heading.
  2. Enumeration A “- ” as the first non-space sequence on a line begins an enumeration list item, which continues over zero or more further lines, terminated by a Heading or another enumeration list item. Continuation lines must match the indentation of the beginning line.
  3. Keyed enumeration A colon (“:”) on the first line of an enumeration specifies a keyed enumeration list item. The text between “- ” and “:” defines the item’s key; the rest its contents.
  4. Nested enumerations Enumerations may be nested; each level is indented by a single space. Indentation must begin at 0.
Contents

The contents of a documentation block consists of 4 parts:

  1. General description

    The heading of this section starts with the name of the plugin followed by its type e.g. “Console consumer”.

    The contents of this section should describe what the plugin does, including any special considerations.

  2. Metadata fields

    The heading of this secion is “Metadata”.

    The contents of this section is an enumeration (of keyed and/or unkeyed elements) of all metadata fields consumed or generated by the plugin.

    The RST generatorwill automatically inherit the Metadata sections from plugins embedded by this type, so there is no need to duplicate their documentation in your own plugin. You can, however, override inherited fields’ documentation if needed.

    Fields that have specific keys should be documented using keyed enumerations, e.g. “- key: stores the key of the request”.

  3. Configuration parameters

    The heading of this secion is “Parameters”.

    The contents of this section is a keyed enumeration of configuration parameters recognized by the plugin.

    The RST generator will automatically inherit the Parameters sections from plugins embedded by this type, so there is no need to duplicate their documentation in your own plugin. You can, however, override inherited parameters’ documentation if needed.

    Default values and units are picked up automatically from struct tags for struct properties that have the ‘config’ and one or both of ‘default’ and ‘metric’ tags.

  4. Configuration Examples

    The heading of this section is “Examples”.

    This section should contain at least one configuration example and a short description for each example provided.

    The example is meant to be a working demonstration of the plugin’s usage, not an exhaustive listing of all possible features and parameters. The user should be able to copy-paste it into a config file as-is. It doesn’t need to be self-contained and should not include any boilerplate configuration for other plugins, with the exception of nested plugins (filters and formatters), which should be contained in the following stub:

    ExampleConsumer:
      Type: consumer.Console
      Streams: console
      Modulators:
    
Struct Tags

The same Go struct tags used by Gollum to parse and set plugin configuration parameters are supported by the RST generator:

  • config:”<ConfigParameter>” maps this struct property to the <ConfigParameter> parameter
  • default:”<defval>” specifies <defval> as the parameter’s default value
  • metric:”<unit>” specifies <unit> as the parameter’s unit

To inherit an embedded struct type’s Metadata and Parameters documentation in your own plugin’s documentation, add the gollumdoc:”embed_type” in the embed instruction.

Structure

The general structure of a plugin documentation block looks like this:

// <BLOCK HEADING>
//
// <CHAPTER CONTENTS>
// <CHAPTER CONTENTS>
//
// <CHAPTER HEADING>
//
// <CHAPTER CONTENTS>
// <CHAPTER CONTENTS>
//
// <CHAPTER HEADING>
//
// <CHAPTER CONTENTS>
// <CHAPTER CONTENTS>
//
// Metadata
//
// - <METADATA KEY>: <DESCRIPTION
// DESCRIPTION CONTINUED ...>
// - <METADATA KEY>: <DESCRIPTION>
// - <VARIABLE-NAMED METADATA FIELD
// DESCRIPTION CONTINUED ...>
//
// Parameters
//
// - <PARAMETER NAME>: <PARAMETER DESCRIPTION>
// - <PARAMETER NAME>: <PARAMETER DESCRIPTION
// DESCRIPTION CONTINUED ...>
//  - <NESTED NAME>: <NESTED VALUE OR OPTION>
//  DESCRIPTION CONTINUED ...>
//  - <NESTED VALUE OR OPTION>
//  - <NESTED VALUE OR OPTION>
//
// Examples
//
//   <CONFIGURATION EXAMPLE>
//   <CONFIGURATION EXAMPLE>
Example
// Console consumer
//
// This consumer reads from stdin or a named pipe. A message is generated after
// each newline character.
//
// Metadata
//
// - pipe: name of the pipe the message was received on
//
// Parameters
//
// - Pipe: Defines the pipe to read from. This can be "stdin" or the path
// of a named pipe. A named pipe is creared if not existing.
//
// - Permissions: Accepts an octal number string containing the unix file
// permissions used when creating a named pipe.
//
// - ExitOnEOF: Can be set to true to trigger an exit signal if the pipe is closed
// i.e. when EOF is detected.
//
// Examples
//
// This configuration reads data from standard-in.
//
//  ConsoleIn:
//    Type: consumer.Console
//    Streams: console
//    Pipe: stdin
type Console struct {
    core.SimpleConsumer `gollumdoc:"embed_type"`
        autoExit            bool   `config:"ExitOnEOF" default:"true"`
        pipeName            string `config:"Pipe" default:"stdin"`
        pipePerm            uint32 `config:"Permissions" default:"0644"`
        pipe                *os.File
}
Plugin types
Writing consumers

When writing a new consumer it is advisable to have a look at existing consumers. A good starting point are the Console and File consumers.

Requirements

All consumers have to implement the “core/Consumer” as well as the “core/Plugin” interface. The most convenient way to do this is to derive from the “core/ConsumerBase” type as it will provide implementations of the most common methods required. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.

ConsumerBase

Consumers deriving from “core/ConsumerBase” have to implement the “Consume” method from the “core/Consumer” interface. In addition to that most plugins might also want to overload the “Configure” function from the “core/Plugin” interface.

The Consume() function will be called as a separate go routine and should do two things. 1. Listen to the control channel 2. Process incoming data

As Consume() is called as a separate go routine you can decide wether to spawn additional go routines to handle both tasks or to let Consume() handle everything. ConsumerBase gives you two convenience loop functions to handle control commands:

ControlLoop
Will loop until a stop is received and can trigger a callback if a log rotation is requested (SIG_HUP is sent). The log rotation callback cane be set e.g. in the Configure method by using the SetRollBack function. Other possible callbacks functions are SetPrepareStopCallback and SetStopCallback.
TickerControlLoop
Gives you an additional callback that is triggered in regular intervals.

Both loops only cover control message handling and are blocking calls. As of their blocking nature you will probably want to spawn a separate go routine handling incoming messages when using these loops.

It is highly recommended to use at least one of these functions in your plugin implementation. By doing this you can be sure that changes to message streaming and control handling are automatically used by your plugin after a Gollum update.

A typical consume function will look like this:

func (cons *MyConsumer) Configure(conf core.PluginConfig) error {
      cons.SetStopCallback(cons.close) // Register close to the control message handler
}

func (cons *MyConsumer) close() {
  cons.WorkerDone()
}

func (cons *MyConsumer) Consume(workers *sync.WaitGroup) {
  cons.AddMainWorker(workers) // New go routine = new worker
  go cons.readData()          // Run until close is called
  cons.ControlLoop()          // Blocks
}

As we want to run a new go routine we also add a new worker. As this is the first worker we use AddMainWorker(). Additional workers can be added by using AddWorker(). This enables the shutdown routine to wait until all consumers have properly stopped. However - to avoid a hang during shutdown, make sure that all workers added are properly closed during the shutdown sequence.

After we made sure all workers are registered, the core function readData() is called as a separate go routine. This is necessary as the ControlLoop will block Consume() until a shutdown is requested. When a stop control message is received, the StopCallback is executed. You can use this callback to signal your readData function to stop or you can check the pluginState inside your readData function. The pluginState will switch to PluginStateStopping after a stop control has been triggered.

Configuration

If your consumer requires additonal configuration options you should implement the Configure method. Please refer to the Plugin documentation for further details.

Sending messages

Messages can be sent by using either the Enqueue() or EnqueueCopy() method. Both function will make sure that the message is sent to all streams and the correct stream ID is set. The function Enqueue() will reference the data you pass to it, while EnqueueCopy() will copy the data to the new message. The latter will allow you to e.g. safely recycle internal buffers without changing messages that are not processed by all producers, yet.

Both methods expect a sequence number to be passed. This sequence number is meant to be a runtime unique ID that may allow future checks on duplicate messages. The most common sequence number is an incrementing 64-bit integer.

func (cons *MyConsumer) readData() {
  var data []byte
  for cons.IsActive() {
    // ... read data
    cons.Enqueue(data, cons.sequence) // This call may block
    cons.sequence++                   // Increment your sequence number
  }
}
Writing bare bone consumers

Sometimes it might be useful not to derive from ConsumerBase. If you decide to go this way please have a look at Gollum’s GoDoc API documentation as well as the source of ConsumerBase.

Writing producers

When writing a new producer it is advisable to have a look at existing producers. A good starting point are the Console and File producers.

Requirements

All producers have to implement the “core/Producer” as well as the “core/Plugin” interface. The most convenient way to do this is to derive from the “core/ProducerBase” type as it will provide implementations of the most common methods required. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.

ProducerBase

Producers deriving from core/ProducerBase have to implement the “Produce” method from the “core/Producer” interface. In addition to that most plugins might also want to overload the “Configure” function from the “core/Plugin” interface.

The Produce() function will be called as a separate go routine and should provide two things. 1. Listen to the control channel 2. Listen to incoming messages

As Produce() is called as a separate go routine you can decide wether to spawn additional go routines to handle both tasks or to let Produce() handle everything. ProducerBase gives you three convenience loop functions to handle control commands:

ControlLoop
Will only listen to control messages and trigger the corresponding callbacks that can be registered during Configure. Stop control messages will cause this loop to end.
MessageControlLoop
In addition to the functionality of ControlLoop this will also check for incoming messages. Messages from the internal message channel are passed to the given message handler. The log rotation callback can be set e.g. in the Configure method by using the SetRollBack function. Other possible callbacks functions are SetPrepareStopCallback and SetStopCallback.
TickerMessageControlLoop
Gives you an additional callback that is triggered in regular intervals.

It is highly recommended to use at least one of these functions in your plugin implementation. By doing this you can be sure that changes to message streaming and control handling are automatically used by your plugin after a Gollum update. A typical produce function will look like this:

func (prod *MyProducer) close() {
  prod.CloseMessageChannel(prod.processData) // Close the internal channel and flush remaining messages
  prod.WorkerDone()                          // Signal that we're done now
}

func (prod *MyProducer) Configure(conf core.PluginConfig) error {
  prod.SetStopCallback(prod.close)  // Call close upon shutdown
  prod.SetRollCallback(prod.onRoll) // Call onRoll when SIG_HUP is sent to the process
}

func (prod *MyProducer) processData(msg core.Message) {
  // Do something with the message
}

func (prod *MyProducer) Produce(workers *sync.WaitGroup) {
  prod.AddMainWorker(workers)
  prod.MessageControlLoop(prod.processData)
}

The framework will call the registered StopCallback function when the control loop receives a stop. As the shutdown procedure needs to wait until all messages from this producers have been sent (to avoid data loss) at least one worker should always be registered. The shutdown procedure will wait until all producer workers have finished before exiting. As of this you have to make sure that all AddWorker calls are followed by a WorkerDone() call during shutdown. If this does not happen the shutdown procedure will block. If your producer sends messages to other producers you can manually set dependencies between receiving producers and this producer by using StreamRegistry.LinkDependencies. DropStream dependencies are automatically added during startup.

Configuration

If your producer requires additonal configuration options you should implement the Configure method. Please refer to the Plugin documentation for further details.

Working with slow services

Messages are passed to the producer one-by-one. Certain services however might perform better when messages are not sent one-by-one but as a batch of messages. Gollum gives you several tools to handle these kind of message batches. A good example for this is the socket producer. This producer takes advantage of the “core/MessageBatch” type. This allows storing messages in a double-buffered queue and provides callback based methods to flush the queue asynchronously. The following code illustrates a best practice approach on how to use the MessageBatch. You may of course change details if required.

buffer := NewMessageBatch(8192)                // Hold up to 8192*2 messages (front and backbuffer)

for {
  // Append the given message
  // - If the buffer is full call the sendBatch method and wait for flush
  // - If the producers is not active or if it is shutting down pass the message to prod.Drop

  buffer.AppendOrFlush(message, prod.sendBatch, prod.IsActiveOrStopping, prod.Drop)

  // ...

  if buffer.ReachedSizeThreshold(2048) {       // Check if at least 2 KB have been written
    buffer.Flush(prod.sendBatch)               // Send all buffered messages via sendBatch
    buffer.WaitForFlush()                      // Wait until done
  }
}
Filtering messages

Producers are able to filter messages like streams do, too. In contrast to streams messages are filtered before they are send to the internal message channel, i.e. before formatting. As formatting is an implementation detail (and may also not happen) a plugin that needs filtering after formatting has too implement it by itself.

Formatting messages

Messages are not automatically formatted when passed to the producer. If you wish to enable producer based formatting you need to call ProducerBase.Format() at an appropriate point inside your plugin. All producers deriving from ProducerBase - and that have called ProducerBase.Configure() - may have a formatter set and should thus provide this possibility.

Writing bare bone producers

Sometimes it might be useful not to derive from ProducerBase. An example for this is the Null producer which is extremely lightweight. If you decide to go this way please have a look at Gollum’s GoDoc API documentation as well as the source of ConsumerBase.

Writing filters
Requirements

All filters have to implement the “core/Filter” as well as the “core/Plugin” interface. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.

Attention

Filters are called in a multithreaded context, so you have to make sure that any internal state is secured by either a mutex or by using atomic functions.

Filtering messages

The Accept method is fairly simple to implement. If the methods returns true the message is passed. If the method returns false the message is rejected. You can inspect the message in question from the parameter passed to the accept method. The following example filter will reject all messages that have no content:

func (filter *MyFilter) Accepts(msg core.Message) bool {
  return len(msg.Data) > 0
}
Writing formatters
Requirements

All filters have to implement the “core/Formatter” as well as the “core/Plugin” interface. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.

Attention

Formatters are called in a multithreaded context, so you have to make sure that any internal state is secured by either a mutex or by using atomic functions.

Transforming messages

The Format method is fairly simple to implement. It accepts the message to modify and returns the new content plus the stream the message should be sent to. The message itself cannot be changed directly. The following example adds a newline to each message:

func (format *MyFormatter) Format(msg core.Message) ([]byte, core.MessageStreamID) {
  return append(msg.Data, '\n'), msg.StreamID
}
Writing routers

When writing a new router it is advisable to have a look at existing routers. A good starting point is the Random router.

Requirements

All routers have to implement the “core/Router” as well as the “core/Plugin” interface. The most convenient way to do this is to derive from the “core/RouterBase” type as it will provide implementations of the most common methods required as well as message metrics. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.

RouterBase

Routers deriving from “core/RouterBase” have to implement a custom method that has to be hooked to the “Distribute” callback during Configure(). This allows RouterBase to check and format the message before actually distributing it. In addition to that a message count metric is updated. The following example implements a router that sends messages only to the first producer in the list.

func (router *MyRouter) myDistribute() {
  router.RouterBase.Producers[0].Enqueue(msg)
}

func (router *MyRouter) Configure(conf core.PluginConfig) {
  if err := router.RouterBase.Configure(conf); err != nil {
    return err
  }
  router.RouterBase.Distribute = router.myDistribute
  return nil
}
Sending messages

Messages are sent directly to a producer by calling the Enqueue method. This call may block as either the underlying channel is filled up completely or the producer plugin implemented Enqueue as a blocking method.

Routers that derive from RouterBase may also by paused. In that case messages are not passed to the custom distributor function but to a temporary function. These messages will be sent to the custom distributor function after the router is resumed. A Pause() call is normally done from producers that encounter a connection loss or an unavailable resource in general.

Plugins

The main components, consumers, routers, producers, filters and formatters are build upon a plugin architecture. This allows each component to be exchanged and configured individually with a different sets of options.

_images/flow_800w.png

Plugin types:

Consumers

Consumers are plugins that read data from external sources. Data is packed into messages and passed to a router.

Example file consumer:

_images/consumer_w800.png

List of available consumers:

AwsKinesis

This consumer reads a message from an AWS Kinesis router.

Parameters

Enable (default: true)

Switches this plugin on or off.

KinesisStream (default: default)

This value defines the stream to read from. By default this parameter is set to “default”.

OffsetFile

This value defines a file to store the current offset per shard. To disable this parameter, set it to “”. If the parameter is set and the file is found, consuming will start after the offset stored in the file. By default this parameter is set to “”.

RecordsPerQuery (default: 100)

This value defines the number of records to pull per query. By default this parameter is set to “100”.

RecordMessageDelimiter

This value defines the string to delimit messages within a record. To disable this parameter, set it to “”. By default this parameter is set to “”.

QuerySleepTimeMs (default: 1000, unit: ms)

This value defines the number of milliseconds to sleep before trying to pull new records from a shard that did not return any records. By default this parameter is set to “1000”.

RetrySleepTimeSec

This value defines the number of seconds to wait after trying to reconnect to a shard. By default this parameter is set to “4”.

CheckNewShardsSec (default: 0, unit: sec)

This value sets a timer to update shards in Kinesis. You can set this parameter to “0” for disabling. By default this parameter is set to “0”.

DefaultOffset

This value defines the message index to start reading from. Valid values are either “newest”, “oldest”, or a number. By default this parameter is set to “newest”.
Parameters (from components.AwsCredentials)

Credential/Type (default: none)

This value defines the credentials that are to be used when connecting to aws. Available values are listed below. See https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/#Credentials for more information.

environment

Retrieves credentials from the environment variables of the running process

static

Retrieves credentials value for individual credential fields

shared

Retrieves credentials from the current user’s home directory

none

Use a anonymous login to aws

Credential/Id

is used for “static” type and is used as the AccessKeyID

Credential/Token

is used for “static” type and is used as the SessionToken

Credential/Secret

is used for “static” type and is used as the SecretAccessKey

Credential/File

is used for “shared” type and is used as the path to your shared Credentials file (~/.aws/credentials)

Credential/Profile (default: default)

is used for “shared” type and is used for the profile

Credential/AssumeRole

This value is used to assume an IAM role using. By default this is set to “”.
Parameters (from components.AwsMultiClient)

Region (default: us-east-1)

This value defines the used aws region. By default this is set to “us-east-1”

Endpoint

This value defines the used aws api endpoint. If no endpoint is set the client needs to set the right endpoint for the used region. By default this is set to “”.
Parameters (from core.SimpleConsumer)

Streams

Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

Modulators

Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

ModulatorRoutines

Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

ModulatorQueueSize

Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples

This example consumes a kinesis stream “myStream” and create messages:

KinesisIn:
  Type: consumer.AwsKinesis
  Credential:
    Type: shared
    File: /Users/<USERNAME>/.aws/credentials
    Profile: default
  Region: "eu-west-1"
  KinesisStream: myStream
Console

This consumer reads from stdin or a named pipe. A message is generated after each newline character.

Metadata

NOTE: The metadata will only set if the parameter `SetMetadata` is active.

pipe

Name of the pipe the message was received on (set)
Parameters

Enable (default: true)

Switches this plugin on or off.

Pipe (default: stdin)

Defines the pipe to read from. This can be “stdin” or the path to a named pipe. If the named pipe doesn’t exist, it will be created. By default this paramater is set to “stdin”.

Permissions (default: 0644)

Defines the UNIX filesystem permissions used when creating the named pipe as an octal number. By default this paramater is set to “0664”.

ExitOnEOF (default: true)

If set to true, the plusing triggers an exit signal if the pipe is closed, i.e. when EOF is detected. By default this paramater is set to “true”.

SetMetadata (default: false)

When this value is set to “true”, the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to “false”.
Parameters (from core.SimpleConsumer)

Streams

Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

Modulators

Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

ModulatorRoutines

Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

ModulatorQueueSize

Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples

This config reads data from stdin e.g. when starting gollum via unix pipe.

ConsoleIn:
  Type: consumer.Console
  Streams: console
  Pipe: stdin
File

The File consumer reads messages from a file, looking for a customizable delimiter sequence that marks the end of a message. If the file is part of e.g. a log rotation, the consumer can be set to read from a symbolic link pointing to the current file and (optionally) be told to reopen the file by sending a SIGHUP. A symlink to a file will automatically be reopened if the underlying file is changed.

Metadata

NOTE: The metadata will only set if the parameter `SetMetadata` is active.

file

The file name of the consumed file (set)

dir

The directory of the consumed file (set)
Parameters

Enable (default: true)

Switches this plugin on or off.

File

This value is a mandatory setting and contains the name of the file to read. The file will be read from beginning to end and the reader will stay attached until the consumer is stopped, so appends to the file will be recognized automatically.

OffsetFile

This value defines the path to a file that stores the current offset inside the source file. If the consumer is restarted, that offset is used to continue reading from the previous position. To disable this setting, set it to “”. By default this parameter is set to “”.

Delimiter (default: n)

This value defines the delimiter sequence to expect at the end of each message in the file. By default this parameter is set to “n”.

ObserveMode (default: poll)

This value select how the source file is observed. Available values are poll and watch. NOTE: The watch implementation uses the [fsnotify/fsnotify](https://github.com/fsnotify/fsnotify) package. If your source file is rotated (moved or removed), please verify that your file system and distribution support the RENAME and REMOVE events; the consumer’s stability depends on them. By default this parameter is set to poll.

DefaultOffset

This value defines the default offset from which to start reading within the file. Valid values are “oldest” and “newest”. If OffsetFile is defined and the file exists, the DefaultOffset parameter is ignored. By default this parameter is set to “newest”.

PollingDelay

This value defines the duration the consumer waits between checking the source file for new content after hitting the end of file (EOF). The value is in milliseconds (ms). NOTE: This settings only takes effect if the consumer is running in poll mode! By default this parameter is set to “100”.

SetMetadata (default: false)

When this value is set to “true”, the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to “false”.
Parameters (from core.SimpleConsumer)

Streams

Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

Modulators

Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

ModulatorRoutines

Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

ModulatorQueueSize

Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples

This example will read the /var/log/system.log file and create a message for each new entry.

FileIn:
  Type: consumer.File
  File: /var/log/system.log
  DefaultOffset: newest
  OffsetFile: ""
  Delimiter: "\n"
  ObserveMode: poll
  PollingDelay: 100
HTTP

This consumer opens up an HTTP 1.1 server and processes the contents of any incoming HTTP request.

Parameters

Enable (default: true)

Switches this plugin on or off.

Address (default: :80)

Defines the TCP port and optional IP address to listen on. Sets http.Server.Addr; for defails, see its Go documentation. Syntax: [hostname|address]:<port>

ReadTimeoutSec (default: 3, unit: sec)

Defines the maximum duration in seconds before timing out the HTTP read request. Sets http.Server.ReadTimeout; for details, see its Go documentation.

WithHeaders (default: true)

If true, relays the complete HTTP request to the generated Gollum message. If false, relays only the HTTP request body and ignores headers.

Htpasswd

Path to an htpasswd-formatted password file. If defined, turns on HTTP Basic Authentication in the server.

BasicRealm

Defines the Authentication Realm for HTTP Basic Authentication. Meaningful only in conjunction with Htpasswd.

Certificate

Path to an X509 formatted certificate file. If defined, turns on SSL/TLS support in the HTTP server. Requires PrivateKey to be set.

PrivateKey

Path to an X509 formatted private key file. Meaningful only in conjunction with Certificate.
Parameters (from core.SimpleConsumer)

Streams

Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

Modulators

Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

ModulatorRoutines

Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

ModulatorQueueSize

Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples

This example listens on port 9090 and writes to the stream “http_in_00”.

"HttpIn00":
  Type: "consumer.HTTP"
  Streams: "http_in_00"
  Address: "localhost:9090"
  WithHeaders: false
Kafka

This consumer reads data from a kafka topic. It is based on the sarama library; most settings are mapped to the settings from this library.

Metadata

NOTE: The metadata will only set if the parameter `SetMetadata` is active.

topic

Contains the name of the kafka topic

key

Contains the key of the kafka message
Parameters

Enable (default: true)

Switches this plugin on or off.

Servers

Defines the list of all kafka brokers to initially connect to when querying topic metadata. This list requires at least one borker to work and ideally contains all the brokers in the cluster. By default this parameter is set to [“localhost:9092”].

Topic (default: default)

Defines the kafka topic to read from. By default this parameter is set to “default”.

ClientId

Sets the client id used in requests by this consumer. By default this parameter is set to “gollum”.

GroupId

Sets the consumer group of this consumer. If empty, consumer groups are not used. This setting requires Kafka version >= 0.9. By default this parameter is set to “”.

Version

Defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. If the version given is not known, the closest possible version is chosen. If GroupId is set to a value < “0.9”, “0.9.0.1” will be used. By default this parameter is set to “0.8.2”.

SetMetadata (default: false)

When this value is set to “true”, the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to “false”.

DefaultOffset

Defines the initial offest when starting to read the topic. Valid values are “oldest” and “newest”. If OffsetFile is defined and the file exists, the DefaultOffset parameter is ignored. If GroupId is defined, this setting will only be used for the first request. By default this parameter is set to “newest”.

OffsetFile

Defines the path to a file that holds the current offset of a given partition. If the consumer is restarted, reading continues from that offset. To disable this setting, set it to “”. Please note that offsets stored in the file might be outdated. In that case DefaultOffset “oldest” will be used. By default this parameter is set to “”.

FolderPermissions (default: 0755)

Used to create the path to the offset file if necessary. By default this parameter is set to “0755”.

Ordered

Forces partitions to be read one-by-one in a round robin fashion instead of reading them all in parallel. Please note that this may restore the original ordering but does not necessarily do so. The term “ordered” refers to an ordered reading of all partitions, as opposed to reading them randomly. By default this parameter is set to false.

MaxOpenRequests

Defines the number of simultaneous connections to a broker at a time. By default this parameter is set to 5.

ServerTimeoutSec

Defines the time after which a connection will time out. By default this parameter is set to 30.

MaxFetchSizeByte

Sets the maximum size of a message to fetch. Larger messages will be ignored. When set to 0 size of the messages is ignored. By default this parameter is set to 0.

MinFetchSizeByte

Defines the minimum amout of data to fetch from Kafka per request. If less data is available the broker will wait. By default this parameter is set to 1.

DefaultFetchSizeByte

Defines the average amout of data to fetch per request. This value must be greater than 0. By default this parameter is set to 32768.

FetchTimeoutMs

Defines the time in milliseconds to wait on reaching MinFetchSizeByte before fetching new data regardless of size. By default this parameter is set to 250.

MessageBufferCount

Sets the internal channel size for the kafka client. By default this parameter is set to 8192.

PresistTimoutMs (default: 5000, unit: ms)

Defines the interval in milliseconds in which data is written to the OffsetFile. A short duration reduces the amount of duplicate messages after a crash but increases I/O. When using GroupId this setting controls the pause time after receiving errors. By default this parameter is set to 5000.

ElectRetries

Defines how many times to retry fetching the new master partition during a leader election. By default this parameter is set to 3.

ElectTimeoutMs

Defines the number of milliseconds to wait for the cluster to elect a new leader. By default this parameter is set to 250.

MetadataRefreshMs

Defines the interval in milliseconds used for fetching kafka metadata from the cluster (e.g. number of partitons). By default this parameter is set to 10000.

TlsEnable

Defines whether to use TLS based authentication when communicating with brokers. By default this parameter is set to false.

TlsKeyLocation

Defines the path to the client’s PEM-formatted private key used for TLS based authentication. By default this parameter is set to “”.

TlsCertificateLocation

Defines the path to the client’s PEM-formatted public key used for TLS based authentication. By default this parameter is set to “”.

TlsCaLocation

Defines the path to the CA certificate(s) for verifying a broker’s key when using TLS based authentication. By default this parameter is set to “”.

TlsServerName

Defines the expected hostname used by hostname verification when using TlsInsecureSkipVerify. By default this parameter is set to “”.

TlsInsecureSkipVerify

Enables verification of the server’s certificate chain and host name. By default this parameter is set to false.

SaslEnable

Defines whether to use SASL based authentication when communicating with brokers. By default this parameter is set to false.

SaslUsername

Defines the username for SASL/PLAIN authentication. By default this parameter is set to “gollum”.

SaslPassword

Defines the password for SASL/PLAIN authentication. By default this parameter is set to “”.
Parameters (from core.SimpleConsumer)

Streams

Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

Modulators

Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

ModulatorRoutines

Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

ModulatorQueueSize

Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples

This config reads the topic “logs” from a cluster with 4 brokers.

kafkaIn:
  Type: consumer.Kafka
  Streams: logs
  Topic: logs
  ClientId: "gollum log reader"
  DefaultOffset: newest
  OffsetFile: /var/gollum/logs.offset
  Servers:
    - "kafka0:9092"
    - "kafka1:9092"
    - "kafka2:9092"
    - "kafka3:9092"
Profiler

The “Profiler” consumer plugin autogenerates messages in user-defined quantity, size and density. It can be used to profile producers and configurations and to provide a message source for testing.

Before startup, [TemplateCount] template payloads are generated based on the format specifier [Message], using characters from [Characters]. The length of each template is determined by format size specifier(s) in [Message].

During execution, [Batches] batches of [Runs] messages are generated, with a [DelayMs] ms delay between each message. Each message’s payload is randomly selected from the set of template payloads above.

Parameters

Enable (default: true)

Switches this plugin on or off.

Runs (default: 10000)

Defines the number of messages per batch.

Batches (default: 10)

Defines the number of batches to generate.

TemplateCount

Defines the number of message templates to generate. Templates are generated in advance and a random message template is chosen from this set every time a message is sent.

Characters (default: abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890)

Defines the set of characters use when generated templates.

Message (default: %256s)

Defines a go format string to use for generating the message templaets. The length of the values generated will be deduced from the format size parameter - “%200d” will generate a digit between 0 and 200, “%10s” will generate a string with 10 characters, etc.

DelayMs (default: 0, unit: ms)

Defines the number of milliseconds to sleep between messages.

KeepRunning

If set to true, shuts down Gollum after Batches * Runs messages have been generated. This can be used to e.g. read metrics after a profile run.
Parameters (from core.SimpleConsumer)

Streams

Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

Modulators

Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

ModulatorRoutines

Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

ModulatorQueueSize

Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples

Generate a short message every 0.5s, useful for testing and debugging

JunkGenerator:
  Type: "consumer.Profiler"
  Message: "%20s"
  Streams: "junkstream"
  Characters: "abcdefghijklmZ"
  KeepRunning: true
  Runs: 10000
  Batches: 3000000
  DelayMs: 500
Proxy

This consumer reads messages from a given socket like consumer.Socket but allows reverse communication, too. Producers which require this kind of communication can access message.GetSource to write data back to the client sending the message. See producer.Proxy as an example target producer.

Parameters

Enable (default: true)

Switches this plugin on or off.

Address

Defines the protocol, host and port or the unix domain socket to listen to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.socket”. Only unix and tcp protocols are supported. By default this parameter is set to “:5880”.

Partitioner

Defines the algorithm used to read messages from the router. The messages will be sent as a whole, no cropping or removal will take place. By default this parameter is set to “delimiter”.

delimiter

Separates messages by looking for a delimiter string. The delimiter is removed from the message.

ascii

Reads an ASCII number at a given offset until a given delimiter is found. Everything to the left of and including the delimiter is removed from the message.

binary

reads a binary number at a given offset and size. The number is removed from the message.

binary_le

is an alias for “binary”.

binary_be

acts like “binary”_le but uses big endian encoding.

fixed

assumes fixed size messages.

Delimiter (default: n)

Defines the delimiter string used to separate messages if partitioner is set to “delimiter” or the string used to separate the message length if partitioner is set to “ascii”. By default this parameter is set to “n”.

Offset (default: 0)

Defines an offset in bytes used to read the length provided for partitioner “binary” and “ascii”. By default this parameter is set to 0.

Size (default: 4)

Defines the size of the length prefix used by partitioner “binary” or the message total size when using partitioner “fixed”. When using partitioner “binary” this parameter can be set to 1,2,4 or 8 when using uint8,uint16,uint32 or uint64 length prefixes. By default this parameter is set to 4.
Parameters (from core.SimpleConsumer)

Streams

Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

Modulators

Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

ModulatorRoutines

Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

ModulatorQueueSize

Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples

This example will accepts 64bit length encoded data on TCP port 5880.

proxyReceive:
  Type: consumer.Proxy
  Streams: proxyData
  Address: ":5880"
  Partitioner: binary
  Size: 8
Socket

The socket consumer reads messages as-is from a given network or filesystem socket. Messages are separated from the stream by using a specific partitioner method.

Parameters

Enable (default: true)

Switches this plugin on or off.

Address

This value defines the protocol, host and port or socket to bind to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.socket”. Valid protocols can be derived from the golang net package documentation. Common values are “udp”, “tcp” and “unix”. By default this parameter is set to “tcp://0.0.0.0:5880”.

Permissions (default: 0770)

This value sets the filesystem permissions for UNIX domain sockets as a four-digit octal number. By default this parameter is set to “0770”.

Acknowledge

This value can be set to a non-empty value to inform the writer that data has been accepted. On success, the given string is sent. Any error will close the connection. Acknowledge does not work with UDP based sockets. By default this parameter is set to “”.

Partitioner

This value defines the algorithm used to read messages from the router. By default this is set to “delimiter”. The following options are available:

“delimiter”

Separates messages by looking for a delimiter string. The delimiter is removed from the message.

“ascii”

Reads an ASCII number at a given offset until a given delimiter is found. Everything to the right of and including the delimiter is removed from the message.

“binary”

Reads a binary number at a given offset and size.

“binary_le”

An alias for “binary”.

“binary_be”

The same as “binary” but uses big endian encoding.

“fixed”

Assumes fixed size messages.

Delimiter (default: n)

This value defines the delimiter used by the text and delimiter partitioners. By default this parameter is set to “n”.

Offset (default: 0)

This value defines the offset used by the binary and text partitioners. This setting is ignored by the fixed partitioner. By default this parameter is set to “0”.

Size

This value defines the size in bytes used by the binary and fixed partitioners. For binary, this can be set to 1,2,4 or 8. The default value is 4. For fixed , this defines the size of a message. By default this parameter is set to “1”.

ReconnectAfterSec (default: 2, unit: sec)

This value defines the number of seconds to wait before a connection is retried. By default this parameter is set to “2”.

AckTimeoutSec (default: 1, unit: sec)

This value defines the number of seconds to wait for acknowledges to succeed. By default this parameter is set to “1”.

ReadTimeoutSec (default: 2, unit: sec)

This value defines the number of seconds to wait for data to be received. This setting affects the maximum shutdown duration of this consumer. By default this parameter is set to “2”.

RemoveOldSocket (default: true)

If set to true, any existing file with the same name as the socket (unix://<path>) is removed prior to connecting. By default this parameter is set to “true”.
Parameters (from core.SimpleConsumer)

Streams

Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

Modulators

Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

ModulatorRoutines

Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

ModulatorQueueSize

Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples

This example open a socket and expect messages with a fixed length of 256 bytes:

socketIn:
  Type: consumer.Socket
  Address: unix:///var/gollum.socket
  Partitioner: fixed
  Size: 256
Syslogd

The syslogd consumer creates a syslogd-compatible log server and receives messages on a TCP or UDP port or a UNIX filesystem socket.

Parameters

Enable (default: true)

Switches this plugin on or off.

Address

Defines the IP address or UNIX socket to listen to. This can take one of the four forms below, to listen on a TCP, UDP or UNIX domain socket. However, see the “Format” option for details on transport support by different formats.

  • [hostname|ip]:<tcp-port>
  • tcp://<hostname|ip>:<tcp-port>
  • udp://<hostname|ip>:<udp-port>
  • unix://<filesystem-path>

By default this parameter is set to “udp://0.0.0.0:514”

Format

Defines which syslog standard the server will support. Three standards, listed below, are currently available. All standards support listening to UDP and UNIX domain sockets. RFC6587 additionally supports TCP sockets. Default: “RFC6587”.

By default this parameter is set to “RFC6587”.

SetMetadata (default: false)

When set to true, syslog based metadata will be attached to the message. The metadata fields added depend on the protocol version used. RFC3164 supports: tag, timestamp, hostname, priority, facility, severity. RFC5424 and RFC6587 support: app_name, version, proc_id , msg_id, timestamp, hostname, priority, facility, severity. By default this parameter is set to “false”.

TimestampFormat (default: 2006-01-02T15:04:05.000 MST)

When using SetMetadata this string denotes the go time format used to convert syslog timestamps into strings. By default this parameter is set to “2006-01-02T15:04:05.000 MST”.
Parameters (from core.SimpleConsumer)

Streams

Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

Modulators

Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

ModulatorRoutines

Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

ModulatorQueueSize

Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples

Replace the system’s standard syslogd with Gollum

SyslogdSocketConsumer:
  Type: consumer.Syslogd
  Streams: "system_syslog"
  Address: "unix:///dev/log"
  Format: "RFC3164"

Listen on a TCP socket

SyslogdTCPSocketConsumer:
  Type: consumer.Syslogd
  Streams: "tcp_syslog"
  Address: "tcp://0.0.0.0:5599"
  Format: "RFC6587"

Producers

Producers are plugins that transfer messages to external services. Data arrives in the form of messages and can be converted by using a formatter.

Example producer setup:

_images/producer_w800.png

List of available producer:

AwsCloudwatchLogs

The AwsCloudwatchLogs producer plugin sends messages to AWS Cloudwatch Logs service. Credentials are obtained by gollum automaticly.

Patameters

  • LogStream: Stream name in cloudwatch logs.
  • LogGroup: Group name in cloudwatch logs.
  • Region: Amazon region into which stream logs to. Defaults to “eu-west-1”.
Parameters

Enable (default: true)

Switches this plugin on or off.

LogStream

(no documentation available)

LogGroup

(no documentation available)
Parameters (from components.AwsCredentials)

Credential/Type (default: none)

This value defines the credentials that are to be used when connecting to aws. Available values are listed below. See https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/#Credentials for more information.

environment

Retrieves credentials from the environment variables of the running process

static

Retrieves credentials value for individual credential fields

shared

Retrieves credentials from the current user’s home directory

none

Use a anonymous login to aws

Credential/Id

is used for “static” type and is used as the AccessKeyID

Credential/Token

is used for “static” type and is used as the SessionToken

Credential/Secret

is used for “static” type and is used as the SecretAccessKey

Credential/File

is used for “shared” type and is used as the path to your shared Credentials file (~/.aws/credentials)

Credential/Profile (default: default)

is used for “shared” type and is used for the profile

Credential/AssumeRole

This value is used to assume an IAM role using. By default this is set to “”.
Parameters (from components.AwsMultiClient)

Region (default: us-east-1)

This value defines the used aws region. By default this is set to “us-east-1”

Endpoint

This value defines the used aws api endpoint. If no endpoint is set the client needs to set the right endpoint for the used region. By default this is set to “”.
Parameters (from core.BatchedProducer)

Batch/MaxCount (default: 8192)

Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.

Batch/FlushCount (default: 4096)

Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.

Batch/TimeoutSec (default: 5, unit: sec)

Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

This configuration sends messages to stream stream_name and group group_name with shared credentials.

CwLogs: .. code-block:: yaml

Type: AwsCloudwatchLogs:
LogStream: stream_name LogGroup: group_name
Credential:
Type: shared
AwsFirehose

This producer sends data to an AWS Firehose stream.

Parameters

Enable (default: true)

Switches this plugin on or off.

StreamMapping

This value defines a translation from gollum stream names to firehose stream names. If no mapping is given, the gollum stream name is used as the firehose stream name. By default this parameter is set to “empty”

RecordMaxMessages (default: 1)

This value defines the number of messages to send in one record to aws firehose. By default this parameter is set to “1”.

RecordMessageDelimiter (default: n)

This value defines the delimiter string to use between messages within a firehose record. By default this parameter is set to “n”.

SendTimeframeMs (default: 1000, unit: ms)

This value defines the timeframe in milliseconds in which a second batch send can be triggered. By default this parameter is set to “1000”.
Parameters (from components.AwsCredentials)

Credential/Type (default: none)

This value defines the credentials that are to be used when connecting to aws. Available values are listed below. See https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/#Credentials for more information.

environment

Retrieves credentials from the environment variables of the running process

static

Retrieves credentials value for individual credential fields

shared

Retrieves credentials from the current user’s home directory

none

Use a anonymous login to aws

Credential/Id

is used for “static” type and is used as the AccessKeyID

Credential/Token

is used for “static” type and is used as the SessionToken

Credential/Secret

is used for “static” type and is used as the SecretAccessKey

Credential/File

is used for “shared” type and is used as the path to your shared Credentials file (~/.aws/credentials)

Credential/Profile (default: default)

is used for “shared” type and is used for the profile

Credential/AssumeRole

This value is used to assume an IAM role using. By default this is set to “”.
Parameters (from components.AwsMultiClient)

Region (default: us-east-1)

This value defines the used aws region. By default this is set to “us-east-1”

Endpoint

This value defines the used aws api endpoint. If no endpoint is set the client needs to set the right endpoint for the used region. By default this is set to “”.
Parameters (from core.BatchedProducer)

Batch/MaxCount (default: 8192)

Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.

Batch/FlushCount (default: 4096)

Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.

Batch/TimeoutSec (default: 5, unit: sec)

Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

This example set up a simple aws firehose producer:

firehoseOut:
  Type: producer.AwsFirehose
  Streams: "*"
  StreamMapping:
    "*": default
  Credential:
    Type: shared
    File: /Users/<USERNAME>/.aws/credentials
    Profile: default
  Region: eu-west-1
  RecordMaxMessages: 1
  RecordMessageDelimiter: "\n"
  SendTimeframeSec: 1
AwsKinesis

This producer sends data to an AWS kinesis stream. Configuration example

Parameters

Enable (default: true)

Switches this plugin on or off.

StreamMapping

This value defines a translation from gollum stream names to kinesis stream names. If no mapping is given the gollum stream name is used as the kinesis stream name. By default this parameter is set to “empty”

RecordMaxMessages (default: 1)

This value defines the maximum number of messages to join into a kinesis record. By default this parameter is set to “500”.

RecordMessageDelimiter (default: n)

This value defines the delimiter string to use between messages within a kinesis record. By default this parameter is set to “n”.

SendTimeframeMs (default: 1000, unit: ms)

This value defines the timeframe in milliseconds in which a second batch send can be triggered. By default this parameter is set to “1000”.
Parameters (from components.AwsCredentials)

Credential/Type (default: none)

This value defines the credentials that are to be used when connecting to aws. Available values are listed below. See https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/#Credentials for more information.

environment

Retrieves credentials from the environment variables of the running process

static

Retrieves credentials value for individual credential fields

shared

Retrieves credentials from the current user’s home directory

none

Use a anonymous login to aws

Credential/Id

is used for “static” type and is used as the AccessKeyID

Credential/Token

is used for “static” type and is used as the SessionToken

Credential/Secret

is used for “static” type and is used as the SecretAccessKey

Credential/File

is used for “shared” type and is used as the path to your shared Credentials file (~/.aws/credentials)

Credential/Profile (default: default)

is used for “shared” type and is used for the profile

Credential/AssumeRole

This value is used to assume an IAM role using. By default this is set to “”.
Parameters (from components.AwsMultiClient)

Region (default: us-east-1)

This value defines the used aws region. By default this is set to “us-east-1”

Endpoint

This value defines the used aws api endpoint. If no endpoint is set the client needs to set the right endpoint for the used region. By default this is set to “”.
Parameters (from core.BatchedProducer)

Batch/MaxCount (default: 8192)

Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.

Batch/FlushCount (default: 4096)

Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.

Batch/TimeoutSec (default: 5, unit: sec)

Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

This example set up a simple aws Kinesis producer:

KinesisOut:
  Type: producer.AwsKinesis
  Streams: "*"
  StreamMapping:
    "*": default
  Credential:
    Type: shared
    File: /Users/<USERNAME>/.aws/credentials
    Profile: default
  Region: eu-west-1
  RecordMaxMessages: 1
  RecordMessageDelimiter: "\n"
  SendTimeframeSec: 1
AwsS3

This producer sends messages to Amazon S3.

Each “file” uses a configurable batch and sends the content by a multipart upload to s3. This principle avoids temporary storage on disk.

Please keep in mind that Amazon S3 does not support appending to existing objects. Therefore rotation is mandatory in this producer.

Parameters

Enable (default: true)

Switches this plugin on or off.

Bucket

The S3 bucket to upload to

File (default: gollum_*.log)

This value is used as a template for final file names. The string ” * ” will replaced with the active stream name. By default this parameter is set to “gollum_*.log”
Parameters (from components.AwsCredentials)

Credential/Type (default: none)

This value defines the credentials that are to be used when connecting to aws. Available values are listed below. See https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/#Credentials for more information.

environment

Retrieves credentials from the environment variables of the running process

static

Retrieves credentials value for individual credential fields

shared

Retrieves credentials from the current user’s home directory

none

Use a anonymous login to aws

Credential/Id

is used for “static” type and is used as the AccessKeyID

Credential/Token

is used for “static” type and is used as the SessionToken

Credential/Secret

is used for “static” type and is used as the SecretAccessKey

Credential/File

is used for “shared” type and is used as the path to your shared Credentials file (~/.aws/credentials)

Credential/Profile (default: default)

is used for “shared” type and is used for the profile

Credential/AssumeRole

This value is used to assume an IAM role using. By default this is set to “”.
Parameters (from components.AwsMultiClient)

Region (default: us-east-1)

This value defines the used aws region. By default this is set to “us-east-1”

Endpoint

This value defines the used aws api endpoint. If no endpoint is set the client needs to set the right endpoint for the used region. By default this is set to “”.
Parameters (from components.BatchedWriterConfig)

Batch/TimeoutSec (default: 5, unit: sec)

This value defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this parameter is set to “5”.

Batch/MaxCount (default: 8192)

This value defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this parameter is set to “8192”.

Batch/FlushCount (default: 4096)

This value defines the number of messages to be buffered before they are written to disk. This setting is clamped to “BatchMaxCount”. By default this parameter is set to “BatchMaxCount / 2”.

Batch/FlushTimeoutSec (default: 0, unit: sec)

This value defines the maximum number of seconds to wait before a flush is aborted during shutdown. Set this parameter to “0” which does not abort the flushing procedure. By default this parameter is set to “0”.
Parameters (from components.RotateConfig)

Rotation/Enable (default: false)

If this value is set to “true” the logs will rotate after reaching certain thresholds. By default this parameter is set to “false”.

Rotation/TimeoutMin (default: 1440, unit: min)

This value defines a timeout in minutes that will cause the logs to rotate. Can be set in parallel with RotateSizeMB. By default this parameter is set to “1440”.

Rotation/SizeMB (default: 1024, unit: mb)

This value defines the maximum file size in MB that triggers a file rotate. Files can get bigger than this size. By default this parameter is set to “1024”.

Rotation/Timestamp (default: 2006-01-02_15)

This value sets the timestamp added to the filename when file rotation is enabled. The format is based on Go’s time.Format function. By default this parameter is to to “2006-01-02_15”.

Rotation/ZeroPadding (default: 0)

This value sets the number of leading zeros when rotating files with an existing name. Setting this setting to 0 won’t add zeros, every other number defines the number of leading zeros to be used. By default this parameter is set to “0”.

Rotation/Compress (default: false)

This value defines if a rotated logfile is to be gzip compressed or not. By default this parameter is set to “false”.

Rotation/At

This value defines a specific time for rotation in hh:mm format. By default this parameter is set to “”.

Rotation/AtHour (default: -1)

(no documentation available)

Rotation/AtMin (default: -1)

(no documentation available)
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

This example sends all received messages from all streams to S3, creating a separate file for each stream:

S3Out:
  Type: producer.AwsS3
  Streams: "*"
  Credential:
    Type: shared
    File: /Users/<USERNAME>/.aws/credentials
    Profile: default
  Region: eu-west-1
  Bucket: gollum-s3-test
  Batch:
    TimeoutSec: 60
    MaxCount: 1000
    FlushCount: 500
    FlushTimeoutSec: 0
  Rotation:
    Timestamp: 2006-01-02T15:04:05.999999999Z07:00
    TimeoutMin: 1
    SizeMB: 20
  Modulators:
    - format.Envelope:
      Postfix: "\n"
Benchmark

This producer is meant to provide more meaningful results in benchmark situations than producer.Null, as it is based on core.BufferedProducer.

Parameters

Enable (default: true)

Switches this plugin on or off.
Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples
benchmark:
  Type: producer.Benchmark
  Streams: "*"
Console

The console producer writes messages to standard output or standard error.

Parameters

Enable (default: true)

Switches this plugin on or off.

Console

Chooses the output device; either “stdout” or “stderr”. By default this is set to “stdout”.
Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples
StdErrPrinter:
  Type: producer.Console
  Streams: myerrorstream
  Console: stderr
ElasticSearch

The ElasticSearch producer sends messages to elastic search using the bulk http API. The producer expects a json payload.

Parameters

Enable (default: true)

Switches this plugin on or off.

Retry/Count

Set the amount of retries before a Elasticsearch request fail finally. By default this parameter is set to “3”.

Retry/TimeToWaitSec

This value denotes the time in seconds after which a failed dataset will be transmitted again. By default this parameter is set to “3”.

SetGzip

This value enables or disables gzip compression for Elasticsearch requests (disabled by default). This option is used one to one for the library package. See http://godoc.org/gopkg.in/olivere/elastic.v5#SetGzip By default this parameter is set to “false”.

Servers

This value defines a list of servers to connect to.

User

This value used as the username for the elasticsearch server. By default this parameter is set to “”.

Password

This value used as the password for the elasticsearch server. By default this parameter is set to “”.

StreamProperties

This value defines the mapping and settings for each stream. As index use the stream name here.

StreamProperties/<streamName>/Index

The value defines the Elasticsearch index used for the stream.

StreamProperties/<streamName>/Type

This value defines the document type used for the stream.

StreamProperties/<streamName>/TimeBasedIndex

This value can be set to “true” to append the date of the message to the index as in “<index>_<TimeBasedFormat>”. NOTE: This setting incurs a performance penalty because it is necessary to check if an index exists for each message! By default this parameter is set to “false”.

StreamProperties/<streamName>/TimeBasedFormat

This value can be set to a valid go time format string to be used with DayBasedIndex. By default this parameter is set to “2006-01-02”.

StreamProperties/<streamName>/Mapping

This value is a map which is used for the document field mapping. As document type, the already defined type is reused for the field mapping. See https://www.elastic.co/guide/en/elasticsearch/reference/5.4/indices-create-index.html#mappings

StreamProperties/<streamName>/Settings

Parameters (from core.BatchedProducer)

Batch/MaxCount (default: 8192)

Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.

Batch/FlushCount (default: 4096)

Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.

Batch/TimeoutSec (default: 5, unit: sec)

Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

This example starts a simple twitter example producer for local running ElasticSearch:

producerElasticSearch:
  Type: producer.ElasticSearch
  Streams: tweets_stream
  SetGzip: true
  Servers:
    - http://127.0.0.1:9200
  StreamProperties:
    tweets_stream:
      Index: twitter
      DayBasedIndex: true
      Type: tweet
      Mapping:
        # index mapping for payload
        user: keyword
        message: text
      Settings:
        number_of_shards: 1
        number_of_replicas: 1
File

The file producer writes messages to a file. This producer also allows log rotation and compression of the rotated logs. Folders in the file path will be created if necessary.

Each target file will handled with separated batch processing.

Parameters

Enable (default: true)

Switches this plugin on or off.

File

This value contains the path to the log file to write. The wildcard character “*” can be used as a placeholder for the stream name. By default this parameter is set to “/var/log/gollum.log”.

FileOverwrite

This value causes the file to be overwritten instead of appending new data to it. By default this parameter is set to “false”.

Permissions (default: 0644)

Defines the UNIX filesystem permissions used when creating the named file as an octal number. By default this paramater is set to “0664”.

FolderPermissions (default: 0755)

Defines the UNIX filesystem permissions used when creating the folders as an octal number. By default this paramater is set to “0755”.
Parameters (from components.BatchedWriterConfig)

Batch/TimeoutSec (default: 5, unit: sec)

This value defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this parameter is set to “5”.

Batch/MaxCount (default: 8192)

This value defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this parameter is set to “8192”.

Batch/FlushCount (default: 4096)

This value defines the number of messages to be buffered before they are written to disk. This setting is clamped to “BatchMaxCount”. By default this parameter is set to “BatchMaxCount / 2”.

Batch/FlushTimeoutSec (default: 0, unit: sec)

This value defines the maximum number of seconds to wait before a flush is aborted during shutdown. Set this parameter to “0” which does not abort the flushing procedure. By default this parameter is set to “0”.
Parameters (from components.RotateConfig)

Rotation/Enable (default: false)

If this value is set to “true” the logs will rotate after reaching certain thresholds. By default this parameter is set to “false”.

Rotation/TimeoutMin (default: 1440, unit: min)

This value defines a timeout in minutes that will cause the logs to rotate. Can be set in parallel with RotateSizeMB. By default this parameter is set to “1440”.

Rotation/SizeMB (default: 1024, unit: mb)

This value defines the maximum file size in MB that triggers a file rotate. Files can get bigger than this size. By default this parameter is set to “1024”.

Rotation/Timestamp (default: 2006-01-02_15)

This value sets the timestamp added to the filename when file rotation is enabled. The format is based on Go’s time.Format function. By default this parameter is to to “2006-01-02_15”.

Rotation/ZeroPadding (default: 0)

This value sets the number of leading zeros when rotating files with an existing name. Setting this setting to 0 won’t add zeros, every other number defines the number of leading zeros to be used. By default this parameter is set to “0”.

Rotation/Compress (default: false)

This value defines if a rotated logfile is to be gzip compressed or not. By default this parameter is set to “false”.

Rotation/At

This value defines a specific time for rotation in hh:mm format. By default this parameter is set to “”.

Rotation/AtHour (default: -1)

(no documentation available)

Rotation/AtMin (default: -1)

(no documentation available)
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Parameters (from file.Pruner)

Prune/Count (default: 0)

this value removes old logfiles upon rotate so that only the given number of logfiles remain. Logfiles are located by the name defined by “File” and are pruned by date (followed by name). Set this value to “0” to disable pruning by count. By default this parameter is set to “0”.

Prune/AfterHours (default: 0)

This value removes old logfiles that are older than a given number of hours. Set this value to “0” to disable pruning by lifetime. By default this parameter is set to “0”.

Prune/TotalSizeMB (default: 0, unit: mb)

This value removes old logfiles upon rotate so that only the given number of MBs are used by logfiles. Logfiles are located by the name defined by “File” and are pruned by date (followed by name). Set this value to “0” to disable pruning by file size. By default this parameter is set to “0”.
Examples

This example will write the messages from all streams to /tmp/gollum.log after every 64 message or after 60sec:

fileOut:
  Type: producer.File
  Streams: "*"
  File: /tmp/gollum.log
  Batch:
    MaxCount: 128
    FlushCount: 64
    TimeoutSec: 60
    FlushTimeoutSec: 3
HTTPRequest

The HTTPRequest producer sends messages as HTTP requests to a given webserver.

In RawData mode, incoming messages are expected to contain complete HTTP requests in “wire format”, such as:

POST /foo/bar HTTP/1.0\n
Content-type: text/plain\n
Content-length: 24
\n
Dummy test\n
Request data\n

In this mode, the message’s contents is parsed as an HTTP request and sent to the destination server (virtually) unchanged. If the message cannot be parsed as an HTTP request, an error is logged. Only the scheme, host and port components of the “Address” URL are used; any path and query parameters are ignored. The “Encoding” parameter is ignored.

If RawData mode is off, a POST request is made to the destination server for each incoming message, using the complete URL in “Address”. The incoming message’s contents are delivered in the POST request’s body and Content-type is set to the value of “Encoding”

Parameters

Enable (default: true)

Switches this plugin on or off.

Address

defines the URL to send http requests to. If the value doesn’t contain “://”, it is prepended with “http://”, so short forms like “localhost:8088” are accepted. The default value is “http://localhost:80”.

RawData (default: true)

Turns “RawData” mode on. See the description above.

Encoding (default: text/plain; charset=utf-8)

Defines the payload encoding when RawData is set to false.
Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples
HttpOut01:
  Type: producer.HTTPRequest
  Streams: http_01
  Address: "http://localhost:8099/test"
  RawData: true
InfluxDB

This producer writes data to an influxDB endpoint. Data is not converted to the correct influxDB format automatically. Proper formatting might be required.

Parameters

Enable (default: true)

Switches this plugin on or off.

Version

Defines the InfluxDB protocol version to use. This can either be 80-89 for 0.8.x, 90 for 0.9.0 or 91-100 for 0.9.1 or later. Be default this parameter is set to 100.

Host

Defines the host (and port) of the InfluxDB master. Be default this parameter is set to “localhost:8086”.

User

Defines the InfluxDB username to use. If this is empty, credentials are not used. Be default this parameter is set to “”.

Password

Defines the InfluxDB password to use. Be default this parameter is set to “”.

Database

Sets the InfluxDB database to write to. Be default this parameter is set to “default”.

TimeBasedName

When set to true, the Database parameter is treated as a template for time.Format and the resulting string is used as the database name. You can e.g. use “default-2006-01-02” to switch databases each day. By default this parameter is set to “true”.

RetentionPolicy

Only available for Version 90. This setting defines the InfluxDB retention policy allowed with this protocol version. By default this parameter is set to “”.
Parameters (from core.BatchedProducer)

Batch/MaxCount (default: 8192)

Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.

Batch/FlushCount (default: 4096)

Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.

Batch/TimeoutSec (default: 5, unit: sec)

Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples
metricsToInflux:
  Type: producer.InfluxDB
  Streams: metrics
  Host: "influx01:8086"
  Database: "metrics"
  TimeBasedName: false
  Batch:
    MaxCount: 2000
    FlushCount: 100
    TimeoutSec: 5
Kafka

This producer writes messages to a kafka cluster. This producer is backed by the sarama library (https://github.com/Shopify/sarama) so most settings directly relate to the settings of that library.

Parameters

Enable (default: true)

Switches this plugin on or off.

Servers

Defines a list of ideally all brokers in the cluster. At least one broker is required. By default this parameter is set to an empty list.

Version

Defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. If the version given is not known, the closest possible version is chosen. If GroupId is set to a value < “0.9”, “0.9.0.1” will be used. By default this parameter is set to “0.8.2”.

Topics

Defines a stream to topic mapping. If a stream is not mapped the stream name is used as topic. You can define the wildcard stream (*) here, too. If defined, all streams that do not have a specific mapping will go to this topic (including _GOLLUM_). By default this parameter is set to an empty list.

ClientId (default: gollum)

Sets the kafka client id used by this producer. By default this parameter is set to “gollum”.

Partitioner

Defines the distribution algorithm to use. Valid values are: Random, Roundrobin and Hash. By default this parameter is set to “Roundrobin”.

PartitionHasher

Defines the hash algorithm to use when Partitioner is set to “Hash”. Accepted values are “fnv1-a” and “murmur2”.

KeyFrom

Defines the metadata field that contains the string to be used as the key passed to kafka. When set to an empty string no key is used. By default this parameter is set to “”.

Compression

Defines the compression algorithm to use. Possible values are “none”, “zip” and “snappy”. By default this parameter is set to “none”.

RequiredAcks

Defines the numbers of acknowledgements required until a message is marked as “sent”. When set to -1 all replicas must acknowledge a message. By default this parameter is set to 1.

TimeoutMs

Denotes the maximum time the broker will wait for acks. This setting becomes active when RequiredAcks is set to wait for multiple commits. By default this parameter is set to 10000.

GracePeriodMs (default: 100, unit: ms)

Defines the number of milliseconds to wait for Sarama to accept a single message. After this period a message is sent to the fallback. This setting mitigates a conceptual problem in the saram API which can lead to long blocking times during startup. By default this parameter is set to 100.

MaxOpenRequests

Defines the maximum number of simultaneous connections opened to a single broker at a time. By default this parameter is set to 5.

ServerTimeoutSec

Defines the time after which a connection is set to timed out. By default this parameter is set to 30.

SendTimeoutMs

Defines the number of milliseconds to wait for a broker to before marking a message as timed out. By default this parameter is set to 250.

SendRetries

Defines how many times a message should be send again before a broker is marked as not reachable. Please note that this setting should never be 0. See https://github.com/Shopify/sarama/issues/294. By default this parameter is set to 1.

AllowNilValue (default: false)

When enabled messages containing an empty or nil payload will not be rejected. By default this parameter is set to false.

Batch/MinCount

Sets the minimum number of messages required to send a request. By default this parameter is set to 1.

Batch/MaxCount

Defines the maximum number of messages bufferd before a request is sent. A value of 0 will remove this limit. By default this parameter is set to 0.

Batch/MinSizeByte

Defines the minimum number of bytes to buffer before sending a request. By default this parameter is set to 8192.

Batch/SizeMaxKB

Defines the maximum allowed message size in KB. Messages bigger than this limit will be rejected. By default this parameter is set to 1024.

Batch/TimeoutMs

Defines the maximum time in milliseconds after which a new request will be sent, ignoring of Batch/MinCount and Batch/MinSizeByte By default this parameter is set to 3.

ElectRetries

Defines how many times a metadata request is to be retried during a leader election phase. By default this parameter is set to 3.

ElectTimeoutMs

Defines the number of milliseconds to wait for the cluster to elect a new leader. By default this parameter is set to 250.

MetadataRefreshMs

Defines the interval in milliseconds for refetching cluster metadata. By default this parameter is set to 600000.

TlsEnable

Enables TLS communication with brokers. By default this parameter is set to false.

TlsKeyLocation

Path to the client’s private key (PEM) used for TLS based authentication. By default this parameter is set to “”.

TlsCertificateLocation

Path to the client’s public key (PEM) used for TLS based authentication. By default this parameter is set to “”.

TlsCaLocation

Path to the CA certificate(s) used for verifying the broker’s key. By default this parameter is set to “”.

TlsServerName

Used to verify the hostname on the server’s certificate unless TlsInsecureSkipVerify is true. By default this parameter is set to “”.

TlsInsecureSkipVerify

Enables server certificate chain and host name verification. By default this parameter is set to false.

SaslEnable

Enables SASL based authentication. By default this parameter is set to false.

SaslUsername

Sets the user name used for SASL/PLAIN authentication. By default this parameter is set to “”.

SaslPassword

Sets the password used for SASL/PLAIN authentication. By default this parameter is set to “”. MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 8192.
Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples
kafkaWriter:
  Type: producer.Kafka
  Streams: logs
  Compression: zip
  Servers:
    - "kafka01:9092"
    - "kafka02:9092"
    - "kafka03:9092"
    - "kafka04:9092"
Null

This producer is meant to be used as a sink for data. It will throw away all messages without notice.

Parameters

Enable (default: true)

Switches this plugin on or off.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples
TrashCan:
  Type: producer.Null
  Streams: trash
Proxy

This producer is a compatible with the Proxy consumer plugin. Responses to messages sent to the given address are sent back to the original consumer of it is a compatible message source. As with consumer.proxy the returned messages are partitioned by common message length algorithms.

Parameters

Enable (default: true)

Switches this plugin on or off.

Address

This value stores the identifier to connect to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.Proxy”. By default this parameter is set to “:5880”.

ConnectionBufferSizeKB (default: 1024, unit: mb)

This value sets the connection buffer size in KB. This also defines the size of the buffer used by the message parser. By default this parameter is set to “1024”.

TimeoutSec (default: 1, unit: sec)

This value defines the maximum time in seconds a client is allowed to take for a response. By default this parameter is set to “1”.

Partitioner

This value defines the algorithm used to read messages from the stream. The messages will be sent as a whole, no cropping or removal will take place. By default this parameter is set to “delimiter”.

delimiter

separates messages by looking for a delimiter string. The delimiter is included into the left hand message.

ascii

reads an ASCII encoded number at a given offset until a given delimiter is found.

binary

reads a binary number at a given offset and size

binary_le

is an alias for “binary”

binary_be

is the same as “binary” but uses big endian encoding

fixed

assumes fixed size messages

Delimiter

This value defines the delimiter used by the text and delimiter partitioner. By default this parameter is set to “n”.

Offset

This value defines the offset used by the binary and text partitioner. This setting is ignored by the fixed partitioner. By default this parameter is set to “0”.

Size

This value defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8, for fixed this defines the size of a message. BY default this parameter is set to “4” for binary or “1” for fixed partitioner.
Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

This example will send 64bit length encoded data on TCP port 5880.

proxyOut:
  Type: producer.Proxy
  Address: ":5880"
  Partitioner: binary
  Size: 8
Redis

This producer sends messages to a redis server. Different redis storage types and database indexes are supported. This producer does not implement support for redis 3.0 cluster.

Parameters

Enable (default: true)

Switches this plugin on or off.

Address

Stores the identifier to connect to. This can either be any ip address and port like “localhost:6379” or a file like “unix:///var/redis.socket”. By default this is set to “:6379”.

Database (default: 0)

Defines the redis database to connect to.

Key

Defines the redis key to store the values in. This field is ignored when “KeyFormatter” is set. By default this is set to “default”.

Storage

Defines the type of the storage to use. Valid values are: “hash”, “list”, “set”, “sortedset”, “string”. By default this is set to “hash”.

KeyFrom

Defines the name of the metadata field used as a key for messages sent to redis. If the name is an empty string no key is sent. By default this value is set to an empty string.

FieldFrom

Defines the name of the metadata field used as a field for messages sent to redis. If the name is an empty string no key is sent. By default this value is set to an empty string.

Password

(no documentation available)
Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

.

RedisProducer00:
  Type: producer.Redis
  Address: ":6379"
  Key: "mykey"
  Storage: "hash"
Scribe

This producer allows sending messages to Facebook’s scribe service.

Parameters

Enable (default: true)

Switches this plugin on or off.

Address

Defines the host and port of a scrive endpoint. By default this parameter is set to “localhost:1463”.

ConnectionBufferSizeKB (default: 1024, unit: kb)

Sets the connection socket buffer size in KB. By default this parameter is set to 1024.

HeartBeatIntervalSec (default: 5, unit: sec)

Defines the interval in seconds used to query scribe for status updates. By default this parameter is set to 1.

WindowSize (default: 2048)

Defines the maximum number of messages send to scribe in one call. The WindowSize will reduce when scribe is returing “try later” to reduce load on the scribe server. It will slowly rise again for each successful write until WindowSize is reached again. By default this parameter is set to 2048.

ConnectionTimeoutSec (default: 5, unit: sec)

Defines the time in seconds after which a connection timeout is assumed. This can happen during writes or status reads. By default this parameter is set to 5.

Category

Maps a stream to a scribe category. You can define the wildcard stream (*) here, too. When set, all streams that do not have a specific mapping will go to this category (including reserved streams like _GOLLUM_). If no category mappings are set the stream name is used as category. By default this parameter is set to an empty list.
Parameters (from core.BatchedProducer)

Batch/MaxCount (default: 8192)

Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.

Batch/FlushCount (default: 4096)

Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.

Batch/TimeoutSec (default: 5, unit: sec)

Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples
logs:
  Type: producer.Scribe"
  Stream: ["*", "_GOLLUM"]
  Address: "scribe01:1463"
  HeartBeatIntervalSec: 10
  Category:
    "access"   : "accesslogs"
    "error"    : "errorlogs"
    "_GOLLUM_" : "gollumlogs"
Socket

The socket producer connects to a service over TCP, UDP or a UNIX domain socket.

Parameters

Enable (default: true)

Switches this plugin on or off.

Address

Defines the address to connect to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.socket”. By default this parameter is set to “:5880”.

ConnectionBufferSizeKB (default: 1024, unit: kb)

This value sets the connection buffer size in KB. By default this parameter is set to “1024”.

Batch/MaxCount (default: 8192)

This value defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this parameter is set to “8192”.

Batch/FlushCount (default: 4096)

This value defines the number of messages to be buffered before they are written to disk. This setting is clamped to BatchMaxCount. By default this parameter is set to “Batch/MaxCount / 2”.

Batch/TimeoutSec (default: 5, unit: sec)

This value defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this parameter is set to “5”.

Acknowledge

This value can be set to a non-empty value to expect the given string as a response from the server after a batch has been sent. If Acknowledge is enabled and a IP-Address is given to Address, TCP is used to open the connection, otherwise UDP is used. By default this parameter is set to “”.

AckTimeoutMs (default: 2000, unit: ms)

This value defines the time in milliseconds to wait for a response from the server. After this timeout the send is marked as failed. By default this parameter is set to “2000”.
Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

This example starts a socket producer on localhost port 5880:

SocketOut:
  Type: producer.Socket
  Address: ":5880"
  Batch
    MaxCount: 1024
    FlushCount: 512
    TimeoutSec: 3
  AckTimeoutMs: 1000
Spooling

This producer is meant to be used as a fallback if another producer fails to send messages, e.g. because a service is down. It does not really produce messages to some other service, it buffers them on disk for a certain time and inserts them back to the system after this period.

Parameters

Enable (default: true)

Switches this plugin on or off.

Path (default: /var/run/gollum/spooling)

Sets the output directory for spooling files. Spooling files will be stored as “<path>/<stream name>/<number>.spl”. By default this parameter is set to “/var/run/gollum/spooling”.

MaxFileSizeMB (default: 512, unit: mb)

Sets the size limit in MB that causes a spool file rotation. Reading messages back into the system will start only after a file is rotated. By default this parameter is set to 512.

MaxFileAgeMin (default: 1, unit: min)

Defines the duration in minutes after which a spool file rotation is triggered (regardless of MaxFileSizeMB). Reading messages back into the system will start only after a file is rotated. By default this parameter is set to 1.

MaxMessagesSec

Sets the maximum number of messages that will be respooled per second. Setting this value to 0 will cause respooling to send as fast as possible. By default this parameter is set to 100.

RespoolDelaySec (default: 10, unit: sec)

Defines the number of seconds to wait before trying to load existing spool files from disk after a restart. This setting can be used to define a safe timeframe for gollum to set up all required connections and resources before putting additionl load on it. By default this parameter is set to 10.

RevertStreamOnFallback (default: false)

This allows the spooling fallback to handle the messages that would have been sent back by the spooler if it would have handled the message. When set to true it will revert the stream of the message to the previous stream ID before sending it to the Fallback stream. By default this parameter is set to false.

BufferSizeByte (default: 8192)

Defines the initial size of the buffer that is used to read messages from a spool file. If a message is larger than this size, the buffer will be resized. By default this parameter is set to 8192.

Batch/MaxCount (default: 100)

defines the maximum number of messages stored in memory before a write to file is triggered. By default this parameter is set to 100.

Batch/TimeoutSec (default: 5, unit: sec)

defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this parameter is set to 5.
Parameters (from components.RotateConfig)

Rotation/Enable (default: false)

If this value is set to “true” the logs will rotate after reaching certain thresholds. By default this parameter is set to “false”.

Rotation/TimeoutMin (default: 1440, unit: min)

This value defines a timeout in minutes that will cause the logs to rotate. Can be set in parallel with RotateSizeMB. By default this parameter is set to “1440”.

Rotation/SizeMB (default: 1024, unit: mb)

This value defines the maximum file size in MB that triggers a file rotate. Files can get bigger than this size. By default this parameter is set to “1024”.

Rotation/Timestamp (default: 2006-01-02_15)

This value sets the timestamp added to the filename when file rotation is enabled. The format is based on Go’s time.Format function. By default this parameter is to to “2006-01-02_15”.

Rotation/ZeroPadding (default: 0)

This value sets the number of leading zeros when rotating files with an existing name. Setting this setting to 0 won’t add zeros, every other number defines the number of leading zeros to be used. By default this parameter is set to “0”.

Rotation/Compress (default: false)

This value defines if a rotated logfile is to be gzip compressed or not. By default this parameter is set to “false”.

Rotation/At

This value defines a specific time for rotation in hh:mm format. By default this parameter is set to “”.

Rotation/AtHour (default: -1)

(no documentation available)

Rotation/AtMin (default: -1)

(no documentation available)
Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

This example will collect messages from the fallback stream and buffer them for 10 minutes. After 10 minutes the first messages will be written back to the system as fast as possible.

spooling:
  Type: producer.Spooling
  Stream: fallback
  MaxMessagesSec: 0
  MaxFileAgeMin: 10
StatsdMetrics

This producer samples the messages it receives and sends metrics about them to statsd.

Parameters

Enable (default: true)

Switches this plugin on or off.

Server

Defines the server and port to send statsd metrics to. By default this parameter is set to “localhost:8125”.

Prefix

Defines a string that is prepended to every statsd metric name. By default this parameter is set to “gollum.”.

StreamMapping

Defines a translation from gollum stream to statsd metric name. If no mapping is given the gollum stream name is used as the metric name. By default this parameter is set to an empty list.

UseMessage (default: false)

Switch between just counting all messages arriving at this producer or summing up the message content. If UseMessage is set to true, the contents will be parsed as an integer, i.e. a string containing a human readable number is expected. By default the parameter is set to false.

UseGauge (default: false)

When set to true the statsd data format will switch from counter to gauge. Every stream that does not receive any message but is liste in StreamMapping will have a gauge value of 0. By default this is parameter is set to false.

Batch/MaxMessages

Defines the maximum number of messages to collect per batch. By default this parameter is set to 500.

Batch/TimeoutSec (default: 10, unit: sec)

Defines the number of seconds after which a batch is processed, regardless of MaxMessages being reached or not. By default this parameter is set to 10.
Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

This example will collect all messages going through gollum and sending metrics about the different datastreams to statsd at least every 5 seconds. Metrics will be send as “logs.streamName”.

metricsCollector:
  Type: producer.StatsdMetrics
  Stream: "*"
  Server: "stats01:8125"
  BatchTimeoutSec: 5
  Prefix: "logs."
  UseGauge: true
Websocket

The websocket producer opens up a websocket.

Parameters

Enable (default: true)

Switches this plugin on or off.

Address (default: :81)

This value defines the host and port to bind to. This is allowed be any ip address/dns and port like “localhost:5880”. By default this parameter is set to “:81”.

Path (default: /)

This value defines the url path to listen for. By default this parameter is set to “/”

ReadTimeoutSec (default: 3, unit: sec)

This value specifies the maximum duration in seconds before timing out read of the request. By default this parameter is set to “3” seconds.

IgnoreOrigin (default: false)

Ignore origin check from websocket server. By default this parameter is set to “false”.
Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples

This example starts a default Websocket producer on port 8080:

WebsocketOut:
  Type: producer.Websocket
  Address: ":8080"

Routers

Routers manage the transfer of messages between consumers and producers by streams. Routers can act as a kind of proxy that may filter and define the distribution algorithm of messages.

The stream names can be referred to by cleartext names. This stream names are free to choose but there are several reserved names for internal or special purpose:

_GOLLUM_:is used for internal log messages
*:is a placeholder for “all routers but the internal routers”. In some cases “*” means “all routers” without exceptions. This is denoted in the corresponding documentations whenever this is the case.

Basics router setups:

_images/router_800w.png

List of available Router:

Broadcast

This router implements the default behavior of routing all messages to all producers registered to the configured stream.

Parameters

Enable (default: true)

Switches this plugin on or off.
Parameters (from core.SimpleRouter)

Stream

This value specifies the name of the stream this plugin is supposed to read messages from.

Filters

This value defines an optional list of Filter plugins to connect to this router.

TimeoutMs (default: 0, unit: ms)

This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to “0”. By default this parameter is set to “0”.
Examples
rateLimiter:
  Type: router.Broadcast
  Stream: errorlogs
  Filters:
    - filter.Rate:
      MessagesPerSec: 200
Distribute

The “Distribute” plugin provides 1:n stream remapping by duplicating messages.

During startup, it creates a set of streams with names listed in [TargetStreams]. During execution, it consumes messages from the stream [Stream] and enqueues copies of these messages onto each of the streams listed in [TargetStreams].

When routing to multiple routers, the incoming stream has to be listed explicitly to be used.

Parameters

Enable (default: true)

Switches this plugin on or off.

TargetStreams

List of streams to route the incoming messages to.
Parameters (from core.SimpleRouter)

Stream

This value specifies the name of the stream this plugin is supposed to read messages from.

Filters

This value defines an optional list of Filter plugins to connect to this router.

TimeoutMs (default: 0, unit: ms)

This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to “0”. By default this parameter is set to “0”.
Examples

This example route incoming messages from streamA to streamB and streamC (duplication):

JunkRouterDist:
  Type: router.Distribute
  Stream: streamA
  TargetStreams:
    - streamB
    - streamC
Metadata

This router routes the message to a stream given in a specified metadata field. If the field is not set, the message will be passed along.

Parameters

Enable (default: true)

Switches this plugin on or off.

Key (default: Stream)

The metadata field to read from. By default this parameter is set to “Stream”
Parameters (from core.SimpleRouter)

Stream

This value specifies the name of the stream this plugin is supposed to read messages from.

Filters

This value defines an optional list of Filter plugins to connect to this router.

TimeoutMs (default: 0, unit: ms)

This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to “0”. By default this parameter is set to “0”.
Examples
switchRoute:
  Type: router.Metadata
  Stream: errorlogs
  Key: key
Random

The “Random” router relays each message sent to the stream [Stream] to exactly one of the producers connected to [Stream]. The receiving producer is chosen randomly for each message.

Parameters

Enable (default: true)

Switches this plugin on or off.
Parameters (from core.SimpleRouter)

Stream

This value specifies the name of the stream this plugin is supposed to read messages from.

Filters

This value defines an optional list of Filter plugins to connect to this router.

TimeoutMs (default: 0, unit: ms)

This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to “0”. By default this parameter is set to “0”.
Examples

This example will randomly send messages to one of the two console producers.

randomRouter:
  Type: router.Random
  Stream: randomStream
JunkPrinter00:
  Type: producer.Console
  Streams: randomStream
  Modulators:
    - format.Envelope:
        Prefix: "[junk_00] "
JunkPrinter01:
  Type: producer.Console
  Streams: randomStream
  Modulators:
    - format.Envelope:
        Prefix: "[junk_01] "
RoundRobin

This router implements round robin routing. Messages are routed to exactly one of the producers registered to the given stream. The producer is switched in a round robin fashin after each message. This producer can be useful for load balancing, e.g. when the target service does not support sharding by itself.

Parameters

Enable (default: true)

Switches this plugin on or off.
Parameters (from core.SimpleRouter)

Stream

This value specifies the name of the stream this plugin is supposed to read messages from.

Filters

This value defines an optional list of Filter plugins to connect to this router.

TimeoutMs (default: 0, unit: ms)

This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to “0”. By default this parameter is set to “0”.
Examples

This example will send message to the two console producers in an alternating fashin.

loadBalancer:
  Type: router.RoundRobin
  Stream: logs
JunkPrinter00:
  Type: producer.Console
  Streams: randomStream
  Modulators:
    - format.Envelope:
        Prefix: "[junk_00] "
JunkPrinter01:
  Type: producer.Console
  Streams: randomStream
  Modulators:
    - format.Envelope:
        Prefix: "[junk_01] "

Filters

Filters are plugins that are embedded into router plugins. Filters can analyze messages and decide wether to let them pass to a producer. or to block them.

Any

This plugin takes a list of filters and applies each of them to incoming messages until a an accepting filter is found. If any of the listed filters accept the message, it is passed through, otherwise, the message is dropper.

Parameters

AnyFilters

Defines a list of filters that should be checked before filtering a message. Filters are checked in order, and if the message passes then no further filters are checked.
Parameters (from core.SimpleFilter)

FilteredStream

This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples

This example will accept valid json or messages from “exceptionStream”:

ExampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - filter.Any:
      AnyFilters:
        - filter.JSON
        - filter.Stream:
          Only: exceptionStream
JSON

This filter inspects fields in JSON encoded datasets and accepts or rejects messages based on their contents.

Parameters

Reject

Defines a list of field names and regular expressions. Messages are rejected if the specified field’s contents matches the given regular expression. Reject is checked before Accept. Field paths can be defined in the format accepted by tgo.MarshalMap.Path. By default this parameter is set to an empty list.

Accept

Defines a list of field names and regular expressions. Messages are accepted if the specified field’s contents matches the given regular expression. Accept is checked after Reject. Field paths can be defined in the format accepted by tgo.MarshalMap.Path. By default this parameter is set to an empty list.

ApplyTo

Defines which part of the message the filter is applied to. When set to “”, this filter is applied to the message’s payload. All other values denotes a metadata key. By default this parameter is set to “”.
Parameters (from core.SimpleFilter)

FilteredStream

This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples
ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - filter.JSON:
      Reject:
        type: ^log\.
      Accept:
        source: ^www\d+\.
        data/active: true
None

This filter blocks all messages.

Parameters (from core.SimpleFilter)

FilteredStream

This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples

This example starts a Console consumer and blocks all incoming messages:

exampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - filter.None
Rate

This plugin blocks messages after a certain number of messages per second has been reached.

Parameters

MessagesPerSec (default: 100)

This value defines the maximum number of messages per second allowed to pass through this filter. By default this parameter is set to “100”.

Ignore

Defines a list of streams that should not be affected by rate limiting. This is useful for e.g. producers listeing to “*”. By default this parameter is set to “empty”.
Parameters (from core.SimpleFilter)

FilteredStream

This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples

This example accept ~10 messages in a second except the “noLimit” stream:

ExampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - filter.Rate:
      MessagesPerSec: 10
      Ignore:
        - noLimit
RegExp

This filter rejects or accepts messages based on regular expressions.

Parameters

Expression

Messages matching this expression are passed on. This parameter is ignored when set to “”. Expression is checked after ExpressionNot. By default this parameter is set to “”.

ExpressionNot

Messages not matching this expression are passed on. This parameter is ignored when set to “”. ExpressionNot is checked before Expression. By default this parameter is set to “”.

ApplyTo

Defines which part of the message the filter is applied to. When set to “”, this filter is applied to the message’s payload. All other values denotes a metadata key. By default this parameter is set to “”.
Parameters (from core.SimpleFilter)

FilteredStream

This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples

This example accepts only accesslog entries with a return status of 2xx or 3xx not originated from staging systems.

ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - filter.RegExp:
      ExpressionNot: " stage\\."
      Expression: "HTTP/1\\.1\\\" [23]\\d\\d"
Sample

This plugin can be used to get n out of m messages (downsample). This allows you to reduce the amount of messages; the plugin starts blocking after a certain number of messages has been reached.

Parameters

SampleRatePerGroup (default: 1)

This value defines how many messages are passed through the filter in each group. By default this parameter is set to “1”.

SampleGroupSize (default: 2)

This value defines how many messages make up a group. Messages over SampleRatePerGroup within a group are filtered. By default this parameter is set to “2”.

SampleRateIgnore

This value defines a list of streams that should not be affected by sampling. This is useful for e.g. producers listening to “*”. By default this parameter is set to an empty list.
Examples

This example will block 8 from 10 messages:

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - filter.Sample:
      SampleRatePerGroup: 2
      SampleGroupSize: 10
      SampleIgnore:
        - foo
        - bar
Stream

The “Stream” filter filters messages by applying black and white lists to the the messages’ streams’ names.

The blacklist is applied first; messages not rejected by the blacklist are checked against the whitelist. An empty white list matches all streams.

Parameters

Block

Defines a list of stream names that are blocked. If a message’s stream is not in that list, the “Only” list is tested. By default this parameter is empty.

Only

Defines a list of streams that may pass. Messages from streams that are not in this list are blocked unless the list is empty. By default this parameter is empty.
Parameters (from core.SimpleFilter)

FilteredStream

This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples

This example accepts ALL messages except ones from stream “foo”:

ExampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - filter.Stream:
        Block:
          - foo

This example only accepts messages from stream “foo”:

ExampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - filter.Stream:
        Only:
          - foo

Formatters

Formatters are plugins that are embedded into routers or producers. Formatters can convert messages into another format or append additional information.

Aggregate

Aggregate is a formatter which can group up further formatter. The ApplyTo settings will be pass on and overwritten in the child formatter. This plugin could be useful to setup complex configs with metadata handling in more readable format.

Parameters

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. This value will also used for further child modulators! By default this parameter is set to “”.

Modulators

Defines a list of child modulators to be applied to a message when it arrives at this formatter. Please try to use only content based formatter and filter! If a modulator changes the stream of a message the message is NOT routed to this stream anymore.
Parameters (from core.SimpleFormatter)

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example show a useful case for format.Aggregate plugin:

exampleConsumerA:
  Type: consumer.Console
  Streams: "foo"
  Modulators:
    - format.MetadataCopy:
        CopyToKeys: ["foo", "bar"]
    - format.Aggregate:
        ApplyTo: foo
        Modulators:
          - format.Base64Encode
          - format.Double
          - format.Envelope:
              Postfix: "\n"
    - format.Envelope:
        Postfix: "\n"
        ApplyTo: bar
# same config as
exampleConsumerB:
  Type: consumer.Console
  Streams: "bar"
  Modulators:
    - format.MetadataCopy:
        CopyToKeys: ["foo", "bar"]
    - format.Base64Encode:
        ApplyTo: foo
    - format.Double:
        ApplyTo: foo
    - format.Envelope:
        Postfix: "\n"
        ApplyTo: foo
    - format.Envelope:
        Postfix: "\n"
        ApplyTo: bar
Base64Decode

Base64Decode is a formatter that decodes base64 encoded messages. If a message is not or only partly base64 encoded an error will be logged and the decoded part is returned. RFC 4648 is expected.

Parameters

Base64Dictionary

This value defines the 64-character base64 lookup dictionary to use. When left empty, a dictionary as defined by RFC4648 is used. By default this parameter is set to “”.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example expects base64 strings from the console and decodes them before transmitting the message payload.

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.Base64Decode
Base64Encode

Base64Encode is a formatter that decodes Base64 encoded strings. Custom dictionaries are supported, by default RFC 4648 standard encoding is used.

Parameters

Base64Dictionary

Defines the 64-character base64 lookup dictionary to use. When left empty a RFC 4648 standard encoding is used. By default this parameter is set to “”.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example uses RFC 4648 URL encoding to format incoming data.

ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - formatter.Base64Encode
      Dictionary: "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"
Clear

This formatter erases the message payload or deletes a metadata key.

Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example removes the “pipe” key from the metadata produced by consumer.Console.

exampleConsumer:
  Type: consumer.Console
  Streams: stdin
  Modulators:
    - format.Clear
      ApplyTo: pipe
CollectdToInflux08

This formatter transforms JSON data produced by collectd to InfluxDB 0.8.x. Trailing and leading commas are removed from the Collectd message.

Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples
ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - formatter.CollectdToInflux08
CollectdToInflux09

This formatter transforms JSON data produced by collectd to InfluxDB 0.9.0. Trailing and leading commas are removed from the Collectd message.

Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples
ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - formatter.CollectdToInflux09
CollectdToInflux10

This formatter transforms JSON data produced by collectd to InfluxDB 0.9.1 or later. Trailing and leading commas are removed from the Collectd message.

Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples
ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - formatter.CollectdToInflux10
Double

Double is a formatter that appends a delimiter string and a second copy of the message’s contents to the message. Independent sets of formatters may be applied to both duplicates.

Parameters

Separator (default: :)

This value sets the separator string placed between both parts. This parameter is set to “:” by default.

UseLeftStreamID (default: false)

Use the stream id of the left side as the final stream id for the message if this value is “true”. This parameter is set to “false” by default.

Left

An optional list of formatters. The first copy of the message (left of the delimiter) is passed through these filters. This parameter is set to an empty list by default.

Left

An optional list of formatters. The second copy of the mssage (right of the delimiter) is passed through these filters. This parameter is set to an empty list by default.

Right

(no documentation available)
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example creates a message of the form “<orig>|<base64>”, where <orig> is the original console input and <base64> its Base64-encoded equivalent.

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.Double:
      Separator: "|"
      Right:
        - format.Base64Encode
Envelope

This formatter adds content to the beginning and/or end of a message.

Parameters

Prefix

Defines a string that is added to the front of the message. Special characters like n r or t can be used without additional escaping. By default this parameter is set to “”.

Postfix (default: n)

Defines a string that is added to the end of the message. Special characters like n r or t can be used without additional escaping. By default this parameter is set to “n”.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example adds a line number and a newline character to each message printed to the console.

exampleProducer:
  Type: producer.Console
  Streams: "*"
  Modulators:
    - format.Sequence
    - format.Envelope
ExtractJSON

This formatter extracts a specific value from a JSON payload and writes it back as a new payload or as a metadata field.

Parameters

Field

Defines the JSON key to extract. If the field does not exist an empty string is returned. Field paths can be defined in a format accepted by tgo.MarshalMap.Path. By default this parameter is set to “”.

TrimValues (default: true)

Enables trimming of whitespaces at the beginning and end of the extracted value. By default this parameter is set to true.

Precision (default: -1)

Defines the number of decimal places to use when converting Numbers into strings. If this parameter is set to -1 the shortest possible number of decimal places will be used. By default this parameter is set to -1.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples
ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - formatter.ExtractJSON
      Field: host
      ApplyTo: host
GrokToJSON

GrokToJSON is a formatter that applies regex filters to messages. It works by combining text patterns into something that matches your logs. See https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html#_grok_basics for more information about Grok.

The output format is JSON.

Parameters

RemoveEmptyValues

When set to true, empty captures will not be returned. By default this parameter is set to “true”.

NamedCapturesOnly

When set to true, only named captures will be returned. By default this parameter is set to “true”.

SkipDefaultPatterns

When set to true, standard grok patterns will not be included in the list of patterns. By default this parameter is set to “true”.

Patterns

A list of grok patterns that will be applied to messages. The first matching pattern will be used to parse the message.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example transforms unstructured input into a structured json output. Input:

us-west.servicename.webserver0.this.is.the.measurement 12.0 1497003802

Output:

{
  "datacenter": "us-west",
  "service": "servicename",
  "host": "webserver0",
  "measurement": "this.is.the.measurement",
  "value": "12.0",
  "time": "1497003802"
}

Config:

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.GrokToJSON:
      Patterns:
        - ^(?P<datacenter>[^\.]+?)\.(?P<service>[^\.]+?)\.(?P<host>[^\.]+?)\.statsd\.gauge-(?P<application>[^\.]+?)\.(?P<measurement>[^\s]+?)\s%{NUMBER:value_gauge:float}\s*%{INT:time}
        - ^(?P<datacenter>[^\.]+?)\.(?P<service>[^\.]+?)\.(?P<host>[^\.]+?)\.statsd\.latency-(?P<application>[^\.]+?)\.(?P<measurement>[^\s]+?)\s%{NUMBER:value_latency:float}\s*%{INT:time}
        - ^(?P<datacenter>[^\.]+?)\.(?P<service>[^\.]+?)\.(?P<host>[^\.]+?)\.statsd\.derive-(?P<application>[^\.]+?)\.(?P<measurement>[^\s]+?)\s%{NUMBER:value_derive:float}\s*%{INT:time}
        - ^(?P<datacenter>[^\.]+?)\.(?P<service>[^\.]+?)\.(?P<host>[^\.]+?)\.(?P<measurement>[^\s]+?)\s%{NUMBER:value:float}\s*%{INT:time}
Hostname

This formatter prefixes the message or metadata with the hostname of the machine gollum is running on.

Parameters

Separator (default: :)

Defines the separator string placed between hostname and data. By default this parameter is set to “:”.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example inserts the hostname into an existing JSON payload.

exampleProducer:
  Type: producer.Console
  Streams: "*"
  Modulators:
    - format.Trim:
      LeftSeparator: "{"
      RightSeparator: "}"
    - format.Hostname
      Separator: ","
    - format.Envelope:
      Prefix: "{\"host\":"
      Postfix: "}"
Identifier

This formatter generates a (mostly) unique 64 bit identifier number from the message payload, timestamp and/or sequence number. The number is be converted to a human readable form.

Parameters

Generator

Defines which algorithm to use when generating the identifier. This my be one of the following values. By default this parameter is set to “time”

hash

The message payload will be hashed using fnv1a and returned as hex.

time

The id will be formatted YYMMDDHHmmSSxxxxxxx where x denotes the current sequence number modulo 10000000. I.e. 10.000.000 messages per second are possible before a collision occurs.

seq

The sequence number will be used.

seqhex

The hex encoded sequence number will be used.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example will generate a payload checksum and store it to a metadata field called “checksum”.

ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - formatter.Identifier
      Generator: hash
      ApplyTo: checksum
JSONToArray

JSONToArray “flattens” a JSON object by selecting specific fields and creating a delimiter-separated string of their values.

A JSON input like {“foo”:”value1”,”bar”:”value2”} can be transformed into a list like value1,value2.

Parameters

Fields

List of strings specifying the JSON keys to retrieve from the input

Separator (default: ,)

The delimited string to insert between each value in the generated string. By default this parameter is set to “,”.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example get the foo and bar fields from a json document and create a payload of foo_value:bar_value:

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.JSONToArray
      Separator: ;
      Fields:
        - foo
        - bar
JSONToInflux10

JSONToInflux10 provides a transformation from arbitrary JSON data to InfluxDB 0.9.1+ compatible line protocol data.

Parameters

TimeField (default: time)

Specifies the JSON field that holds the timestamp of the message. The timestamp is formatted as defined by TimeFormat. If the field is not found the current timestamp is assumed. By default this parameter is set to “time”.

TimeFormat (default: unix)

Specifies the format of the time field as in go’s time.Parse or “unix” if the field contains a valid unix timestamp. By default this parameter is set to “unix”.

Measurement (default: measurement)

Specifies the JSON field that holds the measurements in this message. If the field doesn’t exist, the message is discarded. By default this parameter is set to “measurement”.

Ignore

May contain a list of JSON fields that should be ignored and not sent to InfluxDB. By default this parameter is set to an empty list.

Tags

May contain a list of JSON fields to send to InfluxDB as tags. The InfluxDB 0.9 convention is that values that do not change by every request are to be considered metadata and given as tags.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples
metricsToInflux:
  Type: producer.InfluxDB
  Streams: metrics
  Host: "influx01:8086"
  Database: "metrics"
  Modulators:
    - format.JSONToInflux10
      TimeField: timestamp
      Measurement: metrics
      Tags:
        - tags
        - service
        - host
        - application
MetadataCopy

This formatter sets metadata fields by copying data from the message’s payload or from other metadata fields.

Parameters

Key

Defines the key to copy, i.e. the “source”. ApplyTo will define the target of the copy, i.e. the “destination”. An empty string will use the message payload as source. By default this parameter is set to an empty string (i.e. payload).

Mode

Defines the copy mode to use. This can be one of “append”, “prepend” or “replace”. By default this parameter is set to “replace”.

Separator

When using mode prepend or append, defines the characters inserted between source and destination. By default this parameter is set to an empty string.

CopyToKeys

DEPRECATED. A list of meta data keys to copy the payload or metadata content to. If this field contains at least one value, mode is set to replace and the key field is ignored. By default this parameter is set to an empty list.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example copies the payload to the field key and applies a hash on it contain a hash over the complete payload.

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.MetadataCopy:
      ApplyTo: key
    - formatter.Identifier
      Generator: hash
      ApplyTo: key
ProcessJSON

This formatter allows modification of JSON encoded data. Each field can be processed by different directives and the result of all directives will be stored back to the original location.

Parameters

GeoIPFile

Defines a GeoIP file to load. This enables the “geoip” directive. If no file is loaded IPs will not be resolved. Files can be found e.g. at http://dev.maxmind.com/geoip/geoip2/geolite2/. By default this parameter is set to “”.

TrimValues (default: true)

Allows trimming of whitespaces from the beginning and end of each value after processing took place. By default this parameter is set to true.

Directives

Defines an array of actions to be applied to the JSON encoded data. Directives are processed in order of their appearance. Directives start with the name of the field, followed by an action followed by additional parameters if necessary. Parameters, key and action are separated by using the “:” character. By default this parameter is set to an empty list.

split

<delimiter> {<key>, <key>, …} Split the field’s value by the given delimiter, store the results to the fields listed after the delimiter.

replace

<string> <new string> Replace a given string inside the field’s value with a new one.

trim

<characters> Remove the given characters from the start and end of the field’s value.

rename

<new key> Rename a given field

remove

{<value>, <value>, …}` Remove a given field. If additional parameters are given, the value is expected to be an array. The given strings will be removed from that array.

pick

<index> <key> Pick a specific index from an array and store it to the given field.

time

<from fromat> <to format> Read a timestamp with a given format compatible to time.Parse and transform it into another format compatible with time.Format.

unixtimestamp

<unit> <to format> Read a unix timestamp with a given unit (“s”,”ms” or “ns”) and transform it it into another format compatible with time.Format.

flatten

{<delimiter>} Move all keys from a nested object to new fields named field + delimiter + subfield. If no delimiter is given “.” will be used.

agent

<prefix> {<field>, <field>, …} Parse the field’s value as a user agent string and extract the given fields into new fields named prefix + “_” + field. If no fields are given all fields are returned.

mozilla

mozilla version

platform

the platform used

os

the operating system used

localization

the language used

engine

codename of the browser engine

engine_version

version of the browser engine

browser

name of the browser

version

version of the browser

ip

Parse the field as an array of strings and remove all values that cannot be parsed as a valid IP. Single-string fields are supported, too, but will be converted to an array.

geoip

{<field>, <field>, …} Parse the field as an IP and extract the given fields into new fields named prefix + “_” + field. This action requires a valid GeoIP file to be loaded. If no fields are given all fields are returned.

country

the contry code of the IP. Generates country, countryCode.

city

the city of the IP

continent

the continent of the IP. Generates continent, continentCode.

timezone

the timezome of the IP

proxy

name of the proxy if applying Generates proxy, satellite.

location

the geolocation of this IP. Generates geocoord, geohash.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples
ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - format.ProcessJSON:
      Directives:
        - "host:split: :host:@timestamp"
        - "@timestamp:time:20060102150405:2006-01-02 15\\:04\\:05"
        - "client:ip"
        - "client:geoip:location:country"
        - "ua:agent:ua:os:engine:engine_version"
ProcessTSV

This formatter allows modification of TSV encoded data. Each field can be processed by different directives and the result of all directives will be stored back to the original location.

Parameters

Delimiter (default: t)

Defines the separator used to split values. By default this parameter is set to “t”.

QuotedValue

When set to true, values that start and end with a quotation mark are not scanned for delimiter characters. I.e. those values will not be split even if they contain delimiter characters. By default this parameter is set to false.

Directives

Defines an array of actions to apply to the TSV encoded data. Directives are processed in order of appearance. Directives start with the index of the field, followed by an action followed by additional parameters if necessary. Parameters, key and action are separated by the “:” character. By default this parameter is set to an empty list.

replace

<string> <new string> Replace a given string inside the field’s value with another one.

prefix

<string> Prepend the given string to the field’s value

postfix

<string> Append the given string to the field’s value

trim

<characters> Remove the given characters from the start and end of the field’s value.

quote

Surround the field’s value with quotation marks after all directives have been processed.

time

<from fromat> <to format> Read a timestamp in the specified time.Parse-compatible format and transform it into another format compatible with time.Format.

** (unnamed) **

remove Removes the field from the result

agent

{<field>, <field>, …} Parse the field’s value as a user agent string and insert the given fields into the TSV after the given index. If no fields are given all fields are returned.

mozilla

mozilla version

platform

the platform used

os

the operating system used

localization

the language used

engine

codename of the browser engine

engine_version

version of the browser engine

browser

name of the browser

version

version of the browser

QuotedValues

(no documentation available)
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples
ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - format.processTSV:
      Delimiter: ","
      Directives:
        - "0:time:20060102150405:2006-01-02 15\\:04\\:05"
        - "2:remove"
        - "11:agent:os:engine:engine_version"
RegExp

This formatter parses a message using a regular expression, performs string (template) replacement and returns the result.

Parameters

Posix

Set to true to compile the regular expression using posix semantics. By default this parameter is set to true.

Expression

Defines the regular expression used for parsing. For details on the regexp syntax see https://golang.org/pkg/regexp/syntax. By default this parameter is set to “(.*)”

Template (default: ${1})

Defines the result string. Regexp matching groups can be referred to using “${n}”, with n being the group’s index. For other possible reference semantics, see https://golang.org/pkg/regexp/#Regexp.Expand. By default this parameter is set to “${1}”
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example extracts time and host from an imaginary log message format.

exampleConsumer:
  Type: consumer.Console
  Streams: stding
  Modulators:
    - format.RegExp:
      Expression: "^(\\d+) (\\w+): "
      Template: "time: ${1}, host: ${2}"
Runlength

Runlength is a formatter that prepends the length of the message, followed by a “:”. The actual message is formatted by a nested formatter.

Parameters

Separator (default: :)

This value is used as separator. By default this parameter is set to “:”.

StoreRunlengthOnly (default: false)

If this value is set to “true” only the runlength will stored. This option is useful to e.g. create metadata fields only containing the length of the payload. When set to “true” the Separator parameter will be ignored. By default this parameter is set to false.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example will store the length of the payload in a separate metadata field.

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.MetadataCopy:
      CopyToKeys: ["length"]
    - format.Runlength:
      ApplyTo: length
      StoreRunlengthOnly: true
Sequence

This formatter prefixes data with a sequence number managed by the formatter. All messages passing through an instance of the formatter will get a unique number. The number is not persisted, i.e. it restarts at 0 after each restart of gollum.

Parameters

Separator (default: :)

Defines the separator string placed between number and data. By default this parameter is set to “:”.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example will insert the sequence number into an existing JSON payload.

exampleProducer:
  Type: producer.Console
  Streams: "*"
  Modulators:
    - format.Trim:
      LeftSeparator: "{"
      RightSeparator: "}"
    - format.Sequence
      Separator: ","
    - format.Envelope:
      Prefix: "{\"seq\":"
      Postfix: "}"
Serialize

Serialize is a formatter that serializes a message for later retrieval. The formatter uses the internal protobuf based function from msg.Serialize().

Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example serializes all consumed messages:

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.Serialize
SplitPick

This formatter splits data into an array by using the given delimiter and extracts the given index from that array. The value of that index will be written back.

Parameters

Delimiter (default: :)

Defines the delimiter to use when splitting the data. By default this parameter is set to “:”

Index (default: 0)

Defines the index to pick. By default this parameter is set to 0.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples
ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - format.SplitPick:
      Index: 2
      Delimiter: ":"
SplitToJSON

SplitToJSON is a formatter that splits a message by a given token and creates a JSON object of the split values by assigning each value to a predefined property.

Parameters

Keys

This value defines an array of JSON keys to which the split message’s parts should be assigned to. The keys are applied to the resulting token array by index.

SplitBy (default: |)

This value defines the separator character to use when splitting a message. By default this parameter is set to “|”.

KeepJSON (default: true)

This value can be set to “false” to escape JSON payload texts as regualar strings. Otherwise JSON payload will be taken as-is and set to the corresponding key. By default this parameter is set to “true”.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example will format a input of value1,value2,value3 to a json string of {“foo”:”value1”, “bar”:”value2”}:

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.SplitToJSON:
      SplitBy: ","
      Keys:
        - foo
        - bar
StreamName

This formatter prefixes data with the name of the current or previous stream.

Parameters

UsePrevious

Set to true to use the name of the previous stream. By default this parameter is set to false.

Separator (default: :)

Defines the separator string used between stream name and data. By default this parameter is set to “:”.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example prefixes the message with the most recent routing history.

exampleProducer:
  Type: producer.Console
  Streams: "*"
  Modulators:
    - format.StreamName:
      Separator: ", "
      UsePrevious: true
    - format.StreamName:
      Separator: ": "
StreamRevert

This formatter gets the previously used stream from a message and sets it as the new target stream.

Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples
ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - format.StreamRevert
StreamRoute

StreamRoute is a formatter that modifies a message’s stream by reading a prefix from the message’s data (and discarding it). The prefix is defined as everything before a given delimiter in the message. If no delimiter is found or the prefix is empty the message stream is not changed.

Parameters

Delimiter (default: :)

This value defines the delimiter to search when extracting the stream name. By default this parameter is set to “:”.

StreamModulator

A list of further modulators to format and filter the extracted stream name. By default this parameter is “empty”.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example sets the stream name for messages like <error>:a message string to error and a message string as payload:

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.StreamRoute:
      Delimiter: ":"
      StreamModulator:
        - format.Trim:
          LeftSeparator: <
          RightSeparator: >
TemplateJSON

This formatter unmarshals the given data as JSON and applies the results to the given go template. The JSON data will be replaced with the rendered template result. The template language is described in the go documentation: https://golang.org/pkg/text/template/#hdr-Actions

Parameters

Template

Defines the go template to execute with the received JSON data. If the template cannot be parsed or the JSON payload cannot be unmarshaled, the incoming JSON data is preserved. By default this parameter is set to “”.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example extracts the fields “Name” and “Surname” from a JSON encoded payload and writes them both back as a plain text result.

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.TemplateJSON:
      Template: "{{.Name}} {{.Surname}}"
TextToJSON

This formatter uses a state machine to parse arbitrary text data and transform it to JSON.

Parameters

StartState

Defines the name of the initial state when parsing a message. When set to an empty string the first state from the directives array will be used. By default this parameter is set to “”.

TimestampRead

Defines a time.Parse compatible format string used to read time fields when using the “dat” directive. By default this parameter is set to “20060102150405”.

TimestampWrite (default: 2006-01-02 15:04:05 MST)

Defines a time.Format compatible format string used to write time fields when using the “dat” directive. By default this parameter is set to “2006-01-02 15:04:05 MST”.

UnixTimestampRead

Defines the unix timestamp format expected from fields that are parsed using the “dat” directive. Valid valies are “s” for seconds, “ms” for milliseconds, or “ns” for nanoseconds. This parameter is ignored unless TimestampRead is set to “”. By default this parameter is set to “”.

Directives

Defines an array of directives used to parse text data. Each entry must be of the format: “State:Token:NextState:Flags:Function”. State denotes the name of the state owning this entry. Multiple entries per state are allowed. Token holds a string that triggers a state transition. NextState holds the target of the state transition. Flags is an optional field and is used to trigger special parser behavior. Flags can be comma separated if you need to use more than one. Function defines an action that is triggered upon state transition. Spaces will be stripped from all fields but Token. If a fields requires a colon it has to be escaped with a backslash. Other escape characters supported are n, r and t. By default this parameter is set to an empty list.

Directive rules

There are some special cases which will cause the parser to do additional actions.

  • When writing a value without a key, the state name will become the key.
  • If two keys are written in a row the first key will hold a null value.
  • Writing a key while writing array elements will close the array.

Directive flags

Flags can modify the parser behavior and can be used to store values on a stack across multiple directives.

continue

Prepend the token to the next match.

append

Append the token to the current match and continue reading.

include

Append the token to the current match.

push

Push the current state to the stack.

pop

Pop the stack and use the returned state if possible.

Directive actions

Actions are used to write text read since the last transition to the JSON object.

key

Write the parsed section as a key.

val

Write the parsed section as a value without quotes.

esc

Write the parsed section as a escaped string value.

dat

Write the parsed section as a timestamp value.

arr

Start a new array.

obj

Start a new object.

end

Close an array or object.

arr+val

arr followed by val.

arr+esc

arr followed by esc.

arr+dat

arr followed by dat.

val+end

val followed by end.

esc+end

esc followed by end.

dat+end

dat followed by end.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

The following example parses JSON data.

ExampleConsumer:
  Type: consumer.Console
  Streams: console
  Modulators:
    - format.TextToJSON:
      Directives:
        - "findKey   :\":  key       :      :        "
        - "findKey   :}:             : pop  : end    "
        - "key       :\":  findVal   :      : key    "
        - "findVal   :\\:: value     :      :        "
        - "value     :\":  string    :      :        "
        - "value     :[:   array     : push : arr    "
        - "value     :{:   findKey   : push : obj    "
        - "value     :,:   findKey   :      : val    "
        - "value     :}:             : pop  : val+end"
        - "string    :\":  findKey   :      : esc    "
        - "array     :[:   array     : push : arr    "
        - "array     :{:   findKey   : push : obj    "
        - "array     :]:             : pop  : val+end"
        - "array     :,:   array     :      : val    "
        - "array     :\":  arrString :      :        "
        - "arrString :\":  array     :      : esc    "
Timestamp

Timestamp is a formatter that allows prefixing messages with a timestamp (time of arrival at gollum). The timestamp format is freely configurable and can e.g. contain a delimiter sequence at the end.

Parameters

Timestamp (default: 2006-01-02 15:04:05 MST | )

This value defines a Go time format string that is used to f ormat the timestamp. By default this parameter is set to “2006-01-02 15:04:05 MST | “.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example will set a time string to the meta data field time:

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.Timestamp:
      Timestamp: "2006-01-02T15:04:05.000 MST"
      ApplyTo: time
Trim

This formatter searches for separator strings and removes all data left or right of this separator.

Parameters

LeftSeparator

The string to search for. Searching starts from the left side of the data. If an empty string is given this parameter is ignored. By default this parameter is set to “”.

RightSeparator

The string to search for. Searching starts from the right side of the data. If an empty string is given this parameter is ignored. By default this parameter is set to “”.

LeftOffset (default: 0)

Defines the search start index when using LeftSeparator. By default this parameter is set to 0.

RightOffset (default: 0)

Defines the search start index when using RightSeparator. Counting starts from the right side of the message. By default this parameter is set to 0.
Parameters (from core.SimpleFormatter)

ApplyTo

This value chooses the part of the message the formatting should be applied to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.

SkipIfEmpty

When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples

This example will reduce data like “foo[bar[foo]bar]foo” to “bar[foo]bar”.

exampleConsumer:
  Type: consumer.Console
  Streams: "*"
  Modulators:
    - format.Trim:
      LeftSeparator: "["
      RightSeparator: "]"

Aggregate plugins

To simplify complex pipeline configs you are able to aggregate plugin configurations. That means that all settings witch are defined in the aggregation scope will injected to each defined “sub-plugin”.

To define an aggregation use the keyword Aggregate as plugin type.

Parameters

Plugins

List of plugins witch will instantiate and get the aggregate settings injected.
Examples

In this example both consumers get the streams and modulator injected from the aggregation settings:

AggregatePipeline:
  Type: Aggregate
  Streams: console
  Modulators:
    - format.Envelope:
        Postfix: "\n"
  Plugins:
    consumerFoo:
      Type: consumer.File
      File: /tmp/foo.log
    consumerBar:
      Type: consumer.File
      File: /tmp/bar.log

This example shows a second use case to reuse server settings easier:

consumerConsole:
  Type: consumer.Console
  Streams: write

kafka:
  Type: Aggregate
  Servers:
    - kafka0:9092
    - kafka1:9093
    - kafka2:9094

  Plugins:
    producer:
      Type: producer.Kafka
      Streams: write
      Compression: zip
      Topics:
        write: test

    consumer:
      Type: consumer.Kafka
      Streams: read
      Topic: test
      DefaultOffset: Oldest

producerConsole:
  Type: producer.Console
  Streams: read

Examples and Cookbooks

Here you can find some examples and cookbooks how you can run Gollum.

Examples

Hello World Examples
Hello World

This example sets up a simple console consumer and producer that will simply echo everything you type back to the console. As messages have no new line appended by default an envelope formatter is used to add one before writing to console. Make sure to start Gollum with gollum -ll 3 to see all log messages.

'StdIn':
  Type: 'consumer.Console'
  Streams: 'console'

'StdOut':
  Type: 'producer.Console'
  Streams: 'console'
  Modulators:
  - 'format.Envelope': {}
Loadbalancer

This example extends the Hello World example by introducing a route configuration. All messages from the console consumer will be sent to a round robin loadbalancer that will forward messages to one of the two attached producers. Make sure to start Gollum with gollum -ll 3 to see all log messages.

'StdIn':
  Type: 'consumer.Console'
  Streams: 'console'

'loadbalancer':
  Type: 'router.RoundRobin'
  Stream: 'console'

'StdOut1':
  Type: 'producer.Console'
  Streams: 'console'
  Modulators:
  - 'format.Envelope':
      Prefix: '1: '

'StdOut2':
  Type: 'producer.Console'
  Streams: 'console'
  Modulators:
  - 'format.Envelope':
      Prefix: '2: '

When you remove the router from the config you will see each message to reach both producers.

Hello World filtered

This example extends the previous example by setting up a filter to only echo sentences that end with the word “gollum”. A regular expression filter is used to achieve this. Note that this filter does not apply to standard log messages. Make sure to start Gollum with gollum -ll 3 to see all log messages.

'StdIn':
  Type: 'consumer.Console'
  Streams: 'console'

'loadbalancer':
  Type: 'router.RoundRobin'
  Stream: 'console'
  Filters:
  - 'filter.RegExp':
      FilterExpression: ".*gollum$"

'StdOut1':
  Type: 'producer.Console'
  Streams: 'console'
  Modulators:
  - 'format.Envelope':
      Prefix: '1: '

'StdOut2':
  Type: 'producer.Console'
  Streams: 'console'
  Modulators:
  - 'format.Envelope':
      Prefix: '2: '

You can also attach filters to the modulators section of a consumer or a producer. Please note that routers can filter but not modify messages.

Hello World splitter

This example extends the first example by introducing a stream split. This time we will print the console output twice, encoded as XML and as JSON. Make sure to start Gollum with gollum -ll 3 to see all log messages.

'StdIn':
  Type: 'consumer.Console'
  Streams: 'console'

'StdOutXML':
  Type: 'producer.Console'
  Streams: 'console'
  Modulators:
  - 'format.Envelope':
      Prefix: '<msg>'
      Postfix: '</msg>\n'

'StdOutJSON':
  Type: 'producer.Console'
  Streams: 'console'
  Modulators:
  - 'format.Envelope':
      Prefix: '{"msg":"'
      Postfix: '"}\n'

You can also do this in a slightly different way by utilizing two streams. When doing this you can filter or route both streams differently. In this extended example, every second example will output only JSON.

'StdIn':
  Type: 'consumer.Console'
  Streams:
  - 'consoleJSON'
  - 'consoleXML'

'xmlFilter':
  Type: 'router.Broadcast'
  Stream: 'consoleXML'
  Filters:
  - 'filter.Sample': {}

'StdOutXML':
  Type: 'producer.Console'
  Streams: 'consoleXML'
  Modulators:
  - 'format.Envelope':
      Prefix: '<msg>'
      Postfix: '</msg>\n'

'StdOutJSON':
  Type: 'producer.Console'
  Streams: 'consoleJSON'
  Modulators:
  - 'format.Envelope':
      Prefix: '{"msg":"'
      Postfix: '"}\n'
Chat server

This example requires two Gollum instances to run. The first one acts as the “chat client” while the second one acts as the “chat server”. Messages entered on the client will be sent to the server using runlength encoding. When the message reaches the server, it will be decoded and written to the console. If the server does not respond, the message will be sent to the fallback and displayed as an error. Make sure to start Gollum with gollum -ll 3 to see all log messages.

Client

'StdIn':
  Type: 'consumer.Console'
  Streams: 'console'

'SocketOut':
  Type: 'producer.Socket'
  Streams: 'console'
  Address: ':5880'
  Acknowledge: 'OK'
  FallbackStream: 'failed'
  Modulators:
  - 'format.Runlength': {}

'Failed':
  Type: 'producer.Console'
  Streams: 'failed'
  Modulators:
  - 'format.Envelope':
      Prefix: 'Failed to sent: '

Server

'SocketIn':
  Type: 'consumer.Socket'
  Streams: 'socket'
  Address: ":5880"
  Acknowledge: 'OK'
  Partitioner: 'ascii'
  Delimiter: ':'

'StdOut':
  Type: 'producer.Console'
  Streams: 'socket'
  Modulators:
  - 'format.Envelope': {}
Profiling

This configuration will test Gollum for its theoretic maximum message throughput. You can of course modify this example to test e.g. file producer performance. Make sure to start Gollum with gollum -ll 3 -ps to see all log messages as well as intermediate profiling results.

'Profiler':
Type: ‘consumer.Profiler’ Streams: ‘profile’ Runs: 100000 Batches: 100 Characters: ‘abcdefghijklmnopqrstuvwxyz .,!;:-_’ Message: ‘%256s’ KeepRunning: false ModulatorRoutines: 0
‘Benchmark’:
Type: ‘producer.Benchmark’ Streams: ‘profile’

Cookbooks

Socket to Kafka

This example creates a unix domain socket /tmp/kafka.socket that accepts the following protocol: | <topic>:<message_base64>\n

The message will be base64 decoded and written to the topic mentioned at the start of the message. This example also allows shows how to apply rate limiting per topic.

Configuration (v0.4.x):

# Socket accepts <topic>:<message_base64>
- "consumer.Socket":
    Stream: "raw"
    Address: "unix:///tmp/kafka.socket"
    Permissions: "0777"

# Stream "raw" to stream "<topic>" conversion
# Decoding of <message_base64> to <message>
- "stream.Broadcast":
    Stream: "raw"
    Formatter: "format.StreamRoute"
    StreamRouteDelimiter: ":"
    StreamRouteFormatter: "format.Base64Decode"

# Listening to all streams as streams are generated at runtime
# Use ChannelTimeoutMs to be non-blocking
- "producer.Kafka":
    Stream: "*"
    Filter: "filter.Rate"
    RateLimitPerSec: 100
    ChannelTimeoutMs: 10
    Servers:
        - "kafka1:9092"
        - "kafka2:9092"
        - "kafka3:9092"
Kafka roundtrip

This example can be used for developing or testing kafka consumers and producers.

gollum config

With the following config gollum will create a console.consumer with a kafka.producer and a kafka.consumer with a console.producer. All data which write to the console will send to kafka. The second kafka.consumer will read all data from kafka and send it back to your console by the console.producer:

gollum >= v0.5.0
consumerConsole:
    type: consumer.Console
    Streams: "write"

producerKafka:
    type: producer.Kafka
    Streams: "write"
    Compression: "zip"
    Topics:
        "write" : "test"
    Servers:
        - kafka0:9092
        - kafka1:9093
        - kafka2:9094

consumerKafka:
    type: consumer.Kafka
    Streams: "read"
    Topic: "test"
    DefaultOffset: "Oldest"
    MaxFetchSizeByte: 100
    Servers:
        - kafka0:9092
        - kafka1:9093
        - kafka2:9094

producerConsole:
    type: producer.Console
    Streams: "read"
    Modulators:
        - format.Envelope:
            Postfix: "\n"

This config example can also be found here

kafka setup for docker

Here you find a docker-compose setup which works for the gollum config example.

/etc/hosts entry

You need a valid /etc/hosts entry to be able to use the set hostnames:

# you can not use 127.0.0.1 or localhost here
<YOUR PUBLIC IP> kafka0 kafka1 kafka2
docker-compose file
zookeeper:
  image: wurstmeister/zookeeper
  ports:
    - "2181:2181"
    - "2888:2888"
    - "3888:3888"
kafkaone:
  image: wurstmeister/kafka:0.10.0.0
  ports:
    - "9092:9092"
  links:
    - zookeeper:zookeeper
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock
  environment:
    KAFKA_ADVERTISED_HOST_NAME: kafka0
    KAFKA_ZOOKEEPER_CONNECT: "zookeeper"
    KAFKA_BROKER_ID: "21"
    KAFKA_CREATE_TOPICS: "test:1:3,Topic2:1:1:compact"
kafkatwo:
  image: wurstmeister/kafka:0.10.0.0
  ports:
    - "9093:9092"
  links:
    - zookeeper:zookeeper
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock
  environment:
    KAFKA_ADVERTISED_HOST_NAME: kafka1
    KAFKA_ZOOKEEPER_CONNECT: "zookeeper"
    KAFKA_BROKER_ID: "22"
    KAFKA_CREATE_TOPICS: "test:1:3,Topic2:1:1:compact"
kafkathree:
  image: wurstmeister/kafka:0.10.0.0
  ports:
    - "9094:9092"
  links:
    - zookeeper:zookeeper
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock
  environment:
    KAFKA_ADVERTISED_HOST_NAME: kafka2
    KAFKA_ZOOKEEPER_CONNECT: "zookeeper"
    KAFKA_BROKER_ID: "23"
    KAFKA_CREATE_TOPICS: "test:1:3,Topic2:1:1:compact"

This docker-compose file can be run by

docker-compose -f docker-compose-kafka.yml -p kafka010 up
Write to Elasticsearch (ElasticSearch producer)
Description

This example can be used for developing or testing the ElasticSearch producer.

gollum config

With the following config gollum will create a console.consumer with a ElasticSearch.producer. All data which write to the console will send to ElasticSearch.

This payload can be used for the configured setup:

{"user" : "olivere", "message" : "It's a Raggy Waltz"}
gollum >= v0.5.0
consumerConsole:
    type: consumer.Console
    Streams: "write"

producerElastic:
    Type: producer.ElasticSearch
    Streams: write
    User: elastic
    Password: changeme
    Servers:
        - http://127.0.0.1:9200
    Retry:
        Count: 3
        TimeToWaitSec: 5
    SetGzip: true
    StreamProperties:
        write:
            Index: twitter
            DayBasedIndex: true
            Type: tweet
            Mapping:
                user: keyword
                message: text
            Settings:
                number_of_shards: 1
                number_of_replicas: 1

This config example can also be found here

ElasticSearch setup for docker

Here you find a docker-compose setup which works for the config example:

version: '2'
services:
  elasticsearch1:
    image: docker.elastic.co/elasticsearch/elasticsearch:5.4.1
    container_name: elasticsearch1
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    mem_limit: 1g
    volumes:
      - esdata1:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - esnet
  elasticsearch2:
    image: docker.elastic.co/elasticsearch/elasticsearch:5.4.1
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - "discovery.zen.ping.unicast.hosts=elasticsearch1"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    mem_limit: 1g
    volumes:
      - esdata2:/usr/share/elasticsearch/data
    networks:
      - esnet

volumes:
  esdata1:
    driver: local
  esdata2:
    driver: local

networks:
  esnet:

This docker-compose file can be run by:

docker-compose -f docker-compose-elastic.yml up

Release Notes

Performance tests

History

All tests were executed by calling time gollum -c profile.conf -ll 1.

test ver user sys cpu msg/sec
Raw pipeline 0.5.0 11,85s 3,69s 173% 1.116.071
  0.4.6 10,48s 3,01s 178% 1.320.132
Basic formatting 0.5.0 39,70s 6,09s 532% 1.163.602
  0.4.6 [1] 21,84s 5,78s 206% 746.881
8 consumers 0.5.0 344,33s 28,24s 673% 1.446.157
  0.4.6 319,44s 72,22s 574% 1.173.536
JSON pipeline 0.5.0 28,23s 6,33s 138% 40.033
  0.4.6 28,30s 6,30s 150% 43.400
[1]this version does not use paralell formatting
v0.5.0
Raw pipeline
Intel Core i7-4770HQ CPU @ 2.20GHz, 16 GB RAM
go1.8.3 darwin/amd64
  • 11,85s user
  • 3,69s system
  • 173% cpu
  • 8,960s total
  • 1.116.071 msg/sec
"Profiler":
    Type: "consumer.Profiler"
    Runs: 100000
    Batches: 100
    Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
    Message: "%256s"
    Streams: "profile"
    KeepRunning: false
    ModulatorRoutines: 0

"Benchmark":
    Type: "producer.Benchmark"
    Streams: "profile"
Basic formatting
Intel Core i7-4770HQ CPU @ 2.20GHz, 16 GB RAM
go1.8.3 darwin/amd64
Please note that from this version on formatting is done in parallel.
  • 39,70s user
  • 6,09s system
  • 532% cpu
  • 8,594s total
  • 1.163.602 msg/sec
"Profiler":
    Type: "consumer.Profiler"
    Runs: 100000
    Batches: 100
    Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
    Message: "%256s"
    Streams: "profile"
    KeepRunning: false
    ModulatorRoutines: 4
    Modulators:
        - format.Envelope
        - format.Timestamp

"Benchmark":
    Type: "producer.Benchmark"
    Streams: "profile"
8 consumers with formatting
Intel Core i7-4770HQ CPU @ 2.20GHz, 16 GB RAM
go1.8.3 darwin/amd64
  • 344,33s user
  • 28,24s system
  • 673% cpu
  • 55,319s total
  • 1.446.157 msg/sec
 "Profiler":
    Type: Aggregate
    Runs: 100000
    Batches: 100
    Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
    Message: "%256s"
    Streams: "profile"
    KeepRunning: false
    ModulatorRoutines: 0
    Modulators:
        - format.Envelope
        - format.Timestamp
    Plugins:
        P01:
            Type: "consumer.Profiler"
        P02:
            Type: "consumer.Profiler"
        P03:
            Type: "consumer.Profiler"
        P04:
            Type: "consumer.Profiler"
        P05:
            Type: "consumer.Profiler"
        P06:
            Type: "consumer.Profiler"
        P07:
            Type: "consumer.Profiler"
        P08:
            Type: "consumer.Profiler"

"Benchmark":
    Type: "producer.Benchmark"
    Streams: "profile"
JSON pipeline
Intel Core i7-4770HQ CPU @ 2.20GHz, 16 GB RAM
go1.8.3 darwin/amd64
  • 28,23s user
  • 6,33s system
  • 138% cpu
  • 24,979s total
  • 40.033 msg/sec
"Profiler":
    Type: consumer.Profiler
    Runs: 10000
    Batches: 100
    Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
    Message: "{\"test\":\"%64s\",\"foo\":\"%32s|%32s\",\"bar\":\"%64s\",\"thisisquitealongstring\":\"%64s\"}"
    Streams: "profile"
    KeepRunning: false
    ModulatorRoutines: 0
    Modulators:
        - format.ProcessJSON:
            Directives:
                - "test:rename:foobar"
                - "bar:remove"
                - "foo:split:|:foo1:foo2"
        - format.ExtractJSON:
            Field: thisisquitealongstring

"Benchmark":
    Type: "producer.Benchmark"
    Streams: "profile"
v0.4.6
Raw pipeline
Intel Core i7-4770HQ CPU @ 2.20GHz, 16 GB RAM
go1.8.3 darwin/amd64
  • 10,48s user
  • 3,01s system
  • 178% cpu
  • 7,575s total
  • 1.320.132 msg/sec
- "consumer.Profiler":
    Runs: 100000
    Batches: 100
    Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
    Message: "{\"test\":\"%64s\",\"foo\":\"%32s|%32s\",\"bar\":\"%64s\",\"thisisquitealongstring\":\"%64s\"}"
    Stream: "profile"
    KeepRunning: false

- "producer.Benchmark":
    Stream: "profile"
Basic formatting
Intel Core i7-4770HQ CPU @ 2.20GHz, 16 GB RAM
go1.8.3 darwin/amd64
  • 21,84s user
  • 5,78s system
  • 206% cpu
  • 13,389s total
  • 746.881 msg/sec
- "consumer.Profiler":
    Runs: 100000
    Batches: 100
    Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
    Message: "%256s"
    Stream: "profile"
    KeepRunning: false

- "stream.Broadcast":
    Stream: "profile"
    Formatter: format.Timestamp
    TimestampFormatter: format.Envelope

- "producer.Benchmark":
    Stream: "profile"
8 consumers with formatting
Intel Core i7-4770HQ CPU @ 2.20GHz, 16 GB RAM
go1.8.3 darwin/amd64
  • 319,44s user
  • 72,22s system
  • 574% cpu
  • 68,17s total
  • 1.173.536 msg/sec
- "consumer.Profiler":
    Instances: 8
    Runs: 100000
    Batches: 100
    Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
    Message: "%256s"
    Stream: "profile"
    KeepRunning: false

- "stream.Broadcast":
    Stream: "profile"
    Formatter: format.Timestamp
    TimestampFormatter: format.Envelope

- "producer.Benchmark":
    Stream: "profile"
JSON pipeline
Intel Core i7-4770HQ CPU @ 2.20GHz, 16 GB RAM
go1.8.3 darwin/amd64
  • 28,30s user
  • 6,30s system
  • 150% cpu
  • 23,041s total
  • 43.400 msg/sec
- "consumer.Profiler":
    Runs: 10000
    Batches: 100
    Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
    Message: "%256s"
    Stream: "profile"
    KeepRunning: false

- "stream.Broadcast":
    Stream: "profile"
    Formatter: format.ExtractJSON
    ExtractJSONdataFormatter: format.ProcessJSON
    ProcessJSONDirectives:
        - "test:rename:foobar"
        - "bar:remove"
        - "foo:split:|:foo1:foo2"
    ExtractJSONField: thisisquitealongstring

- "producer.Benchmark":
    Stream: "profile"

v0.5.0

Breaking changes 0.4.x to 0.5.0
Configuration

The goal of this breaking change was to make Gollum configuration files easier to maintain and easier to merge. In addition to that several quirks and inconsistencies have been resolved.

Plugin header
This change allows configs to be easier to merge which is requirement for future features.
As of this change a new, mandatory field “Type” has been added.

From

- "plugin.Type":
    ID: "pluginId"

To

"pluginId":
    Type: "plugin.Type"
Plural form

In previous versions fields did not follow a rule when to use plural or singular. In 0.5.0 plural means “one or more values” while singular means “only one value”.

From

- "plugin.Type":
    ID: "pluginId"
    Category:
        - "Foo"
        - "Bar"
    Streams:
        - "foo"
        - "bar"

To

"pluginId":
    type: "plugin.Type"
    categories:
        - "Foo"
        - "Bar"
    streams:
        - "foo"
        - "bar"
Formatters and filters are now modulators

In earlier versions chaining formatters was done by nesting them via options. This was confusing as the order was “upside down”. In addition to that you could use every formatter only once. The new modulator concept introduces a more natural order and allows formatters to be reused as often as necessary. In addition to that, filter and formatters have been merged into the same list. This fixes the problem of applying filters before or after formatters that was previously fixed by adding e.g. a “FilterAfterFormat” field.

From

- "plugin.Type":
    ID: "pluginId"
    Filter: "filter.Before"
    FilterAfterFormat: "filter.After"
    Formatter: "format.SECOND"
    SECONDOption: "foobar"
    SECONDFormatter: "format.FIRST"

To

"pluginId":
    Type: "plugin.Type"
    Modulators:
        - "filter.Before"
        - "format.FIRST"
        - "format.SECOND"
            Option: "foobar"
        - "filter.After"
Nested options

Some plugins had a set of options starting with the same prefix (e.g. file.Producer). These options have now been grouped.

From

- "plugin.Type":
    ID: "pluginId"
    RotateAfterHours: 10
    RotateSizeMB: 1024
    RotateAt: "00:00"

To

"pluginId":
    Type: "plugin.Type"
    Rotate:
        AfterHours: 10
        SizeMB: 1024
        At: "00:00"
Plugins
The plugin system has been refactored to make plugins more consistent and to reduce the amount of work required to write a new plugin. This change introduced new subclasses and changed some of the basic interfaces.
The shutdown process has been revamped to give plugins a better chance to cleanly shut down and to get rid of all their messages without the system having to care about stream loops.
Renaming of streams to routers
A “stream” in 0.4.x has a double meaning. It denotes a stream of data, as well as a type of plugin that is used to route messages from one stream to another or simply to configure a certain stream of data in terms of formatting.
To make it easier to talk about these to things the routing/configuring part (the plugins) are renamed to “router”.

From

- "stream.Broadcast":
    ID: "Splitter"
    Stream: "foo"

To

"Splitter":
    Type: "router.Broadcast"
    Stream: "foo"
Removal of gollum/shared

All types from the github.com/trivago/gollum/shared package have been moved to the new github.com/trivago/tgo package and subpackages. This allows us to re-use these types in other projects more easily and introduces a better structure. This package is meant to be an extension to the Golang standard library and follows a “t-prefix” naming convention. Everything that you would expect in e.g. the sync package will be placed in tgo/tsync.

From

c := shared.MaxI(a,b)
spin := shared.NewSpinner(shared.SpinPriorityLow)

To

c := tmath.MaxI(a,b)
spin := tsync.NewSpinner(tsync.SpinPriorityLow)
Base classes

In version 0.4.x and earlier not all plugins had a base class. In 0.5.0 all plugins have base classes and existing base classes have been renamed.

renamed

core.ConsumerBase -> core.SimpleConsumer
core.ProducerBase -> core.BufferedProducer
core.StreamBase   -> core.SimpleRouter

new

core.SimpleConsumer     Consumer base class
core.SimpleFilter       Filter base class
core.SimpleFormatter    Formatter base class
core.SimpleProducer     Producer base class
core.SimpleRouter       Router base class
core.DirectProducer     A producer that directly accepts messages without buffering
core.BufferedProducer   A producer that reads messages from a channel
core.BatchedProducer    A producer that collects messages and processes them in a batch
Metrics
Metrics have been moved from gollum/shared to the tgo package. As of this shared.Metric.* has to be replaced by tgo.Metric.* and the package “github.com/trivago/tgo” has to be imported instead of “github.com/trivago/gollum/shared”.
Please note that “per second” metrics can now be added without additional overhead by using tgo.Metric.NewRate(metricName, rateMetricName, time.Second, 10, 3, true). All custom “per second” metrics should be replaced with this function.
Logging

Version 0.5.0 introduces logrus based scoped logging to give error messages a clearer context. As of this every plugin has a “Logger” member in its base class.

From

Log.Error.Print("MyPlugin: Something's wrong", err)

To

plugin.Logger.WithError(err).Error("Something's wrong")
Configure

Error handling has been improved so that a plugin automatically reacts on missing or invalid values. Errors are now collected in a stack attached to the config reader and processed as a batch after configure returns. In addition to that, simple types can now be configured using struct tags.

From

type Console struct {
    core.ConsumerBase
    autoExit bool
    pipeName string
    pipePerm uint32
    pipe     *os.File
}

func (cons *Console) Configure(conf core.PluginConfig) error {
    cons.autoexit = conf.GetBool("ExitOnEOF", true)
    inputConsole := conf.GetString("Console", "stdin")

    switch strings.ToLower(inputConsole) {
    case "stdin":
        cons.pipe = os.Stdin
        cons.pipeName = "stdin"
    case "stdin":
        return fmt.Errorf("Cannot read from stderr")
    default:
        cons.pipe = nil
        cons.pipeName = inputConsole

        if perm, err := strconv.ParseInt(conf.GetString("Permissions", "0664"), 8, 32); err != nil {
            Log.Error.Printf("Error parsing named pipe permissions: %s", err)
        } else {
            cons.pipePerm = uint32(perm)
        }
    }

    return cons.ConsumerBase.Configure(conf)
}

To

type Console struct {
    core.SimpleConsumer
    autoExit            bool   `config:"ExitOnEOF" default:"true"`
    pipeName            string `config:"Pipe" default:"stdin"`
    pipePerm            uint32 `config:"Permissions" default:"0644"`
    pipe                *os.File
}

func (cons *Console) Configure(conf core.PluginConfigReader) {
    switch strings.ToLower(cons.pipeName) {
    case "stdin":
        cons.pipe = os.Stdin
        cons.pipeName = "stdin"
    case "stderr":
        conf.Errors.Pushf("Cannot read from stderr")
    default:
        cons.pipe = nil
    }
}
Message handling

Message handling has changed from the way 0.4.x does it. Messages now support MetaData and contain a copy of the “original” data next to the actual payload. In addition to this, messages are now backed by a memory pool and are passed around using pointers. All this is reflected in new function signatures and new message member functions.

From

func (format *Sequence) Format(msg core.Message) ([]byte, core.MessageStreamID) {
    basePayload, stream := format.base.Format(msg)
    baseLength := len(basePayload)
    sequenceStr := strconv.FormatUint(msg.Sequence, 10) + format.separator

    payload := make([]byte, len(sequenceStr)+baseLength)
    len := copy(payload, []byte(sequenceStr))
    copy(payload[len:], basePayload)

    return payload, stream
}

To

func (format *Sequence) ApplyFormatter(msg *core.Message) error {
    seq := atomic.AddInt64(format.seq, 1)
    sequenceStr := strconv.FormatInt(seq, 10)
    content := format.GetAppliedContent(msg)

    dataSize := len(sequenceStr) + len(format.separator) + len(content)
    payload := core.MessageDataPool.Get(dataSize)

    offset := copy(payload, []byte(sequenceStr))
    offset += copy(payload[offset:], format.separator)
    copy(payload[offset:], content)

    format.SetAppliedContent(msg, payload)
    return nil
}

This example shows most of the changes related to the new message structure.

  1. As the sequence number has been removed from the message struct, plugins relying on it need to implement it themselves.
  2. As messages now support metadata, you need to specify whether you want to affect metadata or the payload. In formatter plugins this is reflected by the GetAppliedContent method, which is backed by the “ApplyTo” config parameter.
  3. If you require a new payload buffer you should now utilize core.MessageDataPool.

Things that you don’t see in this example are the following:

  1. Buffers returned by core.MessageDataPool tend to be overallocated, i.e. they can be resized without reallocation in most cases. As of this methods to resize the payload have been added.
  2. If you need to create a copy of the complete message use the Clone() method
Formatting pipeline

In version 0.4.x you had to take care about message changes by yourself on many different occasions. With 0.5.0 the message flow has been moved completely to the core framework. As of this you don’t need to worry about routing, or resetting data to it’s original state. The framework will do this for you.

From

func (prod *Redis) getValueAndKey(msg core.Message) (v []byte, k string) {
    value, _ := prod.Format(msg) // Creates a copy and we must not forget this step

    if prod.keyFormat == nil {
        return value, prod.key
    }

    if prod.keyFromParsed {     // Ordering is crucial here
        keyMsg := msg
        keyMsg.Data = value
        key, _ := prod.keyFormat.Format(keyMsg)
        return value, string(key)
    }

    key, _ := prod.keyFormat.Format(msg)
    return value, string(key)
}


func (prod *Redis) storeString(msg core.Message) {
    value, key := prod.getValueAndKey(msg)

    result := prod.client.Set(key, string(value), 0)
    if result.Err() != nil {
        Log.Error.Print("Redis: ", result.Err())
        prod.Drop(msg) // Good thing we stored a copy of the message ...
    }
}

To

func (prod *Redis) getValueFieldAndKey(msg *core.Message) (v, f, k []byte) {
    meta := msg.GetMetadata()
    key := meta.GetValue(prod.key)     // Due to metadata fields...
    field := meta.GetValue(prod.field) // ... this is now a lot easier

    return msg.GetPayload(), field, key
}

func (prod *Redis) storeString(msg *core.Message) {
    // The message arrives here after formatting
    value, key := prod.getValueAndKey(msg)

    result := prod.client.Set(string(key), string(value), time.Duration(0))
    if result.Err() != nil {
        prod.Logger.WithError(result.Err()).Error("Failed to set value")
        prod.TryFallback(msg)          // Will send the original (unformatted) message. Always.
    }
}
New features
  • Filters and Formatters have been merged into one list
  • You can now use a filter or formatter more than once in the same plugin
  • Consumers can now do filtering and formatting, too
  • Messages can now store metadata. Formatters can affect the payload or a metadata field
  • All plugins now have an automatic log scope
  • Message payloads are now backed by a memory pool
  • Messages now store the original message, i.e. a backup of the payload state after consumer processing
  • Gollum now provides per-stream metrics
  • Plugins are now able to implement health checks that can be queried via http
  • There is a new pseudo plugin type “Aggregate” that can be used to share configuration between multiple plugins
  • New base types for producers: Direct, Buffered, Batched
  • Plugin configurations now support nested structures
  • The configuration process has been simplified a lot by adding automatic error handling and struct tags
  • Added a new formatter format.GrokToJSON
  • Added a new formatter format.JSONToInflux10
  • Added a new formatter format.Double
  • Added a new formatter format.MetadataCopy
  • Added a new formatter format.Trim
  • Consumer.File now supports filesystem events
  • Consumers can now define the number of go routines used for formatting/filtering
  • All AWS plugins now support role switching
  • All AWS plugins are now based on the same credentials code
Bugfixes
  • The plugin lifecycle has been reimplemented to avoid gollum being stuck waiting for plugins to change state
  • Any errors during the configuration phase will cause gollum to exit
  • Integration test suite added
  • Producer.HTTPRequest port handling fixed
  • The test-config command will now produce more meaningful results
  • Duplicating messages now properly duplicates the whole message and not just the struct
  • Several race conditions have been fixed
  • Producer.ElasticSearch is now based on a more up-to-date library
  • Producer.AwsS3 is now behaving more like producer.File
  • Gollum metrics can now bind to a specific address instead of just a port
Breaking changes
  • The config format has changed to improve automatic processing
  • A lot of plugins have been renamed to avoid confusion and to better reflect their behavior
  • A lot of plugins parameters have been renamed
  • The instances plugin parameter has been removed
  • Most of gollum’s metrics have been renamed
  • Plugin base types have been renamed
  • All message handling function signatures have changed to use pointers
  • All formatters don’t daisy chain anymore as they can now be listed in proper order
  • Stream plugins have been renamed to Router plugins
  • Routers are not allowed to modify message content anymore
  • filter.All and format.Forward have been removed as they are not required anymore
  • Producer formatter listss dedicated to format a key or similar constructs have been removed
  • Logging framework switched to logrus
  • The package gollum.shared has been removed in favor of trivago.tgo
  • Fuses have been removed from all plugins
  • The general message sequence number has been removed
  • The term “drop” has been replaced by the term “fallback” to emphasise it’s use
  • The _DROPPED_ stream has been removed. Messages are discarded if no fallback is set
  • Formatters can still the stream of a message but cannot trigger routing by themselves
  • Compiling contrib plugins now requires a specific loader.go to be added
  • The docker file on docker hub is now a lot smaller and only contains the gollum binary

License

This project is released under the terms of the Apache 2.0 license.

Copyright 2015-2018 trivago N.V.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.