commit 38f9b980eceb8adf1591d5ecd1730297f5552c16 Author: achmad Date: Sun May 17 11:09:26 2026 +0700 Initial commit: MCP server wrapping tradingview-screener as MCP tools - 11 MCP tools: get_stock_quotes, screen_market, find_top_gainers, find_top_losers, find_most_active, technical_scan, fundamental_scan, list_markets, list_fields, set_session, clear_session - Structured filter conditions with nested And/Or groups - Session cookie resolution: per-call > in-memory > env var - auto-load .env via python-dotenv - 69 unit tests covering helpers, conditions, filters, and all tools diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..756a8a3 --- /dev/null +++ b/.env.example @@ -0,0 +1,3 @@ +# TradingView session ID for real-time data access +# Get it from: DevTools > Application > Cookies > tradingview.com > sessionid +TV_SESSION_ID= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ecacee5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.env +.venv/ +__pycache__/ +.pytest_cache/ +*.pyc +.vscode/ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6b9ca18 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,10 @@ +[project] +name = "tradingview-screener-mcp" +version = "0.1.0" +description = "MCP server for TradingView stock screener" +requires-python = ">=3.11" +dependencies = [ + "mcp>=1.0.0", + "tradingview-screener>=3.0.0", + "python-dotenv>=1.0.0", +] diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 0000000..8fd8643 --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,4 @@ +{ + "venvPath": ".", + "venv": ".venv" +} diff --git a/server.py b/server.py new file mode 100644 index 0000000..453ffbf --- /dev/null +++ b/server.py @@ -0,0 +1,522 @@ +from __future__ import annotations + +import os +from typing import Any, cast + +from dotenv import load_dotenv + +load_dotenv() + +from mcp.server.fastmcp import FastMCP +from tradingview_screener.column import col +from tradingview_screener.query import And, Or, Query +from tradingview_screener.screeners import ( + bond, + cfd, + coin, + crypto, + crypto_dex, + forex, + futures, + options, + stocks, +) + +mcp = FastMCP("tradingview-screener") + +_session_cookies: dict[str, str] | None = None + + +def _resolve_cookies(sessionid: str | None = None) -> dict[str, str] | None: + if sessionid: + return {"sessionid": sessionid} + if _session_cookies is not None: + return _session_cookies + env = os.environ.get("TV_SESSION_ID") + if env: + return {"sessionid": env} + return None + + +def _exec_query( + q: Query, + sessionid: str | None = None, +) -> tuple[int, list[dict[str, Any]]]: + cookies = _resolve_cookies(sessionid) + total, df = q.get_scanner_data(cookies=cookies) + return total, cast("list[dict[str, Any]]", df.to_dict(orient="records")) + + +def _build_condition(f: dict[str, Any]) -> Any: + if "field" in f: + c = col(f["field"]) + op = f["operator"] + if op in ("empty", "not_empty"): + return c.empty() if op == "empty" else c.not_empty() + val = f["value"] + match op: + case ">" | "greater": + return c > val + case ">=" | "egreater": + return c >= val + case "<" | "less": + return c < val + case "<=" | "eless": + return c <= val + case "==" | "equal": + return c == val + case "!=" | "nequal": + return c != val + case "between": + return c.between(val[0], val[1]) + case "not_between": + return c.not_between(val[0], val[1]) + case "isin": + return c.isin(val) + case "not_in": + return c.not_in(val) + case "has": + return c.has(val) + case "has_none_of": + return c.has_none_of(val) + case "like": + return c.like(val) + case "not_like": + return c.not_like(val) + case "crosses": + return c.crosses(val) + case "crosses_above": + return c.crosses_above(val) + case "crosses_below": + return c.crosses_below(val) + case "above_pct": + return c.above_pct(val[0], val[1]) + case "below_pct": + return c.below_pct(val[0], val[1]) + case "between_pct": + return c.between_pct(val[0], val[1], val[2]) + case "in_day_range": + return c.in_day_range(val[0], val[1]) + case "in_week_range": + return c.in_week_range(val[0], val[1]) + case "in_month_range": + return c.in_month_range(val[0], val[1]) + if "operator" in f and "filters" in f: + sub = [_build_condition(sf) for sf in f["filters"]] + if f["operator"] == "or": + return Or(*sub) + return And(*sub) + msg = f"Invalid filter: {f}" + raise ValueError(msg) + + +def _apply_filters(q: Query, filters: list[dict[str, Any]] | None) -> Query: + if not filters: + return q + + simple: list[Any] = [] + nested: list[Any] = [] + for f in filters: + if "operator" in f and "filters" in f: + nested.append(_build_condition(f)) + else: + simple.append(_build_condition(f)) + + if simple: + q = q.where(*simple) + + if nested: + default_filter2 = q.query.get("filter2") + user_op = nested[0] if len(nested) == 1 else And(*nested) + user_filter2 = user_op["operation"] + if default_filter2: + q.query["filter2"] = { + "operator": "and", + "operands": [{"operation": user_filter2}, {"operation": default_filter2}], + } + else: + q.query["filter2"] = user_filter2 + + return q + + +MARKET_FACTORIES: dict[str, Any] = { + "stocks": stocks, + "crypto": crypto, + "crypto_dex": crypto_dex, + "coin": coin, + "forex": forex, + "futures": futures, + "bond": bond, + "cfd": cfd, + "options": options, +} + +STOCK_COUNTRY_MARKETS: list[str] = [ + "america", "argentina", "australia", "austria", "bangladesh", "belgium", + "brazil", "bulgaria", "canada", "chile", "china", "colombia", "croatia", + "cyprus", "czech", "denmark", "egypt", "estonia", "finland", "france", + "germany", "ghana", "greece", "hongkong", "hungary", "iceland", "india", + "indonesia", "ireland", "israel", "italy", "japan", "jordan", "kazakhstan", + "kenya", "kuwait", "latvia", "lithuania", "luxembourg", "malaysia", + "malta", "mexico", "morocco", "netherlands", "newzealand", "nigeria", + "norway", "oman", "pakistan", "peru", "philippines", "poland", "portugal", + "qatar", "romania", "saudiarabia", "serbia", "singapore", "slovakia", + "slovenia", "southafrica", "southkorea", "spain", "srilanka", "sweden", + "switzerland", "taiwan", "thailand", "tunisia", "turkey", "uganda", + "ukraine", "unitedarabemirates", "unitedkingdom", "vietnam", +] + +FIELD_CATEGORIES: dict[str, list[dict[str, str]]] = { + "price": [ + {"name": "open", "description": "Open price"}, + {"name": "high", "description": "High price"}, + {"name": "low", "description": "Low price"}, + {"name": "close", "description": "Close price"}, + {"name": "volume", "description": "Trading volume"}, + {"name": "change", "description": "Percent change"}, + {"name": "change_abs", "description": "Absolute change"}, + {"name": "premarket_change", "description": "Premarket percent change"}, + {"name": "postmarket_change", "description": "Postmarket percent change"}, + {"name": "VWAP", "description": "Volume Weighted Average Price"}, + {"name": "52 Week High", "description": "52-week high price"}, + {"name": "52 Week Low", "description": "52-week low price"}, + ], + "technical": [ + {"name": "RSI", "description": "Relative Strength Index (14)"}, + {"name": "RSI.5", "description": "RSI (5)"}, + {"name": "Stoch.K", "description": "Stochastic %K"}, + {"name": "Stoch.D", "description": "Stochastic %D"}, + {"name": "MACD.macd", "description": "MACD line"}, + {"name": "MACD.signal", "description": "MACD signal line"}, + {"name": "MACD.histogram", "description": "MACD histogram"}, + {"name": "BB.upper", "description": "Bollinger Band upper"}, + {"name": "BB.middle", "description": "Bollinger Band middle (SMA 20)"}, + {"name": "BB.lower", "description": "Bollinger Band lower"}, + {"name": "SMA", "description": "Simple Moving Average"}, + {"name": "EMA", "description": "Exponential Moving Average"}, + {"name": "SMA20", "description": "SMA 20"}, + {"name": "SMA50", "description": "SMA 50"}, + {"name": "SMA200", "description": "SMA 200"}, + {"name": "EMA5", "description": "EMA 5"}, + {"name": "EMA20", "description": "EMA 20"}, + {"name": "EMA50", "description": "EMA 50"}, + {"name": "EMA200", "description": "EMA 200"}, + {"name": "ADX", "description": "Average Directional Index (14)"}, + {"name": "ATR", "description": "Average True Range"}, + {"name": "AO", "description": "Awesome Oscillator"}, + {"name": "OBV", "description": "On Balance Volume"}, + {"name": "CCI20", "description": "Commodity Channel Index (20)"}, + {"name": "ROC", "description": "Rate of Change"}, + {"name": "Williams %R", "description": "Williams Percent Range (14)"}, + {"name": "relative_volume_10d_calc", "description": "Relative volume vs 10-day average"}, + {"name": "volume_ma_10", "description": "Volume moving average (10)"}, + {"name": "TechRating_1D", "description": "Technical rating (1 day)"}, + ], + "fundamental": [ + {"name": "market_cap_basic", "description": "Market cap"}, + {"name": "price_earnings_ttm", "description": "Price/Earnings (TTM)"}, + {"name": "earnings_per_share_diluted_ttm", "description": "EPS diluted (TTM)"}, + {"name": "earnings_per_share_diluted_yoy_growth_ttm", "description": "EPS YoY growth (TTM)"}, + {"name": "dividends_yield_current", "description": "Dividend yield"}, + {"name": "dividends_payout_ratio", "description": "Dividend payout ratio"}, + {"name": "price_to_book_fq", "description": "Price/Book"}, + {"name": "price_to_sales", "description": "Price/Sales"}, + {"name": "price_to_cash_flow", "description": "Price/Cash Flow"}, + {"name": "return_on_equity", "description": "Return on Equity"}, + {"name": "return_on_assets", "description": "Return on Assets"}, + {"name": "operating_margin", "description": "Operating margin"}, + {"name": "profit_margin", "description": "Profit margin"}, + {"name": "revenue_growth", "description": "Revenue growth"}, + {"name": "earnings_growth", "description": "Earnings growth"}, + {"name": "debt_to_equity", "description": "Debt/Equity"}, + {"name": "current_ratio_fq", "description": "Current ratio"}, + {"name": "beta_1_year", "description": "Beta (1 year)"}, + {"name": "float", "description": "Shares float"}, + {"name": "shares_outstanding", "description": "Shares outstanding"}, + {"name": "short_ratio_fq", "description": "Short ratio"}, + {"name": "short_float", "description": "Short % of float"}, + {"name": "insider_ownership", "description": "Insider ownership"}, + {"name": "institutional_ownership", "description": "Institutional ownership"}, + {"name": "price_earnings_ttm", "description": "P/E ratio (TTM)"}, + {"name": "AnalystRating", "description": "Analyst rating (numeric)"}, + ], + "general": [ + {"name": "name", "description": "Short name / ticker"}, + {"name": "description", "description": "Company description"}, + {"name": "ticker", "description": "Full ticker (EXCHANGE:SYMBOL)"}, + {"name": "exchange", "description": "Exchange name"}, + {"name": "market", "description": "Market/country"}, + {"name": "sector", "description": "Sector"}, + {"name": "industry", "description": "Industry"}, + {"name": "country", "description": "Country"}, + {"name": "currency", "description": "Quote currency"}, + {"name": "type", "description": "Instrument type"}, + {"name": "typespecs", "description": "Instrument sub-type"}, + {"name": "is_primary", "description": "Primary listing flag"}, + ], +} + + +def _get_market_factory(market_type: str, country_or_param: str | None = None) -> Query: + if market_type == "stocks" or market_type not in MARKET_FACTORIES: + fact = stocks + return fact(country_or_param or "america") + fact = MARKET_FACTORIES[market_type] + if market_type == "options": + return fact(country_or_param or "CME_MINI:ESM2026") + return fact() + + +# --------------------------------------------------------------------------- +# Tools +# --------------------------------------------------------------------------- + + +@mcp.tool() +def get_stock_quotes( + tickers: list[str], + columns: list[str] | None = None, + market_type: str = "stocks", + market_country: str | None = None, + sessionid: str | None = None, +) -> str: + """Get quote data for specific ticker symbols across any market. + + Examples: + - get_stock_quotes(tickers=["NASDAQ:NVDA", "NASDAQ:AAPL"]) + - get_stock_quotes(tickers=["BINANCE:BTCUSDT"], market_type="crypto") + - get_stock_quotes(tickers=["NYSE:SPY"], columns=["name","close","volume","RSI"]) + """ + q = _get_market_factory(market_type, market_country) + if columns: + q = q.select(*columns) + q = q.set_tickers(*tickers) + total, data = _exec_query(q, sessionid) + return f"Found {total} result(s)\n{data}" + + +@mcp.tool() +def screen_market( + columns: list[str] | None = None, + filters: list[dict[str, Any]] | None = None, + order_by: str | None = None, + ascending: bool = False, + limit: int = 50, + offset: int = 0, + market_type: str = "stocks", + market_country: str | None = None, + sessionid: str | None = None, +) -> str: + """General-purpose screener across any market type. + + Parameters: + columns: Field names to return (e.g. ["name","close","RSI","volume"]). + filters: Structured filter conditions. + Simple condition: {"field": "", "operator": "", "value": } + Nested group: {"operator": "and"|"or", "filters": []} + Operators: >, >=, <, <=, ==, !=, between, not_between, isin, not_in, + has, has_none_of, like, not_like, empty, not_empty, + crosses, crosses_above, crosses_below, + above_pct, below_pct, between_pct, + in_day_range, in_week_range, in_month_range + Examples: + [{"field": "RSI", "operator": ">", "value": 70}] + [{"field": "close", "operator": "between", "value": [100, 200]}] + [{"field": "exchange", "operator": "isin", "value": ["NASDAQ","NYSE"]}] + [{"field": "MACD.macd", "operator": "crosses_above", "value": "MACD.signal"}] + Nested: [{"operator": "or", "filters": [ + {"field": "RSI", "operator": ">", "value": 70}, + {"field": "RSI", "operator": "<", "value": 30} + ]}] + order_by: Field to sort by (e.g. "volume", "market_cap_basic"). + ascending: Sort ascending (default False = descending). + limit: Max rows to return (1-1000, default 50). + offset: Row offset for pagination. + market_type: Market type: stocks, crypto, forex, futures, bond, cfd, coin, crypto_dex, options. + market_country: For stocks, a country name (e.g. "america", "india", "germany"). + For options, the underlying symbol (e.g. "CME_MINI:ESM2026"). + sessionid: Optional TradingView session ID for real-time data. + """ + q = _get_market_factory(market_type, market_country) + if columns: + q = q.select(*columns) + q = _apply_filters(q, filters) + if order_by: + q = q.order_by(order_by, ascending=ascending) + q = q.limit(limit).offset(offset) + total, data = _exec_query(q, sessionid) + return f"Total: {total}\n{data}" + + +@mcp.tool() +def find_top_gainers( + limit: int = 20, + min_price: float | None = None, + market_type: str = "stocks", + market_country: str | None = None, + sessionid: str | None = None, +) -> str: + """Find top gainers by percent change in any market.""" + q = _get_market_factory(market_type, market_country) + q = q.select("name", "close", "change", "change_abs", "volume") + filters = [] + if min_price is not None: + filters.append({"field": "close", "operator": ">", "value": min_price}) + q = _apply_filters(q, filters) + q = q.order_by("change", ascending=False).limit(limit) + total, data = _exec_query(q, sessionid) + return f"Total: {total}\n{data}" + + +@mcp.tool() +def find_top_losers( + limit: int = 20, + min_price: float | None = None, + market_type: str = "stocks", + market_country: str | None = None, + sessionid: str | None = None, +) -> str: + """Find top losers by percent change in any market.""" + q = _get_market_factory(market_type, market_country) + q = q.select("name", "close", "change", "change_abs", "volume") + filters = [] + if min_price is not None: + filters.append({"field": "close", "operator": ">", "value": min_price}) + q = _apply_filters(q, filters) + q = q.order_by("change", ascending=True).limit(limit) + total, data = _exec_query(q, sessionid) + return f"Total: {total}\n{data}" + + +@mcp.tool() +def find_most_active( + limit: int = 20, + min_price: float | None = None, + market_type: str = "stocks", + market_country: str | None = None, + sessionid: str | None = None, +) -> str: + """Find most active instruments by volume.""" + q = _get_market_factory(market_type, market_country) + q = q.select("name", "close", "change", "volume") + filters = [] + if min_price is not None: + filters.append({"field": "close", "operator": ">", "value": min_price}) + q = _apply_filters(q, filters) + q = q.order_by("volume", ascending=False).limit(limit) + total, data = _exec_query(q, sessionid) + return f"Total: {total}\n{data}" + + +@mcp.tool() +def technical_scan( + filters: list[dict[str, Any]], + columns: list[str] | None = None, + limit: int = 50, + market_type: str = "stocks", + market_country: str | None = None, + sessionid: str | None = None, +) -> str: + """Screen using technical indicator conditions. + + Example filters: + [{"field": "RSI", "operator": "<", "value": 30}, # oversold + {"field": "MACD.macd", "operator": "crosses_above", "value": "MACD.signal"}, # bullish MACD + {"field": "close", "operator": "above_pct", "value": ["SMA50", 1.02]}] # 2% above SMA50 + """ + default_cols = ["name", "close", "change", "volume", "RSI", "MACD.macd", "SMA50", "VWAP"] + q = _get_market_factory(market_type, market_country) + q = q.select(*(columns or default_cols)) + q = _apply_filters(q, filters) + q = q.limit(limit) + total, data = _exec_query(q, sessionid) + return f"Total: {total}\n{data}" + + +@mcp.tool() +def fundamental_scan( + filters: list[dict[str, Any]], + columns: list[str] | None = None, + limit: int = 50, + market_type: str = "stocks", + market_country: str | None = None, + sessionid: str | None = None, +) -> str: + """Screen by fundamental metrics (market cap, P/E, dividend, sector, etc.). + + Example filters: + [{"field": "market_cap_basic", "operator": "between", "value": [1e9, 1e11}], + {"field": "price_earnings_ttm", "operator": "<", "value": 20}, + {"field": "dividends_yield_current", "operator": ">", "value": 2}, + {"field": "sector", "operator": "isin", "value": ["Technology", "Healthcare"]}] + """ + default_cols = ["name", "close", "market_cap_basic", "price_earnings_ttm", + "dividends_yield_current", "sector", "industry"] + q = _get_market_factory(market_type, market_country) + q = q.select(*(columns or default_cols)) + q = _apply_filters(q, filters) + q = q.limit(limit) + total, data = _exec_query(q, sessionid) + return f"Total: {total}\n{data}" + + +@mcp.tool() +def list_markets() -> str: + """List all available market types and stock-country markets.""" + markets = { + "asset_types": [ + {"id": "stocks", "description": "Stocks (common, preferred, DRs, funds) — use market_country param"}, + {"id": "crypto", "description": "Centralised-exchange crypto pairs"}, + {"id": "crypto_dex", "description": "Decentralised-exchange crypto pairs (USD)"}, + {"id": "coin", "description": "CoinMarketCap crypto coins"}, + {"id": "forex", "description": "Forex currency pairs"}, + {"id": "futures", "description": "Futures contracts"}, + {"id": "bond", "description": "Bonds"}, + {"id": "cfd", "description": "Contracts for Difference"}, + {"id": "options", "description": "Options (use market_country param for underlying symbol)"}, + ], + "stock_countries": STOCK_COUNTRY_MARKETS, + } + return str(markets) + + +@mcp.tool() +def list_fields(category: str | None = None) -> str: + """List available screener fields, optionally filtered by category. + + Categories: price, technical, fundamental, general + If no category given, returns all categories. + """ + if category: + cat = category.lower() + if cat in FIELD_CATEGORIES: + return str(FIELD_CATEGORIES[cat]) + return f"Category '{category}' not found. Available: price, technical, fundamental, general" + return str(FIELD_CATEGORIES) + + +@mcp.tool() +def set_session(sessionid: str) -> str: + """Store a TradingView session ID for real-time data access. + + Get your sessionid from: + 1. Go to tradingview.com and log in + 2. Open DevTools > Application > Cookies > tradingview.com + 3. Copy the 'sessionid' cookie value + """ + global _session_cookies + _session_cookies = {"sessionid": sessionid} + return "Session ID stored successfully" + + +@mcp.tool() +def clear_session() -> str: + """Clear stored TradingView session ID.""" + global _session_cookies + _session_cookies = None + return "Session ID cleared" + + +if __name__ == "__main__": + mcp.run(transport="stdio") diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..d83ffc5 --- /dev/null +++ b/tests/test_server.py @@ -0,0 +1,470 @@ +from __future__ import annotations + +import os +from unittest.mock import patch + +import pytest +import server +from server import ( + _apply_filters, + _build_condition, + _get_market_factory, + _resolve_cookies, + clear_session, + find_most_active, + find_top_gainers, + find_top_losers, + fundamental_scan, + get_stock_quotes, + list_fields, + list_markets, + screen_market, + set_session, + technical_scan, +) +from tradingview_screener.query import And, Or + + +# --------------------------------------------------------------------------- +# _resolve_cookies +# --------------------------------------------------------------------------- + +class TestResolveCookies: + def test_per_call_takes_priority(self): + assert _resolve_cookies("per_call_id") == {"sessionid": "per_call_id"} + + def test_in_memory_over_env(self): + server._session_cookies = {"sessionid": "mem_id"} + with patch.dict(os.environ, {"TV_SESSION_ID": "env_id"}, clear=True): + assert _resolve_cookies() == {"sessionid": "mem_id"} + server._session_cookies = None + + def test_env_var_fallback(self): + server._session_cookies = None + with patch.dict(os.environ, {"TV_SESSION_ID": "env_id"}, clear=True): + assert _resolve_cookies() == {"sessionid": "env_id"} + + def test_no_cookies(self): + server._session_cookies = None + with patch.dict(os.environ, {}, clear=True): + assert _resolve_cookies() is None + + +# --------------------------------------------------------------------------- +# _build_condition +# --------------------------------------------------------------------------- + +def _assert_condition(cond: dict, left: str, operation: str, right: object = None): + """Helper: check the inner expression dict, handling nested 'expression' keys.""" + inner = cond if "expression" not in cond else cond.get("expression", cond) + assert inner.get("left") == left, f"expected left={left}, got {inner.get('left')}" + assert inner.get("operation") == operation, f"expected operation={operation}, got {inner.get('operation')}" + if right is not None or "right" in inner: + assert inner.get("right") == right, f"expected right={right}, got {inner.get('right')}" + + +class TestBuildCondition: + def test_greater(self): + c = _build_condition({"field": "RSI", "operator": ">", "value": 70}) + _assert_condition(c, "RSI", "greater", 70) + + def test_greater_alt(self): + c = _build_condition({"field": "RSI", "operator": "greater", "value": 70}) + _assert_condition(c, "RSI", "greater", 70) + + def test_greater_equal(self): + c = _build_condition({"field": "RSI", "operator": ">=", "value": 70}) + _assert_condition(c, "RSI", "egreater", 70) + + def test_less(self): + c = _build_condition({"field": "RSI", "operator": "<", "value": 30}) + _assert_condition(c, "RSI", "less", 30) + + def test_equal(self): + c = _build_condition({"field": "close", "operator": "==", "value": 100}) + _assert_condition(c, "close", "equal", 100) + + def test_not_equal(self): + c = _build_condition({"field": "sector", "operator": "!=", "value": "Finance"}) + _assert_condition(c, "sector", "nequal", "Finance") + + def test_between(self): + c = _build_condition({"field": "close", "operator": "between", "value": [100, 200]}) + _assert_condition(c, "close", "in_range", [100, 200]) + + def test_not_between(self): + c = _build_condition({"field": "close", "operator": "not_between", "value": [50, 100]}) + _assert_condition(c, "close", "not_in_range", [50, 100]) + + def test_isin(self): + c = _build_condition({"field": "exchange", "operator": "isin", "value": ["NASDAQ", "NYSE"]}) + _assert_condition(c, "exchange", "in_range", ["NASDAQ", "NYSE"]) + + def test_not_in(self): + c = _build_condition({"field": "exchange", "operator": "not_in", "value": ["OTC"]}) + _assert_condition(c, "exchange", "not_in_range", ["OTC"]) + + def test_has(self): + c = _build_condition({"field": "description", "operator": "has", "value": "tech"}) + _assert_condition(c, "description", "has", "tech") + + def test_has_none_of(self): + c = _build_condition({"field": "description", "operator": "has_none_of", "value": ["test"]}) + _assert_condition(c, "description", "has_none_of", ["test"]) + + def test_like(self): + c = _build_condition({"field": "name", "operator": "like", "value": "AAPL"}) + _assert_condition(c, "name", "match", "AAPL") + + def test_not_like(self): + c = _build_condition({"field": "name", "operator": "not_like", "value": "TEST"}) + _assert_condition(c, "name", "nmatch", "TEST") + + def test_empty(self): + c = _build_condition({"field": "description", "operator": "empty"}) + _assert_condition(c, "description", "empty", None) + + def test_not_empty(self): + c = _build_condition({"field": "description", "operator": "not_empty"}) + _assert_condition(c, "description", "nempty", None) + + def test_crosses(self): + c = _build_condition({"field": "MACD.macd", "operator": "crosses", "value": "MACD.signal"}) + _assert_condition(c, "MACD.macd", "crosses", "MACD.signal") + + def test_crosses_above(self): + c = _build_condition({"field": "MACD.macd", "operator": "crosses_above", "value": "MACD.signal"}) + _assert_condition(c, "MACD.macd", "crosses_above", "MACD.signal") + + def test_crosses_below(self): + c = _build_condition({"field": "MACD.macd", "operator": "crosses_below", "value": "MACD.signal"}) + _assert_condition(c, "MACD.macd", "crosses_below", "MACD.signal") + + def test_above_pct(self): + c = _build_condition({"field": "close", "operator": "above_pct", "value": ["SMA50", 1.02]}) + _assert_condition(c, "close", "above%", ["SMA50", 1.02]) + + def test_below_pct(self): + c = _build_condition({"field": "close", "operator": "below_pct", "value": ["SMA50", 0.98]}) + _assert_condition(c, "close", "below%", ["SMA50", 0.98]) + + def test_between_pct(self): + c = _build_condition({"field": "close", "operator": "between_pct", "value": ["SMA50", 0.98, 1.02]}) + _assert_condition(c, "close", "in_range%", ["SMA50", 0.98, 1.02]) + + def test_in_day_range(self): + c = _build_condition({"field": "RSI", "operator": "in_day_range", "value": [30, 70]}) + _assert_condition(c, "RSI", "in_day_range", [30, 70]) + + def test_in_week_range(self): + c = _build_condition({"field": "RSI", "operator": "in_week_range", "value": [30, 70]}) + _assert_condition(c, "RSI", "in_week_range", [30, 70]) + + def test_in_month_range(self): + c = _build_condition({"field": "RSI", "operator": "in_month_range", "value": [30, 70]}) + _assert_condition(c, "RSI", "in_month_range", [30, 70]) + + def test_or_group(self): + c = _build_condition({ + "operator": "or", + "filters": [ + {"field": "RSI", "operator": ">", "value": 70}, + {"field": "RSI", "operator": "<", "value": 30}, + ], + }) + assert callable(Or) + assert "operation" in c + assert c["operation"]["operator"] == "or" + assert len(c["operation"]["operands"]) == 2 + + def test_and_group(self): + c = _build_condition({ + "operator": "and", + "filters": [ + {"field": "close", "operator": ">", "value": 100}, + {"field": "volume", "operator": ">", "value": 1000000}, + ], + }) + assert callable(And) + assert c["operation"]["operator"] == "and" + assert len(c["operation"]["operands"]) == 2 + + def test_nested_groups(self): + c = _build_condition({ + "operator": "or", + "filters": [ + { + "operator": "and", + "filters": [ + {"field": "RSI", "operator": ">", "value": 70}, + {"field": "volume", "operator": ">", "value": 1000000}, + ], + }, + {"field": "close", "operator": "<", "value": 10}, + ], + }) + op = c["operation"] + assert op["operator"] == "or" + assert len(op["operands"]) == 2 + + def test_invalid_filter_raises(self): + with pytest.raises(ValueError, match="Invalid filter"): + _build_condition({"invalid": "data"}) + + +# --------------------------------------------------------------------------- +# _apply_filters +# --------------------------------------------------------------------------- + +class TestApplyFilters: + def test_none_filters(self): + q = _get_market_factory("stocks", "america") + original_query = dict(q.query) + result = _apply_filters(q, None) + assert result.query == original_query + + def test_empty_filters(self): + q = _get_market_factory("stocks", "america") + original_query = dict(q.query) + result = _apply_filters(q, []) + assert result.query == original_query + + def test_simple_filters_only(self): + q = _get_market_factory("stocks", "america") + q = _apply_filters(q, [{"field": "close", "operator": ">", "value": 100}]) + flt = q.query.get("filter", []) + assert isinstance(flt, list) + assert any(f.get("left") == "close" for f in flt) + + def test_nested_filters_only(self): + q = _get_market_factory("stocks", "america") + q = _apply_filters(q, [ + {"operator": "or", "filters": [ + {"field": "RSI", "operator": ">", "value": 70}, + {"field": "RSI", "operator": "<", "value": 30}, + ]}, + ]) + f2 = q.query.get("filter2", {}) + assert isinstance(f2, dict) + + def test_mixed_filters(self): + q = _get_market_factory("stocks", "america") + q = _apply_filters(q, [ + {"field": "close", "operator": ">", "value": 10}, + {"operator": "or", "filters": [ + {"field": "RSI", "operator": ">", "value": 70}, + {"field": "RSI", "operator": "<", "value": 30}, + ]}, + ]) + flt = q.query.get("filter", []) + assert any(f.get("left") == "close" for f in flt) + f2 = q.query.get("filter2", {}) + assert "or" in str(f2) + + def test_preserves_default_filter2(self): + q = _get_market_factory("stocks", "america") + q = _apply_filters(q, [ + {"operator": "or", "filters": [ + {"field": "RSI", "operator": ">", "value": 70}, + {"field": "RSI", "operator": "<", "value": 30}, + ]}, + ]) + f2 = q.query.get("filter2") + assert f2 is not None + assert f2.get("operator") == "and" + + +# --------------------------------------------------------------------------- +# _get_market_factory +# --------------------------------------------------------------------------- + +class TestGetMarketFactory: + def test_stocks_default(self): + q = _get_market_factory("stocks", None) + assert "america" in str(q.query.get("markets", "")) + + def test_stocks_with_country(self): + q = _get_market_factory("stocks", "india") + assert "india" in str(q.query.get("markets", "")) + + def test_unknown_falls_back_to_stocks(self): + q = _get_market_factory("bogus", None) + assert "america" in str(q.query.get("markets", "")) + + def test_crypto(self): + q = _get_market_factory("crypto", None) + m = str(q.query.get("markets", "")).lower() + assert "crypto" in m + + def test_forex(self): + q = _get_market_factory("forex", None) + m = str(q.query.get("markets", "")).lower() + assert "forex" in m + + def test_options(self): + q = _get_market_factory("options", None) + index_filters = q.query.get("index_filters", []) + assert any("ESM2026" in str(f) for f in index_filters) + + +# --------------------------------------------------------------------------- +# MCP Tools (with mocked _exec_query) +# --------------------------------------------------------------------------- + +MOCK_RESULT = (1, [{"name": "TEST", "close": 150.0, "change": 2.5, "volume": 1000000}]) + + +@pytest.fixture(autouse=True) +def reset_session(): + server._session_cookies = None + yield + + +@patch("server._exec_query", return_value=MOCK_RESULT) +class TestMCPTools: + def test_get_stock_quotes(self, mock_exec): + result = get_stock_quotes(tickers=["NASDAQ:NVDA"]) + assert "Found 1 result(s)" in result + assert "TEST" in result + mock_exec.assert_called_once() + + def test_get_stock_quotes_with_columns(self, mock_exec): + result = get_stock_quotes( + tickers=["NASDAQ:AAPL"], + columns=["name", "close", "RSI"], + ) + assert "Found 1 result(s)" in result + + def test_get_stock_quotes_crypto(self, mock_exec): + result = get_stock_quotes( + tickers=["BINANCE:BTCUSDT"], + market_type="crypto", + ) + assert "Found 1 result(s)" in result + + def test_screen_market_no_filters(self, mock_exec): + result = screen_market(limit=10) + assert "Total: 1" in result + + def test_screen_market_with_filters(self, mock_exec): + result = screen_market( + columns=["name", "close", "RSI"], + filters=[{"field": "RSI", "operator": ">", "value": 70}], + order_by="RSI", + limit=20, + ) + assert "Total: 1" in result + + def test_screen_market_nested_filters(self, mock_exec): + result = screen_market( + filters=[{"operator": "or", "filters": [ + {"field": "RSI", "operator": ">", "value": 70}, + {"field": "RSI", "operator": "<", "value": 30}, + ]}], + ) + assert "Total: 1" in result + + def test_screen_market_with_offset(self, mock_exec): + result = screen_market(limit=5, offset=10) + assert "Total: 1" in result + + def test_screen_market_crypto(self, mock_exec): + result = screen_market(market_type="crypto", limit=5) + assert "Total: 1" in result + + def test_find_top_gainers(self, mock_exec): + result = find_top_gainers(limit=10) + assert "Total: 1" in result + + def test_find_top_gainers_with_min_price(self, mock_exec): + result = find_top_gainers(limit=10, min_price=10.0) + assert "Total: 1" in result + + def test_find_top_losers(self, mock_exec): + result = find_top_losers(limit=10) + assert "Total: 1" in result + + def test_find_most_active(self, mock_exec): + result = find_most_active(limit=10) + assert "Total: 1" in result + + def test_technical_scan(self, mock_exec): + result = technical_scan( + filters=[{"field": "RSI", "operator": "<", "value": 30}], + limit=10, + ) + assert "Total: 1" in result + + def test_technical_scan_with_custom_columns(self, mock_exec): + result = technical_scan( + filters=[{"field": "RSI", "operator": ">", "value": 70}], + columns=["name", "close", "RSI", "MACD.macd"], + limit=5, + ) + assert "Total: 1" in result + + def test_fundamental_scan(self, mock_exec): + result = fundamental_scan( + filters=[{"field": "market_cap_basic", "operator": ">", "value": 1e9}], + limit=10, + ) + assert "Total: 1" in result + + def test_fundamental_scan_with_custom_columns(self, mock_exec): + result = fundamental_scan( + filters=[{"field": "price_earnings_ttm", "operator": "<", "value": 20}], + columns=["name", "close", "price_earnings_ttm"], + ) + assert "Total: 1" in result + + +class TestListTools: + def test_list_markets(self): + result = list_markets() + assert "asset_types" in result + assert "stock_countries" in result + assert "america" in result + assert "crypto" in result + + def test_list_fields_all(self): + result = list_fields() + assert "price" in result + assert "technical" in result + assert "fundamental" in result + assert "general" in result + + def test_list_fields_price(self): + result = list_fields(category="price") + assert "close" in result + assert "volume" in result + + def test_list_fields_technical(self): + result = list_fields(category="technical") + assert "RSI" in result + assert "MACD.macd" in result + + def test_list_fields_fundamental(self): + result = list_fields(category="fundamental") + assert "market_cap_basic" in result + assert "price_earnings_ttm" in result + + def test_list_fields_invalid_category(self): + result = list_fields(category="bogus") + assert "not found" in result + + +class TestSessionTools: + def test_set_and_clear_session(self): + result = set_session("test_session_123") + assert "stored" in result + assert _resolve_cookies() == {"sessionid": "test_session_123"} + + result2 = clear_session() + assert "cleared" in result2 + with patch.dict(os.environ, {}, clear=True): + assert _resolve_cookies() is None + + def test_clear_session_when_none(self): + server._session_cookies = None + result = clear_session() + assert "cleared" in result