Add get_historical_candles tool with candlestick pattern detection

Fetches historical OHLCV data from TradingView's chart API for
candlestick pattern analysis (doji, engulfing, hammer, etc.)
and trend analysis over N candles.
This commit is contained in:
achmad
2026-05-17 13:28:31 +07:00
parent f34f77dc54
commit ac6463accb
+234
View File
@@ -1,13 +1,17 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import math
import os import os
import time
from typing import Any, cast from typing import Any, cast
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
import requests
from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp import FastMCP
from tradingview_screener.column import col from tradingview_screener.column import col
from tradingview_screener.query import And, Or, Query from tradingview_screener.query import And, Or, Query
@@ -269,6 +273,236 @@ def _get_market_factory(market_type: str, country_or_param: str | None = None) -
return fact() return fact()
# ---------------------------------------------------------------------------
# Historical Candles & Chart Rendering
# ---------------------------------------------------------------------------
TV_CHART_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Origin": "https://www.tradingview.com",
"Referer": "https://www.tradingview.com/",
}
def _resolve_symbol(ticker: str, market_type: str) -> str:
if ":" in ticker:
return ticker
if market_type == "crypto":
return f"BINANCE:{ticker}"
return ticker
def _fetch_candles(
ticker: str,
resolution: str = "D",
count: int = 30,
) -> list[dict[str, Any]]:
sym = _resolve_symbol(ticker, "stocks")
to_ts = int(time.time())
resolution_seconds = {"1": 60, "5": 300, "15": 900, "30": 1800, "60": 3600,
"240": 14400, "D": 86400, "W": 604800, "M": 2592000}
from_ts = to_ts - (count * resolution_seconds.get(resolution, 86400))
url = "https://chart-data.tradingview.com/history"
params = {"symbol": sym, "resolution": resolution, "from": from_ts, "to": to_ts}
resp = requests.get(url, headers=TV_CHART_HEADERS, params=params, timeout=15)
data = resp.json()
if data.get("s") != "ok":
return []
candles = []
for i in range(len(data["t"])):
candles.append({
"time": data["t"][i],
"open": data["o"][i],
"high": data["h"][i],
"low": data["l"][i],
"close": data["c"][i],
"volume": data["v"][i],
})
return candles
def _detect_candle_patterns(candles: list[dict[str, Any]]) -> list[str]:
patterns: list[str] = []
if len(candles) < 2:
return patterns
c = candles[-1]
body = abs(c["close"] - c["open"])
upper = c["high"] - max(c["open"], c["close"])
lower = min(c["open"], c["close"]) - c["low"]
total_range = c["high"] - c["low"]
# Doji
if total_range > 0 and body / total_range < 0.1:
patterns.append("DOJI (indecision / potential reversal)")
# Hammer / Shooting Star
if body > 0 and total_range > 0:
upper_pct = upper / total_range * 100
lower_pct = lower / total_range * 100
body_pct = body / total_range * 100
if lower_pct > 60 and upper_pct < 10 and body_pct < 30:
if c["close"] < c["open"]:
patterns.append("HAMMER (potential bullish reversal at bottom)")
else:
patterns.append("INVERTED HAMMER (potential bullish reversal at bottom)")
if upper_pct > 60 and lower_pct < 10 and body_pct < 30:
if c["close"] > c["open"]:
patterns.append("SHOOTING STAR (potential bearish reversal at top)")
else:
patterns.append("HANGING MAN (potential bearish reversal at top)")
# Marubozu
if upper < total_range * 0.05 and lower < total_range * 0.05 and body > total_range * 0.8:
if c["close"] > c["open"]:
patterns.append("MARUBOZU (strong bullish — no wicks)")
else:
patterns.append("MARUBOZU (strong bearish — no wicks)")
# Engulfing
if len(candles) >= 2:
p = candles[-2]
p_body = abs(p["close"] - p["open"])
if p_body > 0 and body > 0:
if c["close"] > c["open"] and c["open"] < p["close"] and c["close"] > p["open"] and body > p_body * 1.2:
patterns.append("BULLISH ENGULFING (strong reversal up)")
if c["close"] < c["open"] and c["open"] > p["close"] and c["close"] < p["open"] and body > p_body * 1.2:
patterns.append("BEARISH ENGULFING (strong reversal down)")
# Three white soldiers / Three black crows
if len(candles) >= 3:
c1, c2, c3 = candles[-3], candles[-2], candles[-1]
if all(x["close"] > x["open"] for x in (c1, c2, c3)):
b1, b2, b3 = c1["close"] - c1["open"], c2["close"] - c2["open"], c3["close"] - c3["open"]
if b2 > 0 and b3 > 0 and b2 >= b1 * 0.9 and b3 >= b2 * 0.9:
if c2["open"] > c1["open"] and c3["open"] > c2["open"]:
patterns.append("3 WHITE SOLDIERS (strong sustained bullish momentum)")
elif b2 > 0 and b3 > 0 and b2 < b1 * 0.7 and b3 < b2 * 0.7:
patterns.append("BULLISH IN CONCLUSION (rising but weakening momentum)")
if all(x["close"] < x["open"] for x in (c1, c2, c3)):
b1, b2, b3 = c1["open"] - c1["close"], c2["open"] - c2["close"], c3["open"] - c3["close"]
if b2 > 0 and b3 > 0 and b2 >= b1 * 0.9 and b3 >= b2 * 0.9:
if c2["open"] < c1["open"] and c3["open"] < c2["open"]:
patterns.append("3 BLACK CROWS (strong sustained bearish momentum)")
# Piercing / Dark Cloud
if len(candles) >= 2:
p = candles[-2]
if c["close"] > c["open"] and p["close"] < p["open"]:
mid_point = p["open"] - (p["open"] - p["close"]) / 2
if c["close"] > mid_point and c["open"] < p["close"]:
patterns.append("PIERCING PATTERN (bullish reversal)")
if c["close"] < c["open"] and p["close"] > p["open"]:
mid_point = p["close"] - (p["close"] - p["open"]) / 2
if c["close"] < mid_point and c["open"] > p["close"]:
patterns.append("DARK CLOUD COVER (bearish reversal)")
# Morning/Evening Star (3-candle)
if len(candles) >= 3:
c1, c2, c3 = candles[-3], candles[-2], candles[-1]
c1_body = abs(c1["close"] - c1["open"])
c2_body = abs(c2["close"] - c2["open"])
c3_body = abs(c3["close"] - c3["open"])
if c1["close"] < c1["open"] and c3["close"] > c3["open"]:
if c2_body < c1_body * 0.5 and c2_body < c3_body * 0.5:
patterns.append("MORNING STAR (bullish reversal — doji middle candle)")
if c1["close"] > c1["open"] and c3["close"] < c3["open"]:
if c2_body < c1_body * 0.5 and c2_body < c3_body * 0.5:
patterns.append("EVENING STAR (bearish reversal — doji middle candle)")
if not patterns:
patterns.append("No significant candlestick pattern detected")
return patterns
@mcp.tool()
def get_historical_candles(
ticker: str,
resolution: str = "D",
count: int = 30,
market_type: str = "stocks",
detect_patterns: bool = True,
) -> str:
"""Fetch historical OHLCV candlestick data for candle pattern and trend analysis.
Parameters:
ticker: Full ticker (e.g. "IDX:BBRI", "NASDAQ:AAPL", "IDX:BUMI").
resolution: Candle resolution — 1, 3, 5, 15, 30, 60, 240 (minutes),
D (daily, default), W (weekly), M (monthly).
count: Number of candles to fetch (max 500, default 30).
market_type: Market type (stocks, crypto, forex, etc.).
detect_patterns: Whether to detect candlestick patterns (default True).
"""
count = min(count, 500)
candles = _fetch_candles(ticker, resolution, count)
if not candles:
return f"No historical data returned for {ticker}"
# Summary stats
first, last = candles[0], candles[-1]
total_change = ((last["close"] - first["close"]) / first["close"]) * 100 if first["close"] else 0
highest = max(c["high"] for c in candles)
lowest = min(c["low"] for c in candles)
avg_vol = sum(c["volume"] for c in candles) / len(candles)
lines: list[str] = []
lines.append(f"📊 {ticker}{count}candle chart ({resolution} resolution)")
lines.append(f" Period: {time.strftime('%Y-%m-%d', time.gmtime(first['time']))}"
f"{time.strftime('%Y-%m-%d', time.gmtime(last['time']))}")
lines.append(f" Latest: O={last['open']:.2f} H={last['high']:.2f} L={last['low']:.2f} C={last['close']:.2f}")
lines.append(f" Change: {total_change:.2f}% | Range: {highest:.2f} - {lowest:.2f} | Avg Vol: {avg_vol:,.0f}")
lines.append("")
# Current candle analysis
c = last
body = abs(c["close"] - c["open"])
upper_wick = c["high"] - max(c["open"], c["close"])
lower_wick = min(c["open"], c["close"]) - c["low"]
total = c["high"] - c["low"]
lines.append("── Latest Candle ──")
lines.append(f" Type: {'🟢 BULLISH' if c['close'] >= c['open'] else '🔴 BEARISH'}")
lines.append(f" Body: {body:.2f} | Upper Wick: {upper_wick:.2f} | Lower Wick: {lower_wick:.2f}")
if total > 0:
lines.append(f" Body Ratio: {body/total*100:.0f}% | Wick Ratio: {(upper_wick+lower_wick)/total*100:.0f}%")
if detect_patterns:
lines.append("")
lines.append("── Pattern Detection ──")
for p in _detect_candle_patterns(candles):
lines.append(f"{p}")
# Simple trend analysis
if len(candles) >= 10:
lines.append("")
lines.append("── Trend (last 10 candles) ──")
recent = candles[-10:]
up_count = sum(1 for x in recent if x["close"] >= x["open"])
down_count = 10 - up_count
net = recent[-1]["close"] - recent[0]["close"]
lines.append(f" Bullish candles: {up_count} | Bearish candles: {down_count}")
lines.append(f" Net change: {net:+.2f} ({net/recent[0]['close']*100:+.2f}%)")
if up_count >= 7:
lines.append(" → Strong bullish momentum")
elif up_count >= 6:
lines.append(" → Moderate bullish bias")
elif down_count >= 7:
lines.append(" → Strong bearish momentum")
elif down_count >= 6:
lines.append(" → Moderate bearish bias")
else:
lines.append(" → Sideways / indecision")
return "\n".join(lines)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Tools # Tools
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------