Initial commit: MCP server wrapping tradingview-screener as MCP tools

- 11 MCP tools: get_stock_quotes, screen_market, find_top_gainers,
  find_top_losers, find_most_active, technical_scan, fundamental_scan,
  list_markets, list_fields, set_session, clear_session
- Structured filter conditions with nested And/Or groups
- Session cookie resolution: per-call > in-memory > env var
- auto-load .env via python-dotenv
- 69 unit tests covering helpers, conditions, filters, and all tools
This commit is contained in:
achmad
2026-05-17 11:09:26 +07:00
commit 38f9b980ec
6 changed files with 1015 additions and 0 deletions
+3
View File
@@ -0,0 +1,3 @@
# TradingView session ID for real-time data access
# Get it from: DevTools > Application > Cookies > tradingview.com > sessionid
TV_SESSION_ID=
+6
View File
@@ -0,0 +1,6 @@
.env
.venv/
__pycache__/
.pytest_cache/
*.pyc
.vscode/
+10
View File
@@ -0,0 +1,10 @@
[project]
name = "tradingview-screener-mcp"
version = "0.1.0"
description = "MCP server for TradingView stock screener"
requires-python = ">=3.11"
dependencies = [
"mcp>=1.0.0",
"tradingview-screener>=3.0.0",
"python-dotenv>=1.0.0",
]
+4
View File
@@ -0,0 +1,4 @@
{
"venvPath": ".",
"venv": ".venv"
}
+522
View File
@@ -0,0 +1,522 @@
from __future__ import annotations
import os
from typing import Any, cast
from dotenv import load_dotenv
load_dotenv()
from mcp.server.fastmcp import FastMCP
from tradingview_screener.column import col
from tradingview_screener.query import And, Or, Query
from tradingview_screener.screeners import (
bond,
cfd,
coin,
crypto,
crypto_dex,
forex,
futures,
options,
stocks,
)
mcp = FastMCP("tradingview-screener")
_session_cookies: dict[str, str] | None = None
def _resolve_cookies(sessionid: str | None = None) -> dict[str, str] | None:
if sessionid:
return {"sessionid": sessionid}
if _session_cookies is not None:
return _session_cookies
env = os.environ.get("TV_SESSION_ID")
if env:
return {"sessionid": env}
return None
def _exec_query(
q: Query,
sessionid: str | None = None,
) -> tuple[int, list[dict[str, Any]]]:
cookies = _resolve_cookies(sessionid)
total, df = q.get_scanner_data(cookies=cookies)
return total, cast("list[dict[str, Any]]", df.to_dict(orient="records"))
def _build_condition(f: dict[str, Any]) -> Any:
if "field" in f:
c = col(f["field"])
op = f["operator"]
if op in ("empty", "not_empty"):
return c.empty() if op == "empty" else c.not_empty()
val = f["value"]
match op:
case ">" | "greater":
return c > val
case ">=" | "egreater":
return c >= val
case "<" | "less":
return c < val
case "<=" | "eless":
return c <= val
case "==" | "equal":
return c == val
case "!=" | "nequal":
return c != val
case "between":
return c.between(val[0], val[1])
case "not_between":
return c.not_between(val[0], val[1])
case "isin":
return c.isin(val)
case "not_in":
return c.not_in(val)
case "has":
return c.has(val)
case "has_none_of":
return c.has_none_of(val)
case "like":
return c.like(val)
case "not_like":
return c.not_like(val)
case "crosses":
return c.crosses(val)
case "crosses_above":
return c.crosses_above(val)
case "crosses_below":
return c.crosses_below(val)
case "above_pct":
return c.above_pct(val[0], val[1])
case "below_pct":
return c.below_pct(val[0], val[1])
case "between_pct":
return c.between_pct(val[0], val[1], val[2])
case "in_day_range":
return c.in_day_range(val[0], val[1])
case "in_week_range":
return c.in_week_range(val[0], val[1])
case "in_month_range":
return c.in_month_range(val[0], val[1])
if "operator" in f and "filters" in f:
sub = [_build_condition(sf) for sf in f["filters"]]
if f["operator"] == "or":
return Or(*sub)
return And(*sub)
msg = f"Invalid filter: {f}"
raise ValueError(msg)
def _apply_filters(q: Query, filters: list[dict[str, Any]] | None) -> Query:
if not filters:
return q
simple: list[Any] = []
nested: list[Any] = []
for f in filters:
if "operator" in f and "filters" in f:
nested.append(_build_condition(f))
else:
simple.append(_build_condition(f))
if simple:
q = q.where(*simple)
if nested:
default_filter2 = q.query.get("filter2")
user_op = nested[0] if len(nested) == 1 else And(*nested)
user_filter2 = user_op["operation"]
if default_filter2:
q.query["filter2"] = {
"operator": "and",
"operands": [{"operation": user_filter2}, {"operation": default_filter2}],
}
else:
q.query["filter2"] = user_filter2
return q
MARKET_FACTORIES: dict[str, Any] = {
"stocks": stocks,
"crypto": crypto,
"crypto_dex": crypto_dex,
"coin": coin,
"forex": forex,
"futures": futures,
"bond": bond,
"cfd": cfd,
"options": options,
}
STOCK_COUNTRY_MARKETS: list[str] = [
"america", "argentina", "australia", "austria", "bangladesh", "belgium",
"brazil", "bulgaria", "canada", "chile", "china", "colombia", "croatia",
"cyprus", "czech", "denmark", "egypt", "estonia", "finland", "france",
"germany", "ghana", "greece", "hongkong", "hungary", "iceland", "india",
"indonesia", "ireland", "israel", "italy", "japan", "jordan", "kazakhstan",
"kenya", "kuwait", "latvia", "lithuania", "luxembourg", "malaysia",
"malta", "mexico", "morocco", "netherlands", "newzealand", "nigeria",
"norway", "oman", "pakistan", "peru", "philippines", "poland", "portugal",
"qatar", "romania", "saudiarabia", "serbia", "singapore", "slovakia",
"slovenia", "southafrica", "southkorea", "spain", "srilanka", "sweden",
"switzerland", "taiwan", "thailand", "tunisia", "turkey", "uganda",
"ukraine", "unitedarabemirates", "unitedkingdom", "vietnam",
]
FIELD_CATEGORIES: dict[str, list[dict[str, str]]] = {
"price": [
{"name": "open", "description": "Open price"},
{"name": "high", "description": "High price"},
{"name": "low", "description": "Low price"},
{"name": "close", "description": "Close price"},
{"name": "volume", "description": "Trading volume"},
{"name": "change", "description": "Percent change"},
{"name": "change_abs", "description": "Absolute change"},
{"name": "premarket_change", "description": "Premarket percent change"},
{"name": "postmarket_change", "description": "Postmarket percent change"},
{"name": "VWAP", "description": "Volume Weighted Average Price"},
{"name": "52 Week High", "description": "52-week high price"},
{"name": "52 Week Low", "description": "52-week low price"},
],
"technical": [
{"name": "RSI", "description": "Relative Strength Index (14)"},
{"name": "RSI.5", "description": "RSI (5)"},
{"name": "Stoch.K", "description": "Stochastic %K"},
{"name": "Stoch.D", "description": "Stochastic %D"},
{"name": "MACD.macd", "description": "MACD line"},
{"name": "MACD.signal", "description": "MACD signal line"},
{"name": "MACD.histogram", "description": "MACD histogram"},
{"name": "BB.upper", "description": "Bollinger Band upper"},
{"name": "BB.middle", "description": "Bollinger Band middle (SMA 20)"},
{"name": "BB.lower", "description": "Bollinger Band lower"},
{"name": "SMA", "description": "Simple Moving Average"},
{"name": "EMA", "description": "Exponential Moving Average"},
{"name": "SMA20", "description": "SMA 20"},
{"name": "SMA50", "description": "SMA 50"},
{"name": "SMA200", "description": "SMA 200"},
{"name": "EMA5", "description": "EMA 5"},
{"name": "EMA20", "description": "EMA 20"},
{"name": "EMA50", "description": "EMA 50"},
{"name": "EMA200", "description": "EMA 200"},
{"name": "ADX", "description": "Average Directional Index (14)"},
{"name": "ATR", "description": "Average True Range"},
{"name": "AO", "description": "Awesome Oscillator"},
{"name": "OBV", "description": "On Balance Volume"},
{"name": "CCI20", "description": "Commodity Channel Index (20)"},
{"name": "ROC", "description": "Rate of Change"},
{"name": "Williams %R", "description": "Williams Percent Range (14)"},
{"name": "relative_volume_10d_calc", "description": "Relative volume vs 10-day average"},
{"name": "volume_ma_10", "description": "Volume moving average (10)"},
{"name": "TechRating_1D", "description": "Technical rating (1 day)"},
],
"fundamental": [
{"name": "market_cap_basic", "description": "Market cap"},
{"name": "price_earnings_ttm", "description": "Price/Earnings (TTM)"},
{"name": "earnings_per_share_diluted_ttm", "description": "EPS diluted (TTM)"},
{"name": "earnings_per_share_diluted_yoy_growth_ttm", "description": "EPS YoY growth (TTM)"},
{"name": "dividends_yield_current", "description": "Dividend yield"},
{"name": "dividends_payout_ratio", "description": "Dividend payout ratio"},
{"name": "price_to_book_fq", "description": "Price/Book"},
{"name": "price_to_sales", "description": "Price/Sales"},
{"name": "price_to_cash_flow", "description": "Price/Cash Flow"},
{"name": "return_on_equity", "description": "Return on Equity"},
{"name": "return_on_assets", "description": "Return on Assets"},
{"name": "operating_margin", "description": "Operating margin"},
{"name": "profit_margin", "description": "Profit margin"},
{"name": "revenue_growth", "description": "Revenue growth"},
{"name": "earnings_growth", "description": "Earnings growth"},
{"name": "debt_to_equity", "description": "Debt/Equity"},
{"name": "current_ratio_fq", "description": "Current ratio"},
{"name": "beta_1_year", "description": "Beta (1 year)"},
{"name": "float", "description": "Shares float"},
{"name": "shares_outstanding", "description": "Shares outstanding"},
{"name": "short_ratio_fq", "description": "Short ratio"},
{"name": "short_float", "description": "Short % of float"},
{"name": "insider_ownership", "description": "Insider ownership"},
{"name": "institutional_ownership", "description": "Institutional ownership"},
{"name": "price_earnings_ttm", "description": "P/E ratio (TTM)"},
{"name": "AnalystRating", "description": "Analyst rating (numeric)"},
],
"general": [
{"name": "name", "description": "Short name / ticker"},
{"name": "description", "description": "Company description"},
{"name": "ticker", "description": "Full ticker (EXCHANGE:SYMBOL)"},
{"name": "exchange", "description": "Exchange name"},
{"name": "market", "description": "Market/country"},
{"name": "sector", "description": "Sector"},
{"name": "industry", "description": "Industry"},
{"name": "country", "description": "Country"},
{"name": "currency", "description": "Quote currency"},
{"name": "type", "description": "Instrument type"},
{"name": "typespecs", "description": "Instrument sub-type"},
{"name": "is_primary", "description": "Primary listing flag"},
],
}
def _get_market_factory(market_type: str, country_or_param: str | None = None) -> Query:
if market_type == "stocks" or market_type not in MARKET_FACTORIES:
fact = stocks
return fact(country_or_param or "america")
fact = MARKET_FACTORIES[market_type]
if market_type == "options":
return fact(country_or_param or "CME_MINI:ESM2026")
return fact()
# ---------------------------------------------------------------------------
# Tools
# ---------------------------------------------------------------------------
@mcp.tool()
def get_stock_quotes(
tickers: list[str],
columns: list[str] | None = None,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Get quote data for specific ticker symbols across any market.
Examples:
- get_stock_quotes(tickers=["NASDAQ:NVDA", "NASDAQ:AAPL"])
- get_stock_quotes(tickers=["BINANCE:BTCUSDT"], market_type="crypto")
- get_stock_quotes(tickers=["NYSE:SPY"], columns=["name","close","volume","RSI"])
"""
q = _get_market_factory(market_type, market_country)
if columns:
q = q.select(*columns)
q = q.set_tickers(*tickers)
total, data = _exec_query(q, sessionid)
return f"Found {total} result(s)\n{data}"
@mcp.tool()
def screen_market(
columns: list[str] | None = None,
filters: list[dict[str, Any]] | None = None,
order_by: str | None = None,
ascending: bool = False,
limit: int = 50,
offset: int = 0,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""General-purpose screener across any market type.
Parameters:
columns: Field names to return (e.g. ["name","close","RSI","volume"]).
filters: Structured filter conditions.
Simple condition: {"field": "<name>", "operator": "<op>", "value": <value>}
Nested group: {"operator": "and"|"or", "filters": [<conditions>]}
Operators: >, >=, <, <=, ==, !=, between, not_between, isin, not_in,
has, has_none_of, like, not_like, empty, not_empty,
crosses, crosses_above, crosses_below,
above_pct, below_pct, between_pct,
in_day_range, in_week_range, in_month_range
Examples:
[{"field": "RSI", "operator": ">", "value": 70}]
[{"field": "close", "operator": "between", "value": [100, 200]}]
[{"field": "exchange", "operator": "isin", "value": ["NASDAQ","NYSE"]}]
[{"field": "MACD.macd", "operator": "crosses_above", "value": "MACD.signal"}]
Nested: [{"operator": "or", "filters": [
{"field": "RSI", "operator": ">", "value": 70},
{"field": "RSI", "operator": "<", "value": 30}
]}]
order_by: Field to sort by (e.g. "volume", "market_cap_basic").
ascending: Sort ascending (default False = descending).
limit: Max rows to return (1-1000, default 50).
offset: Row offset for pagination.
market_type: Market type: stocks, crypto, forex, futures, bond, cfd, coin, crypto_dex, options.
market_country: For stocks, a country name (e.g. "america", "india", "germany").
For options, the underlying symbol (e.g. "CME_MINI:ESM2026").
sessionid: Optional TradingView session ID for real-time data.
"""
q = _get_market_factory(market_type, market_country)
if columns:
q = q.select(*columns)
q = _apply_filters(q, filters)
if order_by:
q = q.order_by(order_by, ascending=ascending)
q = q.limit(limit).offset(offset)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def find_top_gainers(
limit: int = 20,
min_price: float | None = None,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Find top gainers by percent change in any market."""
q = _get_market_factory(market_type, market_country)
q = q.select("name", "close", "change", "change_abs", "volume")
filters = []
if min_price is not None:
filters.append({"field": "close", "operator": ">", "value": min_price})
q = _apply_filters(q, filters)
q = q.order_by("change", ascending=False).limit(limit)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def find_top_losers(
limit: int = 20,
min_price: float | None = None,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Find top losers by percent change in any market."""
q = _get_market_factory(market_type, market_country)
q = q.select("name", "close", "change", "change_abs", "volume")
filters = []
if min_price is not None:
filters.append({"field": "close", "operator": ">", "value": min_price})
q = _apply_filters(q, filters)
q = q.order_by("change", ascending=True).limit(limit)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def find_most_active(
limit: int = 20,
min_price: float | None = None,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Find most active instruments by volume."""
q = _get_market_factory(market_type, market_country)
q = q.select("name", "close", "change", "volume")
filters = []
if min_price is not None:
filters.append({"field": "close", "operator": ">", "value": min_price})
q = _apply_filters(q, filters)
q = q.order_by("volume", ascending=False).limit(limit)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def technical_scan(
filters: list[dict[str, Any]],
columns: list[str] | None = None,
limit: int = 50,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Screen using technical indicator conditions.
Example filters:
[{"field": "RSI", "operator": "<", "value": 30}, # oversold
{"field": "MACD.macd", "operator": "crosses_above", "value": "MACD.signal"}, # bullish MACD
{"field": "close", "operator": "above_pct", "value": ["SMA50", 1.02]}] # 2% above SMA50
"""
default_cols = ["name", "close", "change", "volume", "RSI", "MACD.macd", "SMA50", "VWAP"]
q = _get_market_factory(market_type, market_country)
q = q.select(*(columns or default_cols))
q = _apply_filters(q, filters)
q = q.limit(limit)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def fundamental_scan(
filters: list[dict[str, Any]],
columns: list[str] | None = None,
limit: int = 50,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Screen by fundamental metrics (market cap, P/E, dividend, sector, etc.).
Example filters:
[{"field": "market_cap_basic", "operator": "between", "value": [1e9, 1e11}],
{"field": "price_earnings_ttm", "operator": "<", "value": 20},
{"field": "dividends_yield_current", "operator": ">", "value": 2},
{"field": "sector", "operator": "isin", "value": ["Technology", "Healthcare"]}]
"""
default_cols = ["name", "close", "market_cap_basic", "price_earnings_ttm",
"dividends_yield_current", "sector", "industry"]
q = _get_market_factory(market_type, market_country)
q = q.select(*(columns or default_cols))
q = _apply_filters(q, filters)
q = q.limit(limit)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def list_markets() -> str:
"""List all available market types and stock-country markets."""
markets = {
"asset_types": [
{"id": "stocks", "description": "Stocks (common, preferred, DRs, funds) — use market_country param"},
{"id": "crypto", "description": "Centralised-exchange crypto pairs"},
{"id": "crypto_dex", "description": "Decentralised-exchange crypto pairs (USD)"},
{"id": "coin", "description": "CoinMarketCap crypto coins"},
{"id": "forex", "description": "Forex currency pairs"},
{"id": "futures", "description": "Futures contracts"},
{"id": "bond", "description": "Bonds"},
{"id": "cfd", "description": "Contracts for Difference"},
{"id": "options", "description": "Options (use market_country param for underlying symbol)"},
],
"stock_countries": STOCK_COUNTRY_MARKETS,
}
return str(markets)
@mcp.tool()
def list_fields(category: str | None = None) -> str:
"""List available screener fields, optionally filtered by category.
Categories: price, technical, fundamental, general
If no category given, returns all categories.
"""
if category:
cat = category.lower()
if cat in FIELD_CATEGORIES:
return str(FIELD_CATEGORIES[cat])
return f"Category '{category}' not found. Available: price, technical, fundamental, general"
return str(FIELD_CATEGORIES)
@mcp.tool()
def set_session(sessionid: str) -> str:
"""Store a TradingView session ID for real-time data access.
Get your sessionid from:
1. Go to tradingview.com and log in
2. Open DevTools > Application > Cookies > tradingview.com
3. Copy the 'sessionid' cookie value
"""
global _session_cookies
_session_cookies = {"sessionid": sessionid}
return "Session ID stored successfully"
@mcp.tool()
def clear_session() -> str:
"""Clear stored TradingView session ID."""
global _session_cookies
_session_cookies = None
return "Session ID cleared"
if __name__ == "__main__":
mcp.run(transport="stdio")
+470
View File
@@ -0,0 +1,470 @@
from __future__ import annotations
import os
from unittest.mock import patch
import pytest
import server
from server import (
_apply_filters,
_build_condition,
_get_market_factory,
_resolve_cookies,
clear_session,
find_most_active,
find_top_gainers,
find_top_losers,
fundamental_scan,
get_stock_quotes,
list_fields,
list_markets,
screen_market,
set_session,
technical_scan,
)
from tradingview_screener.query import And, Or
# ---------------------------------------------------------------------------
# _resolve_cookies
# ---------------------------------------------------------------------------
class TestResolveCookies:
def test_per_call_takes_priority(self):
assert _resolve_cookies("per_call_id") == {"sessionid": "per_call_id"}
def test_in_memory_over_env(self):
server._session_cookies = {"sessionid": "mem_id"}
with patch.dict(os.environ, {"TV_SESSION_ID": "env_id"}, clear=True):
assert _resolve_cookies() == {"sessionid": "mem_id"}
server._session_cookies = None
def test_env_var_fallback(self):
server._session_cookies = None
with patch.dict(os.environ, {"TV_SESSION_ID": "env_id"}, clear=True):
assert _resolve_cookies() == {"sessionid": "env_id"}
def test_no_cookies(self):
server._session_cookies = None
with patch.dict(os.environ, {}, clear=True):
assert _resolve_cookies() is None
# ---------------------------------------------------------------------------
# _build_condition
# ---------------------------------------------------------------------------
def _assert_condition(cond: dict, left: str, operation: str, right: object = None):
"""Helper: check the inner expression dict, handling nested 'expression' keys."""
inner = cond if "expression" not in cond else cond.get("expression", cond)
assert inner.get("left") == left, f"expected left={left}, got {inner.get('left')}"
assert inner.get("operation") == operation, f"expected operation={operation}, got {inner.get('operation')}"
if right is not None or "right" in inner:
assert inner.get("right") == right, f"expected right={right}, got {inner.get('right')}"
class TestBuildCondition:
def test_greater(self):
c = _build_condition({"field": "RSI", "operator": ">", "value": 70})
_assert_condition(c, "RSI", "greater", 70)
def test_greater_alt(self):
c = _build_condition({"field": "RSI", "operator": "greater", "value": 70})
_assert_condition(c, "RSI", "greater", 70)
def test_greater_equal(self):
c = _build_condition({"field": "RSI", "operator": ">=", "value": 70})
_assert_condition(c, "RSI", "egreater", 70)
def test_less(self):
c = _build_condition({"field": "RSI", "operator": "<", "value": 30})
_assert_condition(c, "RSI", "less", 30)
def test_equal(self):
c = _build_condition({"field": "close", "operator": "==", "value": 100})
_assert_condition(c, "close", "equal", 100)
def test_not_equal(self):
c = _build_condition({"field": "sector", "operator": "!=", "value": "Finance"})
_assert_condition(c, "sector", "nequal", "Finance")
def test_between(self):
c = _build_condition({"field": "close", "operator": "between", "value": [100, 200]})
_assert_condition(c, "close", "in_range", [100, 200])
def test_not_between(self):
c = _build_condition({"field": "close", "operator": "not_between", "value": [50, 100]})
_assert_condition(c, "close", "not_in_range", [50, 100])
def test_isin(self):
c = _build_condition({"field": "exchange", "operator": "isin", "value": ["NASDAQ", "NYSE"]})
_assert_condition(c, "exchange", "in_range", ["NASDAQ", "NYSE"])
def test_not_in(self):
c = _build_condition({"field": "exchange", "operator": "not_in", "value": ["OTC"]})
_assert_condition(c, "exchange", "not_in_range", ["OTC"])
def test_has(self):
c = _build_condition({"field": "description", "operator": "has", "value": "tech"})
_assert_condition(c, "description", "has", "tech")
def test_has_none_of(self):
c = _build_condition({"field": "description", "operator": "has_none_of", "value": ["test"]})
_assert_condition(c, "description", "has_none_of", ["test"])
def test_like(self):
c = _build_condition({"field": "name", "operator": "like", "value": "AAPL"})
_assert_condition(c, "name", "match", "AAPL")
def test_not_like(self):
c = _build_condition({"field": "name", "operator": "not_like", "value": "TEST"})
_assert_condition(c, "name", "nmatch", "TEST")
def test_empty(self):
c = _build_condition({"field": "description", "operator": "empty"})
_assert_condition(c, "description", "empty", None)
def test_not_empty(self):
c = _build_condition({"field": "description", "operator": "not_empty"})
_assert_condition(c, "description", "nempty", None)
def test_crosses(self):
c = _build_condition({"field": "MACD.macd", "operator": "crosses", "value": "MACD.signal"})
_assert_condition(c, "MACD.macd", "crosses", "MACD.signal")
def test_crosses_above(self):
c = _build_condition({"field": "MACD.macd", "operator": "crosses_above", "value": "MACD.signal"})
_assert_condition(c, "MACD.macd", "crosses_above", "MACD.signal")
def test_crosses_below(self):
c = _build_condition({"field": "MACD.macd", "operator": "crosses_below", "value": "MACD.signal"})
_assert_condition(c, "MACD.macd", "crosses_below", "MACD.signal")
def test_above_pct(self):
c = _build_condition({"field": "close", "operator": "above_pct", "value": ["SMA50", 1.02]})
_assert_condition(c, "close", "above%", ["SMA50", 1.02])
def test_below_pct(self):
c = _build_condition({"field": "close", "operator": "below_pct", "value": ["SMA50", 0.98]})
_assert_condition(c, "close", "below%", ["SMA50", 0.98])
def test_between_pct(self):
c = _build_condition({"field": "close", "operator": "between_pct", "value": ["SMA50", 0.98, 1.02]})
_assert_condition(c, "close", "in_range%", ["SMA50", 0.98, 1.02])
def test_in_day_range(self):
c = _build_condition({"field": "RSI", "operator": "in_day_range", "value": [30, 70]})
_assert_condition(c, "RSI", "in_day_range", [30, 70])
def test_in_week_range(self):
c = _build_condition({"field": "RSI", "operator": "in_week_range", "value": [30, 70]})
_assert_condition(c, "RSI", "in_week_range", [30, 70])
def test_in_month_range(self):
c = _build_condition({"field": "RSI", "operator": "in_month_range", "value": [30, 70]})
_assert_condition(c, "RSI", "in_month_range", [30, 70])
def test_or_group(self):
c = _build_condition({
"operator": "or",
"filters": [
{"field": "RSI", "operator": ">", "value": 70},
{"field": "RSI", "operator": "<", "value": 30},
],
})
assert callable(Or)
assert "operation" in c
assert c["operation"]["operator"] == "or"
assert len(c["operation"]["operands"]) == 2
def test_and_group(self):
c = _build_condition({
"operator": "and",
"filters": [
{"field": "close", "operator": ">", "value": 100},
{"field": "volume", "operator": ">", "value": 1000000},
],
})
assert callable(And)
assert c["operation"]["operator"] == "and"
assert len(c["operation"]["operands"]) == 2
def test_nested_groups(self):
c = _build_condition({
"operator": "or",
"filters": [
{
"operator": "and",
"filters": [
{"field": "RSI", "operator": ">", "value": 70},
{"field": "volume", "operator": ">", "value": 1000000},
],
},
{"field": "close", "operator": "<", "value": 10},
],
})
op = c["operation"]
assert op["operator"] == "or"
assert len(op["operands"]) == 2
def test_invalid_filter_raises(self):
with pytest.raises(ValueError, match="Invalid filter"):
_build_condition({"invalid": "data"})
# ---------------------------------------------------------------------------
# _apply_filters
# ---------------------------------------------------------------------------
class TestApplyFilters:
def test_none_filters(self):
q = _get_market_factory("stocks", "america")
original_query = dict(q.query)
result = _apply_filters(q, None)
assert result.query == original_query
def test_empty_filters(self):
q = _get_market_factory("stocks", "america")
original_query = dict(q.query)
result = _apply_filters(q, [])
assert result.query == original_query
def test_simple_filters_only(self):
q = _get_market_factory("stocks", "america")
q = _apply_filters(q, [{"field": "close", "operator": ">", "value": 100}])
flt = q.query.get("filter", [])
assert isinstance(flt, list)
assert any(f.get("left") == "close" for f in flt)
def test_nested_filters_only(self):
q = _get_market_factory("stocks", "america")
q = _apply_filters(q, [
{"operator": "or", "filters": [
{"field": "RSI", "operator": ">", "value": 70},
{"field": "RSI", "operator": "<", "value": 30},
]},
])
f2 = q.query.get("filter2", {})
assert isinstance(f2, dict)
def test_mixed_filters(self):
q = _get_market_factory("stocks", "america")
q = _apply_filters(q, [
{"field": "close", "operator": ">", "value": 10},
{"operator": "or", "filters": [
{"field": "RSI", "operator": ">", "value": 70},
{"field": "RSI", "operator": "<", "value": 30},
]},
])
flt = q.query.get("filter", [])
assert any(f.get("left") == "close" for f in flt)
f2 = q.query.get("filter2", {})
assert "or" in str(f2)
def test_preserves_default_filter2(self):
q = _get_market_factory("stocks", "america")
q = _apply_filters(q, [
{"operator": "or", "filters": [
{"field": "RSI", "operator": ">", "value": 70},
{"field": "RSI", "operator": "<", "value": 30},
]},
])
f2 = q.query.get("filter2")
assert f2 is not None
assert f2.get("operator") == "and"
# ---------------------------------------------------------------------------
# _get_market_factory
# ---------------------------------------------------------------------------
class TestGetMarketFactory:
def test_stocks_default(self):
q = _get_market_factory("stocks", None)
assert "america" in str(q.query.get("markets", ""))
def test_stocks_with_country(self):
q = _get_market_factory("stocks", "india")
assert "india" in str(q.query.get("markets", ""))
def test_unknown_falls_back_to_stocks(self):
q = _get_market_factory("bogus", None)
assert "america" in str(q.query.get("markets", ""))
def test_crypto(self):
q = _get_market_factory("crypto", None)
m = str(q.query.get("markets", "")).lower()
assert "crypto" in m
def test_forex(self):
q = _get_market_factory("forex", None)
m = str(q.query.get("markets", "")).lower()
assert "forex" in m
def test_options(self):
q = _get_market_factory("options", None)
index_filters = q.query.get("index_filters", [])
assert any("ESM2026" in str(f) for f in index_filters)
# ---------------------------------------------------------------------------
# MCP Tools (with mocked _exec_query)
# ---------------------------------------------------------------------------
MOCK_RESULT = (1, [{"name": "TEST", "close": 150.0, "change": 2.5, "volume": 1000000}])
@pytest.fixture(autouse=True)
def reset_session():
server._session_cookies = None
yield
@patch("server._exec_query", return_value=MOCK_RESULT)
class TestMCPTools:
def test_get_stock_quotes(self, mock_exec):
result = get_stock_quotes(tickers=["NASDAQ:NVDA"])
assert "Found 1 result(s)" in result
assert "TEST" in result
mock_exec.assert_called_once()
def test_get_stock_quotes_with_columns(self, mock_exec):
result = get_stock_quotes(
tickers=["NASDAQ:AAPL"],
columns=["name", "close", "RSI"],
)
assert "Found 1 result(s)" in result
def test_get_stock_quotes_crypto(self, mock_exec):
result = get_stock_quotes(
tickers=["BINANCE:BTCUSDT"],
market_type="crypto",
)
assert "Found 1 result(s)" in result
def test_screen_market_no_filters(self, mock_exec):
result = screen_market(limit=10)
assert "Total: 1" in result
def test_screen_market_with_filters(self, mock_exec):
result = screen_market(
columns=["name", "close", "RSI"],
filters=[{"field": "RSI", "operator": ">", "value": 70}],
order_by="RSI",
limit=20,
)
assert "Total: 1" in result
def test_screen_market_nested_filters(self, mock_exec):
result = screen_market(
filters=[{"operator": "or", "filters": [
{"field": "RSI", "operator": ">", "value": 70},
{"field": "RSI", "operator": "<", "value": 30},
]}],
)
assert "Total: 1" in result
def test_screen_market_with_offset(self, mock_exec):
result = screen_market(limit=5, offset=10)
assert "Total: 1" in result
def test_screen_market_crypto(self, mock_exec):
result = screen_market(market_type="crypto", limit=5)
assert "Total: 1" in result
def test_find_top_gainers(self, mock_exec):
result = find_top_gainers(limit=10)
assert "Total: 1" in result
def test_find_top_gainers_with_min_price(self, mock_exec):
result = find_top_gainers(limit=10, min_price=10.0)
assert "Total: 1" in result
def test_find_top_losers(self, mock_exec):
result = find_top_losers(limit=10)
assert "Total: 1" in result
def test_find_most_active(self, mock_exec):
result = find_most_active(limit=10)
assert "Total: 1" in result
def test_technical_scan(self, mock_exec):
result = technical_scan(
filters=[{"field": "RSI", "operator": "<", "value": 30}],
limit=10,
)
assert "Total: 1" in result
def test_technical_scan_with_custom_columns(self, mock_exec):
result = technical_scan(
filters=[{"field": "RSI", "operator": ">", "value": 70}],
columns=["name", "close", "RSI", "MACD.macd"],
limit=5,
)
assert "Total: 1" in result
def test_fundamental_scan(self, mock_exec):
result = fundamental_scan(
filters=[{"field": "market_cap_basic", "operator": ">", "value": 1e9}],
limit=10,
)
assert "Total: 1" in result
def test_fundamental_scan_with_custom_columns(self, mock_exec):
result = fundamental_scan(
filters=[{"field": "price_earnings_ttm", "operator": "<", "value": 20}],
columns=["name", "close", "price_earnings_ttm"],
)
assert "Total: 1" in result
class TestListTools:
def test_list_markets(self):
result = list_markets()
assert "asset_types" in result
assert "stock_countries" in result
assert "america" in result
assert "crypto" in result
def test_list_fields_all(self):
result = list_fields()
assert "price" in result
assert "technical" in result
assert "fundamental" in result
assert "general" in result
def test_list_fields_price(self):
result = list_fields(category="price")
assert "close" in result
assert "volume" in result
def test_list_fields_technical(self):
result = list_fields(category="technical")
assert "RSI" in result
assert "MACD.macd" in result
def test_list_fields_fundamental(self):
result = list_fields(category="fundamental")
assert "market_cap_basic" in result
assert "price_earnings_ttm" in result
def test_list_fields_invalid_category(self):
result = list_fields(category="bogus")
assert "not found" in result
class TestSessionTools:
def test_set_and_clear_session(self):
result = set_session("test_session_123")
assert "stored" in result
assert _resolve_cookies() == {"sessionid": "test_session_123"}
result2 = clear_session()
assert "cleared" in result2
with patch.dict(os.environ, {}, clear=True):
assert _resolve_cookies() is None
def test_clear_session_when_none(self):
server._session_cookies = None
result = clear_session()
assert "cleared" in result