Compare commits
5 Commits
87d8388810
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0f316e8149 | ||
|
|
ef37f17378 | ||
|
|
778bab644f | ||
|
|
7214e26168 | ||
|
|
32069f7e9d |
@@ -1,9 +1,13 @@
|
|||||||
package config;
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/docker/api/types/registry"
|
||||||
);
|
);
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@@ -15,10 +19,15 @@ type Config struct {
|
|||||||
LocalBasePath string
|
LocalBasePath string
|
||||||
BlazenaImageUrl string
|
BlazenaImageUrl string
|
||||||
RegistryAuth RegistryAuth
|
RegistryAuth RegistryAuth
|
||||||
|
EncodedRegistryAuth string
|
||||||
Constants struct{
|
Constants struct{
|
||||||
OverlayNetworkName string
|
OverlayNetworkName string
|
||||||
HelperServiceName string
|
HelperServiceName string
|
||||||
StorageContainerName string
|
StorageContainerName string
|
||||||
|
PrepullImageServiceName string
|
||||||
|
ServiceScaleTimeout time.Duration
|
||||||
|
SSHClientPKConfigName string
|
||||||
|
SSHHostSKSecretName string
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -39,7 +48,10 @@ func GetConfig()(Config, error){
|
|||||||
cfg.Constants.OverlayNetworkName = "blazenaPohar";
|
cfg.Constants.OverlayNetworkName = "blazenaPohar";
|
||||||
cfg.Constants.HelperServiceName = "blazenaHelper";
|
cfg.Constants.HelperServiceName = "blazenaHelper";
|
||||||
cfg.Constants.StorageContainerName = "blazenaStorage";
|
cfg.Constants.StorageContainerName = "blazenaStorage";
|
||||||
|
cfg.Constants.PrepullImageServiceName = "blazenaPrepull";
|
||||||
|
cfg.Constants.ServiceScaleTimeout = time.Second * 15;
|
||||||
|
cfg.Constants.SSHClientPKConfigName = "blazenaSSHClientPublicKey";
|
||||||
|
cfg.Constants.SSHHostSKSecretName = "blazenaSSHHostPrivateKey";
|
||||||
|
|
||||||
err = json.Unmarshal(rawConfig, &cfg);
|
err = json.Unmarshal(rawConfig, &cfg);
|
||||||
|
|
||||||
@@ -47,5 +59,19 @@ func GetConfig()(Config, error){
|
|||||||
return cfg, errors.New("Failed to unmarshal config: " + err.Error());
|
return cfg, errors.New("Failed to unmarshal config: " + err.Error());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
authConfig := registry.AuthConfig{
|
||||||
|
Username: cfg.RegistryAuth.Username,
|
||||||
|
Password: cfg.RegistryAuth.Password,
|
||||||
|
}
|
||||||
|
|
||||||
|
authJSON, err := json.Marshal(authConfig)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic("Failed to marshal auth config!"+ err.Error());
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.EncodedRegistryAuth = base64.StdEncoding.EncodeToString(authJSON);
|
||||||
|
|
||||||
|
|
||||||
return cfg, err;
|
return cfg, err;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,10 +2,10 @@ package docker
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/swarm"
|
"github.com/docker/docker/api/types/swarm"
|
||||||
@@ -20,44 +20,40 @@ func cleanup(w http.ResponseWriter, r *http.Request){
|
|||||||
|
|
||||||
if !bearerAuth(w, r) {return;}
|
if !bearerAuth(w, r) {return;}
|
||||||
|
|
||||||
rawBody, err := io.ReadAll(r.Body);
|
|
||||||
if err != nil {
|
|
||||||
panic("Failed to read body!");
|
|
||||||
}
|
|
||||||
|
|
||||||
var bodyDecoded struct{
|
|
||||||
ServiceId string `json:"serviceId"`
|
|
||||||
};
|
|
||||||
|
|
||||||
err = json.Unmarshal(rawBody, &bodyDecoded);
|
|
||||||
if err != nil {
|
|
||||||
panic("Failed to unmarshal json."+ err.Error());
|
|
||||||
}
|
|
||||||
|
|
||||||
listResoult, err := ApiClient.ServiceList(context.Background(), swarm.ServiceListOptions{});
|
listResoult, err := ApiClient.ServiceList(context.Background(), swarm.ServiceListOptions{});
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("Failed to list services."+ err.Error());
|
slog.Error("Failed to list services", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
var helperServiceId string;
|
var helperServiceId string;
|
||||||
|
var helperServices int;
|
||||||
|
|
||||||
for _, service := range listResoult{
|
for _, service := range listResoult{
|
||||||
if service.Spec.Labels["blazena.helper"] != "true" {
|
if service.Spec.Labels["blazena.helper"] != "true" {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
helperServiceId = service.ID;
|
helperServiceId = service.ID;
|
||||||
break;
|
helperServices ++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if helperServiceId == ""{
|
if helperServiceId == ""{
|
||||||
panic("Helper service not found!");
|
slog.Warn("Helper service wasn't found");
|
||||||
|
http.Error(w, "Internal Server Error", http.StatusInternalServerError);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if helperServices > 1{
|
||||||
|
slog.Error("There are more than 1 helper service.");
|
||||||
|
os.Exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ApiClient.ServiceRemove(context.Background(), helperServiceId);
|
err = ApiClient.ServiceRemove(context.Background(), helperServiceId);
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("Failed to remove helper service."+ err.Error());
|
panic("Failed to remove helper service."+ err.Error());
|
||||||
}
|
}
|
||||||
|
//TODO: add proper wait system
|
||||||
time.Sleep(7*time.Second);
|
time.Sleep(7*time.Second);
|
||||||
|
|
||||||
fmt.Fprint(w, bodyDecoded.ServiceId);
|
fmt.Fprint(w, helperServiceId);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -23,14 +22,16 @@ import (
|
|||||||
|
|
||||||
// Add mutex.
|
// Add mutex.
|
||||||
var ApiClient *client.Client;
|
var ApiClient *client.Client;
|
||||||
var scale sync.Map;
|
|
||||||
var token string = "12345";
|
var token string = "12345";
|
||||||
var theConfig cfg.Config;
|
var theConfig cfg.Config;
|
||||||
|
|
||||||
|
var theShutdownFuncPointer *context.CancelFunc;
|
||||||
|
|
||||||
type aService struct{
|
type aService struct{
|
||||||
ServiceId string `json:"serviceId"`;
|
ServiceId string `json:"serviceId"`;
|
||||||
VolumeNames []string `json:"volumeNames"`;
|
VolumeNames []string `json:"volumeNames"`;
|
||||||
Node string `json:"node"`;
|
Node string `json:"node"`;
|
||||||
|
Dependents []string `json:"dependents"`;
|
||||||
}
|
}
|
||||||
|
|
||||||
func Run(Config cfg.Config){
|
func Run(Config cfg.Config){
|
||||||
@@ -38,6 +39,7 @@ func Run(Config cfg.Config){
|
|||||||
theConfig = Config;
|
theConfig = Config;
|
||||||
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM);
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM);
|
||||||
|
theShutdownFuncPointer = &stop;
|
||||||
|
|
||||||
var err error;
|
var err error;
|
||||||
ApiClient, err = client.NewClientWithOpts(client.FromEnv);
|
ApiClient, err = client.NewClientWithOpts(client.FromEnv);
|
||||||
@@ -61,6 +63,7 @@ func Run(Config cfg.Config){
|
|||||||
Addr: ":1234",
|
Addr: ":1234",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
http.HandleFunc("/prepull", prepullImage);
|
||||||
http.HandleFunc("/services", listServices);
|
http.HandleFunc("/services", listServices);
|
||||||
http.HandleFunc("/scale/up", scaleUp);
|
http.HandleFunc("/scale/up", scaleUp);
|
||||||
http.HandleFunc("/scale/down", scaleDown);
|
http.HandleFunc("/scale/down", scaleDown);
|
||||||
@@ -159,6 +162,12 @@ func listServices(w http.ResponseWriter, r *http.Request){
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !contains(validNodeHostnames, settings["blazena.node"]) {
|
||||||
|
errMsg := "Node with hostname:'"+ settings["blazena.node"] +"' does not exist.";
|
||||||
|
slog.Warn("Invalid Service Config!", slog.String("serviceId", service.ID), slog.String("errMessage", errMsg));
|
||||||
|
continue SERVICES;
|
||||||
|
}
|
||||||
|
|
||||||
targetVolumes := strings.Split(settings["blazena.volumes"], ",");
|
targetVolumes := strings.Split(settings["blazena.volumes"], ",");
|
||||||
|
|
||||||
var validVolumeNames []string;
|
var validVolumeNames []string;
|
||||||
@@ -169,12 +178,6 @@ func listServices(w http.ResponseWriter, r *http.Request){
|
|||||||
validVolumeNames = append(validVolumeNames, mnt.Source);
|
validVolumeNames = append(validVolumeNames, mnt.Source);
|
||||||
}
|
}
|
||||||
|
|
||||||
if !contains(validNodeHostnames, settings["blazena.node"]) {
|
|
||||||
errMsg := "Node with hostname:'"+ settings["blazena.node"] +"' does not exist.";
|
|
||||||
slog.Warn("Invalid Service Config!", slog.String("serviceId", service.ID), slog.String("errMessage", errMsg));
|
|
||||||
continue SERVICES;
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, volume := range targetVolumes{
|
for _, volume := range targetVolumes{
|
||||||
if contains(validVolumeNames, volume){
|
if contains(validVolumeNames, volume){
|
||||||
continue;
|
continue;
|
||||||
@@ -185,10 +188,36 @@ func listServices(w http.ResponseWriter, r *http.Request){
|
|||||||
continue SERVICES;
|
continue SERVICES;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
var dependents []string;
|
||||||
|
|
||||||
|
if(settings["blazena.dependents"] != ""){
|
||||||
|
dependents = strings.Split(settings["blazena.dependents"], ",");
|
||||||
|
} else {
|
||||||
|
dependents = make([]string, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
var validDependents []string;
|
||||||
|
for _, x := range list{
|
||||||
|
validDependents = append(validDependents, x.Spec.Name);
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Debug("validDependents", slog.Any("value", validDependents));
|
||||||
|
slog.Debug("dependents", slog.Any("value", dependents));
|
||||||
|
|
||||||
|
for _, dependent := range dependents {
|
||||||
|
if contains(validDependents, dependent){continue;}
|
||||||
|
|
||||||
|
errMsg := "Dependent named '"+ dependent +"' was not found in this cluster!";
|
||||||
|
slog.Warn("Invalid Service Config!", slog.String("serviceId", service.ID), slog.String("errMessage", errMsg));
|
||||||
|
continue SERVICES;
|
||||||
|
}
|
||||||
|
|
||||||
services = append(services, aService{
|
services = append(services, aService{
|
||||||
ServiceId: service.ID,
|
ServiceId: service.ID,
|
||||||
VolumeNames: targetVolumes,
|
VolumeNames: targetVolumes,
|
||||||
Node: settings["blazena.node"],
|
Node: settings["blazena.node"],
|
||||||
|
Dependents: dependents,
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +1,14 @@
|
|||||||
package docker
|
package docker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"context"
|
"os"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/swarm"
|
"github.com/docker/docker/api/types/swarm"
|
||||||
|
|
||||||
"github.com/rony5394/blazena/shared"
|
"github.com/rony5394/blazena/shared"
|
||||||
@@ -32,23 +35,35 @@ func exchangeKeys(w http.ResponseWriter, r *http.Request){
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic("Failed to unmarshal json."+ err.Error());
|
panic("Failed to unmarshal json."+ err.Error());
|
||||||
}
|
}
|
||||||
sshPkPem := bodyDecoded.SshPkPem;
|
sshClientPkPem := bodyDecoded.SshPkPem;
|
||||||
hostKeypair := shared.GenerateSSHKeypair();
|
hostKeypair := shared.GenerateSSHKeypair();
|
||||||
|
|
||||||
encoded, err := json.Marshal(struct{HostPkPem string `json:"hostPkPem"`}{HostPkPem: hostKeypair.Public});
|
encoded, err := json.Marshal(struct{HostPkPem string `json:"hostPkPem"`}{HostPkPem: hostKeypair.Public});
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("I wonder how. I wonder why?"+err.Error());
|
slog.Error("Failed to marshal host pk into response.", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(42);
|
||||||
}
|
}
|
||||||
|
|
||||||
ApiClient.ConfigCreate(context.Background(), swarm.ConfigSpec{
|
_, err = ApiClient.ConfigCreate(context.Background(), swarm.ConfigSpec{
|
||||||
Data: []byte(sshPkPem),
|
Data: []byte(sshClientPkPem),
|
||||||
Annotations: swarm.Annotations{Name: "blazenaSSHPublicKey"},
|
Annotations: swarm.Annotations{Name: theConfig.Constants.SSHClientPKConfigName},
|
||||||
});
|
});
|
||||||
|
|
||||||
ApiClient.SecretCreate(context.Background(), swarm.SecretSpec{
|
if err != nil {
|
||||||
|
slog.Error("Failed to create a config.", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = ApiClient.SecretCreate(context.Background(), swarm.SecretSpec{
|
||||||
Data: []byte(hostKeypair.Private),
|
Data: []byte(hostKeypair.Private),
|
||||||
Annotations: swarm.Annotations{Name: "blazenaSSHHostPrivateKey"},
|
Annotations: swarm.Annotations{Name: theConfig.Constants.SSHHostSKSecretName},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("Failed to create a secret.", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
fmt.Fprint(w, string(encoded));
|
fmt.Fprint(w, string(encoded));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -53,6 +53,7 @@ func prepare(w http.ResponseWriter, r *http.Request){
|
|||||||
pullBlazenaImage();
|
pullBlazenaImage();
|
||||||
createHelper(theConfig, labels["blazena.node"], bodyDecoded.VolumeId);
|
createHelper(theConfig, labels["blazena.node"], bodyDecoded.VolumeId);
|
||||||
|
|
||||||
|
//TODO: add proper waiting system.
|
||||||
time.Sleep(7*time.Second);
|
time.Sleep(7*time.Second);
|
||||||
|
|
||||||
fmt.Fprint(w, bodyDecoded.ServiceId);
|
fmt.Fprint(w, bodyDecoded.ServiceId);
|
||||||
@@ -129,7 +130,7 @@ func createHelper(Config cfg.Config, targetNode string, targetVolume string){
|
|||||||
stopGracePeriod := time.Second * 5;
|
stopGracePeriod := time.Second * 5;
|
||||||
helperCommand := `/usr/sbin/sshd -h /host-key -p 2222 -D`;
|
helperCommand := `/usr/sbin/sshd -h /host-key -p 2222 -D`;
|
||||||
|
|
||||||
sshKeyConfigId, err := getConfigIDByName(ApiClient, "blazenaSSHPublicKey");
|
sshKeyConfigId, err := getConfigIDByName(ApiClient, theConfig.Constants.SSHClientPKConfigName);
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("Docker needs both id and name to mount config for some reason and getting id of it failed!"+err.Error());
|
panic("Docker needs both id and name to mount config for some reason and getting id of it failed!"+err.Error());
|
||||||
@@ -163,7 +164,7 @@ func createHelper(Config cfg.Config, targetNode string, targetVolume string){
|
|||||||
Configs: []*swarm.ConfigReference{
|
Configs: []*swarm.ConfigReference{
|
||||||
&swarm.ConfigReference{
|
&swarm.ConfigReference{
|
||||||
ConfigID: sshKeyConfigId,
|
ConfigID: sshKeyConfigId,
|
||||||
ConfigName: "blazenaSSHPublicKey",
|
ConfigName: theConfig.Constants.SSHClientPKConfigName,
|
||||||
File: &swarm.ConfigReferenceFileTarget{
|
File: &swarm.ConfigReferenceFileTarget{
|
||||||
Name: "/root/.ssh/authorized_keys",
|
Name: "/root/.ssh/authorized_keys",
|
||||||
Mode: 0600,
|
Mode: 0600,
|
||||||
@@ -175,7 +176,7 @@ func createHelper(Config cfg.Config, targetNode string, targetVolume string){
|
|||||||
Secrets: []*swarm.SecretReference{
|
Secrets: []*swarm.SecretReference{
|
||||||
&swarm.SecretReference{
|
&swarm.SecretReference{
|
||||||
SecretID: sshHostKeySecretId,
|
SecretID: sshHostKeySecretId,
|
||||||
SecretName: "blazenaSSHHostPrivateKey",
|
SecretName: theConfig.Constants.SSHHostSKSecretName,
|
||||||
File: &swarm.SecretReferenceFileTarget{
|
File: &swarm.SecretReferenceFileTarget{
|
||||||
Name: "/host-key",
|
Name: "/host-key",
|
||||||
Mode: 0600,
|
Mode: 0600,
|
||||||
|
|||||||
78
docker/prepullImage.go
Normal file
78
docker/prepullImage.go
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
package docker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/docker/api/types/swarm"
|
||||||
|
)
|
||||||
|
|
||||||
|
func prepullImage(w http.ResponseWriter, r *http.Request){
|
||||||
|
if r.Method != http.MethodPost{
|
||||||
|
w.WriteHeader(http.StatusMethodNotAllowed);
|
||||||
|
fmt.Fprint(w, "Method Not Allowed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bearerAuth(w, r){return;}
|
||||||
|
|
||||||
|
sc, err := ApiClient.ServiceCreate(context.Background(), swarm.ServiceSpec{
|
||||||
|
Annotations: swarm.Annotations{
|
||||||
|
Name: theConfig.Constants.PrepullImageServiceName,
|
||||||
|
Labels: map[string]string{"blazena.prepull": "true"},
|
||||||
|
},
|
||||||
|
Mode: swarm.ServiceMode{
|
||||||
|
GlobalJob: &swarm.GlobalJob{},
|
||||||
|
},
|
||||||
|
TaskTemplate: swarm.TaskSpec{
|
||||||
|
ContainerSpec: &swarm.ContainerSpec{
|
||||||
|
Image: theConfig.BlazenaImageUrl,
|
||||||
|
Command: []string{"sleep", "1s"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, swarm.ServiceCreateOptions{
|
||||||
|
QueryRegistry: true,
|
||||||
|
EncodedRegistryAuth: theConfig.EncodedRegistryAuth,
|
||||||
|
});
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("Failed to create prepull service", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("Started Prepull of blazena image.");
|
||||||
|
|
||||||
|
go func(){
|
||||||
|
CHECKLOOP: for {
|
||||||
|
time.Sleep(3*time.Second);
|
||||||
|
tasks, err := ApiClient.TaskList(context.Background(), swarm.TaskListOptions{});
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("Failed to list tasks.", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, task := range tasks {
|
||||||
|
if task.ServiceID != sc.ID {continue};
|
||||||
|
|
||||||
|
if task.Status.State != "complete" {
|
||||||
|
time.Sleep(1*time.Second);
|
||||||
|
continue CHECKLOOP
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ApiClient.ServiceRemove(context.Background(), sc.ID);
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("Failed to remove prepull service.", slog.Any("propagatedError", err));
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("Prepull Finished");
|
||||||
|
break CHECKLOOP;
|
||||||
|
}
|
||||||
|
}();
|
||||||
|
}
|
||||||
@@ -3,7 +3,10 @@ package docker
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
@@ -53,16 +56,16 @@ func scaleDown(w http.ResponseWriter, r *http.Request){
|
|||||||
newScale := uint64(0);
|
newScale := uint64(0);
|
||||||
updatedSpec.Mode.Replicated.Replicas = &newScale;
|
updatedSpec.Mode.Replicated.Replicas = &newScale;
|
||||||
updatedSpec.Labels["blazena.scaledDown"] = "true";
|
updatedSpec.Labels["blazena.scaledDown"] = "true";
|
||||||
|
updatedSpec.Labels["blazena.originalScale"] = strconv.FormatUint(*originalScale, 10);
|
||||||
scale.Store(serviceId, *originalScale);
|
|
||||||
|
|
||||||
_, err = ApiClient.ServiceUpdate(context.Background(), serviceId, inspectresoult.Version, updatedSpec, swarm.ServiceUpdateOptions{});
|
_, err = ApiClient.ServiceUpdate(context.Background(), serviceId, inspectresoult.Version, updatedSpec, swarm.ServiceUpdateOptions{});
|
||||||
if(err != nil){
|
if(err != nil){
|
||||||
panic("Failed to update service."+ err.Error());
|
panic("Failed to update service."+ err.Error());
|
||||||
}
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), theConfig.Constants.ServiceScaleTimeout);
|
||||||
|
defer cancel();
|
||||||
|
|
||||||
//TODO: Add proper wait system
|
waitForScale(serviceId, ctx, 0);
|
||||||
time.Sleep(15 * time.Second);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func scaleUp(w http.ResponseWriter, r *http.Request){
|
func scaleUp(w http.ResponseWriter, r *http.Request){
|
||||||
@@ -99,22 +102,67 @@ func scaleUp(w http.ResponseWriter, r *http.Request){
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
originalScale, ok := scale.Load(serviceId);
|
originalScale := inspectresoult.Spec.Labels["blazena.originalScale"];
|
||||||
if(!ok){
|
|
||||||
|
if(originalScale == ""){
|
||||||
panic("Its not okay!");
|
panic("Its not okay!");
|
||||||
}
|
}
|
||||||
|
|
||||||
originalScaleChecked, ok := originalScale.(uint64);
|
originalScaleChecked, err := strconv.ParseUint(originalScale, 10, 64);
|
||||||
if(!ok){
|
if(err != nil){
|
||||||
panic("Its very not okay!")
|
panic("Its very not okay!"+ err.Error())
|
||||||
}
|
}
|
||||||
updatedSpec := inspectresoult.Spec;
|
updatedSpec := inspectresoult.Spec;
|
||||||
|
|
||||||
updatedSpec.Mode.Replicated.Replicas = &originalScaleChecked;
|
updatedSpec.Mode.Replicated.Replicas = &originalScaleChecked;
|
||||||
delete(updatedSpec.Labels, "blazena.scaledDown");
|
delete(updatedSpec.Labels, "blazena.scaledDown");
|
||||||
|
delete(updatedSpec.Labels, "blazena.originalScale");
|
||||||
|
|
||||||
ApiClient.ServiceUpdate(context.Background(), serviceId, inspectresoult.Version, updatedSpec, swarm.ServiceUpdateOptions{});
|
_, err = ApiClient.ServiceUpdate(context.Background(), serviceId, inspectresoult.Version, updatedSpec, swarm.ServiceUpdateOptions{});
|
||||||
|
|
||||||
//TODO: Add proper wait system
|
if err != nil {
|
||||||
time.Sleep(15 * time.Second);
|
slog.Error("Failed to update/scale a service.", slog.Any("propagatedError", err), slog.String("serviceId", serviceId));
|
||||||
|
os.Exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), theConfig.Constants.ServiceScaleTimeout);
|
||||||
|
defer cancel();
|
||||||
|
|
||||||
|
waitForScale(serviceId, ctx, int(originalScaleChecked));
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForScale(serviceId string, ctx context.Context, desiredCount int){
|
||||||
|
startTime := time.Now();
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
tasks, err := ApiClient.TaskList(context.Background(), swarm.TaskListOptions{});
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("Failed to list tasks.", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
var running int;
|
||||||
|
for _, task := range tasks {
|
||||||
|
if task.ServiceID != serviceId {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if task.Status.State == swarm.TaskStateRunning{
|
||||||
|
running ++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if running == desiredCount {
|
||||||
|
slog.Debug("Rescaled Service",
|
||||||
|
slog.String("serviceId", serviceId),
|
||||||
|
slog.Any("took", time.Since(startTime)),
|
||||||
|
slog.Any("targetScale", desiredCount),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
time.Sleep(1*time.Second);
|
||||||
|
}
|
||||||
|
if ctx.Err() == context.DeadlineExceeded{
|
||||||
|
slog.Error("Failed to rescale service in given time.", slog.Any("serviceId", serviceId));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
16
host/host.go
16
host/host.go
@@ -25,6 +25,7 @@ type aService struct{
|
|||||||
ServiceId string `json:"serviceId"`;
|
ServiceId string `json:"serviceId"`;
|
||||||
VolumeNames []string `json:"volumeNames"`;
|
VolumeNames []string `json:"volumeNames"`;
|
||||||
Node string `json:"node"`;
|
Node string `json:"node"`;
|
||||||
|
Dependents []string `json:"dependents"`;
|
||||||
}
|
}
|
||||||
|
|
||||||
func Run(Config cfg.Config) {
|
func Run(Config cfg.Config) {
|
||||||
@@ -40,12 +41,19 @@ func Run(Config cfg.Config) {
|
|||||||
|
|
||||||
sshKeyPair := shared.GenerateSSHKeypair();
|
sshKeyPair := shared.GenerateSSHKeypair();
|
||||||
sshHostPkPem := exchangeKeys(Config, string(sshKeyPair.Public));
|
sshHostPkPem := exchangeKeys(Config, string(sshKeyPair.Public));
|
||||||
|
go prepullImage(Config);
|
||||||
createStorageContainer(Config, DockerClient, sshKeyPair.Private, sshHostPkPem);
|
createStorageContainer(Config, DockerClient, sshKeyPair.Private, sshHostPkPem);
|
||||||
|
|
||||||
services := getServices(Config);
|
services := getServices(Config);
|
||||||
|
|
||||||
for _, service := range services {
|
for _, service := range services {
|
||||||
|
|
||||||
|
for _, dependent := range service.Dependents {
|
||||||
|
slog.Info("Scaling Down Dependent", slog.String("serviceId", service.ServiceId), slog.String("dependentId", dependent));
|
||||||
|
scale(Config, dependent, false);
|
||||||
|
slog.Info("Done");
|
||||||
|
}
|
||||||
|
|
||||||
slog.Info("Scaling Down", slog.String("serviceId", service.ServiceId));
|
slog.Info("Scaling Down", slog.String("serviceId", service.ServiceId));
|
||||||
scale(Config, service.ServiceId, false);
|
scale(Config, service.ServiceId, false);
|
||||||
slog.Info("Done");
|
slog.Info("Done");
|
||||||
@@ -89,9 +97,16 @@ func Run(Config cfg.Config) {
|
|||||||
cleanupService(Config, service);
|
cleanupService(Config, service);
|
||||||
slog.Info("Done!");
|
slog.Info("Done!");
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("Scaling Up", slog.String("serviceId", service.ServiceId));
|
slog.Info("Scaling Up", slog.String("serviceId", service.ServiceId));
|
||||||
scale(Config, service.ServiceId, true);
|
scale(Config, service.ServiceId, true);
|
||||||
slog.Info("Done!");
|
slog.Info("Done!");
|
||||||
|
|
||||||
|
for _, dependent := range service.Dependents {
|
||||||
|
slog.Info("Scaling Up Dependent", slog.String("serviceId", service.ServiceId), slog.String("dependentId", dependent));
|
||||||
|
scale(Config, dependent, true);
|
||||||
|
slog.Info("Done");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DockerClient.ContainerRemove(context.Background(), Config.Constants.StorageContainerName, container.RemoveOptions{
|
DockerClient.ContainerRemove(context.Background(), Config.Constants.StorageContainerName, container.RemoveOptions{
|
||||||
@@ -127,6 +142,7 @@ func getServices(Config cfg.Config)[]aService{
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic("Failed to unmarshal response.");
|
panic("Failed to unmarshal response.");
|
||||||
}
|
}
|
||||||
|
slog.Debug("Services", slog.Any("value", services));
|
||||||
return services;
|
return services;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
30
host/prepullImage.go
Normal file
30
host/prepullImage.go
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
package host
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
cfg "github.com/rony5394/blazena/config"
|
||||||
|
);
|
||||||
|
|
||||||
|
func prepullImage(Config cfg.Config){
|
||||||
|
rq, err := http.NewRequest("POST", Config.DockerManagerBaseUrl + "/prepull", bytes.NewBufferString("{}"));
|
||||||
|
|
||||||
|
if err != nil{
|
||||||
|
slog.Error("Failed to create request", slog.Any("propagatedError", err), slog.String("note", "not send just create the object"));
|
||||||
|
os.Exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
rq.Header.Set("Authorization", "Bearer "+ token);
|
||||||
|
rq.Close = true;
|
||||||
|
rs, err := http.DefaultClient.Do(rq);
|
||||||
|
|
||||||
|
if err != nil{
|
||||||
|
slog.Error("Failed to send http request", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
defer rs.Body.Close();
|
||||||
|
}
|
||||||
@@ -37,7 +37,9 @@ func scale(Config cfg.Config, serviceId string, up bool)bool{
|
|||||||
rq.Header.Set("Authorization", "Bearer "+ token);
|
rq.Header.Set("Authorization", "Bearer "+ token);
|
||||||
rq.Close = true;
|
rq.Close = true;
|
||||||
rs, err := http.DefaultClient.Do(rq);
|
rs, err := http.DefaultClient.Do(rq);
|
||||||
defer rs.Body.Close();
|
if err == nil {
|
||||||
|
rs.Body.Close();
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil{
|
if err != nil{
|
||||||
panic("Failed to send http request"+ err.Error());
|
panic("Failed to send http request"+ err.Error());
|
||||||
|
|||||||
17
main.go
17
main.go
@@ -1,12 +1,16 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cfg "github.com/rony5394/blazena/config"
|
||||||
"github.com/rony5394/blazena/docker"
|
"github.com/rony5394/blazena/docker"
|
||||||
"github.com/rony5394/blazena/host"
|
"github.com/rony5394/blazena/host"
|
||||||
cfg "github.com/rony5394/blazena/config"
|
)
|
||||||
"log/slog"
|
|
||||||
);
|
//TODO: consider adding blazena.doNotTouch
|
||||||
|
|
||||||
/*
|
/*
|
||||||
If the exit code is X then it means Y:
|
If the exit code is X then it means Y:
|
||||||
@@ -39,6 +43,8 @@ func main() {
|
|||||||
|
|
||||||
slog.Debug("Config", slog.Any("Value", config));
|
slog.Debug("Config", slog.Any("Value", config));
|
||||||
|
|
||||||
|
startTime := time.Now();
|
||||||
|
|
||||||
mode := os.Args[1];
|
mode := os.Args[1];
|
||||||
switch mode {
|
switch mode {
|
||||||
case "docker":
|
case "docker":
|
||||||
@@ -47,7 +53,12 @@ func main() {
|
|||||||
case "host":
|
case "host":
|
||||||
host.Run(config);
|
host.Run(config);
|
||||||
break;
|
break;
|
||||||
|
case "pull":
|
||||||
|
os.Exit(0);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
panic("Invalid runtime mode!");
|
panic("Invalid runtime mode!");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
slog.Debug("Whole run took", slog.String("time", time.Since(startTime).String()));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,8 @@ import (
|
|||||||
"crypto/ed25519"
|
"crypto/ed25519"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
|
||||||
"golang.org/x/crypto/ssh"
|
"golang.org/x/crypto/ssh"
|
||||||
)
|
)
|
||||||
@@ -16,19 +18,22 @@ type Keypair struct {
|
|||||||
func GenerateSSHKeypair() Keypair {
|
func GenerateSSHKeypair() Keypair {
|
||||||
publicKey, privateKey, err := ed25519.GenerateKey(rand.Reader)
|
publicKey, privateKey, err := ed25519.GenerateKey(rand.Reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
slog.Error("Failed to generate an ssh keypair.", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(42);
|
||||||
}
|
}
|
||||||
|
|
||||||
privBlock, err := ssh.MarshalPrivateKey(privateKey, "")
|
privBlock, err := ssh.MarshalPrivateKey(privateKey, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
slog.Error("Failed to marshal private key", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(42);
|
||||||
}
|
}
|
||||||
|
|
||||||
privPem := pem.EncodeToMemory(privBlock)
|
privPem := pem.EncodeToMemory(privBlock)
|
||||||
|
|
||||||
sshPubKey, err := ssh.NewPublicKey(publicKey)
|
sshPubKey, err := ssh.NewPublicKey(publicKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
slog.Error("Failed deriving public ssh key from a private one.", slog.Any("propagatedError", err));
|
||||||
|
os.Exit(42);
|
||||||
}
|
}
|
||||||
|
|
||||||
pubBytes := ssh.MarshalAuthorizedKey(sshPubKey)
|
pubBytes := ssh.MarshalAuthorizedKey(sshPubKey)
|
||||||
|
|||||||
Reference in New Issue
Block a user