Commit ffaa3c9e authored by Alexander Strizhakov's avatar Alexander Strizhakov

no drop active connections

parent 68297336
......@@ -6,17 +6,24 @@ defmodule Pleroma.Gun.Conn do
@moduledoc """
Struct for gun connection data
"""
@type gun_state :: :open | :up | :down
@type conn_state :: :init | :active | :idle
@type t :: %__MODULE__{
conn: pid(),
state: atom(),
gun_state: gun_state(),
waiting_pids: [pid()],
conn_state: conn_state(),
used_by: [pid()],
last_reference: pos_integer(),
crf: float()
}
defstruct conn: nil,
state: :open,
gun_state: :open,
waiting_pids: [],
conn_state: :init,
used_by: [],
last_reference: :os.system_time(:second),
crf: 1
end
......@@ -14,7 +14,7 @@ defmodule Pleroma.Gun.Connections do
opts: keyword()
}
defstruct conns: %{}, opts: []
defstruct conns: %{}, opts: [], queue: []
alias Pleroma.Gun.API
alias Pleroma.Gun.Conn
......@@ -36,8 +36,8 @@ def start_link(_) do
@impl true
def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
@spec get_conn(String.t(), keyword(), atom()) :: pid()
def get_conn(url, opts \\ [], name \\ :default) do
@spec checkin(String.t(), keyword(), atom()) :: pid()
def checkin(url, opts \\ [], name \\ :default) do
opts = Enum.into(opts, %{})
uri = URI.parse(url)
......@@ -62,7 +62,7 @@ def get_conn(url, opts \\ [], name \\ :default) do
GenServer.call(
name,
{:conn, %{opts: opts, uri: uri}}
{:checkin, %{opts: opts, uri: uri}}
)
end
......@@ -77,28 +77,57 @@ def get_state(name \\ :default) do
GenServer.call(name, {:state})
end
def checkout(conn, pid, name \\ :default) do
GenServer.cast(name, {:checkout, conn, pid})
end
def process_queue(name \\ :default) do
GenServer.cast(name, {:process_queue})
end
@impl true
def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do
def handle_cast({:checkout, conn_pid, pid}, state) do
{key, conn} = find_conn(state.conns, conn_pid)
used_by = List.keydelete(conn.used_by, pid, 0)
conn_state = if used_by == [], do: :idle, else: conn.conn_state
state = put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
{:noreply, state}
end
@impl true
def handle_cast({:process_queue}, state) do
case state.queue do
[{from, key, uri, opts} | _queue] ->
try_to_checkin(key, uri, from, state, Map.put(opts, :from_cast, true))
[] ->
{:noreply, state}
end
end
@impl true
def handle_call({:checkin, %{opts: opts, uri: uri}}, from, state) do
key = compose_key(uri)
case state.conns[key] do
%{conn: conn, state: conn_state, last_reference: reference, crf: last_crf} = current_conn
when conn_state == :up ->
%{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
time = current_time()
last_reference = time - reference
last_reference = time - current_conn.last_reference
current_crf = crf(last_reference, 100, last_crf)
current_crf = crf(last_reference, 100, current_conn.crf)
state =
put_in(state.conns[key], %{
current_conn
| last_reference: time,
crf: current_crf
crf: current_crf,
conn_state: :active,
used_by: [from | current_conn.used_by]
})
{:reply, conn, state}
%{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] ->
%{gun_state: gun_state, waiting_pids: pids} when gun_state in [:open, :down] ->
state = put_in(state.conns[key].waiting_pids, [from | pids])
{:noreply, state}
......@@ -108,22 +137,7 @@ def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do
if Enum.count(state.conns) < max_connections do
open_conn(key, uri, from, state, opts)
else
[{close_key, least_used} | _conns] =
state.conns
|> Enum.filter(fn {_k, v} -> v.waiting_pids == [] end)
|> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
x.crf < y.crf and x.last_reference < y.last_reference
end)
:ok = API.close(least_used.conn)
state =
put_in(
state.conns,
Map.delete(state.conns, close_key)
)
open_conn(key, uri, from, state, opts)
try_to_checkin(key, uri, from, state, opts)
end
end
end
......@@ -131,14 +145,44 @@ def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do
@impl true
def handle_call({:state}, _from, state), do: {:reply, state, state}
defp try_to_checkin(key, uri, from, state, opts) do
unused_conns =
state.conns
|> Enum.filter(fn {_k, v} ->
v.conn_state == :idle and v.waiting_pids == [] and v.used_by == []
end)
|> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
x.crf < y.crf and x.last_reference < y.last_reference
end)
case unused_conns do
[{close_key, least_used} | _conns] ->
:ok = API.close(least_used.conn)
state =
put_in(
state.conns,
Map.delete(state.conns, close_key)
)
open_conn(key, uri, from, state, opts)
[] ->
queue =
if List.keymember?(state.queue, from, 0),
do: state.queue,
else: state.queue ++ [{from, key, uri, opts}]
state = put_in(state.queue, queue)
{:noreply, state}
end
end
@impl true
def handle_info({:gun_up, conn_pid, _protocol}, state) do
conn_key = compose_key_gun_info(conn_pid)
{key, conn} = find_conn(state.conns, conn_pid, conn_key)
# Send to all waiting processes connection pid
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
# Update state of the current connection and set waiting_pids to empty list
time = current_time()
last_reference = time - conn.last_reference
......@@ -147,12 +191,17 @@ def handle_info({:gun_up, conn_pid, _protocol}, state) do
state =
put_in(state.conns[key], %{
conn
| state: :up,
| gun_state: :up,
waiting_pids: [],
last_reference: time,
crf: current_crf
crf: current_crf,
conn_state: :active,
used_by: conn.waiting_pids ++ conn.used_by
})
# Send to all waiting processes connection pid
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
{:noreply, state}
end
......@@ -163,7 +212,7 @@ def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed, _unprocessed}
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end)
state = put_in(state.conns[key].state, :down)
state = put_in(state.conns[key].gun_state, :down)
{:noreply, state}
end
......@@ -186,7 +235,7 @@ defp find_conn(conns, conn_pid, conn_key) do
end)
end
defp open_conn(key, uri, _from, state, %{proxy: {proxy_host, proxy_port}} = opts) do
defp open_conn(key, uri, from, state, %{proxy: {proxy_host, proxy_port}} = opts) do
host = to_charlist(uri.host)
port = uri.port
......@@ -211,9 +260,15 @@ defp open_conn(key, uri, _from, state, %{proxy: {proxy_host, proxy_port}} = opts
put_in(state.conns[key], %Conn{
conn: conn,
waiting_pids: [],
state: :up
gun_state: :up,
conn_state: :active,
used_by: [from]
})
if opts[:from_cast] do
GenServer.reply(from, conn)
end
{:reply, conn, state}
else
error ->
......@@ -227,6 +282,13 @@ defp open_conn(key, uri, from, state, opts) do
port = uri.port
with {:ok, conn} <- API.open(host, port, opts) do
state =
if opts[:from_cast] do
put_in(state.queue, List.keydelete(state.queue, from, 0))
else
state
end
state =
put_in(state.conns[key], %Conn{
conn: conn,
......
......@@ -10,8 +10,7 @@ defmodule Pleroma.HTTP.Connection do
@options [
connect_timeout: 10_000,
timeout: 20_000,
pool: :federation,
version: :master
pool: :federation
]
require Logger
......@@ -61,7 +60,7 @@ def options(opts) do
end
defp get_conn_for_gun(url, options, pool) do
case Pleroma.Gun.Connections.get_conn(url, options, pool) do
case Pleroma.Gun.Connections.checkin(url, options, pool) do
nil ->
options
......
......@@ -45,15 +45,27 @@ def request(method, url, body \\ "", headers \\ [], options \\ []) do
params = Keyword.get(options, :params, [])
%{}
|> Builder.method(method)
|> Builder.url(url)
|> Builder.headers(headers)
|> Builder.opts(options)
|> Builder.add_param(:body, :body, body)
|> Builder.add_param(:query, :query, params)
|> Enum.into([])
|> (&Tesla.request(Connection.new(options), &1)).()
request =
%{}
|> Builder.method(method)
|> Builder.url(url)
|> Builder.headers(headers)
|> Builder.opts(options)
|> Builder.add_param(:body, :body, body)
|> Builder.add_param(:query, :query, params)
|> Enum.into([])
client = Connection.new(options)
response = Tesla.request(client, request)
if adapter_gun? do
%{adapter: {_, _, [adapter_options]}} = client
pool = adapter_options[:pool]
Pleroma.Gun.Connections.checkout(adapter_options[:conn], self(), pool)
Pleroma.Gun.Connections.process_queue(pool)
end
response
rescue
e ->
{:error, e}
......
This diff is collapsed.
......@@ -71,11 +71,18 @@ test "get_conn_for_gun/3" do
options = [adapter: [pool: :federation]]
assert {:ok, resp} =
Pleroma.HTTP.request(:get, "https://httpbin.org/user-agent", "", [], options)
assert {:ok, resp} = Pleroma.HTTP.get("https://httpbin.org/user-agent", [], options)
adapter_opts = resp.opts[:adapter]
assert resp.status == 200
assert adapter_opts[:url] == "https://httpbin.org/user-agent"
state = Pleroma.Gun.Connections.get_state(:federation)
conn = state.conns["https:httpbin.org:443"]
assert conn.conn_state == :idle
assert conn.used_by == []
assert state.queue == []
end
end
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment