79d1e0e538
- _fetch_candles now accepts explicit from_ts/to_ts timestamps; falls back to count-based window when omitted - get_historical_candles accepts from_date/to_date in YYYY-MM-DD, YYYY-MM-DD HH:MM, or YYYY-MM-DD HH:MM:SS formats (all UTC) - Yahoo Finance fallback switched from range= to period1/period2 so it respects the exact window in both modes - Added 11 tests covering date range, datetime, end-of-day bump, invalid format, count cap, and pattern detection toggle - gitignore uv.lock (Dockerfile uses pip, not uv sync)
1109 lines
43 KiB
Python
1109 lines
43 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import math
|
|
import os
|
|
import time
|
|
from typing import Any, cast
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
import requests
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from tradingview_screener.column import col
|
|
from tradingview_screener.query import And, Or, Query
|
|
from tradingview_screener.screeners import (
|
|
bond,
|
|
cfd,
|
|
coin,
|
|
crypto,
|
|
crypto_dex,
|
|
forex,
|
|
futures,
|
|
options,
|
|
stocks,
|
|
)
|
|
|
|
mcp = FastMCP("tradingview-screener")
|
|
|
|
_session_cookies: dict[str, str] | None = None
|
|
|
|
|
|
def _resolve_cookies(sessionid: str | None = None) -> dict[str, str] | None:
|
|
if sessionid:
|
|
return {"sessionid": sessionid}
|
|
if _session_cookies is not None:
|
|
return _session_cookies
|
|
env = os.environ.get("TV_SESSION_ID")
|
|
if env:
|
|
return {"sessionid": env}
|
|
return None
|
|
|
|
|
|
def _exec_query(
|
|
q: Query,
|
|
sessionid: str | None = None,
|
|
) -> tuple[int, list[dict[str, Any]]]:
|
|
cookies = _resolve_cookies(sessionid)
|
|
total, df = q.get_scanner_data(cookies=cookies)
|
|
return total, cast("list[dict[str, Any]]", df.to_dict(orient="records"))
|
|
|
|
|
|
def _build_condition(f: dict[str, Any]) -> Any:
|
|
if "field" in f:
|
|
c = col(f["field"])
|
|
op = f["operator"]
|
|
if op in ("empty", "not_empty"):
|
|
return c.empty() if op == "empty" else c.not_empty()
|
|
val = f["value"]
|
|
match op:
|
|
case ">" | "greater":
|
|
return c > val
|
|
case ">=" | "egreater":
|
|
return c >= val
|
|
case "<" | "less":
|
|
return c < val
|
|
case "<=" | "eless":
|
|
return c <= val
|
|
case "==" | "equal":
|
|
return c == val
|
|
case "!=" | "nequal":
|
|
return c != val
|
|
case "between":
|
|
return c.between(val[0], val[1])
|
|
case "not_between":
|
|
return c.not_between(val[0], val[1])
|
|
case "isin":
|
|
return c.isin(val)
|
|
case "not_in":
|
|
return c.not_in(val)
|
|
case "has":
|
|
return c.has(val)
|
|
case "has_none_of":
|
|
return c.has_none_of(val)
|
|
case "like":
|
|
return c.like(val)
|
|
case "not_like":
|
|
return c.not_like(val)
|
|
case "crosses":
|
|
return c.crosses(val)
|
|
case "crosses_above":
|
|
return c.crosses_above(val)
|
|
case "crosses_below":
|
|
return c.crosses_below(val)
|
|
case "above_pct":
|
|
return c.above_pct(val[0], val[1])
|
|
case "below_pct":
|
|
return c.below_pct(val[0], val[1])
|
|
case "between_pct":
|
|
return c.between_pct(val[0], val[1], val[2])
|
|
case "in_day_range":
|
|
return c.in_day_range(val[0], val[1])
|
|
case "in_week_range":
|
|
return c.in_week_range(val[0], val[1])
|
|
case "in_month_range":
|
|
return c.in_month_range(val[0], val[1])
|
|
if "operator" in f and "filters" in f:
|
|
sub = [_build_condition(sf) for sf in f["filters"]]
|
|
if f["operator"] == "or":
|
|
return Or(*sub)
|
|
return And(*sub)
|
|
msg = f"Invalid filter: {f}"
|
|
raise ValueError(msg)
|
|
|
|
|
|
def _apply_filters(q: Query, filters: list[dict[str, Any]] | None) -> Query:
|
|
if not filters:
|
|
return q
|
|
|
|
simple: list[Any] = []
|
|
nested: list[Any] = []
|
|
for f in filters:
|
|
if "operator" in f and "filters" in f:
|
|
nested.append(_build_condition(f))
|
|
else:
|
|
simple.append(_build_condition(f))
|
|
|
|
if simple:
|
|
q = q.where(*simple)
|
|
|
|
if nested:
|
|
default_filter2 = q.query.get("filter2")
|
|
user_op = nested[0] if len(nested) == 1 else And(*nested)
|
|
user_filter2 = user_op["operation"]
|
|
if default_filter2:
|
|
q.query["filter2"] = {
|
|
"operator": "and",
|
|
"operands": [{"operation": user_filter2}, {"operation": default_filter2}],
|
|
}
|
|
else:
|
|
q.query["filter2"] = user_filter2
|
|
|
|
return q
|
|
|
|
|
|
MARKET_FACTORIES: dict[str, Any] = {
|
|
"stocks": stocks,
|
|
"crypto": crypto,
|
|
"crypto_dex": crypto_dex,
|
|
"coin": coin,
|
|
"forex": forex,
|
|
"futures": futures,
|
|
"bond": bond,
|
|
"cfd": cfd,
|
|
"options": options,
|
|
}
|
|
|
|
STOCK_COUNTRY_MARKETS: list[str] = [
|
|
"america", "argentina", "australia", "austria", "bangladesh", "belgium",
|
|
"brazil", "bulgaria", "canada", "chile", "china", "colombia", "croatia",
|
|
"cyprus", "czech", "denmark", "egypt", "estonia", "finland", "france",
|
|
"germany", "ghana", "greece", "hongkong", "hungary", "iceland", "india",
|
|
"indonesia", "ireland", "israel", "italy", "japan", "jordan", "kazakhstan",
|
|
"kenya", "kuwait", "latvia", "lithuania", "luxembourg", "malaysia",
|
|
"malta", "mexico", "morocco", "netherlands", "newzealand", "nigeria",
|
|
"norway", "oman", "pakistan", "peru", "philippines", "poland", "portugal",
|
|
"qatar", "romania", "saudiarabia", "serbia", "singapore", "slovakia",
|
|
"slovenia", "southafrica", "southkorea", "spain", "srilanka", "sweden",
|
|
"switzerland", "taiwan", "thailand", "tunisia", "turkey", "uganda",
|
|
"ukraine", "unitedarabemirates", "unitedkingdom", "vietnam",
|
|
]
|
|
|
|
FIELD_CATEGORIES: dict[str, list[dict[str, str]]] = {
|
|
"price": [
|
|
{"name": "open", "description": "Open price"},
|
|
{"name": "high", "description": "High price"},
|
|
{"name": "low", "description": "Low price"},
|
|
{"name": "close", "description": "Close price"},
|
|
{"name": "volume", "description": "Trading volume"},
|
|
{"name": "change", "description": "Percent change"},
|
|
{"name": "change_abs", "description": "Absolute change"},
|
|
{"name": "premarket_change", "description": "Premarket percent change"},
|
|
{"name": "postmarket_change", "description": "Postmarket percent change"},
|
|
{"name": "VWAP", "description": "Volume Weighted Average Price"},
|
|
{"name": "52 Week High", "description": "52-week high price"},
|
|
{"name": "52 Week Low", "description": "52-week low price"},
|
|
],
|
|
"technical": [
|
|
{"name": "RSI", "description": "Relative Strength Index (14)"},
|
|
{"name": "RSI.5", "description": "RSI (5)"},
|
|
{"name": "Stoch.K", "description": "Stochastic %K"},
|
|
{"name": "Stoch.D", "description": "Stochastic %D"},
|
|
{"name": "MACD.macd", "description": "MACD line"},
|
|
{"name": "MACD.signal", "description": "MACD signal line"},
|
|
{"name": "MACD.histogram", "description": "MACD histogram"},
|
|
{"name": "BB.upper", "description": "Bollinger Band upper"},
|
|
{"name": "BB.middle", "description": "Bollinger Band middle (SMA 20)"},
|
|
{"name": "BB.lower", "description": "Bollinger Band lower"},
|
|
{"name": "SMA", "description": "Simple Moving Average"},
|
|
{"name": "EMA", "description": "Exponential Moving Average"},
|
|
{"name": "SMA20", "description": "SMA 20"},
|
|
{"name": "SMA50", "description": "SMA 50"},
|
|
{"name": "SMA200", "description": "SMA 200"},
|
|
{"name": "EMA5", "description": "EMA 5"},
|
|
{"name": "EMA20", "description": "EMA 20"},
|
|
{"name": "EMA50", "description": "EMA 50"},
|
|
{"name": "EMA200", "description": "EMA 200"},
|
|
{"name": "ADX", "description": "Average Directional Index (14)"},
|
|
{"name": "ATR", "description": "Average True Range"},
|
|
{"name": "AO", "description": "Awesome Oscillator"},
|
|
{"name": "OBV", "description": "On Balance Volume"},
|
|
{"name": "CCI20", "description": "Commodity Channel Index (20)"},
|
|
{"name": "ROC", "description": "Rate of Change"},
|
|
{"name": "Williams %R", "description": "Williams Percent Range (14)"},
|
|
{"name": "relative_volume_10d_calc", "description": "Relative volume vs 10-day average"},
|
|
{"name": "volume_ma_10", "description": "Volume moving average (10)"},
|
|
{"name": "TechRating_1D", "description": "Technical rating (1 day)"},
|
|
],
|
|
"fundamental": [
|
|
{"name": "market_cap_basic", "description": "Market cap"},
|
|
{"name": "price_earnings_ttm", "description": "Price/Earnings (TTM)"},
|
|
{"name": "earnings_per_share_diluted_ttm", "description": "EPS diluted (TTM)"},
|
|
{"name": "earnings_per_share_diluted_yoy_growth_ttm", "description": "EPS YoY growth (TTM)"},
|
|
{"name": "dividends_yield_current", "description": "Dividend yield"},
|
|
{"name": "dividends_payout_ratio", "description": "Dividend payout ratio"},
|
|
{"name": "price_to_book_fq", "description": "Price/Book"},
|
|
{"name": "price_to_sales", "description": "Price/Sales"},
|
|
{"name": "price_to_cash_flow", "description": "Price/Cash Flow"},
|
|
{"name": "return_on_equity", "description": "Return on Equity"},
|
|
{"name": "return_on_assets", "description": "Return on Assets"},
|
|
{"name": "operating_margin", "description": "Operating margin"},
|
|
{"name": "profit_margin", "description": "Profit margin"},
|
|
{"name": "revenue_growth", "description": "Revenue growth"},
|
|
{"name": "earnings_growth", "description": "Earnings growth"},
|
|
{"name": "debt_to_equity", "description": "Debt/Equity"},
|
|
{"name": "current_ratio_fq", "description": "Current ratio"},
|
|
{"name": "beta_1_year", "description": "Beta (1 year)"},
|
|
{"name": "float", "description": "Shares float"},
|
|
{"name": "shares_outstanding", "description": "Shares outstanding"},
|
|
{"name": "short_ratio_fq", "description": "Short ratio"},
|
|
{"name": "short_float", "description": "Short % of float"},
|
|
{"name": "insider_ownership", "description": "Insider ownership"},
|
|
{"name": "institutional_ownership", "description": "Institutional ownership"},
|
|
{"name": "price_earnings_ttm", "description": "P/E ratio (TTM)"},
|
|
{"name": "AnalystRating", "description": "Analyst rating (numeric)"},
|
|
],
|
|
"general": [
|
|
{"name": "name", "description": "Short name / ticker"},
|
|
{"name": "description", "description": "Company description"},
|
|
{"name": "ticker", "description": "Full ticker (EXCHANGE:SYMBOL)"},
|
|
{"name": "exchange", "description": "Exchange name"},
|
|
{"name": "market", "description": "Market/country"},
|
|
{"name": "sector", "description": "Sector"},
|
|
{"name": "industry", "description": "Industry"},
|
|
{"name": "country", "description": "Country"},
|
|
{"name": "currency", "description": "Quote currency"},
|
|
{"name": "type", "description": "Instrument type"},
|
|
{"name": "typespecs", "description": "Instrument sub-type"},
|
|
{"name": "is_primary", "description": "Primary listing flag"},
|
|
],
|
|
}
|
|
|
|
|
|
def _get_market_factory(market_type: str, country_or_param: str | None = None) -> Query:
|
|
if market_type == "stocks" or market_type not in MARKET_FACTORIES:
|
|
fact = stocks
|
|
return fact(country_or_param or "america")
|
|
fact = MARKET_FACTORIES[market_type]
|
|
if market_type == "options":
|
|
return fact(country_or_param or "CME_MINI:ESM2026")
|
|
return fact()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Historical Candles & Chart Rendering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
TV_CHART_HEADERS = {
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/120.0.0.0 Safari/537.36"
|
|
),
|
|
"Origin": "https://www.tradingview.com",
|
|
"Referer": "https://www.tradingview.com/",
|
|
}
|
|
|
|
|
|
def _resolve_symbol(ticker: str, market_type: str) -> str:
|
|
if ":" in ticker:
|
|
return ticker
|
|
if market_type == "crypto":
|
|
return f"BINANCE:{ticker}"
|
|
return ticker
|
|
|
|
|
|
def _to_yahoo_ticker(ticker: str) -> str:
|
|
"""Convert various ticker formats to Yahoo Finance format."""
|
|
if ".JK" in ticker or ".JK" in ticker.upper():
|
|
return ticker.replace(".jk", ".JK").replace(".Jk", ".JK")
|
|
if ticker.startswith("IDX:"):
|
|
return ticker.replace("IDX:", "") + ".JK"
|
|
if ":" in ticker:
|
|
return ticker.split(":", 1)[1]
|
|
return ticker
|
|
|
|
|
|
def _fetch_candles(
|
|
ticker: str,
|
|
resolution: str = "D",
|
|
count: int = 30,
|
|
from_ts: int | None = None,
|
|
to_ts: int | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
# Try TradingView chart API first
|
|
sym = _resolve_symbol(ticker, "stocks")
|
|
if to_ts is None:
|
|
to_ts = int(time.time())
|
|
if from_ts is None:
|
|
resolution_seconds = {"1": 60, "5": 300, "15": 900, "30": 1800, "60": 3600,
|
|
"240": 14400, "D": 86400, "W": 604800, "M": 2592000}
|
|
from_ts = to_ts - (count * resolution_seconds.get(resolution, 86400))
|
|
|
|
tv_urls = [
|
|
"https://chart-data.tradingview.com/history",
|
|
"https://scanner.tradingview.com/history",
|
|
]
|
|
for url in tv_urls:
|
|
try:
|
|
params = {"symbol": sym, "resolution": resolution, "from": from_ts, "to": to_ts}
|
|
resp = requests.get(url, headers=TV_CHART_HEADERS, params=params, timeout=10)
|
|
data = resp.json()
|
|
if data.get("s") == "ok" and len(data.get("t", [])) > 0:
|
|
candles = []
|
|
for i in range(len(data["t"])):
|
|
candles.append({
|
|
"time": data["t"][i],
|
|
"open": data["o"][i],
|
|
"high": data["h"][i],
|
|
"low": data["l"][i],
|
|
"close": data["c"][i],
|
|
"volume": data["v"][i],
|
|
})
|
|
return candles
|
|
except requests.RequestException:
|
|
continue
|
|
|
|
# Fallback: Yahoo Finance API
|
|
try:
|
|
yahoo_sym = _to_yahoo_ticker(ticker)
|
|
interval_map = {"1": "1m", "5": "5m", "15": "15m", "30": "30m",
|
|
"60": "60m", "240": "4h", "D": "1d", "W": "1wk", "M": "1mo"}
|
|
interval = interval_map.get(resolution, "1d")
|
|
|
|
url = f"https://query1.finance.yahoo.com/v8/finance/chart/{yahoo_sym}"
|
|
params: dict[str, Any] = {"interval": interval, "period1": from_ts, "period2": to_ts}
|
|
headers = {"User-Agent": TV_CHART_HEADERS["User-Agent"]}
|
|
resp = requests.get(url, headers=headers, params=params, timeout=15)
|
|
data = resp.json()
|
|
|
|
result = data.get("chart", {}).get("result", [None])[0]
|
|
if result:
|
|
timestamps = result.get("timestamp", [])
|
|
quotes = result.get("indicators", {}).get("quote", [None])[0]
|
|
if timestamps and quotes:
|
|
opens = quotes.get("open", [])
|
|
highs = quotes.get("high", [])
|
|
lows = quotes.get("low", [])
|
|
closes = quotes.get("close", [])
|
|
volumes = quotes.get("volume", [])
|
|
candles = []
|
|
for i in range(len(timestamps)):
|
|
o, h, l, c, v = opens[i], highs[i], lows[i], closes[i], volumes[i]
|
|
if None in (o, h, l, c, v):
|
|
continue
|
|
candles.append({
|
|
"time": timestamps[i],
|
|
"open": o, "high": h, "low": l, "close": c, "volume": v,
|
|
})
|
|
return candles
|
|
except requests.RequestException:
|
|
pass
|
|
|
|
return []
|
|
|
|
|
|
def _detect_candle_patterns(candles: list[dict[str, Any]]) -> list[str]:
|
|
patterns: list[str] = []
|
|
if len(candles) < 2:
|
|
return patterns
|
|
c = candles[-1]
|
|
body = abs(c["close"] - c["open"])
|
|
upper = c["high"] - max(c["open"], c["close"])
|
|
lower = min(c["open"], c["close"]) - c["low"]
|
|
total_range = c["high"] - c["low"]
|
|
|
|
# Doji
|
|
if total_range > 0 and body / total_range < 0.1:
|
|
patterns.append("DOJI (indecision / potential reversal)")
|
|
|
|
# Hammer / Shooting Star
|
|
if body > 0 and total_range > 0:
|
|
upper_pct = upper / total_range * 100
|
|
lower_pct = lower / total_range * 100
|
|
body_pct = body / total_range * 100
|
|
|
|
if lower_pct > 60 and upper_pct < 10 and body_pct < 30:
|
|
if c["close"] < c["open"]:
|
|
patterns.append("HAMMER (potential bullish reversal at bottom)")
|
|
else:
|
|
patterns.append("INVERTED HAMMER (potential bullish reversal at bottom)")
|
|
if upper_pct > 60 and lower_pct < 10 and body_pct < 30:
|
|
if c["close"] > c["open"]:
|
|
patterns.append("SHOOTING STAR (potential bearish reversal at top)")
|
|
else:
|
|
patterns.append("HANGING MAN (potential bearish reversal at top)")
|
|
|
|
# Marubozu
|
|
if upper < total_range * 0.05 and lower < total_range * 0.05 and body > total_range * 0.8:
|
|
if c["close"] > c["open"]:
|
|
patterns.append("MARUBOZU (strong bullish — no wicks)")
|
|
else:
|
|
patterns.append("MARUBOZU (strong bearish — no wicks)")
|
|
|
|
# Engulfing
|
|
if len(candles) >= 2:
|
|
p = candles[-2]
|
|
p_body = abs(p["close"] - p["open"])
|
|
if p_body > 0 and body > 0:
|
|
if c["close"] > c["open"] and c["open"] < p["close"] and c["close"] > p["open"] and body > p_body * 1.2:
|
|
patterns.append("BULLISH ENGULFING (strong reversal up)")
|
|
if c["close"] < c["open"] and c["open"] > p["close"] and c["close"] < p["open"] and body > p_body * 1.2:
|
|
patterns.append("BEARISH ENGULFING (strong reversal down)")
|
|
|
|
# Three white soldiers / Three black crows
|
|
if len(candles) >= 3:
|
|
c1, c2, c3 = candles[-3], candles[-2], candles[-1]
|
|
if all(x["close"] > x["open"] for x in (c1, c2, c3)):
|
|
b1, b2, b3 = c1["close"] - c1["open"], c2["close"] - c2["open"], c3["close"] - c3["open"]
|
|
if b2 > 0 and b3 > 0 and b2 >= b1 * 0.9 and b3 >= b2 * 0.9:
|
|
if c2["open"] > c1["open"] and c3["open"] > c2["open"]:
|
|
patterns.append("3 WHITE SOLDIERS (strong sustained bullish momentum)")
|
|
elif b2 > 0 and b3 > 0 and b2 < b1 * 0.7 and b3 < b2 * 0.7:
|
|
patterns.append("BULLISH IN CONCLUSION (rising but weakening momentum)")
|
|
if all(x["close"] < x["open"] for x in (c1, c2, c3)):
|
|
b1, b2, b3 = c1["open"] - c1["close"], c2["open"] - c2["close"], c3["open"] - c3["close"]
|
|
if b2 > 0 and b3 > 0 and b2 >= b1 * 0.9 and b3 >= b2 * 0.9:
|
|
if c2["open"] < c1["open"] and c3["open"] < c2["open"]:
|
|
patterns.append("3 BLACK CROWS (strong sustained bearish momentum)")
|
|
|
|
# Piercing / Dark Cloud
|
|
if len(candles) >= 2:
|
|
p = candles[-2]
|
|
if c["close"] > c["open"] and p["close"] < p["open"]:
|
|
mid_point = p["open"] - (p["open"] - p["close"]) / 2
|
|
if c["close"] > mid_point and c["open"] < p["close"]:
|
|
patterns.append("PIERCING PATTERN (bullish reversal)")
|
|
if c["close"] < c["open"] and p["close"] > p["open"]:
|
|
mid_point = p["close"] - (p["close"] - p["open"]) / 2
|
|
if c["close"] < mid_point and c["open"] > p["close"]:
|
|
patterns.append("DARK CLOUD COVER (bearish reversal)")
|
|
|
|
# Morning/Evening Star (3-candle)
|
|
if len(candles) >= 3:
|
|
c1, c2, c3 = candles[-3], candles[-2], candles[-1]
|
|
c1_body = abs(c1["close"] - c1["open"])
|
|
c2_body = abs(c2["close"] - c2["open"])
|
|
c3_body = abs(c3["close"] - c3["open"])
|
|
|
|
if c1["close"] < c1["open"] and c3["close"] > c3["open"]:
|
|
if c2_body < c1_body * 0.5 and c2_body < c3_body * 0.5:
|
|
patterns.append("MORNING STAR (bullish reversal — doji middle candle)")
|
|
if c1["close"] > c1["open"] and c3["close"] < c3["open"]:
|
|
if c2_body < c1_body * 0.5 and c2_body < c3_body * 0.5:
|
|
patterns.append("EVENING STAR (bearish reversal — doji middle candle)")
|
|
|
|
if not patterns:
|
|
patterns.append("No significant candlestick pattern detected")
|
|
return patterns
|
|
|
|
|
|
@mcp.tool()
|
|
def get_historical_candles(
|
|
ticker: str,
|
|
resolution: str = "D",
|
|
count: int = 30,
|
|
market_type: str = "stocks",
|
|
detect_patterns: bool = True,
|
|
from_date: str | None = None,
|
|
to_date: str | None = None,
|
|
) -> str:
|
|
"""Fetch historical OHLCV candlestick data for candle pattern and trend analysis.
|
|
|
|
Parameters:
|
|
ticker: Full ticker (e.g. "IDX:BBRI", "NASDAQ:AAPL", "IDX:BUMI").
|
|
resolution: Candle resolution — 1, 3, 5, 15, 30, 60, 240 (minutes),
|
|
D (daily, default), W (weekly), M (monthly).
|
|
count: Number of candles to fetch when no date range given (max 500, default 30).
|
|
market_type: Market type (stocks, crypto, forex, etc.).
|
|
detect_patterns: Whether to detect candlestick patterns (default True).
|
|
from_date: Start date/datetime (e.g. "2026-05-08" or "2026-05-17 14:00"). When
|
|
provided, fetches candles from this point instead of counting back from now.
|
|
to_date: End date/datetime (e.g. "2026-05-13" or "2026-05-17 14:05"). Defaults to
|
|
now when from_date is given but to_date is omitted.
|
|
"""
|
|
import calendar
|
|
|
|
def _parse_dt(s: str, end_of_period: bool = False) -> int:
|
|
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
|
|
try:
|
|
t = time.strptime(s, fmt)
|
|
ts = int(calendar.timegm(t))
|
|
# For date-only strings, end_of_period bumps to 23:59:59
|
|
if end_of_period and fmt == "%Y-%m-%d":
|
|
ts += 86399
|
|
return ts
|
|
except ValueError:
|
|
continue
|
|
raise ValueError(f"Unrecognised date/datetime format: {s!r}. Use YYYY-MM-DD or YYYY-MM-DD HH:MM")
|
|
|
|
from_ts: int | None = None
|
|
to_ts: int | None = None
|
|
if from_date:
|
|
from_ts = _parse_dt(from_date)
|
|
if to_date:
|
|
to_ts = _parse_dt(to_date, end_of_period=True)
|
|
elif from_date:
|
|
to_ts = int(time.time())
|
|
|
|
count = min(count, 500)
|
|
candles = _fetch_candles(ticker, resolution, count, from_ts=from_ts, to_ts=to_ts)
|
|
if not candles:
|
|
return f"No historical data returned for {ticker}"
|
|
|
|
# Summary stats
|
|
first, last = candles[0], candles[-1]
|
|
total_change = ((last["close"] - first["close"]) / first["close"]) * 100 if first["close"] else 0
|
|
highest = max(c["high"] for c in candles)
|
|
lowest = min(c["low"] for c in candles)
|
|
avg_vol = sum(c["volume"] for c in candles) / len(candles)
|
|
|
|
lines: list[str] = []
|
|
candle_label = f"{from_date} → {to_date or 'now'}" if from_date else f"{len(candles)} candles"
|
|
lines.append(f"📊 {ticker} — {candle_label} ({resolution} resolution)")
|
|
lines.append(f" Period: {time.strftime('%Y-%m-%d', time.gmtime(first['time']))}"
|
|
f" → {time.strftime('%Y-%m-%d', time.gmtime(last['time']))}")
|
|
lines.append(f" Latest: O={last['open']:.2f} H={last['high']:.2f} L={last['low']:.2f} C={last['close']:.2f}")
|
|
lines.append(f" Change: {total_change:.2f}% | Range: {highest:.2f} - {lowest:.2f} | Avg Vol: {avg_vol:,.0f}")
|
|
lines.append("")
|
|
|
|
# Current candle analysis
|
|
c = last
|
|
body = abs(c["close"] - c["open"])
|
|
upper_wick = c["high"] - max(c["open"], c["close"])
|
|
lower_wick = min(c["open"], c["close"]) - c["low"]
|
|
total = c["high"] - c["low"]
|
|
lines.append("── Latest Candle ──")
|
|
lines.append(f" Type: {'🟢 BULLISH' if c['close'] >= c['open'] else '🔴 BEARISH'}")
|
|
lines.append(f" Body: {body:.2f} | Upper Wick: {upper_wick:.2f} | Lower Wick: {lower_wick:.2f}")
|
|
if total > 0:
|
|
lines.append(f" Body Ratio: {body/total*100:.0f}% | Wick Ratio: {(upper_wick+lower_wick)/total*100:.0f}%")
|
|
|
|
if detect_patterns:
|
|
lines.append("")
|
|
lines.append("── Pattern Detection ──")
|
|
for p in _detect_candle_patterns(candles):
|
|
lines.append(f" • {p}")
|
|
|
|
# Simple trend analysis
|
|
if len(candles) >= 10:
|
|
lines.append("")
|
|
lines.append("── Trend (last 10 candles) ──")
|
|
recent = candles[-10:]
|
|
up_count = sum(1 for x in recent if x["close"] >= x["open"])
|
|
down_count = 10 - up_count
|
|
net = recent[-1]["close"] - recent[0]["close"]
|
|
lines.append(f" Bullish candles: {up_count} | Bearish candles: {down_count}")
|
|
lines.append(f" Net change: {net:+.2f} ({net/recent[0]['close']*100:+.2f}%)")
|
|
if up_count >= 7:
|
|
lines.append(" → Strong bullish momentum")
|
|
elif up_count >= 6:
|
|
lines.append(" → Moderate bullish bias")
|
|
elif down_count >= 7:
|
|
lines.append(" → Strong bearish momentum")
|
|
elif down_count >= 6:
|
|
lines.append(" → Moderate bearish bias")
|
|
else:
|
|
lines.append(" → Sideways / indecision")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tools
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def get_stock_quotes(
|
|
tickers: list[str],
|
|
columns: list[str] | None = None,
|
|
market_type: str = "stocks",
|
|
market_country: str | None = None,
|
|
sessionid: str | None = None,
|
|
) -> str:
|
|
"""Get quote data for specific ticker symbols across any market.
|
|
|
|
Examples:
|
|
- get_stock_quotes(tickers=["NASDAQ:NVDA", "NASDAQ:AAPL"])
|
|
- get_stock_quotes(tickers=["BINANCE:BTCUSDT"], market_type="crypto")
|
|
- get_stock_quotes(tickers=["NYSE:SPY"], columns=["name","close","volume","RSI"])
|
|
"""
|
|
q = _get_market_factory(market_type, market_country)
|
|
if columns:
|
|
q = q.select(*columns)
|
|
q = q.set_tickers(*tickers)
|
|
total, data = _exec_query(q, sessionid)
|
|
return f"Found {total} result(s)\n{data}"
|
|
|
|
|
|
@mcp.tool()
|
|
def screen_market(
|
|
columns: list[str] | None = None,
|
|
filters: list[dict[str, Any]] | None = None,
|
|
order_by: str | None = None,
|
|
ascending: bool = False,
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
market_type: str = "stocks",
|
|
market_country: str | None = None,
|
|
sessionid: str | None = None,
|
|
) -> str:
|
|
"""General-purpose screener across any market type.
|
|
|
|
Parameters:
|
|
columns: Field names to return (e.g. ["name","close","RSI","volume"]).
|
|
filters: Structured filter conditions.
|
|
Simple condition: {"field": "<name>", "operator": "<op>", "value": <value>}
|
|
Nested group: {"operator": "and"|"or", "filters": [<conditions>]}
|
|
Operators: >, >=, <, <=, ==, !=, between, not_between, isin, not_in,
|
|
has, has_none_of, like, not_like, empty, not_empty,
|
|
crosses, crosses_above, crosses_below,
|
|
above_pct, below_pct, between_pct,
|
|
in_day_range, in_week_range, in_month_range
|
|
Examples:
|
|
[{"field": "RSI", "operator": ">", "value": 70}]
|
|
[{"field": "close", "operator": "between", "value": [100, 200]}]
|
|
[{"field": "exchange", "operator": "isin", "value": ["NASDAQ","NYSE"]}]
|
|
[{"field": "MACD.macd", "operator": "crosses_above", "value": "MACD.signal"}]
|
|
Nested: [{"operator": "or", "filters": [
|
|
{"field": "RSI", "operator": ">", "value": 70},
|
|
{"field": "RSI", "operator": "<", "value": 30}
|
|
]}]
|
|
order_by: Field to sort by (e.g. "volume", "market_cap_basic").
|
|
ascending: Sort ascending (default False = descending).
|
|
limit: Max rows to return (1-1000, default 50).
|
|
offset: Row offset for pagination.
|
|
market_type: Market type: stocks, crypto, forex, futures, bond, cfd, coin, crypto_dex, options.
|
|
market_country: For stocks, a country name (e.g. "america", "india", "germany").
|
|
For options, the underlying symbol (e.g. "CME_MINI:ESM2026").
|
|
sessionid: Optional TradingView session ID for real-time data.
|
|
"""
|
|
q = _get_market_factory(market_type, market_country)
|
|
if columns:
|
|
q = q.select(*columns)
|
|
q = _apply_filters(q, filters)
|
|
if order_by:
|
|
q = q.order_by(order_by, ascending=ascending)
|
|
q = q.limit(limit).offset(offset)
|
|
total, data = _exec_query(q, sessionid)
|
|
return f"Total: {total}\n{data}"
|
|
|
|
|
|
@mcp.tool()
|
|
def find_top_gainers(
|
|
limit: int = 20,
|
|
min_price: float | None = None,
|
|
market_type: str = "stocks",
|
|
market_country: str | None = None,
|
|
sessionid: str | None = None,
|
|
) -> str:
|
|
"""Find top gainers by percent change in any market."""
|
|
q = _get_market_factory(market_type, market_country)
|
|
q = q.select("name", "close", "change", "change_abs", "volume")
|
|
filters = []
|
|
if min_price is not None:
|
|
filters.append({"field": "close", "operator": ">", "value": min_price})
|
|
q = _apply_filters(q, filters)
|
|
q = q.order_by("change", ascending=False).limit(limit)
|
|
total, data = _exec_query(q, sessionid)
|
|
return f"Total: {total}\n{data}"
|
|
|
|
|
|
@mcp.tool()
|
|
def find_top_losers(
|
|
limit: int = 20,
|
|
min_price: float | None = None,
|
|
market_type: str = "stocks",
|
|
market_country: str | None = None,
|
|
sessionid: str | None = None,
|
|
) -> str:
|
|
"""Find top losers by percent change in any market."""
|
|
q = _get_market_factory(market_type, market_country)
|
|
q = q.select("name", "close", "change", "change_abs", "volume")
|
|
filters = []
|
|
if min_price is not None:
|
|
filters.append({"field": "close", "operator": ">", "value": min_price})
|
|
q = _apply_filters(q, filters)
|
|
q = q.order_by("change", ascending=True).limit(limit)
|
|
total, data = _exec_query(q, sessionid)
|
|
return f"Total: {total}\n{data}"
|
|
|
|
|
|
@mcp.tool()
|
|
def find_most_active(
|
|
limit: int = 20,
|
|
min_price: float | None = None,
|
|
market_type: str = "stocks",
|
|
market_country: str | None = None,
|
|
sessionid: str | None = None,
|
|
) -> str:
|
|
"""Find most active instruments by volume."""
|
|
q = _get_market_factory(market_type, market_country)
|
|
q = q.select("name", "close", "change", "volume")
|
|
filters = []
|
|
if min_price is not None:
|
|
filters.append({"field": "close", "operator": ">", "value": min_price})
|
|
q = _apply_filters(q, filters)
|
|
q = q.order_by("volume", ascending=False).limit(limit)
|
|
total, data = _exec_query(q, sessionid)
|
|
return f"Total: {total}\n{data}"
|
|
|
|
|
|
@mcp.tool()
|
|
def technical_scan(
|
|
filters: list[dict[str, Any]],
|
|
columns: list[str] | None = None,
|
|
limit: int = 50,
|
|
market_type: str = "stocks",
|
|
market_country: str | None = None,
|
|
sessionid: str | None = None,
|
|
) -> str:
|
|
"""Screen using technical indicator conditions.
|
|
|
|
Example filters:
|
|
[{"field": "RSI", "operator": "<", "value": 30}, # oversold
|
|
{"field": "MACD.macd", "operator": "crosses_above", "value": "MACD.signal"}, # bullish MACD
|
|
{"field": "close", "operator": "above_pct", "value": ["SMA50", 1.02]}] # 2% above SMA50
|
|
"""
|
|
default_cols = ["name", "close", "change", "volume", "RSI", "MACD.macd", "SMA50", "VWAP"]
|
|
q = _get_market_factory(market_type, market_country)
|
|
q = q.select(*(columns or default_cols))
|
|
q = _apply_filters(q, filters)
|
|
q = q.limit(limit)
|
|
total, data = _exec_query(q, sessionid)
|
|
return f"Total: {total}\n{data}"
|
|
|
|
|
|
@mcp.tool()
|
|
def fundamental_scan(
|
|
filters: list[dict[str, Any]],
|
|
columns: list[str] | None = None,
|
|
limit: int = 50,
|
|
market_type: str = "stocks",
|
|
market_country: str | None = None,
|
|
sessionid: str | None = None,
|
|
) -> str:
|
|
"""Screen by fundamental metrics (market cap, P/E, dividend, sector, etc.).
|
|
|
|
Example filters:
|
|
[{"field": "market_cap_basic", "operator": "between", "value": [1e9, 1e11}],
|
|
{"field": "price_earnings_ttm", "operator": "<", "value": 20},
|
|
{"field": "dividends_yield_current", "operator": ">", "value": 2},
|
|
{"field": "sector", "operator": "isin", "value": ["Technology", "Healthcare"]}]
|
|
"""
|
|
default_cols = ["name", "close", "market_cap_basic", "price_earnings_ttm",
|
|
"dividends_yield_current", "sector", "industry"]
|
|
q = _get_market_factory(market_type, market_country)
|
|
q = q.select(*(columns or default_cols))
|
|
q = _apply_filters(q, filters)
|
|
q = q.limit(limit)
|
|
total, data = _exec_query(q, sessionid)
|
|
return f"Total: {total}\n{data}"
|
|
|
|
|
|
@mcp.tool()
|
|
def list_markets() -> str:
|
|
"""List all available market types and stock-country markets."""
|
|
markets = {
|
|
"asset_types": [
|
|
{"id": "stocks", "description": "Stocks (common, preferred, DRs, funds) — use market_country param"},
|
|
{"id": "crypto", "description": "Centralised-exchange crypto pairs"},
|
|
{"id": "crypto_dex", "description": "Decentralised-exchange crypto pairs (USD)"},
|
|
{"id": "coin", "description": "CoinMarketCap crypto coins"},
|
|
{"id": "forex", "description": "Forex currency pairs"},
|
|
{"id": "futures", "description": "Futures contracts"},
|
|
{"id": "bond", "description": "Bonds"},
|
|
{"id": "cfd", "description": "Contracts for Difference"},
|
|
{"id": "options", "description": "Options (use market_country param for underlying symbol)"},
|
|
],
|
|
"stock_countries": STOCK_COUNTRY_MARKETS,
|
|
}
|
|
return str(markets)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_fields(category: str | None = None) -> str:
|
|
"""List available screener fields, optionally filtered by category.
|
|
|
|
Categories: price, technical, fundamental, general
|
|
If no category given, returns all categories.
|
|
"""
|
|
if category:
|
|
cat = category.lower()
|
|
if cat in FIELD_CATEGORIES:
|
|
return str(FIELD_CATEGORIES[cat])
|
|
return f"Category '{category}' not found. Available: price, technical, fundamental, general"
|
|
return str(FIELD_CATEGORIES)
|
|
|
|
|
|
@mcp.tool()
|
|
def set_session(sessionid: str) -> str:
|
|
"""Store a TradingView session ID for real-time data access.
|
|
|
|
Get your sessionid from:
|
|
1. Go to tradingview.com and log in
|
|
2. Open DevTools > Application > Cookies > tradingview.com
|
|
3. Copy the 'sessionid' cookie value
|
|
"""
|
|
global _session_cookies
|
|
_session_cookies = {"sessionid": sessionid}
|
|
return "Session ID stored successfully"
|
|
|
|
|
|
@mcp.tool()
|
|
def clear_session() -> str:
|
|
"""Clear stored TradingView session ID."""
|
|
global _session_cookies
|
|
_session_cookies = None
|
|
return "Session ID cleared"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Holiday Checker (api.co.id) — SQLite backed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
import sqlite3
|
|
from datetime import date as dt_date, datetime
|
|
from pathlib import Path
|
|
|
|
_HOLIDAYS_API_KEY: str | None = None
|
|
_HOLIDAYS_DB: str | None = None
|
|
|
|
|
|
def _get_holidays_api_key() -> str | None:
|
|
global _HOLIDAYS_API_KEY
|
|
if _HOLIDAYS_API_KEY is None:
|
|
_HOLIDAYS_API_KEY = os.environ.get("API_CO_ID_KEY") or ""
|
|
return _HOLIDAYS_API_KEY or None
|
|
|
|
|
|
def _get_holidays_db_path() -> str:
|
|
global _HOLIDAYS_DB
|
|
if _HOLIDAYS_DB is None:
|
|
_HOLIDAYS_DB = os.environ.get("HOLIDAYS_DB_PATH") or str(Path(__file__).parent / "holidays.db")
|
|
return _HOLIDAYS_DB
|
|
|
|
|
|
def _init_holidays_db() -> sqlite3.Connection:
|
|
db_path = _get_holidays_db_path()
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS holidays (
|
|
date TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
type TEXT NOT NULL,
|
|
is_national INTEGER NOT NULL DEFAULT 1
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_holidays_date ON holidays(date)")
|
|
conn.commit()
|
|
return conn
|
|
|
|
|
|
def _cache_holidays(items: list[dict]) -> None:
|
|
conn = _init_holidays_db()
|
|
for h in items:
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO holidays (date, name, type, is_national) VALUES (?, ?, ?, ?)",
|
|
(h["date"], h["name"], h.get("type", "Public Holiday"), 1 if h.get("is_national_holiday", True) else 0),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _query_cached_holidays(
|
|
start_date: str = "",
|
|
end_date: str = "",
|
|
limit: int = 0,
|
|
) -> list[dict]:
|
|
conn = _init_holidays_db()
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
if start_date and end_date:
|
|
rows = conn.execute(
|
|
"SELECT date, name, type, is_national FROM holidays WHERE date >= ? AND date <= ? ORDER BY date",
|
|
(start_date, end_date),
|
|
).fetchall()
|
|
elif start_date:
|
|
rows = conn.execute(
|
|
"SELECT date, name, type, is_national FROM holidays WHERE date >= ? ORDER BY date",
|
|
(start_date,),
|
|
).fetchall()
|
|
elif limit:
|
|
rows = conn.execute(
|
|
"SELECT date, name, type, is_national FROM holidays WHERE date >= ? ORDER BY date LIMIT ?",
|
|
(today, limit),
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT date, name, type, is_national FROM holidays WHERE date >= ? ORDER BY date",
|
|
(today,),
|
|
).fetchall()
|
|
conn.close()
|
|
return [
|
|
{"date": r[0], "name": r[1], "type": r[2], "is_national_holiday": bool(r[3])}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def _fetch_and_cache_holidays(year: int) -> list[dict]:
|
|
"""Fetch holidays for a year from API and cache them."""
|
|
api_key = _get_holidays_api_key()
|
|
if not api_key:
|
|
return []
|
|
|
|
resp = requests.get(
|
|
"https://use.api.co.id/holidays/indonesia/",
|
|
headers={"x-api-co-id": api_key},
|
|
params={"year": year},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code != 200:
|
|
return []
|
|
|
|
data = resp.json()
|
|
if not data.get("is_success"):
|
|
return []
|
|
|
|
items = data.get("data", [])
|
|
if not isinstance(items, list):
|
|
items = [items]
|
|
|
|
if items:
|
|
_cache_holidays(items)
|
|
return items
|
|
|
|
|
|
def _format_holidays(items: list[dict]) -> str:
|
|
if not items:
|
|
return "No holidays found."
|
|
|
|
today = datetime.now().date()
|
|
lines: list[str] = []
|
|
lines.append(f"{'Date':<15} {'Day':<10} {'Holiday':<45} {'Type':<20}")
|
|
lines.append("-" * 90)
|
|
for h in items:
|
|
d_str = h.get("date", "?")
|
|
name = h.get("name", "?")
|
|
typ = h.get("type", "?")
|
|
try:
|
|
dt = datetime.strptime(d_str, "%Y-%m-%d").date()
|
|
day_name = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()]
|
|
delta = (dt - today).days
|
|
if delta == 0:
|
|
suffix = " ⬥ TODAY"
|
|
elif delta == 1:
|
|
suffix = " ⬥ Tomorrow"
|
|
elif delta > 0:
|
|
suffix = f" ({delta}d away)"
|
|
elif delta < 0:
|
|
suffix = f" ({abs(delta)}d ago)"
|
|
else:
|
|
suffix = ""
|
|
except ValueError:
|
|
day_name = "?"
|
|
suffix = ""
|
|
lines.append(f"{d_str:<15} {day_name:<10} {name:<45} {typ:<20}{suffix}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def check_holidays(
|
|
mode: str = "upcoming",
|
|
year: int | None = None,
|
|
date: str | None = None,
|
|
start_date: str | None = None,
|
|
end_date: str | None = None,
|
|
limit: int = 10,
|
|
refresh: bool = False,
|
|
) -> str:
|
|
"""Check Indonesian public holidays (cached in SQLite).
|
|
|
|
Modes:
|
|
upcoming (default): next N holidays
|
|
year: all holidays for a given year (pass `year`)
|
|
check: check if a specific date is a holiday (pass `date`, format YYYY-MM-DD)
|
|
range: holidays in a date range (pass `start_date` & `end_date`)
|
|
|
|
Pass refresh=True to re-fetch from API instead of using cache.
|
|
Requires API_CO_ID_KEY env var to be set for API calls.
|
|
"""
|
|
match mode:
|
|
case "check":
|
|
if not date:
|
|
return "Error: `date` param required (YYYY-MM-DD)"
|
|
# Try cache first
|
|
cached = _query_cached_holidays(start_date=date, end_date=date)
|
|
if cached and not refresh:
|
|
return _format_holidays(cached)
|
|
# Fall back to API
|
|
api_key = _get_holidays_api_key()
|
|
if not api_key:
|
|
return "⚠ No cached data and API_CO_ID_KEY not set."
|
|
resp = requests.get(
|
|
"https://use.api.co.id/holidays/indonesia/check/date",
|
|
headers={"x-api-co-id": api_key},
|
|
params={"date": date},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code != 200:
|
|
return f"API error: HTTP {resp.status_code}"
|
|
data = resp.json()
|
|
if not data.get("is_success"):
|
|
return f"API error: {data.get('message', 'unknown')}"
|
|
d = data.get("data", {})
|
|
if not d or not d.get("is_holiday"):
|
|
return f"{date} is NOT a holiday."
|
|
items = [d.get("holiday", {})]
|
|
return _format_holidays(items)
|
|
|
|
case "range":
|
|
if not start_date or not end_date:
|
|
return "Error: `start_date` and `end_date` required"
|
|
cached = _query_cached_holidays(start_date=start_date, end_date=end_date)
|
|
if cached and not refresh:
|
|
return _format_holidays(cached)
|
|
api_key = _get_holidays_api_key()
|
|
if not api_key:
|
|
return "⚠ No cached data and API_CO_ID_KEY not set."
|
|
resp = requests.get(
|
|
"https://use.api.co.id/holidays/indonesia/check/range",
|
|
headers={"x-api-co-id": api_key},
|
|
params={"start_date": start_date, "end_date": end_date},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code != 200:
|
|
return f"API error: HTTP {resp.status_code}"
|
|
data = resp.json()
|
|
if not data.get("is_success"):
|
|
return f"API error: {data.get('message', 'unknown')}"
|
|
items = data.get("data", [])
|
|
if not isinstance(items, list):
|
|
items = [items]
|
|
return _format_holidays(items)
|
|
|
|
case "year":
|
|
y = year or datetime.now().year
|
|
cached = _query_cached_holidays(start_date=f"{y}-01-01", end_date=f"{y}-12-31")
|
|
if cached and not refresh:
|
|
return _format_holidays(cached)
|
|
items = _fetch_and_cache_holidays(y)
|
|
return _format_holidays(items)
|
|
|
|
case _: # upcoming
|
|
cached = _query_cached_holidays(limit=limit)
|
|
if cached and not refresh:
|
|
return _format_holidays(cached)
|
|
# Fetch current & next year to build cache, then return from cache
|
|
current_year = datetime.now().year
|
|
_fetch_and_cache_holidays(current_year)
|
|
_fetch_and_cache_holidays(current_year + 1)
|
|
cached = _query_cached_holidays(limit=limit)
|
|
return _format_holidays(cached) if cached else "No upcoming holidays found."
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="TradingView Screener MCP Server")
|
|
parser.add_argument(
|
|
"--transport",
|
|
choices=["stdio", "sse", "streamable-http"],
|
|
default="stdio",
|
|
help="Transport protocol (default: stdio)",
|
|
)
|
|
parser.add_argument("--host", default=None, help="Host to bind (default: 127.0.0.1)")
|
|
parser.add_argument("--port", type=int, default=None, help="Port to bind (default: 8000)")
|
|
args = parser.parse_args()
|
|
if args.host:
|
|
mcp.settings.host = args.host
|
|
if args.host not in ("127.0.0.1", "localhost", "::1"):
|
|
# FastMCP auto-sets restrictive transport_security for localhost at init time.
|
|
# Reset it when binding to a non-localhost address so external hosts
|
|
# (e.g. host.docker.internal) are not rejected with 421.
|
|
mcp.settings.transport_security = None
|
|
if args.port:
|
|
mcp.settings.port = args.port
|
|
mcp.run(transport=args.transport)
|