From fda1bf377ecf3a13be1be12647bbe50eb054e9fb Mon Sep 17 00:00:00 2001 From: Josh Pigford Date: Thu, 24 Oct 2024 09:56:38 -0500 Subject: [PATCH 1/4] Initial pass --- app/models/provider/marketstack.rb | 79 +++++++++++++++++++ app/models/security/importer.rb | 10 +++ ...1023195438_add_stock_exchange_reference.rb | 7 ++ db/schema.rb | 9 ++- 4 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 app/models/provider/marketstack.rb create mode 100644 app/models/security/importer.rb create mode 100644 db/migrate/20241023195438_add_stock_exchange_reference.rb diff --git a/app/models/provider/marketstack.rb b/app/models/provider/marketstack.rb new file mode 100644 index 00000000000..2a1f1871831 --- /dev/null +++ b/app/models/provider/marketstack.rb @@ -0,0 +1,79 @@ +class Provider::Marketstack + include Retryable + + def initialize(api_key) + @api_key = api_key + end + + def fetch_tickers(exchange_mic:) + params = {} + params[:exchange] = exchange_mic if exchange_mic.present? + + tickers = paginate("/v1/tickers", params) do |body| + body.dig("data").map do |ticker| + { + ticker: ticker.dig("symbol"), + name: ticker.dig("name"), + country_code: ticker.dig("country_code"), + stock_exchange: StockExchange.find_by(mic: ticker.dig("exchange_mic")) + } + end + end + end + + private + + attr_reader :api_key + + def base_url + "https://api.marketstack.com/v1" + end + + def fetch_page(url, offset, params = {}) + client.get(url) do |req| + req.params["access_key"] = api_key + params.each { |k, v| req.params[k.to_s] = v.to_s } + req.params["offset"] = offset + req.params["limit"] = 10000 + end + end + + def client + @client ||= Faraday.new(url: base_url) do |faraday| + faraday.params["access_key"] = api_key + end + end + + def build_error(response) + Provider::Base::ProviderError.new(<<~ERROR) + Failed to fetch data from #{self.class} + Status: #{response.status} + Body: #{response.body.inspect} + ERROR + end + + def paginate(url, params = {}) + results = [] + offset = 0 + total = nil + + loop do + response = fetch_page(url, offset, params) + + if response.success? + body = JSON.parse(response.body) + page_results = yield(body) + results.concat(page_results) + + total ||= body.dig("pagination", "total") + offset += body.dig("pagination", "limit") + + break if offset >= total + else + raise build_error(response) + end + end + + results + end +end diff --git a/app/models/security/importer.rb b/app/models/security/importer.rb new file mode 100644 index 00000000000..ead2566beb4 --- /dev/null +++ b/app/models/security/importer.rb @@ -0,0 +1,10 @@ +class Security::Importer + def initialize(provider, stock_exchange) + @provider = provider + @stock_exchange = stock_exchange + end + + def import + provider.fetch_tickers(exchange_mic: stock_exchange.mic) + end +end diff --git a/db/migrate/20241023195438_add_stock_exchange_reference.rb b/db/migrate/20241023195438_add_stock_exchange_reference.rb new file mode 100644 index 00000000000..a07588b2f60 --- /dev/null +++ b/db/migrate/20241023195438_add_stock_exchange_reference.rb @@ -0,0 +1,7 @@ +class AddStockExchangeReference < ActiveRecord::Migration[7.2] + def change + add_column :securities, :country_code, :string + add_reference :securities, :stock_exchange, type: :uuid, foreign_key: true + add_index :securities, :country_code + end +end diff --git a/db/schema.rb b/db/schema.rb index d9fb2e48df0..3153ca5c10d 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.2].define(version: 2024_10_22_221544) do +ActiveRecord::Schema[7.2].define(version: 2024_10_23_195438) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" @@ -119,7 +119,7 @@ t.boolean "is_active", default: true, null: false t.date "last_sync_date" t.uuid "institution_id" - t.virtual "classification", type: :string, as: "\nCASE\n WHEN ((accountable_type)::text = ANY ((ARRAY['Loan'::character varying, 'CreditCard'::character varying, 'OtherLiability'::character varying])::text[])) THEN 'liability'::text\n ELSE 'asset'::text\nEND", stored: true + t.virtual "classification", type: :string, as: "\nCASE\n WHEN ((accountable_type)::text = ANY (ARRAY[('Loan'::character varying)::text, ('CreditCard'::character varying)::text, ('OtherLiability'::character varying)::text])) THEN 'liability'::text\n ELSE 'asset'::text\nEND", stored: true t.uuid "import_id" t.string "mode" t.index ["accountable_id", "accountable_type"], name: "index_accounts_on_accountable_id_and_accountable_type" @@ -478,6 +478,10 @@ t.string "name" t.datetime "created_at", null: false t.datetime "updated_at", null: false + t.string "country_code" + t.uuid "stock_exchange_id" + t.index ["country_code"], name: "index_securities_on_country_code" + t.index ["stock_exchange_id"], name: "index_securities_on_stock_exchange_id" end create_table "security_prices", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| @@ -598,6 +602,7 @@ add_foreign_key "imports", "families" add_foreign_key "institutions", "families" add_foreign_key "merchants", "families" + add_foreign_key "securities", "stock_exchanges" add_foreign_key "sessions", "impersonation_sessions", column: "active_impersonator_session_id" add_foreign_key "sessions", "users" add_foreign_key "taggings", "tags" From 4a2f0b961b5cd3be5bdc5d511992afd2139f0666 Mon Sep 17 00:00:00 2001 From: Josh Pigford Date: Thu, 24 Oct 2024 11:08:40 -0500 Subject: [PATCH 2/4] Marketstack data provider --- app/models/provider/marketstack.rb | 132 +++++++++++++++++++++++------ 1 file changed, 106 insertions(+), 26 deletions(-) diff --git a/app/models/provider/marketstack.rb b/app/models/provider/marketstack.rb index 2a1f1871831..6d6958b5629 100644 --- a/app/models/provider/marketstack.rb +++ b/app/models/provider/marketstack.rb @@ -5,39 +5,111 @@ def initialize(api_key) @api_key = api_key end - def fetch_tickers(exchange_mic:) - params = {} - params[:exchange] = exchange_mic if exchange_mic.present? + def fetch_security_prices(ticker:, start_date:, end_date:) + prices = paginate("#{base_url}/eod", { + symbols: ticker, + date_from: start_date.to_s, + date_to: end_date.to_s + }) do |body| + body.dig("data").map do |price| + { + date: price["date"], + price: price["close"]&.to_f, + currency: "USD" + } + end + end - tickers = paginate("/v1/tickers", params) do |body| - body.dig("data").map do |ticker| + SecurityPriceResponse.new( + prices: prices, + success?: true, + raw_response: prices.to_json + ) + rescue StandardError => error + SecurityPriceResponse.new( + success?: false, + error: error, + raw_response: error + ) + end + + def fetch_all_tickers + response = client.get("#{base_url}/tickers") + + if response.success? + tickers = JSON.parse(response.body).dig("data").map do |ticker| { - ticker: ticker.dig("symbol"), - name: ticker.dig("name"), - country_code: ticker.dig("country_code"), - stock_exchange: StockExchange.find_by(mic: ticker.dig("exchange_mic")) + name: ticker["name"], + symbol: ticker["symbol"], + exchange: ticker.dig("stock_exchange", "mic"), + country: ticker.dig("stock_exchange", "country") } end + + TickerResponse.new( + tickers: tickers, + success?: true, + raw_response: response + ) + else + TickerResponse.new( + success?: false, + error: build_error(response), + raw_response: response + ) end + rescue StandardError => error + TickerResponse.new( + success?: false, + error: error, + raw_response: error + ) + end + + def fetch_exchange_tickers(exchange_mic:) + response = client.get("#{base_url}/exchanges/#{exchange_mic}/tickers") + + if response.success? + tickers = JSON.parse(response.body).dig("data").map do |ticker| + { + name: ticker["name"], + symbol: ticker["symbol"], + exchange: exchange_mic, + country: ticker.dig("stock_exchange", "country") + } + end + + TickerResponse.new( + tickers: tickers, + success?: true, + raw_response: response + ) + else + TickerResponse.new( + success?: false, + error: build_error(response), + raw_response: response + ) + end + rescue StandardError => error + TickerResponse.new( + success?: false, + error: error, + raw_response: error + ) end private attr_reader :api_key + SecurityPriceResponse = Struct.new(:prices, :success?, :error, :raw_response, keyword_init: true) + TickerResponse = Struct.new(:tickers, :success?, :error, :raw_response, keyword_init: true) + def base_url "https://api.marketstack.com/v1" end - def fetch_page(url, offset, params = {}) - client.get(url) do |req| - req.params["access_key"] = api_key - params.each { |k, v| req.params[k.to_s] = v.to_s } - req.params["offset"] = offset - req.params["limit"] = 10000 - end - end - def client @client ||= Faraday.new(url: base_url) do |faraday| faraday.params["access_key"] = api_key @@ -52,26 +124,34 @@ def build_error(response) ERROR end + def fetch_page(url, page, params = {}) + client.get(url) do |req| + params.each { |k, v| req.params[k.to_s] = v.to_s } + req.params["offset"] = (page - 1) * 100 # Marketstack uses offset-based pagination + req.params["limit"] = 10000 # Maximum allowed by Marketstack + end + end + def paginate(url, params = {}) results = [] - offset = 0 - total = nil + page = 1 + total_results = Float::INFINITY - loop do - response = fetch_page(url, offset, params) + while results.length < total_results + response = fetch_page(url, page, params) if response.success? body = JSON.parse(response.body) page_results = yield(body) results.concat(page_results) - total ||= body.dig("pagination", "total") - offset += body.dig("pagination", "limit") - - break if offset >= total + total_results = body.dig("pagination", "total") + page += 1 else raise build_error(response) end + + break if results.length >= total_results end results From 73b0c609829cf18f9dca0a0721483eddbf756170 Mon Sep 17 00:00:00 2001 From: Josh Pigford Date: Thu, 24 Oct 2024 14:11:31 -0500 Subject: [PATCH 3/4] Marketstack data provider --- .env.example | 4 ++ .env.local.example | 3 ++ app/controllers/securities_controller.rb | 5 +++ app/helpers/securities_helper.rb | 2 + app/jobs/securities_import_job.rb | 10 +++++ app/models/provider/marketstack.rb | 52 ++++++++---------------- app/models/security/importer.rb | 25 +++++++++++- 7 files changed, 65 insertions(+), 36 deletions(-) create mode 100644 app/controllers/securities_controller.rb create mode 100644 app/helpers/securities_helper.rb create mode 100644 app/jobs/securities_import_job.rb diff --git a/.env.example b/.env.example index fd3e93c4202..02865a5ac09 100644 --- a/.env.example +++ b/.env.example @@ -15,6 +15,10 @@ PORT=3000 # This is used to convert between different currencies in the app. In addition, it fetches US stock prices. We use Synth, which is a Maybe product. You can sign up for a free account at synthfinance.com. SYNTH_API_KEY= +# Non-US Stock Pricing API +# This is used to fetch non-US stock prices. We use Marketstack.com for this and while they offer a free tier, it is quite limited. You'll almost certainly need their Basic plan, which is $9.99 per month. +MARKETSTACK_API_KEY= + # SMTP Configuration # This is only needed if you intend on sending emails from your Maybe instance (such as for password resets or email financial reports). # Resend.com is a good option that offers a free tier for sending emails. diff --git a/.env.local.example b/.env.local.example index d393f62346d..cd38f220e0e 100644 --- a/.env.local.example +++ b/.env.local.example @@ -3,3 +3,6 @@ SELF_HOSTED=false # Enable Synth market data (careful, this will use your API credits) SYNTH_API_KEY=yourapikeyhere + +# Enable Marketstack market data (careful, this will use your API credits) +MARKETSTACK_API_KEY=yourapikeyhere diff --git a/app/controllers/securities_controller.rb b/app/controllers/securities_controller.rb new file mode 100644 index 00000000000..24356118e4b --- /dev/null +++ b/app/controllers/securities_controller.rb @@ -0,0 +1,5 @@ +class SecuritiesController < ApplicationController + def import + SecuritiesImportJob.perform_later(params[:exchange_mic]) + end +end diff --git a/app/helpers/securities_helper.rb b/app/helpers/securities_helper.rb new file mode 100644 index 00000000000..05762cb35a6 --- /dev/null +++ b/app/helpers/securities_helper.rb @@ -0,0 +1,2 @@ +module SecuritiesHelper +end diff --git a/app/jobs/securities_import_job.rb b/app/jobs/securities_import_job.rb new file mode 100644 index 00000000000..71b473f4dfe --- /dev/null +++ b/app/jobs/securities_import_job.rb @@ -0,0 +1,10 @@ +class SecuritiesImportJob < ApplicationJob + queue_as :default + + def perform(exchange_mic = nil) + market_stack_client = Provider::Marketstack.new(ENV["MARKETSTACK_API_KEY"]) + importer = Security::Importer.new(market_stack_client, exchange_mic) + + importer.import + end +end diff --git a/app/models/provider/marketstack.rb b/app/models/provider/marketstack.rb index 6d6958b5629..cbad0fedfa8 100644 --- a/app/models/provider/marketstack.rb +++ b/app/models/provider/marketstack.rb @@ -34,30 +34,22 @@ def fetch_security_prices(ticker:, start_date:, end_date:) end def fetch_all_tickers - response = client.get("#{base_url}/tickers") - - if response.success? - tickers = JSON.parse(response.body).dig("data").map do |ticker| + tickers = paginate("#{base_url}/tickers") do |body| + body.dig("data").map do |ticker| { name: ticker["name"], symbol: ticker["symbol"], exchange: ticker.dig("stock_exchange", "mic"), - country: ticker.dig("stock_exchange", "country") + country_code: ticker.dig("stock_exchange", "country_code") } end - - TickerResponse.new( - tickers: tickers, - success?: true, - raw_response: response - ) - else - TickerResponse.new( - success?: false, - error: build_error(response), - raw_response: response - ) end + + TickerResponse.new( + tickers: tickers, + success?: true, + raw_response: tickers.to_json + ) rescue StandardError => error TickerResponse.new( success?: false, @@ -67,30 +59,22 @@ def fetch_all_tickers end def fetch_exchange_tickers(exchange_mic:) - response = client.get("#{base_url}/exchanges/#{exchange_mic}/tickers") - - if response.success? - tickers = JSON.parse(response.body).dig("data").map do |ticker| + tickers = paginate("#{base_url}/tickers?exchange=#{exchange_mic}") do |body| + body.dig("data").map do |ticker| { name: ticker["name"], symbol: ticker["symbol"], exchange: exchange_mic, - country: ticker.dig("stock_exchange", "country") + country_code: ticker.dig("stock_exchange", "country_code") } end - - TickerResponse.new( - tickers: tickers, - success?: true, - raw_response: response - ) - else - TickerResponse.new( - success?: false, - error: build_error(response), - raw_response: response - ) end + + TickerResponse.new( + tickers: tickers, + success?: true, + raw_response: tickers.to_json + ) rescue StandardError => error TickerResponse.new( success?: false, diff --git a/app/models/security/importer.rb b/app/models/security/importer.rb index ead2566beb4..a41d402f6a8 100644 --- a/app/models/security/importer.rb +++ b/app/models/security/importer.rb @@ -1,10 +1,31 @@ class Security::Importer - def initialize(provider, stock_exchange) + def initialize(provider, stock_exchange = nil) @provider = provider @stock_exchange = stock_exchange end def import - provider.fetch_tickers(exchange_mic: stock_exchange.mic) + if @stock_exchange + securities = @provider.fetch_exchange_tickers(exchange_mic: @stock_exchange)&.tickers + else + securities = @provider.fetch_all_tickers&.tickers + end + + stock_exchanges = StockExchange.where(mic: securities.map { |s| s[:exchange] }).index_by(&:mic) + existing_securities = Security.where(ticker: securities.map { |s| s[:symbol] }, stock_exchange_id: stock_exchanges.values.map(&:id)).pluck(:ticker, :stock_exchange_id).to_set + + securities_to_create = securities.map do |security| + stock_exchange_id = stock_exchanges[security[:exchange]]&.id + next if existing_securities.include?([ security[:symbol], stock_exchange_id ]) + + { + name: security[:name], + ticker: security[:symbol], + stock_exchange_id: stock_exchange_id, + country_code: security[:country_code] + } + end.compact + + Security.insert_all(securities_to_create) unless securities_to_create.empty? end end From 2dfbb24a76ed989eda47edf013e1ee02df5f4a76 Mon Sep 17 00:00:00 2001 From: Josh Pigford Date: Thu, 24 Oct 2024 16:10:04 -0500 Subject: [PATCH 4/4] Refactor a bit --- app/jobs/securities_import_job.rb | 9 ++++++--- app/models/provider/marketstack.rb | 32 ++++-------------------------- app/models/security/importer.rb | 6 +----- app/models/stock_exchange.rb | 1 + 4 files changed, 12 insertions(+), 36 deletions(-) diff --git a/app/jobs/securities_import_job.rb b/app/jobs/securities_import_job.rb index 71b473f4dfe..85fef904a58 100644 --- a/app/jobs/securities_import_job.rb +++ b/app/jobs/securities_import_job.rb @@ -1,10 +1,13 @@ class SecuritiesImportJob < ApplicationJob queue_as :default - def perform(exchange_mic = nil) + def perform(country_code = nil) + exchanges = StockExchange.in_country(country_code) market_stack_client = Provider::Marketstack.new(ENV["MARKETSTACK_API_KEY"]) - importer = Security::Importer.new(market_stack_client, exchange_mic) - importer.import + exchanges.each do |exchange| + importer = Security::Importer.new(market_stack_client, exchange.mic) + importer.import + end end end diff --git a/app/models/provider/marketstack.rb b/app/models/provider/marketstack.rb index cbad0fedfa8..5a1b8de9ab5 100644 --- a/app/models/provider/marketstack.rb +++ b/app/models/provider/marketstack.rb @@ -33,38 +33,14 @@ def fetch_security_prices(ticker:, start_date:, end_date:) ) end - def fetch_all_tickers - tickers = paginate("#{base_url}/tickers") do |body| + def fetch_tickers(exchange_mic: nil) + url = exchange_mic ? "#{base_url}/tickers?exchange=#{exchange_mic}" : "#{base_url}/tickers" + tickers = paginate(url) do |body| body.dig("data").map do |ticker| { name: ticker["name"], symbol: ticker["symbol"], - exchange: ticker.dig("stock_exchange", "mic"), - country_code: ticker.dig("stock_exchange", "country_code") - } - end - end - - TickerResponse.new( - tickers: tickers, - success?: true, - raw_response: tickers.to_json - ) - rescue StandardError => error - TickerResponse.new( - success?: false, - error: error, - raw_response: error - ) - end - - def fetch_exchange_tickers(exchange_mic:) - tickers = paginate("#{base_url}/tickers?exchange=#{exchange_mic}") do |body| - body.dig("data").map do |ticker| - { - name: ticker["name"], - symbol: ticker["symbol"], - exchange: exchange_mic, + exchange: exchange_mic || ticker.dig("stock_exchange", "mic"), country_code: ticker.dig("stock_exchange", "country_code") } end diff --git a/app/models/security/importer.rb b/app/models/security/importer.rb index a41d402f6a8..4970170bb84 100644 --- a/app/models/security/importer.rb +++ b/app/models/security/importer.rb @@ -5,11 +5,7 @@ def initialize(provider, stock_exchange = nil) end def import - if @stock_exchange - securities = @provider.fetch_exchange_tickers(exchange_mic: @stock_exchange)&.tickers - else - securities = @provider.fetch_all_tickers&.tickers - end + securities = @provider.fetch_tickers(exchange_mic: @stock_exchange)&.tickers stock_exchanges = StockExchange.where(mic: securities.map { |s| s[:exchange] }).index_by(&:mic) existing_securities = Security.where(ticker: securities.map { |s| s[:symbol] }, stock_exchange_id: stock_exchanges.values.map(&:id)).pluck(:ticker, :stock_exchange_id).to_set diff --git a/app/models/stock_exchange.rb b/app/models/stock_exchange.rb index 7141198e8e7..93631426ad6 100644 --- a/app/models/stock_exchange.rb +++ b/app/models/stock_exchange.rb @@ -1,2 +1,3 @@ class StockExchange < ApplicationRecord + scope :in_country, ->(country_code) { where(country_code: country_code) } end