Files
Achmad 79d1e0e538 Add date/time range support to get_historical_candles
- _fetch_candles now accepts explicit from_ts/to_ts timestamps; falls
  back to count-based window when omitted
- get_historical_candles accepts from_date/to_date in YYYY-MM-DD,
  YYYY-MM-DD HH:MM, or YYYY-MM-DD HH:MM:SS formats (all UTC)
- Yahoo Finance fallback switched from range= to period1/period2 so
  it respects the exact window in both modes
- Added 11 tests covering date range, datetime, end-of-day bump,
  invalid format, count cap, and pattern detection toggle
- gitignore uv.lock (Dockerfile uses pip, not uv sync)
2026-05-17 08:10:03 +00:00

1109 lines
43 KiB
Python

from __future__ import annotations
import argparse
import math
import os
import time
from typing import Any, cast
from dotenv import load_dotenv
load_dotenv()
import requests
from mcp.server.fastmcp import FastMCP
from tradingview_screener.column import col
from tradingview_screener.query import And, Or, Query
from tradingview_screener.screeners import (
bond,
cfd,
coin,
crypto,
crypto_dex,
forex,
futures,
options,
stocks,
)
mcp = FastMCP("tradingview-screener")
_session_cookies: dict[str, str] | None = None
def _resolve_cookies(sessionid: str | None = None) -> dict[str, str] | None:
if sessionid:
return {"sessionid": sessionid}
if _session_cookies is not None:
return _session_cookies
env = os.environ.get("TV_SESSION_ID")
if env:
return {"sessionid": env}
return None
def _exec_query(
q: Query,
sessionid: str | None = None,
) -> tuple[int, list[dict[str, Any]]]:
cookies = _resolve_cookies(sessionid)
total, df = q.get_scanner_data(cookies=cookies)
return total, cast("list[dict[str, Any]]", df.to_dict(orient="records"))
def _build_condition(f: dict[str, Any]) -> Any:
if "field" in f:
c = col(f["field"])
op = f["operator"]
if op in ("empty", "not_empty"):
return c.empty() if op == "empty" else c.not_empty()
val = f["value"]
match op:
case ">" | "greater":
return c > val
case ">=" | "egreater":
return c >= val
case "<" | "less":
return c < val
case "<=" | "eless":
return c <= val
case "==" | "equal":
return c == val
case "!=" | "nequal":
return c != val
case "between":
return c.between(val[0], val[1])
case "not_between":
return c.not_between(val[0], val[1])
case "isin":
return c.isin(val)
case "not_in":
return c.not_in(val)
case "has":
return c.has(val)
case "has_none_of":
return c.has_none_of(val)
case "like":
return c.like(val)
case "not_like":
return c.not_like(val)
case "crosses":
return c.crosses(val)
case "crosses_above":
return c.crosses_above(val)
case "crosses_below":
return c.crosses_below(val)
case "above_pct":
return c.above_pct(val[0], val[1])
case "below_pct":
return c.below_pct(val[0], val[1])
case "between_pct":
return c.between_pct(val[0], val[1], val[2])
case "in_day_range":
return c.in_day_range(val[0], val[1])
case "in_week_range":
return c.in_week_range(val[0], val[1])
case "in_month_range":
return c.in_month_range(val[0], val[1])
if "operator" in f and "filters" in f:
sub = [_build_condition(sf) for sf in f["filters"]]
if f["operator"] == "or":
return Or(*sub)
return And(*sub)
msg = f"Invalid filter: {f}"
raise ValueError(msg)
def _apply_filters(q: Query, filters: list[dict[str, Any]] | None) -> Query:
if not filters:
return q
simple: list[Any] = []
nested: list[Any] = []
for f in filters:
if "operator" in f and "filters" in f:
nested.append(_build_condition(f))
else:
simple.append(_build_condition(f))
if simple:
q = q.where(*simple)
if nested:
default_filter2 = q.query.get("filter2")
user_op = nested[0] if len(nested) == 1 else And(*nested)
user_filter2 = user_op["operation"]
if default_filter2:
q.query["filter2"] = {
"operator": "and",
"operands": [{"operation": user_filter2}, {"operation": default_filter2}],
}
else:
q.query["filter2"] = user_filter2
return q
MARKET_FACTORIES: dict[str, Any] = {
"stocks": stocks,
"crypto": crypto,
"crypto_dex": crypto_dex,
"coin": coin,
"forex": forex,
"futures": futures,
"bond": bond,
"cfd": cfd,
"options": options,
}
STOCK_COUNTRY_MARKETS: list[str] = [
"america", "argentina", "australia", "austria", "bangladesh", "belgium",
"brazil", "bulgaria", "canada", "chile", "china", "colombia", "croatia",
"cyprus", "czech", "denmark", "egypt", "estonia", "finland", "france",
"germany", "ghana", "greece", "hongkong", "hungary", "iceland", "india",
"indonesia", "ireland", "israel", "italy", "japan", "jordan", "kazakhstan",
"kenya", "kuwait", "latvia", "lithuania", "luxembourg", "malaysia",
"malta", "mexico", "morocco", "netherlands", "newzealand", "nigeria",
"norway", "oman", "pakistan", "peru", "philippines", "poland", "portugal",
"qatar", "romania", "saudiarabia", "serbia", "singapore", "slovakia",
"slovenia", "southafrica", "southkorea", "spain", "srilanka", "sweden",
"switzerland", "taiwan", "thailand", "tunisia", "turkey", "uganda",
"ukraine", "unitedarabemirates", "unitedkingdom", "vietnam",
]
FIELD_CATEGORIES: dict[str, list[dict[str, str]]] = {
"price": [
{"name": "open", "description": "Open price"},
{"name": "high", "description": "High price"},
{"name": "low", "description": "Low price"},
{"name": "close", "description": "Close price"},
{"name": "volume", "description": "Trading volume"},
{"name": "change", "description": "Percent change"},
{"name": "change_abs", "description": "Absolute change"},
{"name": "premarket_change", "description": "Premarket percent change"},
{"name": "postmarket_change", "description": "Postmarket percent change"},
{"name": "VWAP", "description": "Volume Weighted Average Price"},
{"name": "52 Week High", "description": "52-week high price"},
{"name": "52 Week Low", "description": "52-week low price"},
],
"technical": [
{"name": "RSI", "description": "Relative Strength Index (14)"},
{"name": "RSI.5", "description": "RSI (5)"},
{"name": "Stoch.K", "description": "Stochastic %K"},
{"name": "Stoch.D", "description": "Stochastic %D"},
{"name": "MACD.macd", "description": "MACD line"},
{"name": "MACD.signal", "description": "MACD signal line"},
{"name": "MACD.histogram", "description": "MACD histogram"},
{"name": "BB.upper", "description": "Bollinger Band upper"},
{"name": "BB.middle", "description": "Bollinger Band middle (SMA 20)"},
{"name": "BB.lower", "description": "Bollinger Band lower"},
{"name": "SMA", "description": "Simple Moving Average"},
{"name": "EMA", "description": "Exponential Moving Average"},
{"name": "SMA20", "description": "SMA 20"},
{"name": "SMA50", "description": "SMA 50"},
{"name": "SMA200", "description": "SMA 200"},
{"name": "EMA5", "description": "EMA 5"},
{"name": "EMA20", "description": "EMA 20"},
{"name": "EMA50", "description": "EMA 50"},
{"name": "EMA200", "description": "EMA 200"},
{"name": "ADX", "description": "Average Directional Index (14)"},
{"name": "ATR", "description": "Average True Range"},
{"name": "AO", "description": "Awesome Oscillator"},
{"name": "OBV", "description": "On Balance Volume"},
{"name": "CCI20", "description": "Commodity Channel Index (20)"},
{"name": "ROC", "description": "Rate of Change"},
{"name": "Williams %R", "description": "Williams Percent Range (14)"},
{"name": "relative_volume_10d_calc", "description": "Relative volume vs 10-day average"},
{"name": "volume_ma_10", "description": "Volume moving average (10)"},
{"name": "TechRating_1D", "description": "Technical rating (1 day)"},
],
"fundamental": [
{"name": "market_cap_basic", "description": "Market cap"},
{"name": "price_earnings_ttm", "description": "Price/Earnings (TTM)"},
{"name": "earnings_per_share_diluted_ttm", "description": "EPS diluted (TTM)"},
{"name": "earnings_per_share_diluted_yoy_growth_ttm", "description": "EPS YoY growth (TTM)"},
{"name": "dividends_yield_current", "description": "Dividend yield"},
{"name": "dividends_payout_ratio", "description": "Dividend payout ratio"},
{"name": "price_to_book_fq", "description": "Price/Book"},
{"name": "price_to_sales", "description": "Price/Sales"},
{"name": "price_to_cash_flow", "description": "Price/Cash Flow"},
{"name": "return_on_equity", "description": "Return on Equity"},
{"name": "return_on_assets", "description": "Return on Assets"},
{"name": "operating_margin", "description": "Operating margin"},
{"name": "profit_margin", "description": "Profit margin"},
{"name": "revenue_growth", "description": "Revenue growth"},
{"name": "earnings_growth", "description": "Earnings growth"},
{"name": "debt_to_equity", "description": "Debt/Equity"},
{"name": "current_ratio_fq", "description": "Current ratio"},
{"name": "beta_1_year", "description": "Beta (1 year)"},
{"name": "float", "description": "Shares float"},
{"name": "shares_outstanding", "description": "Shares outstanding"},
{"name": "short_ratio_fq", "description": "Short ratio"},
{"name": "short_float", "description": "Short % of float"},
{"name": "insider_ownership", "description": "Insider ownership"},
{"name": "institutional_ownership", "description": "Institutional ownership"},
{"name": "price_earnings_ttm", "description": "P/E ratio (TTM)"},
{"name": "AnalystRating", "description": "Analyst rating (numeric)"},
],
"general": [
{"name": "name", "description": "Short name / ticker"},
{"name": "description", "description": "Company description"},
{"name": "ticker", "description": "Full ticker (EXCHANGE:SYMBOL)"},
{"name": "exchange", "description": "Exchange name"},
{"name": "market", "description": "Market/country"},
{"name": "sector", "description": "Sector"},
{"name": "industry", "description": "Industry"},
{"name": "country", "description": "Country"},
{"name": "currency", "description": "Quote currency"},
{"name": "type", "description": "Instrument type"},
{"name": "typespecs", "description": "Instrument sub-type"},
{"name": "is_primary", "description": "Primary listing flag"},
],
}
def _get_market_factory(market_type: str, country_or_param: str | None = None) -> Query:
if market_type == "stocks" or market_type not in MARKET_FACTORIES:
fact = stocks
return fact(country_or_param or "america")
fact = MARKET_FACTORIES[market_type]
if market_type == "options":
return fact(country_or_param or "CME_MINI:ESM2026")
return fact()
# ---------------------------------------------------------------------------
# Historical Candles & Chart Rendering
# ---------------------------------------------------------------------------
TV_CHART_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Origin": "https://www.tradingview.com",
"Referer": "https://www.tradingview.com/",
}
def _resolve_symbol(ticker: str, market_type: str) -> str:
if ":" in ticker:
return ticker
if market_type == "crypto":
return f"BINANCE:{ticker}"
return ticker
def _to_yahoo_ticker(ticker: str) -> str:
"""Convert various ticker formats to Yahoo Finance format."""
if ".JK" in ticker or ".JK" in ticker.upper():
return ticker.replace(".jk", ".JK").replace(".Jk", ".JK")
if ticker.startswith("IDX:"):
return ticker.replace("IDX:", "") + ".JK"
if ":" in ticker:
return ticker.split(":", 1)[1]
return ticker
def _fetch_candles(
ticker: str,
resolution: str = "D",
count: int = 30,
from_ts: int | None = None,
to_ts: int | None = None,
) -> list[dict[str, Any]]:
# Try TradingView chart API first
sym = _resolve_symbol(ticker, "stocks")
if to_ts is None:
to_ts = int(time.time())
if from_ts is None:
resolution_seconds = {"1": 60, "5": 300, "15": 900, "30": 1800, "60": 3600,
"240": 14400, "D": 86400, "W": 604800, "M": 2592000}
from_ts = to_ts - (count * resolution_seconds.get(resolution, 86400))
tv_urls = [
"https://chart-data.tradingview.com/history",
"https://scanner.tradingview.com/history",
]
for url in tv_urls:
try:
params = {"symbol": sym, "resolution": resolution, "from": from_ts, "to": to_ts}
resp = requests.get(url, headers=TV_CHART_HEADERS, params=params, timeout=10)
data = resp.json()
if data.get("s") == "ok" and len(data.get("t", [])) > 0:
candles = []
for i in range(len(data["t"])):
candles.append({
"time": data["t"][i],
"open": data["o"][i],
"high": data["h"][i],
"low": data["l"][i],
"close": data["c"][i],
"volume": data["v"][i],
})
return candles
except requests.RequestException:
continue
# Fallback: Yahoo Finance API
try:
yahoo_sym = _to_yahoo_ticker(ticker)
interval_map = {"1": "1m", "5": "5m", "15": "15m", "30": "30m",
"60": "60m", "240": "4h", "D": "1d", "W": "1wk", "M": "1mo"}
interval = interval_map.get(resolution, "1d")
url = f"https://query1.finance.yahoo.com/v8/finance/chart/{yahoo_sym}"
params: dict[str, Any] = {"interval": interval, "period1": from_ts, "period2": to_ts}
headers = {"User-Agent": TV_CHART_HEADERS["User-Agent"]}
resp = requests.get(url, headers=headers, params=params, timeout=15)
data = resp.json()
result = data.get("chart", {}).get("result", [None])[0]
if result:
timestamps = result.get("timestamp", [])
quotes = result.get("indicators", {}).get("quote", [None])[0]
if timestamps and quotes:
opens = quotes.get("open", [])
highs = quotes.get("high", [])
lows = quotes.get("low", [])
closes = quotes.get("close", [])
volumes = quotes.get("volume", [])
candles = []
for i in range(len(timestamps)):
o, h, l, c, v = opens[i], highs[i], lows[i], closes[i], volumes[i]
if None in (o, h, l, c, v):
continue
candles.append({
"time": timestamps[i],
"open": o, "high": h, "low": l, "close": c, "volume": v,
})
return candles
except requests.RequestException:
pass
return []
def _detect_candle_patterns(candles: list[dict[str, Any]]) -> list[str]:
patterns: list[str] = []
if len(candles) < 2:
return patterns
c = candles[-1]
body = abs(c["close"] - c["open"])
upper = c["high"] - max(c["open"], c["close"])
lower = min(c["open"], c["close"]) - c["low"]
total_range = c["high"] - c["low"]
# Doji
if total_range > 0 and body / total_range < 0.1:
patterns.append("DOJI (indecision / potential reversal)")
# Hammer / Shooting Star
if body > 0 and total_range > 0:
upper_pct = upper / total_range * 100
lower_pct = lower / total_range * 100
body_pct = body / total_range * 100
if lower_pct > 60 and upper_pct < 10 and body_pct < 30:
if c["close"] < c["open"]:
patterns.append("HAMMER (potential bullish reversal at bottom)")
else:
patterns.append("INVERTED HAMMER (potential bullish reversal at bottom)")
if upper_pct > 60 and lower_pct < 10 and body_pct < 30:
if c["close"] > c["open"]:
patterns.append("SHOOTING STAR (potential bearish reversal at top)")
else:
patterns.append("HANGING MAN (potential bearish reversal at top)")
# Marubozu
if upper < total_range * 0.05 and lower < total_range * 0.05 and body > total_range * 0.8:
if c["close"] > c["open"]:
patterns.append("MARUBOZU (strong bullish — no wicks)")
else:
patterns.append("MARUBOZU (strong bearish — no wicks)")
# Engulfing
if len(candles) >= 2:
p = candles[-2]
p_body = abs(p["close"] - p["open"])
if p_body > 0 and body > 0:
if c["close"] > c["open"] and c["open"] < p["close"] and c["close"] > p["open"] and body > p_body * 1.2:
patterns.append("BULLISH ENGULFING (strong reversal up)")
if c["close"] < c["open"] and c["open"] > p["close"] and c["close"] < p["open"] and body > p_body * 1.2:
patterns.append("BEARISH ENGULFING (strong reversal down)")
# Three white soldiers / Three black crows
if len(candles) >= 3:
c1, c2, c3 = candles[-3], candles[-2], candles[-1]
if all(x["close"] > x["open"] for x in (c1, c2, c3)):
b1, b2, b3 = c1["close"] - c1["open"], c2["close"] - c2["open"], c3["close"] - c3["open"]
if b2 > 0 and b3 > 0 and b2 >= b1 * 0.9 and b3 >= b2 * 0.9:
if c2["open"] > c1["open"] and c3["open"] > c2["open"]:
patterns.append("3 WHITE SOLDIERS (strong sustained bullish momentum)")
elif b2 > 0 and b3 > 0 and b2 < b1 * 0.7 and b3 < b2 * 0.7:
patterns.append("BULLISH IN CONCLUSION (rising but weakening momentum)")
if all(x["close"] < x["open"] for x in (c1, c2, c3)):
b1, b2, b3 = c1["open"] - c1["close"], c2["open"] - c2["close"], c3["open"] - c3["close"]
if b2 > 0 and b3 > 0 and b2 >= b1 * 0.9 and b3 >= b2 * 0.9:
if c2["open"] < c1["open"] and c3["open"] < c2["open"]:
patterns.append("3 BLACK CROWS (strong sustained bearish momentum)")
# Piercing / Dark Cloud
if len(candles) >= 2:
p = candles[-2]
if c["close"] > c["open"] and p["close"] < p["open"]:
mid_point = p["open"] - (p["open"] - p["close"]) / 2
if c["close"] > mid_point and c["open"] < p["close"]:
patterns.append("PIERCING PATTERN (bullish reversal)")
if c["close"] < c["open"] and p["close"] > p["open"]:
mid_point = p["close"] - (p["close"] - p["open"]) / 2
if c["close"] < mid_point and c["open"] > p["close"]:
patterns.append("DARK CLOUD COVER (bearish reversal)")
# Morning/Evening Star (3-candle)
if len(candles) >= 3:
c1, c2, c3 = candles[-3], candles[-2], candles[-1]
c1_body = abs(c1["close"] - c1["open"])
c2_body = abs(c2["close"] - c2["open"])
c3_body = abs(c3["close"] - c3["open"])
if c1["close"] < c1["open"] and c3["close"] > c3["open"]:
if c2_body < c1_body * 0.5 and c2_body < c3_body * 0.5:
patterns.append("MORNING STAR (bullish reversal — doji middle candle)")
if c1["close"] > c1["open"] and c3["close"] < c3["open"]:
if c2_body < c1_body * 0.5 and c2_body < c3_body * 0.5:
patterns.append("EVENING STAR (bearish reversal — doji middle candle)")
if not patterns:
patterns.append("No significant candlestick pattern detected")
return patterns
@mcp.tool()
def get_historical_candles(
ticker: str,
resolution: str = "D",
count: int = 30,
market_type: str = "stocks",
detect_patterns: bool = True,
from_date: str | None = None,
to_date: str | None = None,
) -> str:
"""Fetch historical OHLCV candlestick data for candle pattern and trend analysis.
Parameters:
ticker: Full ticker (e.g. "IDX:BBRI", "NASDAQ:AAPL", "IDX:BUMI").
resolution: Candle resolution — 1, 3, 5, 15, 30, 60, 240 (minutes),
D (daily, default), W (weekly), M (monthly).
count: Number of candles to fetch when no date range given (max 500, default 30).
market_type: Market type (stocks, crypto, forex, etc.).
detect_patterns: Whether to detect candlestick patterns (default True).
from_date: Start date/datetime (e.g. "2026-05-08" or "2026-05-17 14:00"). When
provided, fetches candles from this point instead of counting back from now.
to_date: End date/datetime (e.g. "2026-05-13" or "2026-05-17 14:05"). Defaults to
now when from_date is given but to_date is omitted.
"""
import calendar
def _parse_dt(s: str, end_of_period: bool = False) -> int:
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
t = time.strptime(s, fmt)
ts = int(calendar.timegm(t))
# For date-only strings, end_of_period bumps to 23:59:59
if end_of_period and fmt == "%Y-%m-%d":
ts += 86399
return ts
except ValueError:
continue
raise ValueError(f"Unrecognised date/datetime format: {s!r}. Use YYYY-MM-DD or YYYY-MM-DD HH:MM")
from_ts: int | None = None
to_ts: int | None = None
if from_date:
from_ts = _parse_dt(from_date)
if to_date:
to_ts = _parse_dt(to_date, end_of_period=True)
elif from_date:
to_ts = int(time.time())
count = min(count, 500)
candles = _fetch_candles(ticker, resolution, count, from_ts=from_ts, to_ts=to_ts)
if not candles:
return f"No historical data returned for {ticker}"
# Summary stats
first, last = candles[0], candles[-1]
total_change = ((last["close"] - first["close"]) / first["close"]) * 100 if first["close"] else 0
highest = max(c["high"] for c in candles)
lowest = min(c["low"] for c in candles)
avg_vol = sum(c["volume"] for c in candles) / len(candles)
lines: list[str] = []
candle_label = f"{from_date}{to_date or 'now'}" if from_date else f"{len(candles)} candles"
lines.append(f"📊 {ticker}{candle_label} ({resolution} resolution)")
lines.append(f" Period: {time.strftime('%Y-%m-%d', time.gmtime(first['time']))}"
f"{time.strftime('%Y-%m-%d', time.gmtime(last['time']))}")
lines.append(f" Latest: O={last['open']:.2f} H={last['high']:.2f} L={last['low']:.2f} C={last['close']:.2f}")
lines.append(f" Change: {total_change:.2f}% | Range: {highest:.2f} - {lowest:.2f} | Avg Vol: {avg_vol:,.0f}")
lines.append("")
# Current candle analysis
c = last
body = abs(c["close"] - c["open"])
upper_wick = c["high"] - max(c["open"], c["close"])
lower_wick = min(c["open"], c["close"]) - c["low"]
total = c["high"] - c["low"]
lines.append("── Latest Candle ──")
lines.append(f" Type: {'🟢 BULLISH' if c['close'] >= c['open'] else '🔴 BEARISH'}")
lines.append(f" Body: {body:.2f} | Upper Wick: {upper_wick:.2f} | Lower Wick: {lower_wick:.2f}")
if total > 0:
lines.append(f" Body Ratio: {body/total*100:.0f}% | Wick Ratio: {(upper_wick+lower_wick)/total*100:.0f}%")
if detect_patterns:
lines.append("")
lines.append("── Pattern Detection ──")
for p in _detect_candle_patterns(candles):
lines.append(f"{p}")
# Simple trend analysis
if len(candles) >= 10:
lines.append("")
lines.append("── Trend (last 10 candles) ──")
recent = candles[-10:]
up_count = sum(1 for x in recent if x["close"] >= x["open"])
down_count = 10 - up_count
net = recent[-1]["close"] - recent[0]["close"]
lines.append(f" Bullish candles: {up_count} | Bearish candles: {down_count}")
lines.append(f" Net change: {net:+.2f} ({net/recent[0]['close']*100:+.2f}%)")
if up_count >= 7:
lines.append(" → Strong bullish momentum")
elif up_count >= 6:
lines.append(" → Moderate bullish bias")
elif down_count >= 7:
lines.append(" → Strong bearish momentum")
elif down_count >= 6:
lines.append(" → Moderate bearish bias")
else:
lines.append(" → Sideways / indecision")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Tools
# ---------------------------------------------------------------------------
@mcp.tool()
def get_stock_quotes(
tickers: list[str],
columns: list[str] | None = None,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Get quote data for specific ticker symbols across any market.
Examples:
- get_stock_quotes(tickers=["NASDAQ:NVDA", "NASDAQ:AAPL"])
- get_stock_quotes(tickers=["BINANCE:BTCUSDT"], market_type="crypto")
- get_stock_quotes(tickers=["NYSE:SPY"], columns=["name","close","volume","RSI"])
"""
q = _get_market_factory(market_type, market_country)
if columns:
q = q.select(*columns)
q = q.set_tickers(*tickers)
total, data = _exec_query(q, sessionid)
return f"Found {total} result(s)\n{data}"
@mcp.tool()
def screen_market(
columns: list[str] | None = None,
filters: list[dict[str, Any]] | None = None,
order_by: str | None = None,
ascending: bool = False,
limit: int = 50,
offset: int = 0,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""General-purpose screener across any market type.
Parameters:
columns: Field names to return (e.g. ["name","close","RSI","volume"]).
filters: Structured filter conditions.
Simple condition: {"field": "<name>", "operator": "<op>", "value": <value>}
Nested group: {"operator": "and"|"or", "filters": [<conditions>]}
Operators: >, >=, <, <=, ==, !=, between, not_between, isin, not_in,
has, has_none_of, like, not_like, empty, not_empty,
crosses, crosses_above, crosses_below,
above_pct, below_pct, between_pct,
in_day_range, in_week_range, in_month_range
Examples:
[{"field": "RSI", "operator": ">", "value": 70}]
[{"field": "close", "operator": "between", "value": [100, 200]}]
[{"field": "exchange", "operator": "isin", "value": ["NASDAQ","NYSE"]}]
[{"field": "MACD.macd", "operator": "crosses_above", "value": "MACD.signal"}]
Nested: [{"operator": "or", "filters": [
{"field": "RSI", "operator": ">", "value": 70},
{"field": "RSI", "operator": "<", "value": 30}
]}]
order_by: Field to sort by (e.g. "volume", "market_cap_basic").
ascending: Sort ascending (default False = descending).
limit: Max rows to return (1-1000, default 50).
offset: Row offset for pagination.
market_type: Market type: stocks, crypto, forex, futures, bond, cfd, coin, crypto_dex, options.
market_country: For stocks, a country name (e.g. "america", "india", "germany").
For options, the underlying symbol (e.g. "CME_MINI:ESM2026").
sessionid: Optional TradingView session ID for real-time data.
"""
q = _get_market_factory(market_type, market_country)
if columns:
q = q.select(*columns)
q = _apply_filters(q, filters)
if order_by:
q = q.order_by(order_by, ascending=ascending)
q = q.limit(limit).offset(offset)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def find_top_gainers(
limit: int = 20,
min_price: float | None = None,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Find top gainers by percent change in any market."""
q = _get_market_factory(market_type, market_country)
q = q.select("name", "close", "change", "change_abs", "volume")
filters = []
if min_price is not None:
filters.append({"field": "close", "operator": ">", "value": min_price})
q = _apply_filters(q, filters)
q = q.order_by("change", ascending=False).limit(limit)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def find_top_losers(
limit: int = 20,
min_price: float | None = None,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Find top losers by percent change in any market."""
q = _get_market_factory(market_type, market_country)
q = q.select("name", "close", "change", "change_abs", "volume")
filters = []
if min_price is not None:
filters.append({"field": "close", "operator": ">", "value": min_price})
q = _apply_filters(q, filters)
q = q.order_by("change", ascending=True).limit(limit)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def find_most_active(
limit: int = 20,
min_price: float | None = None,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Find most active instruments by volume."""
q = _get_market_factory(market_type, market_country)
q = q.select("name", "close", "change", "volume")
filters = []
if min_price is not None:
filters.append({"field": "close", "operator": ">", "value": min_price})
q = _apply_filters(q, filters)
q = q.order_by("volume", ascending=False).limit(limit)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def technical_scan(
filters: list[dict[str, Any]],
columns: list[str] | None = None,
limit: int = 50,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Screen using technical indicator conditions.
Example filters:
[{"field": "RSI", "operator": "<", "value": 30}, # oversold
{"field": "MACD.macd", "operator": "crosses_above", "value": "MACD.signal"}, # bullish MACD
{"field": "close", "operator": "above_pct", "value": ["SMA50", 1.02]}] # 2% above SMA50
"""
default_cols = ["name", "close", "change", "volume", "RSI", "MACD.macd", "SMA50", "VWAP"]
q = _get_market_factory(market_type, market_country)
q = q.select(*(columns or default_cols))
q = _apply_filters(q, filters)
q = q.limit(limit)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def fundamental_scan(
filters: list[dict[str, Any]],
columns: list[str] | None = None,
limit: int = 50,
market_type: str = "stocks",
market_country: str | None = None,
sessionid: str | None = None,
) -> str:
"""Screen by fundamental metrics (market cap, P/E, dividend, sector, etc.).
Example filters:
[{"field": "market_cap_basic", "operator": "between", "value": [1e9, 1e11}],
{"field": "price_earnings_ttm", "operator": "<", "value": 20},
{"field": "dividends_yield_current", "operator": ">", "value": 2},
{"field": "sector", "operator": "isin", "value": ["Technology", "Healthcare"]}]
"""
default_cols = ["name", "close", "market_cap_basic", "price_earnings_ttm",
"dividends_yield_current", "sector", "industry"]
q = _get_market_factory(market_type, market_country)
q = q.select(*(columns or default_cols))
q = _apply_filters(q, filters)
q = q.limit(limit)
total, data = _exec_query(q, sessionid)
return f"Total: {total}\n{data}"
@mcp.tool()
def list_markets() -> str:
"""List all available market types and stock-country markets."""
markets = {
"asset_types": [
{"id": "stocks", "description": "Stocks (common, preferred, DRs, funds) — use market_country param"},
{"id": "crypto", "description": "Centralised-exchange crypto pairs"},
{"id": "crypto_dex", "description": "Decentralised-exchange crypto pairs (USD)"},
{"id": "coin", "description": "CoinMarketCap crypto coins"},
{"id": "forex", "description": "Forex currency pairs"},
{"id": "futures", "description": "Futures contracts"},
{"id": "bond", "description": "Bonds"},
{"id": "cfd", "description": "Contracts for Difference"},
{"id": "options", "description": "Options (use market_country param for underlying symbol)"},
],
"stock_countries": STOCK_COUNTRY_MARKETS,
}
return str(markets)
@mcp.tool()
def list_fields(category: str | None = None) -> str:
"""List available screener fields, optionally filtered by category.
Categories: price, technical, fundamental, general
If no category given, returns all categories.
"""
if category:
cat = category.lower()
if cat in FIELD_CATEGORIES:
return str(FIELD_CATEGORIES[cat])
return f"Category '{category}' not found. Available: price, technical, fundamental, general"
return str(FIELD_CATEGORIES)
@mcp.tool()
def set_session(sessionid: str) -> str:
"""Store a TradingView session ID for real-time data access.
Get your sessionid from:
1. Go to tradingview.com and log in
2. Open DevTools > Application > Cookies > tradingview.com
3. Copy the 'sessionid' cookie value
"""
global _session_cookies
_session_cookies = {"sessionid": sessionid}
return "Session ID stored successfully"
@mcp.tool()
def clear_session() -> str:
"""Clear stored TradingView session ID."""
global _session_cookies
_session_cookies = None
return "Session ID cleared"
# ---------------------------------------------------------------------------
# Holiday Checker (api.co.id) — SQLite backed
# ---------------------------------------------------------------------------
import sqlite3
from datetime import date as dt_date, datetime
from pathlib import Path
_HOLIDAYS_API_KEY: str | None = None
_HOLIDAYS_DB: str | None = None
def _get_holidays_api_key() -> str | None:
global _HOLIDAYS_API_KEY
if _HOLIDAYS_API_KEY is None:
_HOLIDAYS_API_KEY = os.environ.get("API_CO_ID_KEY") or ""
return _HOLIDAYS_API_KEY or None
def _get_holidays_db_path() -> str:
global _HOLIDAYS_DB
if _HOLIDAYS_DB is None:
_HOLIDAYS_DB = os.environ.get("HOLIDAYS_DB_PATH") or str(Path(__file__).parent / "holidays.db")
return _HOLIDAYS_DB
def _init_holidays_db() -> sqlite3.Connection:
db_path = _get_holidays_db_path()
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS holidays (
date TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
is_national INTEGER NOT NULL DEFAULT 1
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_holidays_date ON holidays(date)")
conn.commit()
return conn
def _cache_holidays(items: list[dict]) -> None:
conn = _init_holidays_db()
for h in items:
conn.execute(
"INSERT OR REPLACE INTO holidays (date, name, type, is_national) VALUES (?, ?, ?, ?)",
(h["date"], h["name"], h.get("type", "Public Holiday"), 1 if h.get("is_national_holiday", True) else 0),
)
conn.commit()
conn.close()
def _query_cached_holidays(
start_date: str = "",
end_date: str = "",
limit: int = 0,
) -> list[dict]:
conn = _init_holidays_db()
today = datetime.now().strftime("%Y-%m-%d")
if start_date and end_date:
rows = conn.execute(
"SELECT date, name, type, is_national FROM holidays WHERE date >= ? AND date <= ? ORDER BY date",
(start_date, end_date),
).fetchall()
elif start_date:
rows = conn.execute(
"SELECT date, name, type, is_national FROM holidays WHERE date >= ? ORDER BY date",
(start_date,),
).fetchall()
elif limit:
rows = conn.execute(
"SELECT date, name, type, is_national FROM holidays WHERE date >= ? ORDER BY date LIMIT ?",
(today, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT date, name, type, is_national FROM holidays WHERE date >= ? ORDER BY date",
(today,),
).fetchall()
conn.close()
return [
{"date": r[0], "name": r[1], "type": r[2], "is_national_holiday": bool(r[3])}
for r in rows
]
def _fetch_and_cache_holidays(year: int) -> list[dict]:
"""Fetch holidays for a year from API and cache them."""
api_key = _get_holidays_api_key()
if not api_key:
return []
resp = requests.get(
"https://use.api.co.id/holidays/indonesia/",
headers={"x-api-co-id": api_key},
params={"year": year},
timeout=10,
)
if resp.status_code != 200:
return []
data = resp.json()
if not data.get("is_success"):
return []
items = data.get("data", [])
if not isinstance(items, list):
items = [items]
if items:
_cache_holidays(items)
return items
def _format_holidays(items: list[dict]) -> str:
if not items:
return "No holidays found."
today = datetime.now().date()
lines: list[str] = []
lines.append(f"{'Date':<15} {'Day':<10} {'Holiday':<45} {'Type':<20}")
lines.append("-" * 90)
for h in items:
d_str = h.get("date", "?")
name = h.get("name", "?")
typ = h.get("type", "?")
try:
dt = datetime.strptime(d_str, "%Y-%m-%d").date()
day_name = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()]
delta = (dt - today).days
if delta == 0:
suffix = " ⬥ TODAY"
elif delta == 1:
suffix = " ⬥ Tomorrow"
elif delta > 0:
suffix = f" ({delta}d away)"
elif delta < 0:
suffix = f" ({abs(delta)}d ago)"
else:
suffix = ""
except ValueError:
day_name = "?"
suffix = ""
lines.append(f"{d_str:<15} {day_name:<10} {name:<45} {typ:<20}{suffix}")
return "\n".join(lines)
@mcp.tool()
def check_holidays(
mode: str = "upcoming",
year: int | None = None,
date: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
limit: int = 10,
refresh: bool = False,
) -> str:
"""Check Indonesian public holidays (cached in SQLite).
Modes:
upcoming (default): next N holidays
year: all holidays for a given year (pass `year`)
check: check if a specific date is a holiday (pass `date`, format YYYY-MM-DD)
range: holidays in a date range (pass `start_date` & `end_date`)
Pass refresh=True to re-fetch from API instead of using cache.
Requires API_CO_ID_KEY env var to be set for API calls.
"""
match mode:
case "check":
if not date:
return "Error: `date` param required (YYYY-MM-DD)"
# Try cache first
cached = _query_cached_holidays(start_date=date, end_date=date)
if cached and not refresh:
return _format_holidays(cached)
# Fall back to API
api_key = _get_holidays_api_key()
if not api_key:
return "⚠ No cached data and API_CO_ID_KEY not set."
resp = requests.get(
"https://use.api.co.id/holidays/indonesia/check/date",
headers={"x-api-co-id": api_key},
params={"date": date},
timeout=10,
)
if resp.status_code != 200:
return f"API error: HTTP {resp.status_code}"
data = resp.json()
if not data.get("is_success"):
return f"API error: {data.get('message', 'unknown')}"
d = data.get("data", {})
if not d or not d.get("is_holiday"):
return f"{date} is NOT a holiday."
items = [d.get("holiday", {})]
return _format_holidays(items)
case "range":
if not start_date or not end_date:
return "Error: `start_date` and `end_date` required"
cached = _query_cached_holidays(start_date=start_date, end_date=end_date)
if cached and not refresh:
return _format_holidays(cached)
api_key = _get_holidays_api_key()
if not api_key:
return "⚠ No cached data and API_CO_ID_KEY not set."
resp = requests.get(
"https://use.api.co.id/holidays/indonesia/check/range",
headers={"x-api-co-id": api_key},
params={"start_date": start_date, "end_date": end_date},
timeout=10,
)
if resp.status_code != 200:
return f"API error: HTTP {resp.status_code}"
data = resp.json()
if not data.get("is_success"):
return f"API error: {data.get('message', 'unknown')}"
items = data.get("data", [])
if not isinstance(items, list):
items = [items]
return _format_holidays(items)
case "year":
y = year or datetime.now().year
cached = _query_cached_holidays(start_date=f"{y}-01-01", end_date=f"{y}-12-31")
if cached and not refresh:
return _format_holidays(cached)
items = _fetch_and_cache_holidays(y)
return _format_holidays(items)
case _: # upcoming
cached = _query_cached_holidays(limit=limit)
if cached and not refresh:
return _format_holidays(cached)
# Fetch current & next year to build cache, then return from cache
current_year = datetime.now().year
_fetch_and_cache_holidays(current_year)
_fetch_and_cache_holidays(current_year + 1)
cached = _query_cached_holidays(limit=limit)
return _format_holidays(cached) if cached else "No upcoming holidays found."
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TradingView Screener MCP Server")
parser.add_argument(
"--transport",
choices=["stdio", "sse", "streamable-http"],
default="stdio",
help="Transport protocol (default: stdio)",
)
parser.add_argument("--host", default=None, help="Host to bind (default: 127.0.0.1)")
parser.add_argument("--port", type=int, default=None, help="Port to bind (default: 8000)")
args = parser.parse_args()
if args.host:
mcp.settings.host = args.host
if args.host not in ("127.0.0.1", "localhost", "::1"):
# FastMCP auto-sets restrictive transport_security for localhost at init time.
# Reset it when binding to a non-localhost address so external hosts
# (e.g. host.docker.internal) are not rejected with 421.
mcp.settings.transport_security = None
if args.port:
mcp.settings.port = args.port
mcp.run(transport=args.transport)