loading
Generated 2020-02-25T13:02:37+00:00

All Files ( 100.0% covered at 3.35 hits/line )

16 files in total.
353 relevant lines, 353 lines covered and 0 lines missed. ( 100.0% )
File % covered Lines Relevant Lines Lines covered Lines missed Avg. Hits / Line
lib/rabbitmq_client.rb 100.00 % 108 50 50 0 1.28
lib/rabbitmq_client/callback.rb 100.00 % 47 23 23 0 2.61
lib/rabbitmq_client/exchange.rb 100.00 % 25 11 11 0 1.82
lib/rabbitmq_client/exchange_registry.rb 100.00 % 33 14 14 0 2.07
lib/rabbitmq_client/json_formatter.rb 100.00 % 29 16 16 0 2.13
lib/rabbitmq_client/json_log_subscriber.rb 100.00 % 90 29 29 0 1.59
lib/rabbitmq_client/lifecycle.rb 100.00 % 53 25 25 0 2.28
lib/rabbitmq_client/log_subscriber_base.rb 100.00 % 16 7 7 0 43.57
lib/rabbitmq_client/logger_builder.rb 100.00 % 35 20 20 0 2.35
lib/rabbitmq_client/message_publisher.rb 100.00 % 57 28 28 0 2.43
lib/rabbitmq_client/plain_log_subscriber.rb 100.00 % 64 32 32 0 2.72
lib/rabbitmq_client/plugin.rb 100.00 % 31 15 15 0 1.20
lib/rabbitmq_client/publisher.rb 100.00 % 74 33 33 0 4.48
lib/rabbitmq_client/publisher_job.rb 100.00 % 40 22 22 0 1.95
lib/rabbitmq_client/tags_filter.rb 100.00 % 16 8 8 0 8.13
lib/rabbitmq_client/text_formatter.rb 100.00 % 42 20 20 0 4.50

lib/rabbitmq_client.rb

100.0% lines covered

50 relevant lines. 50 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require 'active_support'
  3. 1 require 'bunny'
  4. 1 require 'connection_pool'
  5. 1 require 'rabbitmq_client/version'
  6. 1 require 'rabbitmq_client/lifecycle'
  7. 1 require 'rabbitmq_client/plugin'
  8. 1 require 'rabbitmq_client/exchange_registry'
  9. 1 require 'rabbitmq_client/publisher'
  10. # RabbitmqClient Module is used as a clinet library for Rabbitmq
  11. # This Module is supporting the following use cases
  12. # - Publish events to Rabbitmq server
  13. 1 module RabbitmqClient
  14. 1 extend ActiveSupport::Autoload
  15. 1 include ActiveSupport::Configurable
  16. 1 include ActiveSupport::JSON
  17. 1 eager_autoload do
  18. 1 autoload :LoggerBuilder
  19. 1 autoload :PlainLogSubscriber
  20. 1 autoload :JsonLogSubscriber
  21. 1 autoload :JsonFormatter
  22. 1 autoload :TextFormatter
  23. 1 autoload :TagsFilter
  24. end
  25. 1 @exchange_registry = ExchangeRegistry.new
  26. # [url] url address of rabbitmq server
  27. 1 config_accessor(:rabbitmq_url, instance_accessor: false) do
  28. 1 'amqp://guest:guest@127.0.0.1:5672'
  29. end
  30. # [logger_configs] configs for teh used logger
  31. # logs_format: json, plain
  32. # logs_to_stdout: true, false
  33. # logs_level: info, debug
  34. # logs_filename: logs file name
  35. 1 config_accessor(:logger_configs, instance_accessor: false) do
  36. {
  37. 1 logs_format: 'plain',
  38. logs_level: :info,
  39. logs_filename: nil,
  40. logger: nil
  41. }
  42. end
  43. # default rabbitmq configs
  44. # heartbeat_publisher = 0
  45. # session_pool = 1
  46. # session_pool_timeout = 5
  47. 1 config_accessor(:session_params, instance_accessor: false) do
  48. {
  49. 1 heartbeat_publisher: 0,
  50. async_publisher: true,
  51. session_pool: 1,
  52. session_pool_timeout: 5
  53. }
  54. end
  55. 2 config_accessor(:plugins, instance_accessor: false) { [] }
  56. 2 config_accessor(:global_store, instance_accessor: false) { nil }
  57. 1 config_accessor(:whitelist, instance_accessor: false) do
  58. 1 ['x-request-id'.to_sym]
  59. end
  60. 1 class << self
  61. 1 def add_exchange(name, type, options = {})
  62. 1 @exchange_registry.add(name, type, options)
  63. end
  64. 1 def publish(payload, options = {})
  65. 1 publisher.publish(payload, options)
  66. end
  67. 1 def lifecycle
  68. 3 @lifecycle ||= setup_lifecycle
  69. end
  70. 1 def logger
  71. 6 @logger ||= setup_logger
  72. end
  73. 1 private
  74. 1 def setup_logger
  75. 3 LoggerBuilder.new(config[:logger_configs]).build_logger
  76. end
  77. 1 def publisher
  78. 1 @publisher ||= init_publisher
  79. end
  80. 1 def init_publisher
  81. 1 Publisher.new(config.merge(
  82. exchange_registry: @exchange_registry
  83. ))
  84. end
  85. 1 def setup_lifecycle
  86. 2 @lifecycle = Lifecycle.new
  87. 2 plugins.each(&:new)
  88. 2 @lifecycle
  89. end
  90. end
  91. end

lib/rabbitmq_client/callback.rb

100.0% lines covered

23 relevant lines. 23 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module RabbitmqClient
  3. # Custom error thrown when an unsupported callback type is used
  4. 1 class InvalidCallback < RuntimeError
  5. 1 def initialize(name)
  6. 2 super("The Callback '#{name}' is an invalid callback and cannot be used")
  7. end
  8. end
  9. # Callback Object Store all plugins clallbacks
  10. # Supported callback types are before and adter
  11. 1 class Callback
  12. 1 def initialize
  13. 9 @before = []
  14. 9 @after = []
  15. end
  16. 1 def execute(*args, &block)
  17. 2 execute_before_callbacks(*args)
  18. 2 result = block.call(*args)
  19. 2 execute_after_callbacks(*args)
  20. 2 result
  21. end
  22. 1 def add(type, &callback)
  23. 8 case type
  24. when :before
  25. 4 @before << callback
  26. when :after
  27. 3 @after << callback
  28. else
  29. 1 raise InvalidCallback, "Invalid callback type: #{type}"
  30. end
  31. end
  32. 1 private
  33. 1 def execute_before_callbacks(*args)
  34. 3 @before.each { |callback| callback.call(*args) }
  35. end
  36. 1 def execute_after_callbacks(*args)
  37. 3 @after.each { |callback| callback.call(*args) }
  38. end
  39. end
  40. end

lib/rabbitmq_client/exchange.rb

100.0% lines covered

11 relevant lines. 11 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module RabbitmqClient
  3. # ExchangeRegistry is a store for all managed exchanges and their details
  4. 1 class Exchange
  5. 1 attr_reader :name, :type, :options
  6. 1 def initialize(name, type, options)
  7. 4 @name = name
  8. 4 @type = type
  9. 4 @options = options
  10. end
  11. 1 def create(channel)
  12. 1 exhange_obj = Bunny::Exchange.new(channel, @type, @name,
  13. @options)
  14. 1 ActiveSupport::Notifications.instrument(
  15. 'created_exhange.rabbitmq_client',
  16. name: @name,
  17. type: @type
  18. )
  19. 1 exhange_obj
  20. end
  21. end
  22. end

lib/rabbitmq_client/exchange_registry.rb

100.0% lines covered

14 relevant lines. 14 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require_relative 'exchange'
  3. 1 module RabbitmqClient
  4. # ExchangeRegistry is a store for all managed exchanges and their details
  5. 1 class ExchangeRegistry
  6. # Custom Eroor thrown when trying to find unkown exchange
  7. 1 class ExchangeNotFound < StandardError
  8. 1 def initialize(name)
  9. 2 super("The Exchange '#{name}' cannot be found")
  10. end
  11. end
  12. 1 def initialize
  13. 6 @exchanges = {}
  14. end
  15. 1 def add(name, type, options = {})
  16. 4 @exchanges[name] = Exchange.new(name, type, options)
  17. end
  18. 1 def find(name)
  19. 5 @exchanges.fetch(name) do
  20. 2 ActiveSupport::Notifications.instrument(
  21. 'exhange_not_found.rabbitmq_client',
  22. name: name
  23. )
  24. 2 raise ExchangeNotFound, name
  25. end
  26. end
  27. end
  28. end

lib/rabbitmq_client/json_formatter.rb

100.0% lines covered

16 relevant lines. 16 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require 'json'
  3. 1 module RabbitmqClient
  4. # Formatter for json log messages
  5. 1 class JsonFormatter
  6. 1 TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%6N '
  7. 1 def initialize
  8. 2 @json = {}
  9. 2 @msg = ''
  10. end
  11. 1 def call(severity, timestamp, progname, msg)
  12. 2 @json = build_new_json(msg)
  13. 2 @json = @json.merge(progname: progname.to_s, level: severity,
  14. timestamp: timestamp.strftime(TIME_FORMAT))
  15. 11 @json = @json.reject { |_key, value| value.to_s.empty? }
  16. 2 @json.to_json + "\n"
  17. end
  18. 1 def build_new_json(msg)
  19. 2 @msg = msg
  20. 2 @json = @msg.is_a?(Hash) ? @msg : { message: @msg.strip }
  21. 2 @json.merge(TagsFilter.tags || {})
  22. end
  23. end
  24. end

lib/rabbitmq_client/json_log_subscriber.rb

100.0% lines covered

29 relevant lines. 29 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require_relative 'log_subscriber_base'
  3. 1 module RabbitmqClient
  4. # Manage RabbitmqClient plain text logs
  5. 1 class JsonLogSubscriber < LogSubscriberBase
  6. 1 def publisher_created(event)
  7. 1 debug(action: 'publisher_created',
  8. message: 'The RabbitmqClient publisher is created',
  9. publisher_configs: event.payload)
  10. end
  11. 1 def network_error(event)
  12. 1 payload = event.payload
  13. 1 error({ action: 'network_error',
  14. message: 'Failed to publish a message',
  15. error_message: payload.fetch(:error).message }.merge(
  16. process_payload(payload)
  17. ))
  18. end
  19. 1 def overriding_configs(event)
  20. 1 debug(action: 'overriding_configs',
  21. message: 'Overriding publisher configs',
  22. publisher_configs: event.payload)
  23. end
  24. 1 def publishing_message(event)
  25. 1 debug({ action: 'publishing_message',
  26. message: 'Publishing a new message' }.merge(
  27. process_payload(event.payload)
  28. ))
  29. end
  30. 1 def published_message(event)
  31. 1 info({ action: 'published_message',
  32. message: 'Published a message' }.merge(
  33. process_payload(event.payload)
  34. ))
  35. end
  36. 1 def confirming_message(event)
  37. 1 debug({ action: 'confirming_message',
  38. message: 'Confirming a message' }.merge(
  39. process_payload(event.payload)
  40. ))
  41. end
  42. 1 def message_confirmed(event)
  43. 1 debug({ action: 'message_confirmed',
  44. message: 'Confirmed a message' }.merge(
  45. process_payload(event.payload)
  46. ))
  47. end
  48. 1 def exhange_not_found(event)
  49. 1 error(action: 'exhange_not_found',
  50. message: 'Exhange Not Found',
  51. exchange_name: event.payload.fetch(:name))
  52. end
  53. 1 def created_exhange(event)
  54. 1 debug(action: 'created_exhange',
  55. message: 'Exhange is created successfuly',
  56. exchange_name: event.payload.fetch(:name))
  57. end
  58. 1 private
  59. 1 %w[info debug warn error fatal unknown].each do |level|
  60. 6 class_eval <<-METHOD, __FILE__, __LINE__ + 1
  61. def #{level}(progname = nil, &block)
  62. logger.#{level} rabbitmq_client_event(progname, &block) if logger
  63. end
  64. METHOD
  65. end
  66. 1 def rabbitmq_client_event(event)
  67. 9 { source: 'rabbitmq_client' }.merge(event)
  68. end
  69. 1 def process_payload(payload)
  70. {
  71. 5 exchange_name: payload.fetch(:exchange, 'undefined'),
  72. message_id: payload.fetch(:message_id, 'undefined')
  73. }
  74. end
  75. end
  76. end

lib/rabbitmq_client/lifecycle.rb

100.0% lines covered

25 relevant lines. 25 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require_relative 'callback'
  3. 1 module RabbitmqClient
  4. # Lifecycle defines the rabbitmq_client lifecycle events,
  5. # callbacks and manage the execution of these callbacks
  6. 1 class Lifecycle
  7. EVENTS = {
  8. 1 publish: %i[message options]
  9. }.freeze
  10. 1 attr_reader :callbacks
  11. 1 def initialize
  12. 6 @callbacks = EVENTS.keys.each_with_object({}) do |key, hash|
  13. 6 hash[key] = Callback.new
  14. end
  15. end
  16. 1 def before(event, &block)
  17. 4 add(:before, event, &block)
  18. end
  19. 1 def after(event, &block)
  20. 2 add(:after, event, &block)
  21. end
  22. 1 def run_callbacks(event, *args, &block)
  23. 3 missing_callback(event) unless @callbacks.key?(event)
  24. 3 event_obj = EVENTS[event]
  25. 3 event_size = event_obj.size
  26. 3 unless event_size == args.size
  27. 1 raise ArgumentError, "Callback #{event} expects\
  28. #{event_size} parameter(s): #{event_obj.join(', ')}"
  29. end
  30. 2 @callbacks[event].execute(*args, &block)
  31. end
  32. 1 private
  33. 1 def add(type, event, &block)
  34. 6 missing_callback(event) unless @callbacks.key?(event)
  35. 5 @callbacks[event].add(type, &block)
  36. end
  37. 1 def missing_callback(event)
  38. 1 raise InvalidCallback, "Unknown callback event: #{event}"
  39. end
  40. end
  41. end

lib/rabbitmq_client/log_subscriber_base.rb

100.0% lines covered

7 relevant lines. 7 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module RabbitmqClient
  3. # Log Subscriber base class
  4. 1 class LogSubscriberBase < ActiveSupport::LogSubscriber
  5. 1 class << self
  6. 1 def logger
  7. 150 @logger ||= RabbitmqClient.logger
  8. end
  9. end
  10. 1 def logger
  11. 150 LogSubscriberBase.logger
  12. end
  13. end
  14. end

lib/rabbitmq_client/logger_builder.rb

100.0% lines covered

20 relevant lines. 20 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module RabbitmqClient
  3. # ExchangeRegistry is a store for all managed exchanges and their details
  4. 1 class LoggerBuilder
  5. 1 def initialize(config)
  6. 3 @logger = config[:logger].clone
  7. 3 @format = config[:logs_format]
  8. 3 @level = config[:logs_level].to_sym
  9. 3 @filename = config[:logs_filename]
  10. end
  11. 1 def build_logger
  12. 3 @logger ||= ::Logger.new(@filename || STDOUT)
  13. 3 @logger.level = @level
  14. 3 @logger.formatter = create_logger_formatter
  15. 3 log_subscriber.attach_to(:rabbitmq_client)
  16. 3 @logger
  17. end
  18. 1 private
  19. 1 def create_logger_formatter
  20. 3 json? ? JsonFormatter.new : TextFormatter.new
  21. end
  22. 1 def log_subscriber
  23. 3 json? ? JsonLogSubscriber : PlainLogSubscriber
  24. end
  25. 1 def json?
  26. 6 __method__.to_s == "#{@format}?"
  27. end
  28. end
  29. end

lib/rabbitmq_client/message_publisher.rb

100.0% lines covered

28 relevant lines. 28 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module RabbitmqClient
  3. # ExchangeRegistry is a store for all managed exchanges and their details
  4. 1 class MessagePublisher
  5. # Custom error is thrown when rabbitmq do not confirm publishing an event
  6. 1 class ConfirmationFailed < StandardError
  7. 1 def initialize(exchange, nacked, unconfirmed)
  8. msg = 'Message confirmation on the exchange ' \
  9. 1 "#{exchange} has failed (#{nacked}/#{unconfirmed})."
  10. 1 super(msg)
  11. end
  12. end
  13. 1 def initialize(data, exchange, channel, options)
  14. 3 @data = data.to_json
  15. 3 @exchange = exchange
  16. 3 @channel = channel
  17. 3 @options = { headers: {} }.merge(options)
  18. 3 @options[:headers][:tags] = TagsFilter.tags
  19. end
  20. 1 def publish
  21. 3 exchange = @exchange.create(@channel)
  22. 3 notify('publishing_message')
  23. 3 exchange.publish(@data, **@options)
  24. 3 notify('published_message')
  25. end
  26. 1 def wait_for_confirms
  27. 3 notify('confirming_message')
  28. 3 if @channel.wait_for_confirms
  29. 1 notify('message_confirmed')
  30. 1 return
  31. end
  32. 1 raise ConfirmationFailed.new(@exchange.name, @channel.nacked_set,
  33. @channel.unconfirmed_set)
  34. end
  35. 1 private
  36. 1 def notify(event)
  37. 10 ActiveSupport::Notifications.instrument(
  38. "#{event}.rabbitmq_client",
  39. message_payload
  40. )
  41. end
  42. 1 def message_payload
  43. {
  44. 10 exchange: @exchange.name,
  45. message_id: @options[:message_id]
  46. }
  47. end
  48. end
  49. end

lib/rabbitmq_client/plain_log_subscriber.rb

100.0% lines covered

32 relevant lines. 32 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require_relative 'log_subscriber_base'
  3. 1 module RabbitmqClient
  4. # Manage RabbitmqClient plain text logs
  5. 1 class PlainLogSubscriber < LogSubscriberBase
  6. 1 def publisher_created(event)
  7. msg = 'The RabbitmqClient publisher is created ' \
  8. 9 "with the follwong configs #{event.payload.inspect}"
  9. 9 debug(msg)
  10. end
  11. 1 def network_error(event)
  12. 3 payload = event.payload
  13. 3 msg = "Failed to publish a message (#{payload.fetch(:error).message}) " \
  14. "to exchange (#{payload.dig(:options, :exchange_name)})"
  15. 3 error(msg)
  16. end
  17. 1 def overriding_configs(event)
  18. msg = 'Overriding the follwing configs for ' \
  19. 3 "the created publisher #{event.payload.inspect}"
  20. 3 debug(msg)
  21. end
  22. 1 def publishing_message(event)
  23. 4 payload = event.payload
  24. msg = 'Start>> Publishing a new message ' \
  25. 4 "(message_id: #{payload.fetch(:message_id, 'undefined')} ) " \
  26. "to the exchange (#{payload.fetch(:exchange, 'undefined')})"
  27. 4 debug(msg)
  28. end
  29. 1 def published_message(event)
  30. 4 payload = event.payload
  31. msg = '<<DONE Published a message to ' \
  32. 4 "the exchange (#{payload.fetch(:exchange, 'undefined')}) " \
  33. "with message_id: #{payload.fetch(:message_id, 'undefined')}"
  34. 4 info(msg)
  35. end
  36. 1 def confirming_message(event)
  37. msg = 'Start>> confirming a message ' \
  38. 4 "(message_id: #{event.payload.fetch(:message_id, 'undefined')})"
  39. 4 debug(msg)
  40. end
  41. 1 def message_confirmed(event)
  42. 2 msg_id = event.payload.fetch(:message_id, 'undefined')
  43. msg = '<<DONE confirmed a message ' \
  44. 2 "(message_id: #{msg_id}) Successfuly"
  45. 2 debug(msg)
  46. end
  47. 1 def exhange_not_found(event)
  48. 2 error("The Exchange '#{event.payload.fetch(:name)}' cannot be found")
  49. end
  50. 1 def created_exhange(event)
  51. 2 debug("The #{event.payload.fetch(:name)} exchange is created successfuly")
  52. end
  53. end
  54. end

lib/rabbitmq_client/plugin.rb

100.0% lines covered

15 relevant lines. 15 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require 'active_support/core_ext/class/attribute'
  3. 1 module RabbitmqClient
  4. # Custom Error thrown in case of defining a plugin without any callbacks
  5. 1 class EmptyPlugin < RuntimeError
  6. 1 def initialize(name)
  7. 1 super("The Plugin '#{name}' is empty")
  8. end
  9. end
  10. # Plugin class is the base class for all Plugins that
  11. # extends RabbitmqClient functionalty.
  12. 1 class Plugin
  13. 1 def initialize
  14. 2 callback_block.call(RabbitmqClient.lifecycle)
  15. end
  16. 1 def callback_block
  17. 2 klass = self.class
  18. 2 klass.callback_block || (raise EmptyPlugin, klass.to_s)
  19. end
  20. 1 class << self
  21. 1 attr_accessor :callback_block
  22. 1 def callbacks(&block)
  23. 1 @callback_block = block
  24. end
  25. end
  26. end
  27. end

lib/rabbitmq_client/publisher.rb

100.0% lines covered

33 relevant lines. 33 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require_relative 'publisher_job'
  3. 1 module RabbitmqClient
  4. # Publisher class is responsible for publishing events to rabbitmq exhanges
  5. 1 class Publisher
  6. 1 def initialize(**config)
  7. 8 @config = config
  8. 8 @session_params = session_params
  9. 8 @exchange_registry = @config.fetch(:exchange_registry, nil)
  10. 8 @session_params.freeze
  11. 8 @session_pool = create_connection_pool
  12. 8 notify('publisher_created', @session_params)
  13. end
  14. 1 def publish(data, options)
  15. 5 return nil unless @exchange_registry
  16. 4 if async
  17. 1 PublisherJob.perform_async(@exchange_registry,
  18. @session_pool, data, options)
  19. else
  20. 3 PublisherJob.new.perform(@exchange_registry,
  21. @session_pool, data, options)
  22. end
  23. end
  24. 1 private
  25. 1 def async
  26. 4 @config.dig(:session_params, :async_publisher) || false
  27. end
  28. 1 def overwritten_config_notification
  29. 8 return unless overwritten_config?
  30. 2 notify('overriding_configs',
  31. threaded: false,
  32. automatically_recover: false)
  33. end
  34. 1 def overwritten_config?
  35. 8 @config.dig(:session_params, :threaded) ||
  36. @config.dig(:session_params, :automatically_recover)
  37. end
  38. 1 def session_params
  39. 8 overwritten_config_notification
  40. 8 @config.fetch(:session_params, {})
  41. .merge(threaded: false,
  42. automatically_recover: false,
  43. heartbeat: @config.dig(
  44. :session_params, :heartbeat_publisher
  45. ) || 0)
  46. end
  47. 1 def create_connection_pool
  48. 8 pool_size = @session_params.fetch(:session_pool, 1)
  49. 8 pool_timeout = @session_params.fetch(:session_pool_timeout, 5)
  50. 8 ConnectionPool.new(size: pool_size, timeout: pool_timeout) do
  51. 3 Bunny.new(@config[:rabbitmq_url],
  52. { logger: RabbitmqClient.logger }.merge(@session_params))
  53. end
  54. end
  55. 1 def notify(event, payload = {})
  56. 10 ActiveSupport::Notifications.instrument(
  57. "#{event}.rabbitmq_client",
  58. payload
  59. )
  60. end
  61. end
  62. end

lib/rabbitmq_client/publisher_job.rb

100.0% lines covered

22 relevant lines. 22 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require_relative 'message_publisher'
  3. 1 require 'sucker_punch'
  4. 1 module RabbitmqClient
  5. # Publisher class is responsible for publishing events to rabbitmq exhanges
  6. 1 class PublisherJob
  7. 1 include SuckerPunch::Job
  8. 1 def perform(registry, session_pool, data, options)
  9. 3 handle_publish_event(registry, session_pool, data, options)
  10. rescue StandardError => e
  11. 2 notify('network_error', error: e, options: options)
  12. 2 raise
  13. end
  14. 1 private
  15. 1 def handle_publish_event(registry, session_pool, data, options)
  16. 3 exchange = registry.find(options.fetch(:exchange_name, nil))
  17. 3 session_pool.with do |session|
  18. 3 session.start
  19. 3 channel = session.create_channel
  20. 3 channel.confirm_select
  21. 3 message = MessagePublisher.new(data, exchange, channel, options)
  22. 3 message.publish
  23. 3 message.wait_for_confirms
  24. 1 channel.close
  25. end
  26. end
  27. 1 def notify(event, payload = {})
  28. 2 ActiveSupport::Notifications.instrument(
  29. "#{event}.rabbitmq_client",
  30. payload
  31. )
  32. end
  33. end
  34. end

lib/rabbitmq_client/tags_filter.rb

100.0% lines covered

8 relevant lines. 8 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 module RabbitmqClient
  3. # ExchangeRegistry is a store for all managed exchanges and their details
  4. 1 class TagsFilter
  5. 1 def self.tags
  6. 17 config = RabbitmqClient.config
  7. 17 global_store = config.global_store
  8. 17 return unless global_store
  9. 7 global_store.store.select do |key, _value|
  10. 4 Array(config.whitelist).include? key.downcase.to_sym
  11. end
  12. end
  13. end
  14. end

lib/rabbitmq_client/text_formatter.rb

100.0% lines covered

20 relevant lines. 20 lines covered and 0 lines missed.
    
  1. # frozen_string_literal: true
  2. 1 require 'json'
  3. 1 require 'English'
  4. 1 module RabbitmqClient
  5. # Formatter for text log messages
  6. 1 class TextFormatter < ::Logger::Formatter
  7. 1 def initialize
  8. 6 @datetime_format = nil
  9. 6 @severity_text = nil
  10. 6 @tags = nil
  11. 6 super
  12. end
  13. 1 def call(severity, time, progname, msg)
  14. 9 create_instance_vars(severity)
  15. 9 format(Format,
  16. @severity_text[0],
  17. format_datetime(time),
  18. $PID,
  19. @severity_text,
  20. progname,
  21. "#{@tags}#{msg2str(msg)}")
  22. end
  23. 1 private
  24. 1 def create_instance_vars(severity)
  25. 9 @severity_text = if severity.is_a?(Integer)
  26. 2 Logger::Severity.constants(false).select do |level|
  27. 12 Logger::Severity.const_get(level) == severity
  28. end.first.to_s
  29. else
  30. 7 severity
  31. end
  32. 9 @tags = (TagsFilter.tags || {}).collect do |key, val|
  33. 1 "[#{key}: #{val}] "
  34. end.join
  35. end
  36. end
  37. end