All Files ( 100.0% covered at 3.35 hits/line )
16 files in total.
353 relevant lines,
353 lines covered and
0 lines missed.
(
100.0%
)
# frozen_string_literal: true
- 1
require 'active_support'
- 1
require 'bunny'
- 1
require 'connection_pool'
- 1
require 'rabbitmq_client/version'
- 1
require 'rabbitmq_client/lifecycle'
- 1
require 'rabbitmq_client/plugin'
- 1
require 'rabbitmq_client/exchange_registry'
- 1
require 'rabbitmq_client/publisher'
# RabbitmqClient Module is used as a clinet library for Rabbitmq
# This Module is supporting the following use cases
# - Publish events to Rabbitmq server
- 1
module RabbitmqClient
- 1
extend ActiveSupport::Autoload
- 1
include ActiveSupport::Configurable
- 1
include ActiveSupport::JSON
- 1
eager_autoload do
- 1
autoload :LoggerBuilder
- 1
autoload :PlainLogSubscriber
- 1
autoload :JsonLogSubscriber
- 1
autoload :JsonFormatter
- 1
autoload :TextFormatter
- 1
autoload :TagsFilter
end
- 1
@exchange_registry = ExchangeRegistry.new
# [url] url address of rabbitmq server
- 1
config_accessor(:rabbitmq_url, instance_accessor: false) do
- 1
'amqp://guest:guest@127.0.0.1:5672'
end
# [logger_configs] configs for teh used logger
# logs_format: json, plain
# logs_to_stdout: true, false
# logs_level: info, debug
# logs_filename: logs file name
- 1
config_accessor(:logger_configs, instance_accessor: false) do
{
- 1
logs_format: 'plain',
logs_level: :info,
logs_filename: nil,
logger: nil
}
end
# default rabbitmq configs
# heartbeat_publisher = 0
# session_pool = 1
# session_pool_timeout = 5
- 1
config_accessor(:session_params, instance_accessor: false) do
{
- 1
heartbeat_publisher: 0,
async_publisher: true,
session_pool: 1,
session_pool_timeout: 5
}
end
- 2
config_accessor(:plugins, instance_accessor: false) { [] }
- 2
config_accessor(:global_store, instance_accessor: false) { nil }
- 1
config_accessor(:whitelist, instance_accessor: false) do
- 1
['x-request-id'.to_sym]
end
- 1
class << self
- 1
def add_exchange(name, type, options = {})
- 1
@exchange_registry.add(name, type, options)
end
- 1
def publish(payload, options = {})
- 1
publisher.publish(payload, options)
end
- 1
def lifecycle
- 3
@lifecycle ||= setup_lifecycle
end
- 1
def logger
- 6
@logger ||= setup_logger
end
- 1
private
- 1
def setup_logger
- 3
LoggerBuilder.new(config[:logger_configs]).build_logger
end
- 1
def publisher
- 1
@publisher ||= init_publisher
end
- 1
def init_publisher
- 1
Publisher.new(config.merge(
exchange_registry: @exchange_registry
))
end
- 1
def setup_lifecycle
- 2
@lifecycle = Lifecycle.new
- 2
plugins.each(&:new)
- 2
@lifecycle
end
end
end
# frozen_string_literal: true
- 1
module RabbitmqClient
# Custom error thrown when an unsupported callback type is used
- 1
class InvalidCallback < RuntimeError
- 1
def initialize(name)
- 2
super("The Callback '#{name}' is an invalid callback and cannot be used")
end
end
# Callback Object Store all plugins clallbacks
# Supported callback types are before and adter
- 1
class Callback
- 1
def initialize
- 9
@before = []
- 9
@after = []
end
- 1
def execute(*args, &block)
- 2
execute_before_callbacks(*args)
- 2
result = block.call(*args)
- 2
execute_after_callbacks(*args)
- 2
result
end
- 1
def add(type, &callback)
- 8
case type
when :before
- 4
@before << callback
when :after
- 3
@after << callback
else
- 1
raise InvalidCallback, "Invalid callback type: #{type}"
end
end
- 1
private
- 1
def execute_before_callbacks(*args)
- 3
@before.each { |callback| callback.call(*args) }
end
- 1
def execute_after_callbacks(*args)
- 3
@after.each { |callback| callback.call(*args) }
end
end
end
# frozen_string_literal: true
- 1
module RabbitmqClient
# ExchangeRegistry is a store for all managed exchanges and their details
- 1
class Exchange
- 1
attr_reader :name, :type, :options
- 1
def initialize(name, type, options)
- 4
@name = name
- 4
@type = type
- 4
@options = options
end
- 1
def create(channel)
- 1
exhange_obj = Bunny::Exchange.new(channel, @type, @name,
@options)
- 1
ActiveSupport::Notifications.instrument(
'created_exhange.rabbitmq_client',
name: @name,
type: @type
)
- 1
exhange_obj
end
end
end
# frozen_string_literal: true
- 1
require_relative 'exchange'
- 1
module RabbitmqClient
# ExchangeRegistry is a store for all managed exchanges and their details
- 1
class ExchangeRegistry
# Custom Eroor thrown when trying to find unkown exchange
- 1
class ExchangeNotFound < StandardError
- 1
def initialize(name)
- 2
super("The Exchange '#{name}' cannot be found")
end
end
- 1
def initialize
- 6
@exchanges = {}
end
- 1
def add(name, type, options = {})
- 4
@exchanges[name] = Exchange.new(name, type, options)
end
- 1
def find(name)
- 5
@exchanges.fetch(name) do
- 2
ActiveSupport::Notifications.instrument(
'exhange_not_found.rabbitmq_client',
name: name
)
- 2
raise ExchangeNotFound, name
end
end
end
end
# frozen_string_literal: true
- 1
require 'json'
- 1
module RabbitmqClient
# Formatter for json log messages
- 1
class JsonFormatter
- 1
TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%6N '
- 1
def initialize
- 2
@json = {}
- 2
@msg = ''
end
- 1
def call(severity, timestamp, progname, msg)
- 2
@json = build_new_json(msg)
- 2
@json = @json.merge(progname: progname.to_s, level: severity,
timestamp: timestamp.strftime(TIME_FORMAT))
- 11
@json = @json.reject { |_key, value| value.to_s.empty? }
- 2
@json.to_json + "\n"
end
- 1
def build_new_json(msg)
- 2
@msg = msg
- 2
@json = @msg.is_a?(Hash) ? @msg : { message: @msg.strip }
- 2
@json.merge(TagsFilter.tags || {})
end
end
end
# frozen_string_literal: true
- 1
require_relative 'log_subscriber_base'
- 1
module RabbitmqClient
# Manage RabbitmqClient plain text logs
- 1
class JsonLogSubscriber < LogSubscriberBase
- 1
def publisher_created(event)
- 1
debug(action: 'publisher_created',
message: 'The RabbitmqClient publisher is created',
publisher_configs: event.payload)
end
- 1
def network_error(event)
- 1
payload = event.payload
- 1
error({ action: 'network_error',
message: 'Failed to publish a message',
error_message: payload.fetch(:error).message }.merge(
process_payload(payload)
))
end
- 1
def overriding_configs(event)
- 1
debug(action: 'overriding_configs',
message: 'Overriding publisher configs',
publisher_configs: event.payload)
end
- 1
def publishing_message(event)
- 1
debug({ action: 'publishing_message',
message: 'Publishing a new message' }.merge(
process_payload(event.payload)
))
end
- 1
def published_message(event)
- 1
info({ action: 'published_message',
message: 'Published a message' }.merge(
process_payload(event.payload)
))
end
- 1
def confirming_message(event)
- 1
debug({ action: 'confirming_message',
message: 'Confirming a message' }.merge(
process_payload(event.payload)
))
end
- 1
def message_confirmed(event)
- 1
debug({ action: 'message_confirmed',
message: 'Confirmed a message' }.merge(
process_payload(event.payload)
))
end
- 1
def exhange_not_found(event)
- 1
error(action: 'exhange_not_found',
message: 'Exhange Not Found',
exchange_name: event.payload.fetch(:name))
end
- 1
def created_exhange(event)
- 1
debug(action: 'created_exhange',
message: 'Exhange is created successfuly',
exchange_name: event.payload.fetch(:name))
end
- 1
private
- 1
%w[info debug warn error fatal unknown].each do |level|
- 6
class_eval <<-METHOD, __FILE__, __LINE__ + 1
def #{level}(progname = nil, &block)
logger.#{level} rabbitmq_client_event(progname, &block) if logger
end
METHOD
end
- 1
def rabbitmq_client_event(event)
- 9
{ source: 'rabbitmq_client' }.merge(event)
end
- 1
def process_payload(payload)
{
- 5
exchange_name: payload.fetch(:exchange, 'undefined'),
message_id: payload.fetch(:message_id, 'undefined')
}
end
end
end
# frozen_string_literal: true
- 1
require_relative 'callback'
- 1
module RabbitmqClient
# Lifecycle defines the rabbitmq_client lifecycle events,
# callbacks and manage the execution of these callbacks
- 1
class Lifecycle
EVENTS = {
- 1
publish: %i[message options]
}.freeze
- 1
attr_reader :callbacks
- 1
def initialize
- 6
@callbacks = EVENTS.keys.each_with_object({}) do |key, hash|
- 6
hash[key] = Callback.new
end
end
- 1
def before(event, &block)
- 4
add(:before, event, &block)
end
- 1
def after(event, &block)
- 2
add(:after, event, &block)
end
- 1
def run_callbacks(event, *args, &block)
- 3
missing_callback(event) unless @callbacks.key?(event)
- 3
event_obj = EVENTS[event]
- 3
event_size = event_obj.size
- 3
unless event_size == args.size
- 1
raise ArgumentError, "Callback #{event} expects\
#{event_size} parameter(s): #{event_obj.join(', ')}"
end
- 2
@callbacks[event].execute(*args, &block)
end
- 1
private
- 1
def add(type, event, &block)
- 6
missing_callback(event) unless @callbacks.key?(event)
- 5
@callbacks[event].add(type, &block)
end
- 1
def missing_callback(event)
- 1
raise InvalidCallback, "Unknown callback event: #{event}"
end
end
end
# frozen_string_literal: true
- 1
module RabbitmqClient
# Log Subscriber base class
- 1
class LogSubscriberBase < ActiveSupport::LogSubscriber
- 1
class << self
- 1
def logger
- 150
@logger ||= RabbitmqClient.logger
end
end
- 1
def logger
- 150
LogSubscriberBase.logger
end
end
end
# frozen_string_literal: true
- 1
module RabbitmqClient
# ExchangeRegistry is a store for all managed exchanges and their details
- 1
class LoggerBuilder
- 1
def initialize(config)
- 3
@logger = config[:logger].clone
- 3
@format = config[:logs_format]
- 3
@level = config[:logs_level].to_sym
- 3
@filename = config[:logs_filename]
end
- 1
def build_logger
- 3
@logger ||= ::Logger.new(@filename || STDOUT)
- 3
@logger.level = @level
- 3
@logger.formatter = create_logger_formatter
- 3
log_subscriber.attach_to(:rabbitmq_client)
- 3
@logger
end
- 1
private
- 1
def create_logger_formatter
- 3
json? ? JsonFormatter.new : TextFormatter.new
end
- 1
def log_subscriber
- 3
json? ? JsonLogSubscriber : PlainLogSubscriber
end
- 1
def json?
- 6
__method__.to_s == "#{@format}?"
end
end
end
# frozen_string_literal: true
- 1
module RabbitmqClient
# ExchangeRegistry is a store for all managed exchanges and their details
- 1
class MessagePublisher
# Custom error is thrown when rabbitmq do not confirm publishing an event
- 1
class ConfirmationFailed < StandardError
- 1
def initialize(exchange, nacked, unconfirmed)
msg = 'Message confirmation on the exchange ' \
- 1
"#{exchange} has failed (#{nacked}/#{unconfirmed})."
- 1
super(msg)
end
end
- 1
def initialize(data, exchange, channel, options)
- 3
@data = data.to_json
- 3
@exchange = exchange
- 3
@channel = channel
- 3
@options = { headers: {} }.merge(options)
- 3
@options[:headers][:tags] = TagsFilter.tags
end
- 1
def publish
- 3
exchange = @exchange.create(@channel)
- 3
notify('publishing_message')
- 3
exchange.publish(@data, **@options)
- 3
notify('published_message')
end
- 1
def wait_for_confirms
- 3
notify('confirming_message')
- 3
if @channel.wait_for_confirms
- 1
notify('message_confirmed')
- 1
return
end
- 1
raise ConfirmationFailed.new(@exchange.name, @channel.nacked_set,
@channel.unconfirmed_set)
end
- 1
private
- 1
def notify(event)
- 10
ActiveSupport::Notifications.instrument(
"#{event}.rabbitmq_client",
message_payload
)
end
- 1
def message_payload
{
- 10
exchange: @exchange.name,
message_id: @options[:message_id]
}
end
end
end
# frozen_string_literal: true
- 1
require_relative 'log_subscriber_base'
- 1
module RabbitmqClient
# Manage RabbitmqClient plain text logs
- 1
class PlainLogSubscriber < LogSubscriberBase
- 1
def publisher_created(event)
msg = 'The RabbitmqClient publisher is created ' \
- 9
"with the follwong configs #{event.payload.inspect}"
- 9
debug(msg)
end
- 1
def network_error(event)
- 3
payload = event.payload
- 3
msg = "Failed to publish a message (#{payload.fetch(:error).message}) " \
"to exchange (#{payload.dig(:options, :exchange_name)})"
- 3
error(msg)
end
- 1
def overriding_configs(event)
msg = 'Overriding the follwing configs for ' \
- 3
"the created publisher #{event.payload.inspect}"
- 3
debug(msg)
end
- 1
def publishing_message(event)
- 4
payload = event.payload
msg = 'Start>> Publishing a new message ' \
- 4
"(message_id: #{payload.fetch(:message_id, 'undefined')} ) " \
"to the exchange (#{payload.fetch(:exchange, 'undefined')})"
- 4
debug(msg)
end
- 1
def published_message(event)
- 4
payload = event.payload
msg = '<<DONE Published a message to ' \
- 4
"the exchange (#{payload.fetch(:exchange, 'undefined')}) " \
"with message_id: #{payload.fetch(:message_id, 'undefined')}"
- 4
info(msg)
end
- 1
def confirming_message(event)
msg = 'Start>> confirming a message ' \
- 4
"(message_id: #{event.payload.fetch(:message_id, 'undefined')})"
- 4
debug(msg)
end
- 1
def message_confirmed(event)
- 2
msg_id = event.payload.fetch(:message_id, 'undefined')
msg = '<<DONE confirmed a message ' \
- 2
"(message_id: #{msg_id}) Successfuly"
- 2
debug(msg)
end
- 1
def exhange_not_found(event)
- 2
error("The Exchange '#{event.payload.fetch(:name)}' cannot be found")
end
- 1
def created_exhange(event)
- 2
debug("The #{event.payload.fetch(:name)} exchange is created successfuly")
end
end
end
# frozen_string_literal: true
- 1
require 'active_support/core_ext/class/attribute'
- 1
module RabbitmqClient
# Custom Error thrown in case of defining a plugin without any callbacks
- 1
class EmptyPlugin < RuntimeError
- 1
def initialize(name)
- 1
super("The Plugin '#{name}' is empty")
end
end
# Plugin class is the base class for all Plugins that
# extends RabbitmqClient functionalty.
- 1
class Plugin
- 1
def initialize
- 2
callback_block.call(RabbitmqClient.lifecycle)
end
- 1
def callback_block
- 2
klass = self.class
- 2
klass.callback_block || (raise EmptyPlugin, klass.to_s)
end
- 1
class << self
- 1
attr_accessor :callback_block
- 1
def callbacks(&block)
- 1
@callback_block = block
end
end
end
end
# frozen_string_literal: true
- 1
require_relative 'publisher_job'
- 1
module RabbitmqClient
# Publisher class is responsible for publishing events to rabbitmq exhanges
- 1
class Publisher
- 1
def initialize(**config)
- 8
@config = config
- 8
@session_params = session_params
- 8
@exchange_registry = @config.fetch(:exchange_registry, nil)
- 8
@session_params.freeze
- 8
@session_pool = create_connection_pool
- 8
notify('publisher_created', @session_params)
end
- 1
def publish(data, options)
- 5
return nil unless @exchange_registry
- 4
if async
- 1
PublisherJob.perform_async(@exchange_registry,
@session_pool, data, options)
else
- 3
PublisherJob.new.perform(@exchange_registry,
@session_pool, data, options)
end
end
- 1
private
- 1
def async
- 4
@config.dig(:session_params, :async_publisher) || false
end
- 1
def overwritten_config_notification
- 8
return unless overwritten_config?
- 2
notify('overriding_configs',
threaded: false,
automatically_recover: false)
end
- 1
def overwritten_config?
- 8
@config.dig(:session_params, :threaded) ||
@config.dig(:session_params, :automatically_recover)
end
- 1
def session_params
- 8
overwritten_config_notification
- 8
@config.fetch(:session_params, {})
.merge(threaded: false,
automatically_recover: false,
heartbeat: @config.dig(
:session_params, :heartbeat_publisher
) || 0)
end
- 1
def create_connection_pool
- 8
pool_size = @session_params.fetch(:session_pool, 1)
- 8
pool_timeout = @session_params.fetch(:session_pool_timeout, 5)
- 8
ConnectionPool.new(size: pool_size, timeout: pool_timeout) do
- 3
Bunny.new(@config[:rabbitmq_url],
{ logger: RabbitmqClient.logger }.merge(@session_params))
end
end
- 1
def notify(event, payload = {})
- 10
ActiveSupport::Notifications.instrument(
"#{event}.rabbitmq_client",
payload
)
end
end
end
# frozen_string_literal: true
- 1
require_relative 'message_publisher'
- 1
require 'sucker_punch'
- 1
module RabbitmqClient
# Publisher class is responsible for publishing events to rabbitmq exhanges
- 1
class PublisherJob
- 1
include SuckerPunch::Job
- 1
def perform(registry, session_pool, data, options)
- 3
handle_publish_event(registry, session_pool, data, options)
rescue StandardError => e
- 2
notify('network_error', error: e, options: options)
- 2
raise
end
- 1
private
- 1
def handle_publish_event(registry, session_pool, data, options)
- 3
exchange = registry.find(options.fetch(:exchange_name, nil))
- 3
session_pool.with do |session|
- 3
session.start
- 3
channel = session.create_channel
- 3
channel.confirm_select
- 3
message = MessagePublisher.new(data, exchange, channel, options)
- 3
message.publish
- 3
message.wait_for_confirms
- 1
channel.close
end
end
- 1
def notify(event, payload = {})
- 2
ActiveSupport::Notifications.instrument(
"#{event}.rabbitmq_client",
payload
)
end
end
end
# frozen_string_literal: true
- 1
module RabbitmqClient
# ExchangeRegistry is a store for all managed exchanges and their details
- 1
class TagsFilter
- 1
def self.tags
- 17
config = RabbitmqClient.config
- 17
global_store = config.global_store
- 17
return unless global_store
- 7
global_store.store.select do |key, _value|
- 4
Array(config.whitelist).include? key.downcase.to_sym
end
end
end
end
# frozen_string_literal: true
- 1
require 'json'
- 1
require 'English'
- 1
module RabbitmqClient
# Formatter for text log messages
- 1
class TextFormatter < ::Logger::Formatter
- 1
def initialize
- 6
@datetime_format = nil
- 6
@severity_text = nil
- 6
@tags = nil
- 6
super
end
- 1
def call(severity, time, progname, msg)
- 9
create_instance_vars(severity)
- 9
format(Format,
@severity_text[0],
format_datetime(time),
$PID,
@severity_text,
progname,
"#{@tags}#{msg2str(msg)}")
end
- 1
private
- 1
def create_instance_vars(severity)
- 9
@severity_text = if severity.is_a?(Integer)
- 2
Logger::Severity.constants(false).select do |level|
- 12
Logger::Severity.const_get(level) == severity
end.first.to_s
else
- 7
severity
end
- 9
@tags = (TagsFilter.tags || {}).collect do |key, val|
- 1
"[#{key}: #{val}] "
end.join
end
end
end