#!/usr/bin/env ruby

require File.dirname(__FILE__) + '/../lib/amqp_utils/command'
require File.dirname(__FILE__) + '/../lib/amqp_utils/message_formatter'
require File.dirname(__FILE__) + '/../lib/amqp_utils/serialization'
require 'clio/progressbar'
require 'stringio'

class EnqueueCommand < AmqpUtils::Command
  def prepare_options(options)
    options.banner <<-BANNER.unindent
    Publishes a message or a series of messages to a particular queue.
    
      #{command_name} <queue>
    
    When publishing a series of messages, the messages should be separated by a
    newline, with one message per line. 
    
    Enqueue options:
    BANNER
    options.opt :content_type, 'Sets the content_type header and chooses an appropriate serializer', :type => :string, :short => :none
    options.opt :persistent, 'Mark messages as persistent.', :short => :none
    options.opt :priority, 'The priority of this message', :type => :int, :short => :none, :default => 1
    options.opt :count, 'Number of times the message should be published. If multiple messages are provided, only the first message is published multiple times.', :type => :int, :default => 1
    options.opt :header, 'A key-value pair to be set in "headers" field of the AMQP header', :short => :none, :type => :string, :multi => true
    options.opt :message, 'The message to publish to the queue. Comes from STDIN if not provided.', :type => :string, :default => nil
    options.opt :format, 'The format that the message is supplied in.', :short => :none, :default => 'message'
  end

  def validate
    if options[:header]
      options[:header] = Hash[*(options[:header].map do |h|
        h.split('=')
      end.flatten)]
    end
    Trollop::die "need a queue to publish to" unless args[0] && !args[0].empty?

    @formatter = AmqpUtils::MessageFormatter.for_type(options[:format])
    Trollop::die :format, "not an available type: #{AmqpUtils::MessageFormatter.types.join(", ")}" unless @formatter
  end

  def execute
    @queue = args
    if options[:message]
      @message_io = StringIO.new(options[:message])
    else
      @message_io = STDIN
    end

    if options[:count] > 1
      publisher = EM.spawn do |queue, message, messages, options|
        @progress ||= Clio::Progressbar.new('Enqueuing', options[:count]) if options[:count] > 1

        if messages > 0
          @mq ||= MQ.new

          publish_options = {
            :persistent => options.persistent,
            :headers => options[:header],
            :priority => options.priority,
            :content_type => options.content_type
          }

          serialized_message = AmqpUtils::Serialization.serialize(options.content_type, message)

          @mq.queue(queue, :durable => true, :auto_delete => false).
            publish(serialized_message, publish_options)

          @progress.inc if @progress
          publisher.notify(queue, message, messages - 1, options)
        else
          @progress.finish if @progress
          EM.next_tick { AMQP.stop { EM.stop } }
        end
      end

      message = @formatter.load(@message_io)
      publisher.notify(@queue, message, options[:count], options)
    else
      publisher = EM.spawn do |queue, message_io, formatter, options|
        @mq ||= MQ.new

        message = formatter.load(message_io)
        if message
          raise "expected a value for message in #{message.inspect}" unless message['message']

          publish_options = {
            :app_id => 'amqp-utils',
            :persistent => options.persistent,
            :headers => options[:header],
            :priority => options.priority,
            :content_type => options.content_type
          }
          if header = message['header']
            publish_options[:message_id] = header['message_id']
            publish_options[:priority] = header['priority'] unless options.priority_given
            publish_options[:expiration] = header['expiration']
          end

          serialized_message = AmqpUtils::Serialization.serialize(options.content_type, message['message'])

          @mq.queue(queue, :durable => true, :auto_delete => false).
            publish(serialized_message, publish_options)

          publisher.notify(queue, message_io, formatter, options)
        else
          EM.next_tick { AMQP.stop { EM.stop } }
        end
      end

      publisher.notify(@queue, @message_io, @formatter, options)
    end
  end
end

EnqueueCommand.run
