Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,79 +5,104 @@ defmodule CodeCorps.StripeService.WebhookProcessing.WebhookProcessor do

alias CodeCorps.StripeEvent
alias CodeCorps.Repo
alias CodeCorps.StripeService.WebhookProcessing.{ConnectEventHandler, PlatformEventHandler}

@api Application.get_env(:code_corps, :stripe)

@doc """
Used to process a Stripe webhook event.

Receives the event json as the first parameter.
Since a webhook can be a platform or a connect webhook,
the function requires the handler module as the second parameter.
Receives the event JSON as the first parameter.

Since a webhook can be a platform or a connect webhook, the function requires
the handler module as the second parameter.

## Returns

* `{:ok, :ignored_by_environment}` if the event was ignored due to environment mismatch
* `{:ok, :enqueued}` if the event will be handled
- `{:ok, pid}` if the event will be handled
- `{:error, :ignored_by_environment}` if the event was ignored due to
environment mismatch

## Note

Stripe events can have their `livemode` property set to `true` or `false`.
A livemode event should be handled by the production environment, while all other environments
handle non-livemode events.
A livemode `true` event should be handled by the production environment,
while all other environments handle livemode `false` events.
"""
def process_async(%{} = json, handler) do
case event_matches_environment?(json) do
true -> do_process_async(json, handler)
false -> {:ok, :ignored_by_environment}
def process_async(%{"id" => id, "livemode" => livemode, "user_id" => user_id} = json, handler) do
case event_matches_environment?(livemode) do
true -> do_process_async(id, user_id, handler, json)
false -> {:error, :ignored_by_environment}
end
end

defp do_process_async(json, handler) do
Task.Supervisor.start_child(:webhook_processor, fn -> do_process(json, handler) end)
def process_async(%{"id" => id, "livemode" => livemode} = json, handler) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you can consolidate this def with the one on line 32 unless I'm missing something:

def process_async(%{"id" => id, "livemode" => livemode} = json, handler) 
  case event_matches_environment?(livemode) 
    true -> do_process_async(id, json["user_id"], handler, json)
    false -> {:error, :ignored_by_environment}
  end
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rossta the one on line 32 is doing pattern matching against the user_id key, so on platform events the signature will be off.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah, but isn't it just passing the user_id value into the same place where it would be nil in the other example? So I thought json["user_id"] as the second param to the true case would handle both implementations but I could be wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rossta yeah, I'm not sure how I feel about it. On the one hand I like the explicitness of saying that there are two different ways we handle this, but I suppose it's the same outcome.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did miss your code example somehow earlier so just read "consolidate" somehow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you mean about being explicit. In that case, I'm usually inclined to extract an intention-revealing variable or method to help highlight the difference. That may not be the best way to handle it as I'm still learning my way around the conventions in the codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that this interface should be redesigned to have two public fn's, one for processing platform and one for connect events, and then the controller does some pattern matching against user_id. I believe that every connect event should have a user_id, but I could be wrong.

case event_matches_environment?(livemode) do
true -> do_process_async(id, nil, handler, json)
false -> {:error, :ignored_by_environment}
end
end

defp event_matches_environment?(%{"livemode" => livemode}) do
case Application.get_env(:code_corps, :stripe_env) do
:prod -> livemode
_ -> !livemode
end
defp do_process_async(id, user_id, handler, json) do
Task.Supervisor.start_child(:webhook_processor, fn -> do_process(id, user_id, handler, json) end)
end

defp do_process(%{"id" => event_id, "type" => event_type} = json, handler) do
with {:ok, %StripeEvent{} = event} <- find_or_create_event(event_id, event_type) do
case handler.handle_event(json) |> Tuple.to_list do
[:ok, :unhandled_event] -> event |> set_unhandled
[:ok | _results] -> event |> set_processed
[:error | _error] -> event |> set_errored
end
defp do_process(id, user_id, handler, json) do
with {:ok, %Stripe.Event{id: api_event_id, type: api_event_type, user_id: api_user_id}} <- retrieve_event_from_api(id, user_id),
{:ok, endpoint} <- infer_endpoint_from_handler(handler),
{:ok, %StripeEvent{} = event} <- find_or_create_event(api_event_id, api_event_type, api_user_id, endpoint)
do
handle_event(json, event, handler)
else
{:error, :already_processing} -> nil
end
end

defp find_or_create_event(id_from_stripe, type) do
defp event_matches_environment?(livemode) do
case Application.get_env(:code_corps, :stripe_env) do
Copy link
Contributor

@rossta rossta Dec 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The livemode value is just true/false, no? How about just the following as the method body?

livemode && Application.get_env(:code_corps, :stripe_env) == :prod

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rossta we still want to be able to process events in staging, so we would never handle events with this situation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried thinking about how to improve this because it reads strange, but think it's isolated enough to not make a difference, and is kind of addressed in the moduledoc.

:prod -> livemode
_ -> !livemode
end
end

defp find_or_create_event(id_from_stripe, type, user_id, endpoint) do
case find_event(id_from_stripe) do
%StripeEvent{status: "processing"} -> {:error, :already_processing}
%StripeEvent{} = event -> {:ok, event}
nil -> create_event(id_from_stripe, type)
nil -> create_event(id_from_stripe, endpoint, type, user_id)
end
end

defp find_event(id_from_stripe) do
Repo.get_by(StripeEvent, id_from_stripe: id_from_stripe)
end

defp create_event(id_from_stripe, type) do
%StripeEvent{} |> StripeEvent.create_changeset(%{id_from_stripe: id_from_stripe, type: type}) |> Repo.insert
defp handle_event(json, event, handler) do
case handler.handle_event(json) |> Tuple.to_list do
[:ok, :unhandled_event] -> event |> set_unhandled
[:ok | _results] -> event |> set_processed
[:error | _error] -> event |> set_errored
end
end

defp set_processed(%StripeEvent{} = event) do
event |> StripeEvent.update_changeset(%{status: "processed"}) |> Repo.update
defp infer_endpoint_from_handler(ConnectEventHandler), do: {:ok, "connect"}
defp infer_endpoint_from_handler(PlatformEventHandler), do: {:ok, "platform"}
defp infer_endpoint_from_handler(_), do: {:error, :invalid_handler}

defp retrieve_event_from_api(id, nil), do: @api.Event.retrieve(id)
defp retrieve_event_from_api(id, user_id), do: @api.Event.retrieve(id, connect_account: user_id)

defp create_event(id_from_stripe, endpoint, type, user_id) do
%StripeEvent{} |> StripeEvent.create_changeset(%{endpoint: endpoint, id_from_stripe: id_from_stripe, type: type, user_id: user_id}) |> Repo.insert
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we follow any conventions on when to split out a pipe to a new line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We nominally use credo but don't have it enabled on the repo and honestly haven't run it in CLI in awhile.

end

defp set_errored(%StripeEvent{} = event) do
event |> StripeEvent.update_changeset(%{status: "errored"}) |> Repo.update
end

defp set_processed(%StripeEvent{} = event) do
event |> StripeEvent.update_changeset(%{status: "processed"}) |> Repo.update
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I often struggle with the question of whether or not to introduce another helper method for helper methods like set_(errored|processed|unhandled) when all three have similar implementation. However, another layer of indirection isn't always helpful and I think it's fine the way you have it here with already-small methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wasn't sure about this, hemmed and hawed, but ultimately this is probably okay. Was thinking we could just have done set_status/2 with a string as a second argument.


defp set_unhandled(%StripeEvent{} = event) do
event |> StripeEvent.update_changeset(%{status: "unhandled"}) |> Repo.update
end
Expand Down
39 changes: 39 additions & 0 deletions lib/code_corps/stripe_testing/event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule CodeCorps.StripeTesting.Event do
def retrieve(id, _opts = [connect_account: _]) do
{:ok, do_retrieve_connect(id)}
end
def retrieve(id) do
{:ok, do_retrieve(id)}
end

defp do_retrieve(_) do
{:ok, created} = DateTime.from_unix(1479472835)

%Stripe.Event{
api_version: "2016-07-06",
created: created,
id: "evt_123",
livemode: false,
object: "event",
pending_webhooks: 1,
request: nil,
type: "any.event"
}
end

defp do_retrieve_connect(_) do
{:ok, created} = DateTime.from_unix(1479472835)

%Stripe.Event{
api_version: "2016-07-06",
created: created,
id: "evt_123",
livemode: false,
object: "event",
pending_webhooks: 1,
request: nil,
type: "any.event",
user_id: "acct_123"
}
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ defmodule CodeCorps.Mixfile do
{:scrivener_ecto, "~> 1.0"}, # DB query pagination
{:segment, github: "stueccles/analytics-elixir"}, # Segment analytics
{:sentry, "~> 2.0"}, # Sentry error tracking
{:stripity_stripe, "~> 2.0.0-alpha.5"}, # Stripe
{:stripity_stripe, git: "https://github.com/code-corps/stripity_stripe.git", branch: "2.0"}, # Stripe
{:timber, "~> 0.4"}, # Logging
{:timex, "~> 3.0"},
{:timex_ecto, "~> 3.0"},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"sentry": {:hex, :sentry, "2.0.2", "f08638758f7bf891e238466009f6cd702fc26d87286663af26927a78ed149346", [:mix], [{:hackney, "~> 1.6.1", [hex: :hackney, optional: false]}, {:plug, "~> 1.0", [hex: :plug, optional: true]}, {:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}, {:uuid, "~> 1.0", [hex: :uuid, optional: false]}]},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:rebar, :make], []},
"stripe_eventex": {:hex, :stripe_eventex, "1.0.0", "782016598b751c0fdb5489038c92c30a5aab034636d0d9d3a486f75a01fbf0b6", [:mix], [{:cowboy, "~> 1.0.0", [hex: :cowboy, optional: false]}, {:plug, "~> 1.0", [hex: :plug, optional: false]}, {:poison, "~> 2.0", [hex: :poison, optional: false]}]},
"stripity_stripe": {:hex, :stripity_stripe, "2.0.0-alpha.5", "ba6d4ffc6251029135c76e9c6e2dd77580713f5c6833fb82da708336023bbfa2", [:mix], [{:hackney, "~> 1.6", [hex: :hackney, optional: false]}, {:poison, "~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}]},
"stripity_stripe": {:git, "https://github.com/code-corps/stripity_stripe.git", "d26b09aff994a30a17bcd35eff6863cbaadf5ec8", [branch: "2.0"]},
"timber": {:hex, :timber, "0.4.7", "df3fcd79bcb4eb4b53874d906ef5f3a212937b4bc7b7c5b244745202cc389443", [:mix], [{:ecto, "~> 2.0", [hex: :ecto, optional: true]}, {:phoenix, "~> 1.2", [hex: :phoenix, optional: true]}, {:plug, "~> 1.2", [hex: :plug, optional: true]}, {:poison, "~> 2.0 or ~> 3.0", [hex: :poison, optional: false]}]},
"timex": {:hex, :timex, "3.1.5", "413d6d8d6f0162a5d47080cb8ca520d790184ac43e097c95191c7563bf25b428", [:mix], [{:combine, "~> 0.7", [hex: :combine, optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}]},
"timex_ecto": {:hex, :timex_ecto, "3.0.5", "3ec6c25e10d2c0020958e5df64d2b5e690e441faa2c2259da8bc6bd3d7f39256", [:mix], [{:ecto, "~> 2.0", [hex: :ecto, optional: false]}, {:timex, "~> 3.0", [hex: :timex, optional: false]}]},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule CodeCorps.Repo.Migrations.AddUserIdAndEndpointToStripeEvents do
use Ecto.Migration

def change do
alter table(:stripe_events) do
add :endpoint, :string, null: false
add :user_id, :string
end
end
end
3 changes: 2 additions & 1 deletion test/controllers/stripe_connect_events_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ defmodule CodeCorps.StripeConnectEventsControllerTest do
"object" => "event",
"pending_webhooks" => 1,
"request" => nil,
"type" => type
"type" => type,
"user_id" => "acct_123"
}
end

Expand Down
15 changes: 13 additions & 2 deletions test/models/stripe_event_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule CodeCorps.StripeEventTest do
alias CodeCorps.StripeEvent

describe "create_changeset/2" do
@valid_attrs %{id_from_stripe: "evt_123", type: "any.event"}
@valid_attrs %{endpoint: "connect", id_from_stripe: "evt_123", type: "any.event"}

test "reports as valid when attributes are valid" do
changeset = StripeEvent.create_changeset(%StripeEvent{}, @valid_attrs)
Expand All @@ -15,6 +15,7 @@ defmodule CodeCorps.StripeEventTest do
changeset = StripeEvent.create_changeset(%StripeEvent{}, %{})

refute changeset.valid?
assert changeset.errors[:endpoint] == {"can't be blank", []}
assert changeset.errors[:id_from_stripe] == {"can't be blank", []}
assert changeset.errors[:type] == {"can't be blank", []}
end
Expand All @@ -27,6 +28,16 @@ defmodule CodeCorps.StripeEventTest do

assert record.status == "processing"
end

test "prevents :endpoint from being invalid" do
event = insert(:stripe_event)

attrs = %{endpoint: "random", id_from_stripe: "evt_123", type: "any.event"}
changeset = StripeEvent.create_changeset(event, attrs)

refute changeset.valid?
assert changeset.errors[:endpoint] == {"is invalid", []}
end
end

describe "update_changeset/2" do
Expand All @@ -51,7 +62,7 @@ defmodule CodeCorps.StripeEventTest do
test "prevents :status from being invalid" do
event = insert(:stripe_event)

changeset = StripeEvent.update_changeset(event, %{status: "invalid"})
changeset = StripeEvent.update_changeset(event, %{status: "random"})

refute changeset.valid?
assert changeset.errors[:status] == {"is invalid", []}
Expand Down
13 changes: 7 additions & 6 deletions test/support/factories.ex
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,6 @@ defmodule CodeCorps.Factories do
}
end

def stripe_file_upload_factory do
%CodeCorps.StripeFileUpload{
id_from_stripe: sequence(:id_from_stripe, &"stripe_id_#{&1}"),
}
end

def stripe_connect_subscription_factory do
stripe_connect_plan = build(:stripe_connect_plan)
%CodeCorps.StripeConnectSubscription{
Expand All @@ -160,12 +154,19 @@ defmodule CodeCorps.Factories do

def stripe_event_factory do
%CodeCorps.StripeEvent{
endpoint: sequence(:endpoint, fn(_) -> Enum.random(~w{ connect platform }) end),
id_from_stripe: sequence(:id_from_stripe, &"stripe_id_#{&1}"),
status: sequence(:status, fn(_) -> Enum.random(~w{ unprocessed processed errored }) end),
type: "test.type"
}
end

def stripe_file_upload_factory do
%CodeCorps.StripeFileUpload{
id_from_stripe: sequence(:id_from_stripe, &"stripe_id_#{&1}"),
}
end

def stripe_platform_customer_factory do
%CodeCorps.StripePlatformCustomer{
created: Timex.now,
Expand Down
2 changes: 1 addition & 1 deletion web/controllers/stripe_connect_events_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule CodeCorps.StripeConnectEventsController do

def create(conn, params) do
case WebhookProcessor.process_async(params, ConnectEventHandler) do
{:ok, :ignored_by_environment} -> conn |> send_resp(400, "")
{:ok, _pid} -> conn |> send_resp(200, "")
{:error, :ignored_by_environment} -> conn |> send_resp(400, "")
end
end
end
2 changes: 1 addition & 1 deletion web/controllers/stripe_platform_events_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule CodeCorps.StripePlatformEventsController do

def create(conn, params) do
case WebhookProcessor.process_async(params, PlatformEventHandler) do
{:ok, :ignored_by_environment} -> conn |> send_resp(400, "")
{:ok, _pid} -> conn |> send_resp(200, "")
{:error, :ignored_by_environment} -> conn |> send_resp(400, "")
end
end
end
15 changes: 11 additions & 4 deletions web/models/stripe_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule CodeCorps.StripeEvent do

## Fields

* `endpoint` - "connect" or "platform"
* `id_from_stripe` - Stripe's `id`
* `status` - "unprocessed", "processed", or "errored"

Expand All @@ -21,24 +22,26 @@ defmodule CodeCorps.StripeEvent do
use CodeCorps.Web, :model

schema "stripe_events" do
field :endpoint, :string, null: false
field :id_from_stripe, :string, null: false
field :status, :string, default: "unprocessed"
field :type, :string, null: false
field :user_id, :string

timestamps()
end

@doc """
Builds a changeset for storing a new event reference into the database.
Accepts `:id_from_stripe` only. The `status` field is set to "unprocessed"
by default.
The `status` field is set to "unprocessed" by default.
"""
def create_changeset(struct, params \\ %{}) do
struct
|> cast(params, [:id_from_stripe, :type])
|> validate_required([:id_from_stripe, :type])
|> cast(params, [:endpoint, :id_from_stripe, :type, :user_id])
|> validate_required([:endpoint, :id_from_stripe, :type])
|> put_change(:status, "processing")
|> validate_inclusion(:status, states)
|> validate_inclusion(:endpoint, endpoints)
|> unique_constraint(:id_from_stripe)
end

Expand All @@ -54,6 +57,10 @@ defmodule CodeCorps.StripeEvent do
|> validate_inclusion(:status, states)
end

defp endpoints do
~w{ connect platform }
end

defp states do
~w{ errored processed processing unhandled unprocessed }
end
Expand Down