forked from redactedCHAD/sol-arb-st-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
118 lines (97 loc) · 4.71 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from flask import Flask, request, jsonify
import requests
from dotenv import load_dotenv
import os
import schedule
import time
from threading import Thread
load_dotenv() # Load environment variables from .env file
app = Flask(__name__)
ZETA_API_KEY = os.getenv('ZETA_API_KEY')
HELLO_MOON_API_KEY = os.getenv('HELLO_MOON_API_KEY')
BIRDEYE_API_KEY = os.getenv('BIRDEYE_API_KEY')
DRIFT_API_KEY = os.getenv('DRIFT_API_KEY')
def fetch_data():
# Fetch data from various APIs
url_jupiter = "https://public.jupiterapi.com/v1/quote"
params_jupiter = {"inputMint": "So11111111111111111111111111111111111111112", "outputMint": "USDC", "amount": "1000000", "slippage": 1, "onlyDirectRoutes": True}
response_jupiter = requests.get(url_jupiter, params=params_jupiter)
data_jupiter = response_jupiter.json()
url_zeta = "https://api.zeta.markets/price"
headers_zeta = {"Authorization": f"Bearer {ZETA_API_KEY}"}
response_zeta = requests.get(url_zeta, headers=headers_zeta)
data_zeta = response_zeta.json()
url_hello_moon = "https://api.hellomoon.io/price"
headers_hello_moon = {"Authorization": f"Bearer {HELLO_MOON_API_KEY}"}
response_hello_moon = requests.get(url_hello_moon, headers=headers_hello_moon)
data_hello_moon = response_hello_moon.json()
url_birdeye = "https://public-api.birdeye.so/price"
headers_birdeye = {"Authorization": f"Bearer {BIRDEYE_API_KEY}"}
response_birdeye = requests.get(url_birdeye, headers=headers_birdeye)
data_birdeye = response_birdeye.json()
url_drift = "https://api.drift.trade/v1/markets"
headers_drift = {"Authorization": f"Bearer {DRIFT_API_KEY}"}
response_drift = requests.get(url_drift, headers=headers_drift)
data_drift = response_drift.json()
return {
"jupiter": data_jupiter,
"zeta": data_zeta,
"hello_moon": data_hello_moon,
"birdeye": data_birdeye,
"drift": data_drift
}
def identify_arbitrage_opportunities(data):
# Placeholder for arbitrage logic
# Implement your logic to identify arbitrage opportunities
# Compare prices from different sources and calculate potential profit
opportunities = []
jupiter_prices = data['jupiter']
zeta_prices = data['zeta']
hello_moon_prices = data['hello_moon']
birdeye_prices = data['birdeye']
drift_prices = data['drift']
# Example logic to find arbitrage opportunities
# Assuming all APIs return data in a similar structure for simplicity
for token in jupiter_prices:
if token in zeta_prices and token in hello_moon_prices and token in birdeye_prices and token in drift_prices:
jupiter_price = jupiter_prices[token]['price']
zeta_price = zeta_prices[token]['price']
hello_moon_price = hello_moon_prices[token]['price']
birdeye_price = birdeye_prices[token]['price']
drift_price = drift_prices[token]['price']
prices = [jupiter_price, zeta_price, hello_moon_price, birdeye_price, drift_price]
buy_price = min(prices)
sell_price = max(prices)
if sell_price > buy_price:
potential_profit = (sell_price - buy_price) * 1 # Assuming 1 token for simplicity
opportunities.append({
"token": token,
"buy_price": buy_price,
"sell_price": sell_price,
"potential_profit": potential_profit
})
return opportunities
def execute_trade():
data = fetch_data()
opportunities = identify_arbitrage_opportunities(data)
# If profitable opportunities are found, execute the trades
for opportunity in opportunities:
print(f"Executing trade for {opportunity['token']} with potential profit: {opportunity['potential_profit']}")
@app.route('/api/get_best_route', methods=['GET'])
def get_best_route():
input_token = request.args.get('input_token')
output_token = request.args.get('output_token')
amount = request.args.get('amount')
data = fetch_data()
# Implement logic to find the best route based on fetched data
best_route = {} # Placeholder for best route logic
return jsonify(best_route)
def run_scheduler():
schedule.every(10).seconds.do(execute_trade) # Run every 10 seconds
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
thread = Thread(target=run_scheduler)
thread.start()
app.run(port=os.getenv('FLASK_RUN_PORT', 5000))