Skip to Content Skip to Search
Methods
A
I
L
N
R
S

Class Public methods

new(adapter, event_loop)

# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 74
def initialize(adapter, event_loop)
  super()

  @adapter = adapter
  @event_loop = event_loop
  @queue = Queue.new

  @thread = Thread.new do
    Thread.current.abort_on_exception = true
    listen
  end
end

Instance Public methods

add_channel(channel, on_success)

# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 118
def add_channel(channel, on_success)
  @queue.push([:listen, channel, on_success])
end

invoke_callback(*)

# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 126
def invoke_callback(*)
  @event_loop.post { super }
end

listen()

# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 87
def listen
  @adapter.with_subscriptions_connection do |pg_conn|
    catch :shutdown do
      loop do
        until @queue.empty?
          action, channel, callback = @queue.pop(true)

          case action
          when :listen
            pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
            @event_loop.post(&callback) if callback
          when :unlisten
            pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
          when :shutdown
            throw :shutdown
          end
        end

        pg_conn.wait_for_notify(1) do |chan, pid, message|
          broadcast(chan, message)
        end
      end
    end
  end
end

remove_channel(channel)

# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 122
def remove_channel(channel)
  @queue.push([:unlisten, channel])
end

shutdown()

# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 113
def shutdown
  @queue.push([:shutdown])
  Thread.pass while @thread.alive?
end