Class: CloudEvents::KafkaBinding
- Inherits:
-
Object
- Object
- CloudEvents::KafkaBinding
- Defined in:
- lib/cloud_events/kafka_binding.rb
Overview
Kafka protocol binding for CloudEvents.
This class implements the Kafka protocol binding, including decoding of events from Kafka message hashes, and encoding of events to Kafka message hashes. It supports binary (header-based) and structured (body-based) content modes that can delegate to formatters such as JSON.
Supports CloudEvents 1.0 only. See https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/kafka-protocol-binding.md.
Kafka messages are represented as plain Ruby Hashes with the keys:
key:[String, nil] — the Kafka record keyvalue:[String, nil] — the Kafka record value (body)headers:[Hash] — String => String header pairs
Constant Summary collapse
- JSON_FORMAT =
The name of the JSON decoder/encoder.
"json"- DEFAULT_KEY_MAPPER =
The default key mapper for encoding. Returns the
partitionkeyextension attribute from the event. ->(event) { event["partitionkey"] }
- DEFAULT_REVERSE_KEY_MAPPER =
The default reverse key mapper for decoding. Sets the
partitionkeyextension attribute from the Kafka record key. Returns an empty hash if the key is nil. ->(key) { key.nil? ? {} : { "partitionkey" => key } }
Instance Attribute Summary collapse
-
#default_encoder_name ⇒ String?
The name of the encoder to use if none is specified.
Class Method Summary collapse
-
.default ⇒ KafkaBinding
Returns a default Kafka binding, including support for JSON format.
Instance Method Summary collapse
-
#decode_event(message, allow_opaque: false, reverse_key_mapper: :NOT_SET, **format_args) ⇒ CloudEvents::Event
Decode an event from a Kafka message hash.
-
#encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format_args) ⇒ Hash
Encode an event into a Kafka message hash.
-
#initialize(key_mapper: DEFAULT_KEY_MAPPER, reverse_key_mapper: DEFAULT_REVERSE_KEY_MAPPER) ⇒ KafkaBinding
constructor
Create an empty Kafka binding.
-
#probable_event?(message) ⇒ boolean
Determine whether a Kafka message is likely a CloudEvent, by inspecting headers only (does not parse the value).
-
#register_formatter(formatter, encoder_name: nil) ⇒ self
Register a formatter for all operations it supports, based on which methods are implemented by the formatter object.
-
#register_formatter_methods(formatter, decode_event: false, encode_event: nil, decode_data: false, encode_data: false) ⇒ self
Registers the given formatter for the given operations.
Constructor Details
#initialize(key_mapper: DEFAULT_KEY_MAPPER, reverse_key_mapper: DEFAULT_REVERSE_KEY_MAPPER) ⇒ KafkaBinding
Create an empty Kafka binding.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/cloud_events/kafka_binding.rb', line 67 def initialize(key_mapper: DEFAULT_KEY_MAPPER, reverse_key_mapper: DEFAULT_REVERSE_KEY_MAPPER) @key_mapper = key_mapper @reverse_key_mapper = reverse_key_mapper @event_decoders = Format::Multi.new do |result| result&.key?(:event) ? result : nil end @event_encoders = {} @data_decoders = Format::Multi.new do |result| result&.key?(:data) && result.key?(:content_type) ? result : nil end @data_encoders = Format::Multi.new do |result| result&.key?(:content) && result.key?(:content_type) ? result : nil end text_format = TextFormat.new @data_decoders.formats.replace([text_format, HttpBinding::DefaultDataFormat]) @data_encoders.formats.replace([text_format, HttpBinding::DefaultDataFormat]) @default_encoder_name = nil end |
Instance Attribute Details
#default_encoder_name ⇒ String?
The name of the encoder to use if none is specified.
150 151 152 |
# File 'lib/cloud_events/kafka_binding.rb', line 150 def default_encoder_name @default_encoder_name end |
Class Method Details
.default ⇒ KafkaBinding
Returns a default Kafka binding, including support for JSON format.
47 48 49 50 51 52 53 54 |
# File 'lib/cloud_events/kafka_binding.rb', line 47 def self.default @default ||= begin kafka_binding = new kafka_binding.register_formatter(JsonFormat.new, encoder_name: JSON_FORMAT) kafka_binding.default_encoder_name = JSON_FORMAT kafka_binding end end |
Instance Method Details
#decode_event(message, allow_opaque: false, reverse_key_mapper: :NOT_SET, **format_args) ⇒ CloudEvents::Event
Decode an event from a Kafka message hash.
184 185 186 187 188 189 190 191 192 |
# File 'lib/cloud_events/kafka_binding.rb', line 184 def decode_event(, allow_opaque: false, reverse_key_mapper: :NOT_SET, **format_args) reverse_key_mapper = @reverse_key_mapper if reverse_key_mapper == :NOT_SET headers = [:headers] || {} content_type_string = headers["content-type"] content_type = ContentType.new(content_type_string) if content_type_string event = decode_content(, headers, content_type, content_type_string, allow_opaque, **format_args) apply_reverse_key_mapper(event, [:key], reverse_key_mapper) end |
#encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format_args) ⇒ Hash
Encode an event into a Kafka message hash.
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/cloud_events/kafka_binding.rb', line 207 def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format_args) key_mapper = @key_mapper if key_mapper == :NOT_SET if event.is_a?(Event::Opaque) return encode_opaque_event(event) end unless event.respond_to?(:spec_version) raise(SpecVersionError, "Unable to determine specversion") end unless event.spec_version.start_with?("1.") raise(SpecVersionError, "Unrecognized specversion: #{event.spec_version}") end if structured_format encode_structured_event(event, structured_format, key_mapper, **format_args) else encode_binary_event(event, key_mapper, **format_args) end end |
#probable_event?(message) ⇒ boolean
Determine whether a Kafka message is likely a CloudEvent, by inspecting headers only (does not parse the value).
159 160 161 162 163 164 165 166 |
# File 'lib/cloud_events/kafka_binding.rb', line 159 def probable_event?() headers = [:headers] || {} return true if headers.key?("ce_specversion") content_type_string = headers["content-type"] return false unless content_type_string content_type = ContentType.new(content_type_string) content_type.media_type == "application" && content_type.subtype_base == "cloudevents" end |
#register_formatter(formatter, encoder_name: nil) ⇒ self
Register a formatter for all operations it supports, based on which methods are implemented by the formatter object. See Format for a list of possible methods.
97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/cloud_events/kafka_binding.rb', line 97 def register_formatter(formatter, encoder_name: nil) encoder_name = encoder_name.to_s.strip.downcase if encoder_name decode_event = formatter.respond_to?(:decode_event) encode_event = encoder_name if formatter.respond_to?(:encode_event) decode_data = formatter.respond_to?(:decode_data) encode_data = formatter.respond_to?(:encode_data) register_formatter_methods(formatter, decode_event: decode_event, encode_event: encode_event, decode_data: decode_data, encode_data: encode_data) self end |
#register_formatter_methods(formatter, decode_event: false, encode_event: nil, decode_data: false, encode_data: false) ⇒ self
Registers the given formatter for the given operations. Some arguments
are activated by passing true, whereas those that rely on a format
name are activated by passing in a name string.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/cloud_events/kafka_binding.rb', line 128 def register_formatter_methods(formatter, decode_event: false, encode_event: nil, decode_data: false, encode_data: false) @event_decoders.formats.unshift(formatter) if decode_event if encode_event encoders = @event_encoders[encode_event] ||= Format::Multi.new do |result| result&.key?(:content) && result.key?(:content_type) ? result : nil end encoders.formats.unshift(formatter) end @data_decoders.formats.unshift(formatter) if decode_data @data_encoders.formats.unshift(formatter) if encode_data self end |